You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ol...@apache.org on 2008/09/26 23:25:02 UTC

svn commit: r699506 - in /incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/physicalLayer: ./ relationalOperators/

Author: olga
Date: Fri Sep 26 14:25:02 2008
New Revision: 699506

URL: http://svn.apache.org/viewvc?rev=699506&view=rev
Log:
missing illustrate files

Added:
    incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/physicalLayer/
    incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/physicalLayer/LocalLogToPhyTranslationVisitor.java
    incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/physicalLayer/relationalOperators/
    incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/physicalLayer/relationalOperators/POCogroup.java
    incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/physicalLayer/relationalOperators/POSplit.java
    incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/physicalLayer/relationalOperators/POSplitOutput.java

Added: incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/physicalLayer/LocalLogToPhyTranslationVisitor.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/physicalLayer/LocalLogToPhyTranslationVisitor.java?rev=699506&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/physicalLayer/LocalLogToPhyTranslationVisitor.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/physicalLayer/LocalLogToPhyTranslationVisitor.java Fri Sep 26 14:25:02 2008
@@ -0,0 +1,145 @@
+package org.apache.pig.backend.local.executionengine.physicalLayer;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.LogToPhyTranslationVisitor;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
+import org.apache.pig.backend.local.executionengine.physicalLayer.relationalOperators.POCogroup;
+import org.apache.pig.backend.local.executionengine.physicalLayer.relationalOperators.POSplit;
+import org.apache.pig.backend.local.executionengine.physicalLayer.relationalOperators.POSplitOutput;
+import org.apache.pig.data.DataType;
+import org.apache.pig.impl.logicalLayer.LOCogroup;
+import org.apache.pig.impl.logicalLayer.LOSplit;
+import org.apache.pig.impl.logicalLayer.LOSplitOutput;
+import org.apache.pig.impl.logicalLayer.LogicalOperator;
+import org.apache.pig.impl.logicalLayer.LogicalPlan;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.PlanException;
+import org.apache.pig.impl.plan.PlanWalker;
+import org.apache.pig.impl.plan.VisitorException;
+
+
+public class LocalLogToPhyTranslationVisitor extends LogToPhyTranslationVisitor {
+
+    private Log log = LogFactory.getLog(getClass());
+    
+    public LocalLogToPhyTranslationVisitor(LogicalPlan plan) {
+	super(plan);
+	// TODO Auto-generated constructor stub
+    }
+    
+    public Map<LogicalOperator, PhysicalOperator> getLogToPhyMap() {
+	return LogToPhyMap;
+    }
+    
+    @Override
+    public void visit(LOCogroup cg) throws VisitorException {
+	String scope = cg.getOperatorKey().scope;
+        List<LogicalOperator> inputs = cg.getInputs();
+        
+        POCogroup poc = new POCogroup(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), cg.getRequestedParallelism());
+        
+        currentPlan.add(poc);
+        
+        int count = 0;
+        Byte type = null;
+        for(LogicalOperator lo : inputs) {
+            List<LogicalPlan> plans = (List<LogicalPlan>) cg.getGroupByPlans().get(lo);
+            
+            POLocalRearrange physOp = new POLocalRearrange(new OperatorKey(
+                    scope, nodeGen.getNextNodeId(scope)), cg
+                    .getRequestedParallelism());
+            List<PhysicalPlan> exprPlans = new ArrayList<PhysicalPlan>();
+            currentPlans.push(currentPlan);
+            for (LogicalPlan lp : plans) {
+                currentPlan = new PhysicalPlan();
+                PlanWalker<LogicalOperator, LogicalPlan> childWalker = mCurrentWalker
+                        .spawnChildWalker(lp);
+                pushWalker(childWalker);
+                mCurrentWalker.walk(this);
+                exprPlans.add((PhysicalPlan) currentPlan);
+                popWalker();
+
+            }
+            currentPlan = currentPlans.pop();
+            physOp.setPlans(exprPlans);
+            physOp.setIndex(count++);
+            if (plans.size() > 1) {
+                type = DataType.TUPLE;
+                physOp.setKeyType(type);
+            } else {
+                type = exprPlans.get(0).getLeaves().get(0).getResultType();
+                physOp.setKeyType(type);
+            }
+            physOp.setResultType(DataType.TUPLE);
+
+            currentPlan.add(physOp);
+
+            try {
+                currentPlan.connect(LogToPhyMap.get(lo), physOp);
+                currentPlan.connect(physOp, poc);
+            } catch (PlanException e) {
+                log.error("Invalid physical operators in the physical plan"
+                        + e.getMessage());
+                throw new VisitorException(e);
+            }
+            
+        }
+        LogToPhyMap.put(cg, poc);
+    }
+    
+    @Override
+    public void visit(LOSplit split) throws VisitorException {
+	String scope = split.getOperatorKey().scope;
+        PhysicalOperator physOp = new POSplit(new OperatorKey(scope, nodeGen
+                .getNextNodeId(scope)), split.getRequestedParallelism());
+        
+        LogToPhyMap.put(split, physOp);
+
+        currentPlan.add(physOp);
+        PhysicalOperator from = LogToPhyMap.get(split.getPlan()
+                .getPredecessors(split).get(0));
+        try {
+            currentPlan.connect(from, physOp);
+        } catch (PlanException e) {
+            log.error("Invalid physical operator in the plan" + e.getMessage());
+            throw new VisitorException(e);
+        }
+    }
+    
+    @Override
+    public void visit(LOSplitOutput split) throws VisitorException {
+	String scope = split.getOperatorKey().scope;
+        PhysicalOperator physOp = new POSplitOutput(new OperatorKey(scope, nodeGen
+                .getNextNodeId(scope)), split.getRequestedParallelism());
+        LogToPhyMap.put(split, physOp);
+
+        currentPlan.add(physOp);
+        currentPlans.push(currentPlan);
+        currentPlan = new PhysicalPlan();
+        PlanWalker<LogicalOperator, LogicalPlan> childWalker = mCurrentWalker
+                .spawnChildWalker(split.getConditionPlan());
+        pushWalker(childWalker);
+        mCurrentWalker.walk(this);
+        popWalker();
+
+        ((POSplitOutput) physOp).setPlan((PhysicalPlan) currentPlan);
+        currentPlan = currentPlans.pop();
+        currentPlan.add(physOp);
+        PhysicalOperator from = LogToPhyMap.get(split.getPlan()
+                .getPredecessors(split).get(0));
+        try {
+            currentPlan.connect(from, physOp);
+        } catch (PlanException e) {
+            log.error("Invalid physical operator in the plan" + e.getMessage());
+            throw new VisitorException(e);
+        }
+    }
+
+}

