You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ga...@apache.org on 2008/05/13 23:11:24 UTC
svn commit: r656011 [5/5] - in /incubator/pig/branches/types: ./ lib/
src/org/apache/pig/ src/org/apache/pig/backend/hadoop/executionengine/
src/org/apache/pig/data/ src/org/apache/pig/impl/io/
src/org/apache/pig/impl/logicalLayer/ src/org/apache/pig/i...
Modified: incubator/pig/branches/types/test/org/apache/pig/test/utils/GenPhyOp.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/utils/GenPhyOp.java?rev=656011&r1=656010&r2=656011&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/utils/GenPhyOp.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/utils/GenPhyOp.java Tue May 13 14:11:21 2008
@@ -17,33 +17,42 @@
*/
package org.apache.pig.test.utils;
+import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
import java.util.Random;
import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.builtin.BinStorage;
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.io.FileLocalizer;
+import org.apache.pig.impl.io.FileSpec;
import org.apache.pig.impl.logicalLayer.OperatorKey;
+import org.apache.pig.impl.mapReduceLayer.MapReduceOper;
import org.apache.pig.impl.physicalLayer.plans.ExprPlan;
import org.apache.pig.impl.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.impl.physicalLayer.topLevelOperators.POFilter;
import org.apache.pig.impl.physicalLayer.topLevelOperators.POForEach;
import org.apache.pig.impl.physicalLayer.topLevelOperators.POGenerate;
-import org.apache.pig.impl.physicalLayer.topLevelOperators.PhysicalOperator;
-// import org.apache.pig.impl.physicalLayer.topLevelOperators.POGlobalRearrange;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.POGlobalRearrange;
import org.apache.pig.impl.physicalLayer.topLevelOperators.POLoad;
import org.apache.pig.impl.physicalLayer.topLevelOperators.POLocalRearrange;
-// import org.apache.pig.impl.physicalLayer.topLevelOperators.POPackage;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.POPackage;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.POSplit;
import org.apache.pig.impl.physicalLayer.topLevelOperators.POStore;
-// import org.apache.pig.impl.physicalLayer.topLevelOperators.StartMap;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.POUnion;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.PhysicalOperator;
import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.ConstantExpression;
-import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.ExpressionOperator;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.POCast;
import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.POProject;
-// import
-// org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.binaryExprOps.comparators.EqualToExpr;
-// import
-// org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.binaryExprOps.comparators.GTOrEqualToExpr;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.binaryExprOps.arithmeticOperators.Add;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.binaryExprOps.arithmeticOperators.Divide;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.binaryExprOps.arithmeticOperators.Mod;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.binaryExprOps.arithmeticOperators.Multiply;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.binaryExprOps.arithmeticOperators.Subtract;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.binaryExprOps.comparators.ComparisonOperator;
import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.binaryExprOps.comparators.EqualToExpr;
import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.binaryExprOps.comparators.GTOrEqualToExpr;
import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.binaryExprOps.comparators.GreaterThanExpr;
@@ -52,9 +61,17 @@
import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.binaryExprOps.comparators.NotEqualToExpr;
import org.apache.pig.impl.plan.PlanException;
-public class GenPhyOp {
+public class GenPhyOp{
static Random r = new Random();
-
+
+ public static final byte GTE = 1;
+ public static final byte GT = 2;
+ public static final byte LTE = 3;
+ public static final byte LT = 4;
+
+ public static PigContext pc;
+// public static MiniCluster cluster = MiniCluster.buildCluster();
+
public static ConstantExpression exprConst() {
ConstantExpression ret = new ConstantExpression(new OperatorKey("", r
.nextLong()));
@@ -106,6 +123,17 @@
return ret;
}
+ public static POGlobalRearrange topGlobalRearrangeOp(){
+ POGlobalRearrange ret = new POGlobalRearrange(new OperatorKey("", r
+ .nextLong()));
+ return ret;
+ }
+
+ public static POPackage topPackageOp(){
+ POPackage ret = new POPackage(new OperatorKey("", r.nextLong()));
+ return ret;
+ }
+
public static POForEach topForEachOp() {
POForEach ret = new POForEach(new OperatorKey("", r
.nextLong()));
@@ -117,6 +145,11 @@
return ret;
}
+ public static POUnion topUnionOp() {
+ POUnion ret = new POUnion(new OperatorKey("", r.nextLong()));
+ return ret;
+ }
+
/**
* creates the POGenerate operator for
* generate grpCol, *.
@@ -168,6 +201,65 @@
/**
* creates the POGenerate operator for
+ * generate grpCol, *.
+ *
+ * @param grpCol - The column to be grouped on
+ * @param sample - The sample tuple that is used to infer
+ * result types and #projects for *
+ * @return - The POGenerate operator which has the exprplan
+ * for generate grpCol, * set.
+ * @throws ExecException
+ * @throws PlanException
+ */
+ public static POGenerate topGenerateOpWithExPlanLR(int grpCol, Tuple sample) throws ExecException, PlanException {
+ POProject prj1 = new POProject(new OperatorKey("", r.nextLong()), -1, grpCol);
+ prj1.setResultType(sample.getType(grpCol));
+ prj1.setOverloaded(false);
+
+ POCast cst = new POCast(new OperatorKey("",r.nextLong()));
+ cst.setResultType(sample.getType(grpCol));
+
+ List<Boolean> toBeFlattened = new LinkedList<Boolean>();
+ toBeFlattened.add(false);
+
+
+ ExprPlan plan1 = new ExprPlan();
+ plan1.add(prj1);
+ plan1.add(cst);
+ plan1.connect(prj1, cst);
+
+ List<ExprPlan> inputs = new LinkedList<ExprPlan>();
+ inputs.add(plan1);
+
+ POProject rest[] = new POProject[sample.size()];
+ POCast csts[] = new POCast[sample.size()];
+ int i=-1;
+ for (POProject project : rest) {
+ project = new POProject(new OperatorKey("", r.nextLong()), -1, ++i);
+ project.setResultType(sample.getType(i));
+ project.setOverloaded(false);
+
+ csts[i] = new POCast(new OperatorKey("",r.nextLong()));
+ csts[i].setResultType(sample.getType(i));
+
+ ExprPlan pl = new ExprPlan();
+ pl.add(project);
+ pl.add(csts[i]);
+ pl.connect(project, csts[i]);
+
+ toBeFlattened.add(false);
+ inputs.add(pl);
+ }
+
+
+
+ POGenerate ret = new POGenerate(new OperatorKey("", r.nextLong()),
+ inputs, toBeFlattened);
+ return ret;
+ }
+
+ /**
+ * creates the POGenerate operator for
* 'generate field'.
*
* @param field - The column to be generated
@@ -200,6 +292,129 @@
}
/**
+ * creates the POGenerate operator for
+ * 'generate flatten(field)'.
+ *
+ * @param field - The column to be generated
+ * @param sample - The sample tuple that is used to infer
+ * result type
+ * @return - The POGenerate operator which has the exprplan
+ * for 'generate field' set.
+ * @throws ExecException
+ */
+ public static POGenerate topGenerateOpWithExPlanForFeFlat(int field) throws ExecException {
+ POProject prj1 = new POProject(new OperatorKey("", r.nextLong()), -1, field);
+ prj1.setResultType(DataType.BAG);
+ prj1.setOverloaded(false);
+
+ List<Boolean> toBeFlattened = new LinkedList<Boolean>();
+ toBeFlattened.add(true);
+
+ ExprPlan plan1 = new ExprPlan();
+ plan1.add(prj1);
+
+ List<ExprPlan> inputs = new LinkedList<ExprPlan>();
+ inputs.add(plan1);
+
+ POGenerate ret = new POGenerate(new OperatorKey("", r.nextLong()),
+ inputs, toBeFlattened);
+ return ret;
+ }
+
+ /**
+ * creates the POGenerate operator for
+ * 'generate field[0] field[1] ...'.
+ *
+ * @param field - The columns to be generated
+ * @param sample - The sample tuple that is used to infer
+ * result type
+ * @return - The POGenerate operator which has the exprplan
+ * for 'generate field[0] field[1]' set.
+ * @throws ExecException
+ * @throws PlanException
+ */
+ public static POGenerate topGenerateOpWithExPlanForFe(int[] fields, Tuple sample) throws ExecException, PlanException {
+ POProject[] prj = new POProject[fields.length];
+
+ for(int i=0;i<prj.length;i++){
+ prj[i] = new POProject(new OperatorKey("", r.nextLong()), -1, fields[i]);
+ prj[i].setResultType(sample.getType(fields[i]));
+ prj[i].setOverloaded(false);
+ }
+
+
+ POCast[] cst = new POCast[fields.length];
+
+ List<Boolean> toBeFlattened = new LinkedList<Boolean>();
+ for (POProject project : prj)
+ toBeFlattened.add(false);
+
+
+
+ List<ExprPlan> inputs = new LinkedList<ExprPlan>();
+ ExprPlan[] plans = new ExprPlan[fields.length];
+ for (int i=0;i<plans.length;i++) {
+ plans[i] = new ExprPlan();
+ plans[i].add(prj[i]);
+ cst[i] = new POCast(new OperatorKey("",r.nextLong()));
+ cst[i].setResultType(sample.getType(fields[i]));
+ plans[i].add(cst[i]);
+ plans[i].connect(prj[i], cst[i]);
+ inputs.add(plans[i]);
+ }
+
+ POGenerate ret = new POGenerate(new OperatorKey("", r.nextLong()),
+ inputs, toBeFlattened);
+ return ret;
+ }
+
+ /**
+ * creates the POGenerate operator for
+ * 'generate field[0] field[1] ...'.
+ * with the flatten list as specified
+ * @param field - The columns to be generated
+ * @param toBeFlattened - The columns to be flattened
+ * @param sample - The sample tuple that is used to infer
+ * result type
+ * @return - The POGenerate operator which has the exprplan
+ * for 'generate field[0] field[1]' set.
+ * @throws ExecException
+ * @throws PlanException
+ */
+ public static POGenerate topGenerateOpWithExPlanForFe(int[] fields, Tuple sample, List<Boolean> toBeFlattened) throws ExecException, PlanException {
+ POProject[] prj = new POProject[fields.length];
+
+ for(int i=0;i<prj.length;i++){
+ prj[i] = new POProject(new OperatorKey("", r.nextLong()), -1, fields[i]);
+ prj[i].setResultType(sample.getType(fields[i]));
+ prj[i].setOverloaded(false);
+ }
+
+
+ POCast[] cst = new POCast[fields.length];
+
+ /*List<Boolean> toBeFlattened = new LinkedList<Boolean>();
+ for (POProject project : prj)
+ toBeFlattened.add(false);*/
+
+ List<ExprPlan> inputs = new LinkedList<ExprPlan>();
+ ExprPlan[] plans = new ExprPlan[fields.length];
+ for (int i=0;i<plans.length;i++) {
+ plans[i] = new ExprPlan();
+ plans[i].add(prj[i]);
+ cst[i] = new POCast(new OperatorKey("",r.nextLong()));
+ cst[i].setResultType(sample.getType(fields[i]));
+ plans[i].add(cst[i]);
+ plans[i].connect(prj[i], cst[i]);
+ inputs.add(plans[i]);
+ }
+
+ POGenerate ret = new POGenerate(new OperatorKey("", r.nextLong()),
+ inputs, toBeFlattened);
+ return ret;
+ }
+
+ /**
* creates the POLocalRearrange operator with the given index for
* group by grpCol
* @param index - The input index of this POLocalRearrange operator
@@ -208,7 +423,28 @@
* @return - The POLocalRearrange operator
* @throws ExecException
*/
- public static POLocalRearrange topLocalRearrangeOPWithPlan(int index, int grpCol, Tuple sample) throws ExecException{
+ public static POLocalRearrange topLocalRearrangeOPWithPlan(int index, int grpCol, Tuple sample) throws ExecException, PlanException{
+ POGenerate gen = topGenerateOpWithExPlanLR(grpCol, sample);
+ PhysicalPlan<PhysicalOperator> pp = new PhysicalPlan<PhysicalOperator>();
+ pp.add(gen);
+
+ POLocalRearrange ret = topLocalRearrangeOp();
+ ret.setPlan(pp);
+ ret.setIndex(index);
+ ret.setResultType(DataType.TUPLE);
+ return ret;
+ }
+
+ /**
+ * creates the POLocalRearrange operator with the given index for
+ * group by grpCol
+ * @param index - The input index of this POLocalRearrange operator
+ * @param grpCol - The column to be grouped on
+ * @param sample - Sample tuple needed for topGenerateOpWithExPlan
+ * @return - The POLocalRearrange operator
+ * @throws ExecException
+ */
+ public static POLocalRearrange topLocalRearrangeOPWithPlanPlain(int index, int grpCol, Tuple sample) throws ExecException, PlanException{
POGenerate gen = topGenerateOpWithExPlan(grpCol, sample);
PhysicalPlan<PhysicalOperator> pp = new PhysicalPlan<PhysicalOperator>();
pp.add(gen);
@@ -228,7 +464,7 @@
* @return - The POForEach operator
* @throws ExecException
*/
- public static POForEach topForEachOPWithPlan(int field, Tuple sample) throws ExecException{
+ public static POForEach topForEachOPWithPlan(int field, Tuple sample) throws ExecException, PlanException{
POGenerate gen = topGenerateOpWithExPlanForFe(field, sample);
PhysicalPlan<PhysicalOperator> pp = new PhysicalPlan<PhysicalOperator>();
pp.add(gen);
@@ -239,8 +475,67 @@
return ret;
}
+ /**
+ * creates the POForEach operator for
+ * foreach A generate field[0] field[1]
+ * @param fields - The columns to be generated
+ * @param sample - Sample tuple needed for topGenerateOpWithExPlanForFe
+ * @return - The POForEach operator
+ * @throws ExecException
+ */
+ public static POForEach topForEachOPWithPlan(int[] fields, Tuple sample) throws ExecException, PlanException{
+ POGenerate gen = topGenerateOpWithExPlanForFe(fields, sample);
+ PhysicalPlan<PhysicalOperator> pp = new PhysicalPlan<PhysicalOperator>();
+ pp.add(gen);
+
+ POForEach ret = topForEachOp();
+ ret.setPlan(pp);
+ ret.setResultType(DataType.TUPLE);
+ return ret;
+ }
+
+ /**
+ * creates the POForEach operator for
+ * foreach A generate field[0] field[1]
+ * @param fields - The columns to be generated
+ * @param sample - Sample tuple needed for topGenerateOpWithExPlanForFe
+ * @return - The POForEach operator
+ * @throws ExecException
+ */
+ public static POForEach topForEachOPWithPlan(int[] fields, Tuple sample, List<Boolean> toBeFlattened) throws ExecException, PlanException{
+ POGenerate gen = topGenerateOpWithExPlanForFe(fields, sample, toBeFlattened);
+ PhysicalPlan<PhysicalOperator> pp = new PhysicalPlan<PhysicalOperator>();
+ pp.add(gen);
+
+ POForEach ret = topForEachOp();
+ ret.setPlan(pp);
+ ret.setResultType(DataType.TUPLE);
+ return ret;
+ }
+
+ /**
+ * creates the POForEach operator for
+ * foreach A generate flatten(field)
+ * @param fields - The columns to be generated
+ * @param sample - Sample tuple needed for topGenerateOpWithExPlanForFe
+ * @return - The POForEach operator
+ * @throws ExecException
+ */
+ public static POForEach topForEachOPWithPlan(int field) throws ExecException, PlanException{
+ POGenerate gen = topGenerateOpWithExPlanForFeFlat(field);
+ PhysicalPlan<PhysicalOperator> pp = new PhysicalPlan<PhysicalOperator>();
+ pp.add(gen);
+
+ POForEach ret = topForEachOp();
+ ret.setPlan(pp);
+ ret.setResultType(DataType.TUPLE);
+ return ret;
+ }
+
+
public static POLoad topLoadOp() {
POLoad ret = new POLoad(new OperatorKey("", r.nextLong()));
+ ret.setPc(pc);
return ret;
}
@@ -250,7 +545,7 @@
}
public static POFilter topFilterOpWithExPlan(int lhsVal, int rhsVal)
- throws PlanException {
+ throws ExecException, PlanException {
POFilter ret = new POFilter(new OperatorKey("", r.nextLong()));
ConstantExpression ce1 = GenPhyOp.exprConst();
@@ -278,7 +573,7 @@
}
public static POFilter topFilterOpWithProj(int col, int rhsVal)
- throws PlanException {
+ throws ExecException, PlanException {
POFilter ret = new POFilter(new OperatorKey("", r.nextLong()));
POProject proj = exprProject();
@@ -306,26 +601,219 @@
return ret;
}
+
+ public static POFilter topFilterOpWithProj(int col, int rhsVal,
+ byte CompType) throws ExecException, PlanException {
+ POFilter ret = new POFilter(new OperatorKey("", r.nextLong()));
+
+ POProject proj = exprProject();
+ proj.setResultType(DataType.INTEGER);
+ proj.setColumn(col);
+ proj.setOverloaded(false);
+
+ ConstantExpression ce2 = GenPhyOp.exprConst();
+ ce2.setValue(rhsVal);
+
+ ComparisonOperator cop = null;
+ switch (CompType) {
+ case GenPhyOp.GTE:
+ cop = GenPhyOp.compGTOrEqualToExpr();
+ break;
+ case GenPhyOp.GT:
+ cop = GenPhyOp.compGreaterThanExpr();
+ break;
+ case GenPhyOp.LTE:
+ cop = GenPhyOp.compLTOrEqualToExpr();
+ break;
+ case GenPhyOp.LT:
+ cop = GenPhyOp.compLessThanExpr();
+ break;
+ }
+
+ cop.setLhs(proj);
+ cop.setRhs(ce2);
+ cop.setOperandType(DataType.INTEGER);
+
+ ExprPlan ep = new ExprPlan();
+ ep.add(proj);
+ ep.add(ce2);
+ ep.add(cop);
+
+ ep.connect(proj, cop);
+ ep.connect(ce2, cop);
+
+ ret.setPlan(ep);
+
+ return ret;
+ }
+
+ public static POFilter topFilterOpWithProjWithCast(int col, int rhsVal, byte CompType)
+ throws ExecException, PlanException {
+ POFilter ret = new POFilter(new OperatorKey("", r.nextLong()));
+
+ POProject proj = exprProject();
+ proj.setResultType(DataType.INTEGER);
+ proj.setColumn(col);
+ proj.setOverloaded(false);
+
+ ConstantExpression ce2 = GenPhyOp.exprConst();
+ ce2.setValue(rhsVal);
+
+ ComparisonOperator cop = null;
+ switch(CompType){
+ case GenPhyOp.GTE:
+ cop = GenPhyOp.compGTOrEqualToExpr();
+ break;
+ case GenPhyOp.GT:
+ cop = GenPhyOp.compGreaterThanExpr();
+ break;
+ case GenPhyOp.LTE:
+ cop = GenPhyOp.compLTOrEqualToExpr();
+ break;
+ case GenPhyOp.LT:
+ cop = GenPhyOp.compLessThanExpr();
+ break;
+ }
+
+ POCast cst = new POCast(new OperatorKey("",r.nextLong()));
+
+ cop.setLhs(cst);
+ cop.setRhs(ce2);
+ cop.setOperandType(DataType.INTEGER);
+
+ ExprPlan ep = new ExprPlan();
+ ep.add(cst);
+ ep.add(proj);
+ ep.add(ce2);
+ ep.add(cop);
+
+ ep.connect(proj, cst);
+ ep.connect(cst, cop);
+ ep.connect(ce2, cop);
+
+ ret.setPlan(ep);
- //
- // public static POGlobalRearrange topGlobalRearrangeOp(){
- // POGlobalRearrange ret = new POGlobalRearrange(new
- // OperatorKey("",r.nextLong()));
- // return ret;
- // }
- //
- // public static POPackage topPackageOp(){
- // POPackage ret = new POPackage(new OperatorKey("",r.nextLong()));
- // return ret;
- // }
- //
+ return ret;
+ }
+
public static POStore topStoreOp() {
POStore ret = new POStore(new OperatorKey("", r.nextLong()));
+ ret.setPc(pc);
+ return ret;
+ }
+
+ public static void setR(Random r) {
+ GenPhyOp.r = r;
+ }
+
+ public static MapReduceOper MROp(){
+ MapReduceOper ret = new MapReduceOper(new OperatorKey("",r.nextLong()));
+ return ret;
+ }
+
+ private static FileSpec getTempFileSpec() throws IOException {
+ return new FileSpec(FileLocalizer.getTemporaryPath(null, pc).toString(),BinStorage.class.getName());
+ }
+
+ public static POSplit topSplitOp() throws IOException{
+ POSplit ret = new POSplit(new OperatorKey("",r.nextLong()));
+ ret.setSplitStore(getTempFileSpec());
return ret;
}
- //
- // public static StartMap topStartMapOp(){
- // StartMap ret = new StartMap(new OperatorKey("",r.nextLong()));
- // return ret;
- // }
+
+ public static PhysicalPlan<PhysicalOperator> grpChain() throws ExecException, PlanException{
+ PhysicalPlan<PhysicalOperator> grpChain = new PhysicalPlan<PhysicalOperator>();
+ POLocalRearrange lr = GenPhyOp.topLocalRearrangeOp();
+ POGlobalRearrange gr = GenPhyOp.topGlobalRearrangeOp();
+ POPackage pk = GenPhyOp.topPackageOp();
+
+ grpChain.add(lr);
+ grpChain.add(gr);
+ grpChain.add(pk);
+
+ grpChain.connect(lr, gr);
+ grpChain.connect(gr, pk);
+
+ return grpChain;
+ }
+
+ public static PhysicalPlan<PhysicalOperator> loadedGrpChain() throws ExecException, PlanException{
+ PhysicalPlan<PhysicalOperator> ret = new PhysicalPlan<PhysicalOperator>();
+ POLoad ld = GenPhyOp.topLoadOp();
+ POLocalRearrange lr = GenPhyOp.topLocalRearrangeOp();
+ POGlobalRearrange gr = GenPhyOp.topGlobalRearrangeOp();
+ POPackage pk = GenPhyOp.topPackageOp();
+
+ ret.add(ld);
+ ret.add(lr);
+ ret.add(gr);
+ ret.add(pk);
+
+ ret.connect(ld, lr);
+ ret.connect(lr, gr);
+ ret.connect(gr, pk);
+
+ return ret;
+ }
+
+ public static PhysicalPlan<PhysicalOperator> loadedFilter() throws ExecException, PlanException{
+ PhysicalPlan<PhysicalOperator> ret = new PhysicalPlan<PhysicalOperator>();
+ POLoad ld = GenPhyOp.topLoadOp();
+ POFilter fl = GenPhyOp.topFilterOp();
+ ret.add(ld);
+ ret.add(fl);
+
+ ret.connect(ld, fl);
+ return ret;
+ }
+
+ public static ExprPlan arithPlan() throws PlanException{
+ ExprPlan ep = new ExprPlan();
+ ConstantExpression ce[] = new ConstantExpression[7];
+ for(int i=0;i<ce.length;i++){
+ ce[i] = GenPhyOp.exprConst();
+ ce[i].setValue(i);
+ ep.add(ce[i]);
+ }
+
+ Add ad = new Add(getOK());
+ ep.add(ad);
+ ep.connect(ce[0], ad);
+ ep.connect(ce[1], ad);
+
+ Divide div = new Divide(getOK());
+ ep.add(div);
+ ep.connect(ce[2], div);
+ ep.connect(ce[3], div);
+
+ Subtract sub = new Subtract(getOK());
+ ep.add(sub);
+ ep.connect(ad, sub);
+ ep.connect(div, sub);
+
+ Mod mod = new Mod(getOK());
+ ep.add(mod);
+ ep.connect(ce[4], mod);
+ ep.connect(ce[5], mod);
+
+ Multiply mul1 = new Multiply(getOK());
+ ep.add(mul1);
+ ep.connect(mod, mul1);
+ ep.connect(ce[6], mul1);
+
+ Multiply mul2 = new Multiply(getOK());
+ ep.add(mul2);
+ ep.connect(sub, mul2);
+ ep.connect(mul1, mul2);
+
+ return ep;
+ }
+
+ private static OperatorKey getOK(){
+ return new OperatorKey("",r.nextLong());
+ }
+
+ public static void setPc(PigContext pc) {
+ GenPhyOp.pc = pc;
+ }
}
Modified: incubator/pig/branches/types/test/org/apache/pig/test/utils/TestHelper.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/utils/TestHelper.java?rev=656011&r1=656010&r2=656011&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/utils/TestHelper.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/utils/TestHelper.java Tue May 13 14:11:21 2008
@@ -17,13 +17,24 @@
*/
package org.apache.pig.test.utils;
+import java.io.IOException;
+import java.io.InputStream;
import java.util.Iterator;
+import java.util.Random;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataByteArray;
import org.apache.pig.data.DefaultBagFactory;
import org.apache.pig.data.DefaultTuple;
import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.io.FileSpec;
+import org.apache.pig.impl.logicalLayer.OperatorKey;
+import org.apache.pig.impl.physicalLayer.POStatus;
+import org.apache.pig.impl.physicalLayer.Result;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.POLoad;
/**
* Will contain static methods that will be useful
@@ -31,6 +42,7 @@
*
*/
public class TestHelper {
+ public static int dispAfterNumTuples = 1000;
public static boolean bagContains(DataBag db, Tuple t) {
Iterator<Tuple> iter = db.iterator();
for (Tuple tuple : db) {
@@ -43,7 +55,8 @@
public static boolean compareBags(DataBag db1, DataBag db2) {
if (db1.size() != db2.size())
return false;
-
+
+ int i=-1;
boolean equal = true;
for (Tuple tuple : db2) {
boolean contains = false;
@@ -57,6 +70,8 @@
equal = false;
break;
}
+ /*if(++i%dispAfterNumTuples==0)
+ System.out.println(i/dispAfterNumTuples);*/
}
return equal;
}
@@ -71,4 +86,129 @@
}
return ret;
}
+
+ public static DataBag projectBag(DataBag db2, int[] fields) throws ExecException {
+ DataBag ret = DefaultBagFactory.getInstance().newDefaultBag();
+ for (Tuple tuple : db2) {
+ Tuple t1 = new DefaultTuple();
+ for (int fld : fields) {
+ Object o = tuple.get(fld);
+ t1.append(o);
+ }
+ ret.add(t1);
+ }
+ return ret;
+ }
+
+ public static int compareInputStreams(InputStream exp, InputStream act) throws IOException{
+ byte[] bExp = new byte[4096], bAct = new byte[4096];
+
+ int outLen,inLen = -1;
+ while(act.read(bAct)!=-1){
+ exp.read(bExp);
+ int cmp = compareByteArray(bExp, bAct);
+ if(cmp!=0)
+ return cmp;
+ }
+ return 0;
+ }
+
+ public static int compareByteArray(byte[] b1, byte[] b2){
+ if(b1.length>b2.length)
+ return 1;
+ else if(b1.length<b2.length)
+ return -1;
+ for(int i=0;i<b1.length;i++){
+ if(b1[i]>b2[i])
+ return 1;
+ else if(b1[i]<b2[i])
+ return -1;
+ }
+ return 0;
+ }
+
+ /*public static boolean areFilesSame(FileSpec expLocal, FileSpec actHadoop, PigContext pc, int dispAftNumTuples) throws ExecException, IOException{
+ Random r = new Random();
+
+ POLoad ldExp = new POLoad(new OperatorKey("", r.nextLong()));
+ ldExp.setPc(pc);
+ ldExp.setLFile(expLocal);
+
+ POLoad ldAct = new POLoad(new OperatorKey("", r.nextLong()));
+ ldAct.setPc(pc);
+ ldAct.setLFile(actHadoop);
+
+ Tuple t = null;
+ int numActTuples = -1;
+ boolean matches = true;
+ for(Result resAct=ldAct.getNext(t);resAct.returnStatus!=POStatus.STATUS_EOP;resAct=ldAct.getNext(t)){
+ Tuple tupAct = (Tuple)resAct.result;
+ ++numActTuples;
+ boolean found = false;
+ for(Result resExp=ldExp.getNext(t);resExp.returnStatus!=POStatus.STATUS_EOP;resExp=ldExp.getNext(t)){
+ Tuple tupExp = (Tuple)resExp.result;
+ if(tupAct.compareTo(tupExp)==0){
+ found = true;
+ ldExp.tearDown();
+ break;
+ }
+ }
+ if(!found){
+ matches = false;
+ break;
+ }
+ if(numActTuples%dispAftNumTuples ==0)
+ System.out.println(numActTuples/dispAftNumTuples);
+ }
+
+ int numExpTuples = -1;
+ while(ldExp.getNext(t).returnStatus!=POStatus.STATUS_EOP)
+ ++numExpTuples;
+
+ return (matches && numActTuples==numExpTuples);
+ }*/
+
+ public static boolean areFilesSame(FileSpec expLocal, FileSpec actHadoop, PigContext pc) throws ExecException, IOException{
+ Random r = new Random();
+
+ POLoad ldExp = new POLoad(new OperatorKey("", r.nextLong()));
+ ldExp.setPc(pc);
+ ldExp.setLFile(expLocal);
+
+ POLoad ldAct = new POLoad(new OperatorKey("", r.nextLong()));
+ ldAct.setPc(pc);
+ ldAct.setLFile(actHadoop);
+
+ Tuple t = null;
+ int numActTuples = -1;
+ DataBag bagAct = DefaultBagFactory.getInstance().newDefaultBag();
+ Result resAct = null;
+ while((resAct = ldAct.getNext(t)).returnStatus!=POStatus.STATUS_EOP){
+ ++numActTuples;
+ bagAct.add(trimTuple((Tuple)resAct.result));
+ }
+
+ int numExpTuples = -1;
+ DataBag bagExp = DefaultBagFactory.getInstance().newDefaultBag();
+ Result resExp = null;
+ while((resExp = ldExp.getNext(t)).returnStatus!=POStatus.STATUS_EOP){
+ ++numExpTuples;
+ bagExp.add(trimTuple((Tuple)resExp.result));
+ }
+
+ if(numActTuples!=numExpTuples)
+ return false;
+
+ return compareBags(bagExp, bagAct);
+ }
+
+ private static Tuple trimTuple(Tuple t){
+ Tuple ret = TupleFactory.getInstance().newTuple();
+ for (Object o : t.getAll()) {
+ DataByteArray dba = (DataByteArray)o;
+ DataByteArray nDba = new DataByteArray(dba.toString().trim().getBytes());
+ ret.append(nDba);
+ }
+ return ret;
+ }
}