You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ol...@apache.org on 2008/09/26 23:25:02 UTC
svn commit: r699506 - in
/incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/physicalLayer:
./ relationalOperators/
Author: olga
Date: Fri Sep 26 14:25:02 2008
New Revision: 699506
URL: http://svn.apache.org/viewvc?rev=699506&view=rev
Log:
missing illustrate files
Added:
incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/physicalLayer/
incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/physicalLayer/LocalLogToPhyTranslationVisitor.java
incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/physicalLayer/relationalOperators/
incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/physicalLayer/relationalOperators/POCogroup.java
incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/physicalLayer/relationalOperators/POSplit.java
incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/physicalLayer/relationalOperators/POSplitOutput.java
Added: incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/physicalLayer/LocalLogToPhyTranslationVisitor.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/physicalLayer/LocalLogToPhyTranslationVisitor.java?rev=699506&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/physicalLayer/LocalLogToPhyTranslationVisitor.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/physicalLayer/LocalLogToPhyTranslationVisitor.java Fri Sep 26 14:25:02 2008
@@ -0,0 +1,145 @@
+package org.apache.pig.backend.local.executionengine.physicalLayer;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.LogToPhyTranslationVisitor;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
+import org.apache.pig.backend.local.executionengine.physicalLayer.relationalOperators.POCogroup;
+import org.apache.pig.backend.local.executionengine.physicalLayer.relationalOperators.POSplit;
+import org.apache.pig.backend.local.executionengine.physicalLayer.relationalOperators.POSplitOutput;
+import org.apache.pig.data.DataType;
+import org.apache.pig.impl.logicalLayer.LOCogroup;
+import org.apache.pig.impl.logicalLayer.LOSplit;
+import org.apache.pig.impl.logicalLayer.LOSplitOutput;
+import org.apache.pig.impl.logicalLayer.LogicalOperator;
+import org.apache.pig.impl.logicalLayer.LogicalPlan;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.PlanException;
+import org.apache.pig.impl.plan.PlanWalker;
+import org.apache.pig.impl.plan.VisitorException;
+
+
+public class LocalLogToPhyTranslationVisitor extends LogToPhyTranslationVisitor {
+
+ private Log log = LogFactory.getLog(getClass());
+
+ public LocalLogToPhyTranslationVisitor(LogicalPlan plan) {
+ super(plan);
+ // TODO Auto-generated constructor stub
+ }
+
+ public Map<LogicalOperator, PhysicalOperator> getLogToPhyMap() {
+ return LogToPhyMap;
+ }
+
+ @Override
+ public void visit(LOCogroup cg) throws VisitorException {
+ String scope = cg.getOperatorKey().scope;
+ List<LogicalOperator> inputs = cg.getInputs();
+
+ POCogroup poc = new POCogroup(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), cg.getRequestedParallelism());
+
+ currentPlan.add(poc);
+
+ int count = 0;
+ Byte type = null;
+ for(LogicalOperator lo : inputs) {
+ List<LogicalPlan> plans = (List<LogicalPlan>) cg.getGroupByPlans().get(lo);
+
+ POLocalRearrange physOp = new POLocalRearrange(new OperatorKey(
+ scope, nodeGen.getNextNodeId(scope)), cg
+ .getRequestedParallelism());
+ List<PhysicalPlan> exprPlans = new ArrayList<PhysicalPlan>();
+ currentPlans.push(currentPlan);
+ for (LogicalPlan lp : plans) {
+ currentPlan = new PhysicalPlan();
+ PlanWalker<LogicalOperator, LogicalPlan> childWalker = mCurrentWalker
+ .spawnChildWalker(lp);
+ pushWalker(childWalker);
+ mCurrentWalker.walk(this);
+ exprPlans.add((PhysicalPlan) currentPlan);
+ popWalker();
+
+ }
+ currentPlan = currentPlans.pop();
+ physOp.setPlans(exprPlans);
+ physOp.setIndex(count++);
+ if (plans.size() > 1) {
+ type = DataType.TUPLE;
+ physOp.setKeyType(type);
+ } else {
+ type = exprPlans.get(0).getLeaves().get(0).getResultType();
+ physOp.setKeyType(type);
+ }
+ physOp.setResultType(DataType.TUPLE);
+
+ currentPlan.add(physOp);
+
+ try {
+ currentPlan.connect(LogToPhyMap.get(lo), physOp);
+ currentPlan.connect(physOp, poc);
+ } catch (PlanException e) {
+ log.error("Invalid physical operators in the physical plan"
+ + e.getMessage());
+ throw new VisitorException(e);
+ }
+
+ }
+ LogToPhyMap.put(cg, poc);
+ }
+
+ @Override
+ public void visit(LOSplit split) throws VisitorException {
+ String scope = split.getOperatorKey().scope;
+ PhysicalOperator physOp = new POSplit(new OperatorKey(scope, nodeGen
+ .getNextNodeId(scope)), split.getRequestedParallelism());
+
+ LogToPhyMap.put(split, physOp);
+
+ currentPlan.add(physOp);
+ PhysicalOperator from = LogToPhyMap.get(split.getPlan()
+ .getPredecessors(split).get(0));
+ try {
+ currentPlan.connect(from, physOp);
+ } catch (PlanException e) {
+ log.error("Invalid physical operator in the plan" + e.getMessage());
+ throw new VisitorException(e);
+ }
+ }
+
+ @Override
+ public void visit(LOSplitOutput split) throws VisitorException {
+ String scope = split.getOperatorKey().scope;
+ PhysicalOperator physOp = new POSplitOutput(new OperatorKey(scope, nodeGen
+ .getNextNodeId(scope)), split.getRequestedParallelism());
+ LogToPhyMap.put(split, physOp);
+
+ currentPlan.add(physOp);
+ currentPlans.push(currentPlan);
+ currentPlan = new PhysicalPlan();
+ PlanWalker<LogicalOperator, LogicalPlan> childWalker = mCurrentWalker
+ .spawnChildWalker(split.getConditionPlan());
+ pushWalker(childWalker);
+ mCurrentWalker.walk(this);
+ popWalker();
+
+ ((POSplitOutput) physOp).setPlan((PhysicalPlan) currentPlan);
+ currentPlan = currentPlans.pop();
+ currentPlan.add(physOp);
+ PhysicalOperator from = LogToPhyMap.get(split.getPlan()
+ .getPredecessors(split).get(0));
+ try {
+ currentPlan.connect(from, physOp);
+ } catch (PlanException e) {
+ log.error("Invalid physical operator in the plan" + e.getMessage());
+ throw new VisitorException(e);
+ }
+ }
+
+}
Added: incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/physicalLayer/relationalOperators/POCogroup.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/physicalLayer/relationalOperators/POCogroup.java?rev=699506&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/physicalLayer/relationalOperators/POCogroup.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/physicalLayer/relationalOperators/POCogroup.java Fri Sep 26 14:25:02 2008
@@ -0,0 +1,219 @@
+package org.apache.pig.backend.local.executionengine.physicalLayer.relationalOperators;
+
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.SortedDataBag;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.pen.util.ExampleTuple;
+
+/** This is a local implementation of Cogroup.
+ * The inputs need to be connected to LocalRearranges possibly by the
+ * logical to physical translator.
+ *
+ * This is a blocking operator. The outputs of LRs are put into
+ * SortedDataBags. They are sorted on the keys. We then start pulling
+ * tuple out of these bags and start constructing output.
+ *
+ * @author shubhamc
+ *
+ */
+public class POCogroup extends PhysicalOperator {
+
+ Tuple[] data = null;
+ Iterator<Tuple>[] its = null;
+
+ public POCogroup(OperatorKey k) {
+ super(k);
+ // TODO Auto-generated constructor stub
+ }
+
+ public POCogroup(OperatorKey k, int rp) {
+ super(k, rp);
+ // TODO Auto-generated constructor stub
+ }
+
+ public POCogroup(OperatorKey k, List<PhysicalOperator> inp) {
+ super(k, inp);
+ }
+
+ public POCogroup(OperatorKey k, int rp, List<PhysicalOperator> inp) {
+ super(k, rp, inp);
+ }
+
+ @Override
+ public void visit(PhyPlanVisitor v) throws VisitorException {
+ // TODO Auto-generated method stub
+ v.visitCogroup(this);
+
+ }
+
+ @Override
+ public String name() {
+ // TODO Auto-generated method stub
+ return "POCogroup" + "[" + DataType.findTypeName(resultType) + "]" +" - " + mKey.toString();
+ }
+
+ @Override
+ public Result getNext(Tuple t) throws ExecException{
+ if(its == null) {
+ accumulateData();
+ }
+
+ boolean done = true;
+ Result res = new Result();
+ for(int i = 0; i < data.length; i++) {
+ done &= (data[i] == null);
+ }
+ if(done) {
+ res.returnStatus = POStatus.STATUS_EOP;
+ its = null;
+ return res;
+ }
+
+ Tuple smallestTuple = getSmallest(data);
+ Comparator<Tuple> comp = new groupComparator();
+
+ int size = data.length;
+
+ Tuple output = TupleFactory.getInstance().newTuple(size + 1);
+
+ output.set(0, smallestTuple.get(1));
+ for(int i = 1; i < size + 1; i++) {
+ output.set(i, BagFactory.getInstance().newDefaultBag());
+ }
+ ExampleTuple tOut = null;
+ if(lineageTracer != null) {
+ tOut = new ExampleTuple(output);
+ lineageTracer.insert(tOut);
+ }
+
+ boolean loop = true;
+
+ while(loop) {
+ loop = false;
+ for(int i = 0; i < size; i++) {
+ if(data[i] != null && comp.compare(data[i], smallestTuple) == 0) {
+ loop = true;
+ DataBag bag = (DataBag) output.get(i + 1);
+ //update lineage if it exists
+ //Tuple temp = ((IndexedTuple) data[i].get(1)).toTuple();
+ Tuple temp = (Tuple) data[i].get(2);
+ if(lineageTracer != null) {
+ if(((ExampleTuple)temp).synthetic) tOut.synthetic = true;
+ lineageTracer.union(temp, tOut);
+ }
+ //bag.add(((IndexedTuple) data[i].get(1)).toTuple());
+ bag.add(temp);
+ if(its[i].hasNext())
+ data[i] = its[i].next();
+ else
+ data[i] = null;
+
+
+ }
+ }
+ }
+ if(lineageTracer != null)
+ res.result = tOut;
+ else
+ res.result = output;
+
+ res.returnStatus = POStatus.STATUS_OK;
+
+ return res;
+ }
+
+ private void accumulateData() throws ExecException {
+ int size = inputs.size();
+ its = new Iterator[size];
+ data = new Tuple[size];
+ for(int i = 0; i < size; i++) {
+ DataBag bag = new SortedDataBag(new groupComparator());
+ for(Result input = inputs.get(i).getNext(dummyTuple); input.returnStatus != POStatus.STATUS_EOP; input = inputs.get(i).getNext(dummyTuple)) {
+ if(input.returnStatus == POStatus.STATUS_ERR) {
+ throw new ExecException("Error accumulating output at local Cogroup operator");
+ }
+ bag.add((Tuple) input.result);
+ }
+ its[i] = bag.iterator();
+ data[i] = its[i].next();
+ }
+
+ }
+
+// private Tuple getSmallest(Tuple[] data) {
+// Tuple t = (Tuple) data[0];
+// Comparator<Tuple> comp = new groupComparator();
+// for(int i = 1; i < data.length; i++) {
+// if(comp.compare(t, (Tuple) data[i]) < 0)
+// t = data[i];
+// }
+// return t;
+// }
+
+ private Tuple getSmallest(Tuple[] data) {
+ Tuple t = null;
+ Comparator<Tuple> comp = new groupComparator();
+
+ for(int i = 0; i < data.length; i++) {
+ if(data[i] == null) continue;
+ if(t == null) {
+ t = data[i];
+ continue; //since the previous data was probably null so we dont really need a comparison
+ }
+ if(comp.compare(t, (Tuple) data[i]) < 0)
+ t = data[i];
+ }
+ return t;
+ }
+
+ @Override
+ public boolean supportsMultipleInputs() {
+ // TODO Auto-generated method stub
+ return true;
+ }
+
+ @Override
+ public boolean supportsMultipleOutputs() {
+ // TODO Auto-generated method stub
+ return false;
+ }
+
+ private class groupComparator implements Comparator<Tuple> {
+
+ public int compare(Tuple o1, Tuple o2) {
+ //We want to make it as efficient as possible by only comparing the keys
+ Object t1 = null;
+ Object t2 = null;
+ try {
+ t1 = o1.get(1);
+ t2 = o2.get(1);
+
+ } catch (ExecException e) {
+ // TODO Auto-generated catch block
+ throw new RuntimeException("Error comparing tuples");
+ }
+
+ return DataType.compare(t1, t2);
+ }
+
+ public boolean equals(Object obj) {
+ return this.equals(obj);
+ }
+
+ }
+
+}
Added: incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/physicalLayer/relationalOperators/POSplit.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/physicalLayer/relationalOperators/POSplit.java?rev=699506&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/physicalLayer/relationalOperators/POSplit.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/physicalLayer/relationalOperators/POSplit.java Fri Sep 26 14:25:02 2008
@@ -0,0 +1,87 @@
+package org.apache.pig.backend.local.executionengine.physicalLayer.relationalOperators;
+
+import java.util.List;
+
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.VisitorException;
+
+public class POSplit extends PhysicalOperator {
+
+
+ /**
+ * POSplit is a blocking operator. It reads the data from its input into a databag and then returns the iterator
+ * of that bag to POSplitOutputs which do the necessary filtering
+ */
+ private static final long serialVersionUID = 1L;
+
+ DataBag data = null;
+
+ boolean processingDone = false;
+
+ public POSplit(OperatorKey k, int rp, List<PhysicalOperator> inp) {
+ super(k, rp, inp);
+ // TODO Auto-generated constructor stub
+ data = BagFactory.getInstance().newDefaultBag();
+ }
+
+ public POSplit(OperatorKey k, int rp) {
+ this(k, rp, null);
+ }
+
+ public POSplit(OperatorKey k, List<PhysicalOperator> inp) {
+
+ this(k, -1, inp);
+ }
+
+ public POSplit(OperatorKey k) {
+ this(k, -1, null);
+ }
+
+ public Result getNext(Tuple t) throws ExecException{
+ if(!processingDone) {
+ for(Result input = inputs.get(0).getNext(dummyTuple); input.returnStatus != POStatus.STATUS_EOP; input = inputs.get(0).getNext(dummyTuple)) {
+ if(input.returnStatus == POStatus.STATUS_ERR) {
+ throw new ExecException("Error accumulating output at local Split operator");
+ }
+ data.add((Tuple) input.result);
+ }
+ processingDone = true;
+ }
+
+ Result res = new Result();
+ res.returnStatus = POStatus.STATUS_OK;
+ res.result = data.iterator();
+ return res;
+ }
+
+ @Override
+ public void visit(PhyPlanVisitor v) throws VisitorException {
+ v.visitSplit(this);
+ }
+
+ @Override
+ public String name() {
+ return "Split - " + mKey.toString();
+ }
+
+ @Override
+ public boolean supportsMultipleInputs() {
+ // TODO Auto-generated method stub
+ return false;
+ }
+
+ @Override
+ public boolean supportsMultipleOutputs() {
+ // TODO Auto-generated method stub
+ return true;
+ }
+
+}
Added: incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/physicalLayer/relationalOperators/POSplitOutput.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/physicalLayer/relationalOperators/POSplitOutput.java?rev=699506&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/physicalLayer/relationalOperators/POSplitOutput.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/physicalLayer/relationalOperators/POSplitOutput.java Fri Sep 26 14:25:02 2008
@@ -0,0 +1,114 @@
+package org.apache.pig.backend.local.executionengine.physicalLayer.relationalOperators;
+
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.pen.util.ExampleTuple;
+
+public class POSplitOutput extends PhysicalOperator {
+
+ /**
+ * POSplitOutput reads from POSplit using an iterator
+ */
+ private static final long serialVersionUID = 1L;
+
+ PhysicalOperator compOp;
+ PhysicalPlan compPlan;
+ Iterator<Tuple> it;
+
+ public POSplitOutput(OperatorKey k, int rp, List<PhysicalOperator> inp) {
+ super(k, rp, inp);
+ // TODO Auto-generated constructor stub
+ }
+
+ public POSplitOutput(OperatorKey k, int rp) {
+ super(k, rp);
+ // TODO Auto-generated constructor stub
+ }
+
+ public POSplitOutput(OperatorKey k, List<PhysicalOperator> inp) {
+ super(k, inp);
+ // TODO Auto-generated constructor stub
+ }
+
+ public POSplitOutput(OperatorKey k) {
+ super(k);
+ // TODO Auto-generated constructor stub
+ }
+
+ @Override
+ public void visit(PhyPlanVisitor v) throws VisitorException {
+ // TODO Auto-generated method stub
+
+ }
+
+ public Result getNext(Tuple t) throws ExecException {
+ if(it == null) {
+ PhysicalOperator op = getInputs().get(0);
+ Result res = getInputs().get(0).getNext(t);
+ if(res.returnStatus == POStatus.STATUS_OK)
+ it = (Iterator<Tuple>) res.result;
+ }
+ Result res = null;
+ Result inp = new Result();
+ while(true) {
+ if(it.hasNext())
+ inp.result = it.next();
+ else {
+ inp.returnStatus = POStatus.STATUS_EOP;
+ return inp;
+ }
+ inp.returnStatus = POStatus.STATUS_OK;
+
+ compPlan.attachInput((Tuple) inp.result);
+
+ res = compOp.getNext(dummyBool);
+ if (res.returnStatus != POStatus.STATUS_OK
+ && res.returnStatus != POStatus.STATUS_NULL)
+ return res;
+
+ if (res.result != null && (Boolean) res.result == true) {
+ if(lineageTracer != null) {
+ ExampleTuple tIn = (ExampleTuple) inp.result;
+ lineageTracer.insert(tIn);
+ lineageTracer.union(tIn, tIn);
+ }
+ return inp;
+ }
+ }
+
+ }
+
+ @Override
+ public String name() {
+ // TODO Auto-generated method stub
+ return "POSplitOutput " + mKey.toString();
+ }
+
+ @Override
+ public boolean supportsMultipleInputs() {
+ // TODO Auto-generated method stub
+ return false;
+ }
+
+ @Override
+ public boolean supportsMultipleOutputs() {
+ // TODO Auto-generated method stub
+ return false;
+ }
+
+ public void setPlan(PhysicalPlan compPlan) {
+ this.compPlan = compPlan;
+ this.compOp = compPlan.getLeaves().get(0);
+ }
+
+}