Added: incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/physicalLayer/relationalOperators/POCogroup.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/physicalLayer/relationalOperators/POCogroup.java?rev=699506&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/physicalLayer/relationalOperators/POCogroup.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/physicalLayer/relationalOperators/POCogroup.java Fri Sep 26 14:25:02 2008
@@ -0,0 +1,219 @@
+package org.apache.pig.backend.local.executionengine.physicalLayer.relationalOperators;
+
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.SortedDataBag;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.pen.util.ExampleTuple;
+
+/** This is a local implementation of Cogroup.
+ * The inputs need to be connected to LocalRearranges possibly by the
+ * logical to physical translator.
+ * 
+ * This is a blocking operator. The outputs of LRs are put into
+ * SortedDataBags. They are sorted on the keys. We then start pulling
+ * tuple out of these bags and start constructing output.
+ * 
+ * @author shubhamc
+ *
+ */
+public class POCogroup extends PhysicalOperator {
+    
+    Tuple[] data = null;
+    Iterator<Tuple>[] its = null;
+
+    public POCogroup(OperatorKey k) {
+	super(k);
+	// TODO Auto-generated constructor stub
+    }
+
+    public POCogroup(OperatorKey k, int rp) {
+	super(k, rp);
+	// TODO Auto-generated constructor stub
+    }
+
+    public POCogroup(OperatorKey k, List<PhysicalOperator> inp) {
+	super(k, inp);
+    }
+
+    public POCogroup(OperatorKey k, int rp, List<PhysicalOperator> inp) {
+	super(k, rp, inp);
+    }
+
+    @Override
+    public void visit(PhyPlanVisitor v) throws VisitorException {
+	// TODO Auto-generated method stub
+	v.visitCogroup(this);
+
+    }
+
+    @Override
+    public String name() {
+	// TODO Auto-generated method stub
+	return "POCogroup" + "[" + DataType.findTypeName(resultType) + "]" +" - " + mKey.toString();
+    }
+    
+    @Override
+    public Result getNext(Tuple t) throws ExecException{
+	if(its == null) {
+	    accumulateData();
+	}
+	
+	boolean done = true;
+	Result res = new Result();
+	for(int i = 0; i < data.length; i++) {
+	    done &= (data[i] == null);
+	}
+	if(done) {
+	    res.returnStatus = POStatus.STATUS_EOP;
+	    its = null;
+	    return res;
+	}
+	
+	Tuple smallestTuple = getSmallest(data);
+	Comparator<Tuple> comp = new groupComparator();
+	
+	int size = data.length;
+	
+	Tuple output = TupleFactory.getInstance().newTuple(size + 1);
+	
+	output.set(0, smallestTuple.get(1));
+	for(int i = 1; i < size + 1; i++) {
+	    output.set(i, BagFactory.getInstance().newDefaultBag());
+	}
+	ExampleTuple tOut = null;
+	if(lineageTracer != null) {
+	    tOut = new ExampleTuple(output);
+	    lineageTracer.insert(tOut);
+	}
+	
+	boolean loop = true;
+	
+	while(loop) {
+	    loop = false;
+	    for(int i = 0; i < size; i++) {
+		if(data[i] != null && comp.compare(data[i], smallestTuple) == 0) {
+		    loop = true;
+		    DataBag bag = (DataBag) output.get(i + 1);
+		    //update lineage if it exists
+		    //Tuple temp = ((IndexedTuple) data[i].get(1)).toTuple();
+		    Tuple temp = (Tuple) data[i].get(2);
+		    if(lineageTracer != null) {
+			if(((ExampleTuple)temp).synthetic) tOut.synthetic = true;
+			lineageTracer.union(temp, tOut);
+		    }
+		    //bag.add(((IndexedTuple) data[i].get(1)).toTuple());
+		    bag.add(temp);
+		    if(its[i].hasNext()) 
+			data[i] = its[i].next();
+		    else
+			data[i] = null;
+			
+		    
+		}
+	    }
+	}
+	if(lineageTracer != null)
+	    res.result = tOut;
+	else
+	    res.result = output;
+	
+	res.returnStatus = POStatus.STATUS_OK;
+	
+	return res;
+    }
+    
+    private void accumulateData() throws ExecException {
+	int size = inputs.size();
+	its = new Iterator[size];
+	data = new Tuple[size];
+	for(int i = 0; i < size; i++) {
+	    DataBag bag = new SortedDataBag(new groupComparator());
+	    for(Result input = inputs.get(i).getNext(dummyTuple); input.returnStatus != POStatus.STATUS_EOP; input = inputs.get(i).getNext(dummyTuple)) {
+		if(input.returnStatus == POStatus.STATUS_ERR) {
+		    throw new ExecException("Error accumulating output at local Cogroup operator");
+		}
+		bag.add((Tuple) input.result);
+	    }
+	    its[i] = bag.iterator();
+	    data[i] = its[i].next();
+	}
+	
+    }
+    
+//    private Tuple getSmallest(Tuple[] data) {
+//	Tuple t = (Tuple) data[0];
+//	Comparator<Tuple> comp = new groupComparator();
+//	for(int i = 1; i < data.length; i++) {
+//	    if(comp.compare(t, (Tuple) data[i]) < 0) 
+//		t = data[i];
+//	}
+//	return t;
+//    }
+    
+    private Tuple getSmallest(Tuple[] data) {
+	Tuple t = null;
+	Comparator<Tuple> comp = new groupComparator();
+	
+	for(int i = 0; i < data.length; i++) {
+	    if(data[i] == null) continue;
+	    if(t == null) {
+		t = data[i];
+		continue; //since the previous data was probably null so we dont really need a comparison
+	    }
+	    if(comp.compare(t, (Tuple) data[i]) < 0) 
+		t = data[i];
+	}
+	return t;
+    }
+
+    @Override
+    public boolean supportsMultipleInputs() {
+	// TODO Auto-generated method stub
+	return true;
+    }
+
+    @Override
+    public boolean supportsMultipleOutputs() {
+	// TODO Auto-generated method stub
+	return false;
+    }
+    
+    private class groupComparator implements Comparator<Tuple> {
+
+	public int compare(Tuple o1, Tuple o2) {
+	    //We want to make it as efficient as possible by only comparing the keys
+	    Object t1 = null;
+	    Object t2 = null;
+	    try {
+		t1 = o1.get(1);
+		t2 = o2.get(1);
+	    
+	    } catch (ExecException e) {
+		// TODO Auto-generated catch block
+		throw new RuntimeException("Error comparing tuples");
+	    }
+	    
+	    return DataType.compare(t1, t2);
+	}
+	
+	public boolean equals(Object obj) {
+	    return this.equals(obj);
+	}
+	
+    }
+
+}

