You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ol...@apache.org on 2008/08/14 01:47:35 UTC
svn commit: r685730 - in /incubator/pig/branches/types: ./
src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/
src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/
Author: olga
Date: Wed Aug 13 16:47:34 2008
New Revision: 685730
URL: http://svn.apache.org/viewvc?rev=685730&view=rev
Log:
PIG-311: corss is broken
Modified:
incubator/pig/branches/types/CHANGES.txt
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java
Modified: incubator/pig/branches/types/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/CHANGES.txt?rev=685730&r1=685729&r2=685730&view=diff
==============================================================================
--- incubator/pig/branches/types/CHANGES.txt (original)
+++ incubator/pig/branches/types/CHANGES.txt Wed Aug 13 16:47:34 2008
@@ -149,3 +149,5 @@
PIG-367: convinience function for UDFs to name schema
PIG-368: making JobConf available to Load/Store UDFs
+
+ PIG-311: cross is broken
Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java?rev=685730&r1=685729&r2=685730&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java Wed Aug 13 16:47:34 2008
@@ -19,6 +19,7 @@
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -32,7 +33,10 @@
import org.apache.pig.FuncSpec;
import org.apache.pig.LoadFunc;
import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.PigContext;
+import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.*;
@@ -41,6 +45,7 @@
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.BinaryExpressionOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.UnaryExpressionOperator;
import org.apache.pig.builtin.BinStorage;
+import org.apache.pig.impl.builtin.GFCross;
import org.apache.pig.impl.io.FileLocalizer;
import org.apache.pig.impl.io.FileSpec;
import org.apache.pig.impl.logicalLayer.*;
@@ -503,6 +508,119 @@
}
@Override
+ protected void visit(LOCross cs) throws VisitorException {
+ String scope = cs.getOperatorKey().scope;
+ List<LogicalOperator> inputs = cs.getInputs();
+
+ POGlobalRearrange poGlobal = new POGlobalRearrange(new OperatorKey(
+ scope, nodeGen.getNextNodeId(scope)), cs
+ .getRequestedParallelism());
+ POPackage poPackage = new POPackage(new OperatorKey(scope, nodeGen
+ .getNextNodeId(scope)), cs.getRequestedParallelism());
+
+ currentPlan.add(poGlobal);
+ currentPlan.add(poPackage);
+
+ int count = 0;
+
+ try {
+ currentPlan.connect(poGlobal, poPackage);
+ List<Boolean> flattenLst = Arrays.asList(true, true);
+
+ for (LogicalOperator op : inputs) {
+ List<PhysicalOperator> pop = Arrays.asList(LogToPhyMap.get(op));
+ PhysicalPlan fep1 = new PhysicalPlan();
+ ConstantExpression ce1 = new ConstantExpression(new OperatorKey(scope, nodeGen.getNextNodeId(scope)),cs.getRequestedParallelism());
+ Tuple ce1val = TupleFactory.getInstance().newTuple(2);
+ ce1val.set(0,inputs.size());
+ ce1val.set(1,count);
+ ce1.setValue(ce1val);
+ ce1.setResultType(DataType.TUPLE);
+
+ fep1.add(ce1);
+
+ POUserFunc gfc = new POUserFunc(new OperatorKey(scope, nodeGen.getNextNodeId(scope)),cs.getRequestedParallelism(), Arrays.asList((PhysicalOperator)ce1), new FuncSpec(GFCross.class.getName()));
+ gfc.setResultType(DataType.BAG);
+ fep1.addAsLeaf(gfc);
+
+ PhysicalPlan fep2 = new PhysicalPlan();
+ POProject feproj = new POProject(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), cs.getRequestedParallelism());
+ feproj.setResultType(DataType.TUPLE);
+ feproj.setStar(true);
+ feproj.setOverloaded(false);
+ fep2.add(feproj);
+ List<PhysicalPlan> fePlans = Arrays.asList(fep1, fep2);
+
+ POForEach fe = new POForEach(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), cs.getRequestedParallelism(), fePlans, flattenLst );
+ currentPlan.add(fe);
+ currentPlan.connect(LogToPhyMap.get(op), fe);
+
+ POLocalRearrange physOp = new POLocalRearrange(new OperatorKey(
+ scope, nodeGen.getNextNodeId(scope)), cs
+ .getRequestedParallelism());
+ List<PhysicalPlan> lrPlans = new ArrayList<PhysicalPlan>();
+ for(int i=0;i<inputs.size();i++){
+ PhysicalPlan lrp1 = new PhysicalPlan();
+ POProject lrproj1 = new POProject(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), cs.getRequestedParallelism(), i);
+ lrproj1.setOverloaded(false);
+ lrproj1.setResultType(DataType.INTEGER);
+ lrp1.add(lrproj1);
+ lrPlans.add(lrp1);
+ }
+
+ physOp.setCross(true);
+ physOp.setIndex(count++);
+ physOp.setKeyType(DataType.TUPLE);
+ physOp.setPlans(lrPlans);
+ physOp.setResultType(DataType.TUPLE);
+
+ currentPlan.add(physOp);
+ currentPlan.connect(fe, physOp);
+ currentPlan.connect(physOp, poGlobal);
+ }
+ } catch (PlanException e1) {
+ log.error("Invalid physical operators in the physical plan"
+ + e1.getMessage());
+ throw new VisitorException(e1);
+ } catch (ExecException e) {
+ log.error("Unable to create the constant tuple because " + e.getMessage());
+ throw new VisitorException(e);
+ }
+
+ poPackage.setKeyType(DataType.TUPLE);
+ poPackage.setResultType(DataType.TUPLE);
+ poPackage.setNumInps(count);
+ boolean inner[] = new boolean[count];
+ for (int i=0;i<count;i++) {
+ inner[i] = true;
+ }
+ poPackage.setInner(inner);
+
+ List<PhysicalPlan> fePlans = new ArrayList<PhysicalPlan>();
+ List<Boolean> flattenLst = new ArrayList<Boolean>();
+ for(int i=1;i<=count;i++){
+ PhysicalPlan fep1 = new PhysicalPlan();
+ POProject feproj1 = new POProject(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), cs.getRequestedParallelism(), i);
+ feproj1.setResultType(DataType.BAG);
+ feproj1.setOverloaded(false);
+ fep1.add(feproj1);
+ fePlans.add(fep1);
+ flattenLst.add(true);
+ }
+
+ POForEach fe = new POForEach(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), cs.getRequestedParallelism(), fePlans, flattenLst );
+ currentPlan.add(fe);
+ try{
+ currentPlan.connect(poPackage, fe);
+ }catch (PlanException e1) {
+ log.error("Invalid physical operators in the physical plan"
+ + e1.getMessage());
+ throw new VisitorException(e1);
+ }
+ LogToPhyMap.put(cs, fe);
+ }
+
+ @Override
public void visit(LOCogroup cg) throws VisitorException {
boolean currentPhysicalPlan = false;
String scope = cg.getOperatorKey().scope;
Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java?rev=685730&r1=685729&r2=685730&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java Wed Aug 13 16:47:34 2008
@@ -64,6 +64,8 @@
byte keyType;
private boolean mIsDistinct = false;
+
+ private boolean isCross = false;
// A place holder IndexedTuple used in distinct case where we really don't
// have any value to pass through. But hadoop gets cranky if we pass a
@@ -228,6 +230,10 @@
outPut.set(1, mFakeIndexedTuple);
return outPut;
} else {
+ if(isCross){
+ for(int i=0;i<plans.size();i++)
+ value.getAll().remove(0);
+ }
//Create the indexed tuple out of the value
//that is remaining in the input tuple
IndexedTuple it = new IndexedTuple(value, index);
@@ -284,5 +290,13 @@
return clone;
}
+ public boolean isCross() {
+ return isCross;
+ }
+
+ public void setCross(boolean isCross) {
+ this.isCross = isCross;
+ }
+
}