Added: incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/physicalLayer/relationalOperators/POSplit.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/physicalLayer/relationalOperators/POSplit.java?rev=699506&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/physicalLayer/relationalOperators/POSplit.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/physicalLayer/relationalOperators/POSplit.java Fri Sep 26 14:25:02 2008
@@ -0,0 +1,87 @@
+package org.apache.pig.backend.local.executionengine.physicalLayer.relationalOperators;
+
+import java.util.List;
+
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.VisitorException;
+
+public class POSplit extends PhysicalOperator {
+    
+    
+    /**
+     * POSplit is a blocking operator. It reads the data from its input into a databag and then returns the iterator
+     * of that bag to POSplitOutputs which do the necessary filtering
+     */
+    private static final long serialVersionUID = 1L;
+
+    DataBag data = null;
+    
+    boolean processingDone = false;
+
+    public POSplit(OperatorKey k, int rp, List<PhysicalOperator> inp) {
+	super(k, rp, inp);
+	// TODO Auto-generated constructor stub
+	data = BagFactory.getInstance().newDefaultBag();
+    }
+
+    public POSplit(OperatorKey k, int rp) {
+	this(k, rp, null);
+    }
+
+    public POSplit(OperatorKey k, List<PhysicalOperator> inp) {
+	
+	this(k, -1, inp);
+    }
+
+    public POSplit(OperatorKey k) {
+	 this(k, -1, null);
+    }
+
+    public Result getNext(Tuple t) throws ExecException{
+	if(!processingDone) {
+	    for(Result input = inputs.get(0).getNext(dummyTuple); input.returnStatus != POStatus.STATUS_EOP; input = inputs.get(0).getNext(dummyTuple)) {
+		if(input.returnStatus == POStatus.STATUS_ERR) {
+		    throw new ExecException("Error accumulating output at local Split operator");
+		}
+		data.add((Tuple) input.result);
+	    }
+	    processingDone = true;
+	}
+
+	Result res = new Result();
+	res.returnStatus = POStatus.STATUS_OK;
+	res.result = data.iterator();
+	return res;
+    }
+
+    @Override
+    public void visit(PhyPlanVisitor v) throws VisitorException {
+	v.visitSplit(this);
+    }
+
+    @Override
+    public String name() {
+	return "Split - " + mKey.toString();
+    }
+
+    @Override
+    public boolean supportsMultipleInputs() {
+	// TODO Auto-generated method stub
+	return false;
+    }
+
+    @Override
+    public boolean supportsMultipleOutputs() {
+	// TODO Auto-generated method stub
+	return true;
+    }
+
+}

Added: incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/physicalLayer/relationalOperators/POSplitOutput.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/physicalLayer/relationalOperators/POSplitOutput.java?rev=699506&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/physicalLayer/relationalOperators/POSplitOutput.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/physicalLayer/relationalOperators/POSplitOutput.java Fri Sep 26 14:25:02 2008
@@ -0,0 +1,114 @@
+package org.apache.pig.backend.local.executionengine.physicalLayer.relationalOperators;
+
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.pen.util.ExampleTuple;
+
+public class POSplitOutput extends PhysicalOperator {
+
+    /**
+     * POSplitOutput reads from POSplit using an iterator
+     */
+    private static final long serialVersionUID = 1L;
+    
+    PhysicalOperator compOp;
+    PhysicalPlan compPlan;
+    Iterator<Tuple> it;
+    
+    public POSplitOutput(OperatorKey k, int rp, List<PhysicalOperator> inp) {
+	super(k, rp, inp);
+	// TODO Auto-generated constructor stub
+    }
+
+    public POSplitOutput(OperatorKey k, int rp) {
+	super(k, rp);
+	// TODO Auto-generated constructor stub
+    }
+
+    public POSplitOutput(OperatorKey k, List<PhysicalOperator> inp) {
+	super(k, inp);
+	// TODO Auto-generated constructor stub
+    }
+
+    public POSplitOutput(OperatorKey k) {
+	super(k);
+	// TODO Auto-generated constructor stub
+    }
+
+    @Override
+    public void visit(PhyPlanVisitor v) throws VisitorException {
+	// TODO Auto-generated method stub
+
+    }
+    
+    public Result getNext(Tuple t) throws ExecException {
+	if(it == null) {
+	    PhysicalOperator op = getInputs().get(0);
+	    Result res = getInputs().get(0).getNext(t);
+	    if(res.returnStatus == POStatus.STATUS_OK)
+		it = (Iterator<Tuple>) res.result;
+	}
+	Result res = null;
+	Result inp = new Result();
+	while(true) {
+	    if(it.hasNext())
+		inp.result = it.next();
+	    else {
+		inp.returnStatus = POStatus.STATUS_EOP;
+		return inp;
+	    }
+	    inp.returnStatus = POStatus.STATUS_OK;
+
+	    compPlan.attachInput((Tuple) inp.result);
+
+	    res = compOp.getNext(dummyBool);
+	    if (res.returnStatus != POStatus.STATUS_OK 
+		    && res.returnStatus != POStatus.STATUS_NULL) 
+		return res;
+
+	    if (res.result != null && (Boolean) res.result == true) {
+		if(lineageTracer != null) {
+		    ExampleTuple tIn = (ExampleTuple) inp.result;
+		    lineageTracer.insert(tIn);
+		    lineageTracer.union(tIn, tIn);
+		}
+		return inp;
+	    }
+	}
+        
+    }
+
+    @Override
+    public String name() {
+	// TODO Auto-generated method stub
+	return "POSplitOutput " + mKey.toString();
+    }
+
+    @Override
+    public boolean supportsMultipleInputs() {
+	// TODO Auto-generated method stub
+	return false;
+    }
+
+    @Override
+    public boolean supportsMultipleOutputs() {
+	// TODO Auto-generated method stub
+	return false;
+    }
+    
+    public void setPlan(PhysicalPlan compPlan) {
+	this.compPlan = compPlan;
+	this.compOp = compPlan.getLeaves().get(0);
+    }
+
+}