You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ya...@apache.org on 2010/12/13 20:11:04 UTC
svn commit: r1045314 [3/5] - in /pig/trunk: ./ src/org/apache/pig/
src/org/apache/pig/backend/hadoop/executionengine/
src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/
src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/ src...
Modified: pig/trunk/src/org/apache/pig/pen/AugmentBaseDataVisitor.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/pen/AugmentBaseDataVisitor.java?rev=1045314&r1=1045313&r2=1045314&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/pen/AugmentBaseDataVisitor.java (original)
+++ pig/trunk/src/org/apache/pig/pen/AugmentBaseDataVisitor.java Mon Dec 13 19:11:00 2010
@@ -24,10 +24,13 @@ import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.HashSet;
+import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.data.BagFactory;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataByteArray;
@@ -41,6 +44,7 @@ import org.apache.pig.impl.logicalLayer.
import org.apache.pig.impl.logicalLayer.LOAnd;
import org.apache.pig.impl.logicalLayer.LOCast;
import org.apache.pig.impl.logicalLayer.LOCogroup;
+import org.apache.pig.impl.logicalLayer.LOLimit;
import org.apache.pig.impl.logicalLayer.LOConst;
import org.apache.pig.impl.logicalLayer.LOCross;
import org.apache.pig.impl.logicalLayer.LODistinct;
@@ -69,10 +73,10 @@ import org.apache.pig.impl.logicalLayer.
import org.apache.pig.impl.logicalLayer.LogicalOperator;
import org.apache.pig.impl.logicalLayer.LogicalPlan;
import org.apache.pig.impl.logicalLayer.schema.Schema;
-import org.apache.pig.impl.plan.DepthFirstWalker;
import org.apache.pig.impl.plan.VisitorException;
import org.apache.pig.pen.util.ExampleTuple;
import org.apache.pig.pen.util.PreOrderDepthFirstWalker;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLimit;
//This is used to generate synthetic data
//Synthetic data generation is done by making constraint tuples for each operator as we traverse the plan
@@ -83,6 +87,9 @@ public class AugmentBaseDataVisitor exte
Map<LOLoad, DataBag> baseData = null;
Map<LOLoad, DataBag> newBaseData = new HashMap<LOLoad, DataBag>();
Map<LogicalOperator, DataBag> derivedData = null;
+ private boolean limit = false;
+ private final Map<LogicalOperator, PhysicalOperator> logToPhysMap;
+ private Map<LOLimit, Long> oriLimitMap;
Map<LogicalOperator, DataBag> outputConstraintsMap = new HashMap<LogicalOperator, DataBag>();
@@ -91,15 +98,20 @@ public class AugmentBaseDataVisitor exte
// Augmentation moves from the leaves to root and hence needs a
// depthfirstwalker
public AugmentBaseDataVisitor(LogicalPlan plan,
+ Map<LogicalOperator, PhysicalOperator> logToPhysMap,
Map<LOLoad, DataBag> baseData,
Map<LogicalOperator, DataBag> derivedData) {
super(plan, new PreOrderDepthFirstWalker<LogicalOperator, LogicalPlan>(
plan));
this.baseData = baseData;
this.derivedData = derivedData;
-
+ this.logToPhysMap = logToPhysMap;
}
+ public void setLimit() {
+ limit = true;
+ }
+
public Map<LOLoad, DataBag> getNewBaseData() {
for (Map.Entry<LOLoad, DataBag> e : baseData.entrySet()) {
DataBag bag = newBaseData.get(e.getKey());
@@ -112,8 +124,14 @@ public class AugmentBaseDataVisitor exte
return newBaseData;
}
+ public Map<LOLimit, Long> getOriLimitMap() {
+ return oriLimitMap;
+ }
+
@Override
protected void visit(LOCogroup cg) throws VisitorException {
+ if (limit && !((PreOrderDepthFirstWalker<LogicalOperator, LogicalPlan>) mCurrentWalker).getBranchFlag())
+ return;
// we first get the outputconstraints for the current cogroup
DataBag outputConstraints = outputConstraintsMap.get(cg);
outputConstraintsMap.remove(cg);
@@ -234,11 +252,58 @@ public class AugmentBaseDataVisitor exte
@Override
protected void visit(LODistinct dt) throws VisitorException {
+ if (limit && !((PreOrderDepthFirstWalker<LogicalOperator, LogicalPlan>) mCurrentWalker).getBranchFlag())
+ return;
+
+ DataBag outputConstraints = outputConstraintsMap.get(dt);
+ outputConstraintsMap.remove(dt);
+ DataBag inputConstraints = outputConstraintsMap.get(dt.getInput());
+ if (inputConstraints == null) {
+ inputConstraints = BagFactory.getInstance().newDefaultBag();
+ outputConstraintsMap.put(dt.getInput(), inputConstraints);
+ }
+
+ if (outputConstraints != null && outputConstraints.size() > 0) {
+ for (Iterator<Tuple> it = outputConstraints.iterator(); it.hasNext();)
+ {
+ inputConstraints.add(it.next());
+ }
+ }
+
+ boolean emptyInputConstraints = inputConstraints.size() == 0;
+ if (emptyInputConstraints) {
+ DataBag inputData = derivedData.get(dt.getInput());
+ for (Iterator<Tuple> it = inputData.iterator(); it.hasNext();)
+ {
+ inputConstraints.add(it.next());
+ }
+ }
+ Set<Tuple> distinctSet = new HashSet<Tuple>();
+ Iterator<Tuple> it;
+ for (it = inputConstraints.iterator(); it.hasNext();) {
+ if (!distinctSet.add(it.next()))
+ break;
+ }
+ if (!it.hasNext())
+ {
+ // no duplicates found: generate one
+ if (inputConstraints.size()> 0) {
+ Tuple src = ((ExampleTuple)inputConstraints.iterator().next()).toTuple(),
+ tgt = TupleFactory.getInstance().newTuple(src.getAll());
+ ExampleTuple inputConstraint = new ExampleTuple(tgt);
+ inputConstraint.synthetic = true;
+ inputConstraints.add(inputConstraint);
+ } else if (emptyInputConstraints)
+ inputConstraints.clear();
+ }
}
@Override
protected void visit(LOFilter filter) throws VisitorException {
+ if (limit && !((PreOrderDepthFirstWalker<LogicalOperator, LogicalPlan>) mCurrentWalker).getBranchFlag())
+ return;
+
DataBag outputConstraints = outputConstraintsMap.get(filter);
outputConstraintsMap.remove(filter);
@@ -308,6 +373,8 @@ public class AugmentBaseDataVisitor exte
@Override
protected void visit(LOForEach forEach) throws VisitorException {
+ if (limit && !((PreOrderDepthFirstWalker<LogicalOperator, LogicalPlan>) mCurrentWalker).getBranchFlag())
+ return;
DataBag outputConstraints = outputConstraintsMap.get(forEach);
outputConstraintsMap.remove(forEach);
List<LogicalPlan> plans = forEach.getForEachPlans();
@@ -388,8 +455,8 @@ public class AugmentBaseDataVisitor exte
outputConstraintsMap.remove(load);
// check if the inputData exists
if (inputData == null || inputData.size() == 0) {
- log.error("No input data found!");
- throw new RuntimeException("No input data found!");
+ log.error("No (valid) input data found!");
+ throw new RuntimeException("No (valid) input data found!");
}
// first of all, we are required to guarantee that there is at least one
@@ -403,11 +470,13 @@ public class AugmentBaseDataVisitor exte
// create example tuple to steal values from when we encounter
// "don't care" fields (i.e. null fields)
Tuple exampleTuple = inputData.iterator().next();
+ System.out.println(exampleTuple.toString());
// run through output constraints; for each one synthesize a tuple and
// add it to the base data
// (while synthesizing individual fields, try to match fields that exist
// in the real data)
+ boolean newInput = false;
for (Iterator<Tuple> it = outputConstraints.iterator(); it.hasNext();) {
Tuple outputConstraint = it.next();
@@ -423,10 +492,15 @@ public class AugmentBaseDataVisitor exte
try {
for (int i = 0; i < inputTuple.size(); i++) {
Object d = outputConstraint.get(i);
- if (d == null)
+ if (d == null && i < exampleTuple.size())
d = exampleTuple.get(i);
inputTuple.set(i, d);
}
+ if (outputConstraint instanceof ExampleTuple)
+ inputTuple.synthetic = ((ExampleTuple) outputConstraint).synthetic;
+ else
+ // raw tuple should have been synthesized
+ inputTuple.synthetic = true;
} catch (ExecException e) {
log
.error("Error visiting Load during Augmentation phase of Example Generator! "
@@ -436,15 +510,55 @@ public class AugmentBaseDataVisitor exte
+ e.getMessage());
}
- if (!inputTuple.equals(exampleTuple))
- inputTuple.synthetic = true;
-
- newInputData.add(inputTuple);
+ try {
+ if (inputTuple.synthetic || !inInput(inputTuple, inputData, schema))
+ {
+ inputTuple.synthetic = true;
+
+ newInputData.add(inputTuple);
+
+ if (!newInput)
+ newInput = true;
+ }
+ } catch (ExecException e) {
+ throw new VisitorException(
+ "Error visiting Load during Augmentation phase of Example Generator! "
+ + e.getMessage());
+ }
+ }
+
+ if (newInput) {
+ for (Map.Entry<LOLoad, DataBag> entry : newBaseData.entrySet()) {
+ LOLoad otherLoad = entry.getKey();
+ if (otherLoad != load && otherLoad.getInputFile().equals(load.getInputFile())) {
+ // different load sharing the same input file
+ entry.getValue().addAll(newInputData);
+ }
+ }
}
}
+ private boolean inInput(Tuple newTuple, DataBag input, Schema schema) throws ExecException {
+ boolean result;
+ for (Iterator<Tuple> iter = input.iterator(); iter.hasNext();) {
+ result = true;
+ Tuple tmp = iter.next();
+ for (int i = 0; i < schema.size(); ++i)
+ if (!newTuple.get(i).equals(tmp.get(i)))
+ {
+ result = false;
+ break;
+ }
+ if (result)
+ return true;
+ }
+ return false;
+ }
+
@Override
protected void visit(LOSort s) throws VisitorException {
+ if (limit && !((PreOrderDepthFirstWalker<LogicalOperator, LogicalPlan>) mCurrentWalker).getBranchFlag())
+ return;
DataBag outputConstraints = outputConstraintsMap.get(s);
outputConstraintsMap.remove(s);
@@ -457,11 +571,14 @@ public class AugmentBaseDataVisitor exte
@Override
protected void visit(LOSplit split) throws VisitorException {
-
+ if (limit && !((PreOrderDepthFirstWalker<LogicalOperator, LogicalPlan>) mCurrentWalker).getBranchFlag())
+ return;
}
@Override
protected void visit(LOStore store) throws VisitorException {
+ if (limit && !((PreOrderDepthFirstWalker<LogicalOperator, LogicalPlan>) mCurrentWalker).getBranchFlag())
+ return;
DataBag outputConstraints = outputConstraintsMap.get(store);
if (outputConstraints == null) {
outputConstraintsMap.put(store.getPlan().getPredecessors(store)
@@ -475,6 +592,8 @@ public class AugmentBaseDataVisitor exte
@Override
protected void visit(LOUnion u) throws VisitorException {
+ if (limit && !((PreOrderDepthFirstWalker<LogicalOperator, LogicalPlan>) mCurrentWalker).getBranchFlag())
+ return;
DataBag outputConstraints = outputConstraintsMap.get(u);
outputConstraintsMap.remove(u);
if (outputConstraints == null || outputConstraints.size() == 0) {
@@ -506,6 +625,61 @@ public class AugmentBaseDataVisitor exte
}
+ @Override
+ protected void visit(LOLimit lm) throws VisitorException {
+ if (!limit) // not augment for LIMIT in this traversal
+ return;
+
+ if (oriLimitMap == null)
+ oriLimitMap = new HashMap<LOLimit, Long>();
+
+ DataBag outputConstraints = outputConstraintsMap.get(lm);
+ outputConstraintsMap.remove(lm);
+
+ DataBag inputConstraints = outputConstraintsMap.get(lm.getInput());
+ if (inputConstraints == null) {
+ inputConstraints = BagFactory.getInstance().newDefaultBag();
+ outputConstraintsMap.put(lm.getInput(), inputConstraints);
+ }
+
+ DataBag inputData = derivedData.get(lm.getInput());
+
+ if (outputConstraints != null && outputConstraints.size() > 0) { // there
+ // 's
+ // one
+ // or
+ // more
+ // output
+ // constraints
+ // ;
+ // generate
+ // corresponding
+ // input
+ // constraints
+ for (Iterator<Tuple> it = outputConstraints.iterator(); it
+ .hasNext();) {
+ inputConstraints.add(it.next());
+ // ... plus one more if only one
+ if (inputConstraints.size() == 1) {
+ inputConstraints.add(inputData.iterator().next());
+ ((PreOrderDepthFirstWalker<LogicalOperator, LogicalPlan>) mCurrentWalker).setBranchFlag();
+ }
+ }
+ } else if (inputConstraints.size() == 0){
+ // add all input to input constraints ...
+ inputConstraints.addAll(inputData);
+ // ... plus one more if only one
+ if (inputConstraints.size() == 1) {
+ inputConstraints.add(inputData.iterator().next());
+ ((PreOrderDepthFirstWalker<LogicalOperator, LogicalPlan>) mCurrentWalker).setBranchFlag();
+ }
+ }
+ POLimit poLimit = (POLimit) logToPhysMap.get(lm);
+ oriLimitMap.put(lm, Long.valueOf(poLimit.getLimit()));
+ poLimit.setLimit(inputConstraints.size()-1);
+ lm.setLimit(poLimit.getLimit());
+ }
+
Tuple GetGroupByInput(Object groupLabel, List<Integer> groupCols,
int numFields) throws ExecException {
Tuple t = TupleFactory.getInstance().newTuple(numFields);
Modified: pig/trunk/src/org/apache/pig/pen/DerivedDataVisitor.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/pen/DerivedDataVisitor.java?rev=1045314&r1=1045313&r2=1045314&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/pen/DerivedDataVisitor.java (original)
+++ pig/trunk/src/org/apache/pig/pen/DerivedDataVisitor.java Mon Dec 13 19:11:00 2010
@@ -67,7 +67,7 @@ import org.apache.pig.pen.util.LineageTr
//This class is used to pass data through the entire plan and save the intermediates results.
-public class DerivedDataVisitor extends LOVisitor {
+public class DerivedDataVisitor {
Map<LogicalOperator, DataBag> derivedData = new HashMap<LogicalOperator, DataBag>();
PhysicalPlan physPlan = null;
@@ -83,16 +83,13 @@ public class DerivedDataVisitor extends
public DerivedDataVisitor(LogicalPlan plan, PigContext pc,
Map<LOLoad, DataBag> baseData,
- Map<LogicalOperator, PhysicalOperator> logToPhyMap,
PhysicalPlan physPlan) {
- super(plan, new DependencyOrderWalker<LogicalOperator, LogicalPlan>(
- plan));
+
this.baseData = baseData;
OpToEqClasses = new HashMap<LogicalOperator, Collection<IdentityHashSet<Tuple>>>();
EqClasses = new LinkedList<IdentityHashSet<Tuple>>();
- LogToPhyMap = logToPhyMap;
this.physPlan = physPlan;
// if(logToPhyMap == null)
// compilePlan(plan);
@@ -105,9 +102,6 @@ public class DerivedDataVisitor extends
Map<LOLoad, DataBag> baseData,
Map<LogicalOperator, PhysicalOperator> logToPhyMap,
PhysicalPlan physPlan) {
- super(op.getPlan(),
- new DependencyOrderLimitedWalker<LogicalOperator, LogicalPlan>(
- op, op.getPlan()));
this.baseData = baseData;
OpToEqClasses = new HashMap<LogicalOperator, Collection<IdentityHashSet<Tuple>>>();
@@ -115,364 +109,5 @@ public class DerivedDataVisitor extends
LogToPhyMap = logToPhyMap;
this.physPlan = physPlan;
- // if(logToPhyMap == null)
- // compilePlan(op.getPlan());
- // else
- // LogToPhyMap = logToPhyMap;
- }
-
- public void setOperatorToEvaluate(LogicalOperator op) {
- mCurrentWalker = new DependencyOrderLimitedWalker<LogicalOperator, LogicalPlan>(
- op, op.getPlan());
- }
-
- @Override
- protected void visit(LOCogroup cg) throws VisitorException {
- // evaluateOperator(cg);
- // there is a slightly different code path for cogroup because of the
- // local rearranges
- PhysicalOperator physOp = LogToPhyMap.get(cg);
- Random r = new Random();
- // get the list of original inputs
-
- // List<PhysicalOperator> inputs = physOp.getInputs();
- List<PhysicalOperator> inputs = new ArrayList<PhysicalOperator>();
- PhysicalPlan phy = new PhysicalPlan();
- phy.add(physOp);
-
- // for(PhysicalOperator input : physOp.getInputs()) {
- for (PhysicalOperator input : physPlan.getPredecessors(physOp)) {
- inputs.add(input.getInputs().get(0));
- // input.setInputs(null);
- phy.add(input);
- try {
- phy.connect(input, physOp);
- } catch (PlanException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- log.error("Error connecting " + input.name() + " to "
- + physOp.name());
- }
- }
-
- physOp.setLineageTracer(lineage);
-
- // replace the original inputs by POReads
- for (int i = 0; i < inputs.size(); i++) {
- DataBag bag = derivedData.get(cg.getInputs().get(i));
- PORead por = new PORead(new OperatorKey("", r.nextLong()), bag);
- phy.add(por);
- try {
- phy.connect(por, physOp.getInputs().get(i));
- } catch (PlanException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- log.error("Error connecting " + por.name() + " to "
- + physOp.name());
- }
- }
-
- DataBag output = BagFactory.getInstance().newDefaultBag();
- Tuple t = null;
- try {
- for (Result res = physOp.getNext(t); res.returnStatus != POStatus.STATUS_EOP; res = physOp
- .getNext(t)) {
- output.add((Tuple) res.result);
- }
- } catch (ExecException e) {
- log.error("Error evaluating operator : " + physOp.name());
- }
- derivedData.put(cg, output);
-
- try {
- Collection<IdentityHashSet<Tuple>> eq = EquivalenceClasses
- .getEquivalenceClasses(cg, derivedData);
- EqClasses.addAll(eq);
- OpToEqClasses.put(cg, eq);
- } catch (ExecException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- log
- .error("Error updating equivalence classes while evaluating operators. \n"
- + e.getMessage());
- }
-
- // re-attach the original operators
- // for(int i = 0; i < inputs.size(); i++) {
- // try {
- // physPlan.connect(inputs.get(i), physOp.getInputs().get(i));
- //
- // } catch (PlanException e) {
- // // TODO Auto-generated catch block
- // e.printStackTrace();
- // log.error("Error connecting " + inputs.get(i).name() + " to " +
- // physOp.getInputs().get(i).name());
- // }
- // }
- physOp.setLineageTracer(null);
- }
-
- @Override
- protected void visit(LOCross cs) throws VisitorException {
- evaluateOperator(cs);
- }
-
- @Override
- protected void visit(LODistinct dt) throws VisitorException {
- evaluateOperator(dt);
- }
-
- @Override
- protected void visit(LOFilter filter) throws VisitorException {
- evaluateOperator(filter);
- }
-
- @Override
- protected void visit(LOForEach forEach) throws VisitorException {
- evaluateOperator(forEach);
- }
-
- @Override
- protected void visit(LOLoad load) throws VisitorException {
- derivedData.put(load, baseData.get(load));
-
- Collection<IdentityHashSet<Tuple>> eq = EquivalenceClasses
- .getEquivalenceClasses(load, derivedData);
- EqClasses.addAll(eq);
- OpToEqClasses.put(load, eq);
-
- for (Iterator<Tuple> it = derivedData.get(load).iterator(); it
- .hasNext();) {
- lineage.insert(it.next());
- }
-
}
-
- @Override
- protected void visit(LOSplit split) throws VisitorException {
- evaluateOperator(split);
- }
-
- @Override
- protected void visit(LOStore store) throws VisitorException {
- derivedData.put(store, derivedData.get(store.getPlan().getPredecessors(
- store).get(0)));
- }
-
- @Override
- protected void visit(LOUnion u) throws VisitorException {
- evaluateOperator(u);
- }
-
- @Override
- protected void visit(LOLimit l) throws VisitorException {
- evaluateOperator(l);
- }
-
- @Override
- protected void visit(LOSort sort) throws VisitorException {
- evaluateOperator(sort);
- }
-
- // private void compilePlan(LogicalPlan plan) {
- //
- // plan = refineLogicalPlan(plan);
- //
- // LocalLogToPhyTranslationVisitor visitor = new
- // LocalLogToPhyTranslationVisitor(plan);
- // visitor.setPigContext(pc);
- // try {
- // visitor.visit();
- // } catch (VisitorException e) {
- // // TODO Auto-generated catch block
- // e.printStackTrace();
- // log.error("Error visiting the logical plan in ExampleGenerator");
- // }
- // physPlan = visitor.getPhysicalPlan();
- // LogToPhyMap = visitor.getLogToPhyMap();
- // }
- //
- // private LogicalPlan refineLogicalPlan(LogicalPlan plan) {
- // PlanSetter ps = new PlanSetter(plan);
- // try {
- // ps.visit();
- //
- // } catch (VisitorException e) {
- // // TODO Auto-generated catch block
- // e.printStackTrace();
- // }
- //
- // // run through validator
- // CompilationMessageCollector collector = new CompilationMessageCollector()
- // ;
- // FrontendException caught = null;
- // try {
- // LogicalPlanValidationExecutor validator =
- // new LogicalPlanValidationExecutor(plan, pc);
- // validator.validate(plan, collector);
- // } catch (FrontendException fe) {
- // // Need to go through and see what the collector has in it. But
- // // remember what we've caught so we can wrap it into what we
- // // throw.
- // caught = fe;
- // }
- //
- //
- // return plan;
- //
- // }
-
- private void evaluateOperator(LogicalOperator op) {
- PhysicalOperator physOp = LogToPhyMap.get(op);
- Random r = new Random();
- // get the list of original inputs
-
- List<PhysicalOperator> inputs = physOp.getInputs();
- physOp.setInputs(null);
- physOp.setLineageTracer(lineage);
- PhysicalPlan phy = new PhysicalPlan();
- phy.add(physOp);
-
- // replace the original inputs by POReads
- for (LogicalOperator l : op.getPlan().getPredecessors(op)) {
- DataBag bag = derivedData.get(l);
- PORead por = new PORead(new OperatorKey("", r.nextLong()), bag);
- phy.add(por);
- try {
- phy.connect(por, physOp);
- } catch (PlanException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- log.error("Error connecting " + por.name() + " to "
- + physOp.name());
- }
- }
-
- DataBag output = BagFactory.getInstance().newDefaultBag();
- Tuple t = null;
- try {
- for (Result res = physOp.getNext(t); res.returnStatus != POStatus.STATUS_EOP; res = physOp
- .getNext(t)) {
- output.add((Tuple) res.result);
- }
- } catch (ExecException e) {
- log.error("Error evaluating operator : " + physOp.name());
- }
- derivedData.put(op, output);
-
- try {
- Collection<IdentityHashSet<Tuple>> eq = EquivalenceClasses
- .getEquivalenceClasses(op, derivedData);
- EqClasses.addAll(eq);
- OpToEqClasses.put(op, eq);
- } catch (ExecException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- log
- .error("Error updating equivalence classes while evaluating operators. \n"
- + e.getMessage());
- }
-
- // re-attach the original operators
- physOp.setInputs(inputs);
- physOp.setLineageTracer(null);
- }
-
- public DataBag evaluateIsolatedOperator(LOCogroup op,
- List<DataBag> inputBags) {
- if (op.getPlan().getPredecessors(op).size() > inputBags.size())
- return null;
-
- int count = 0;
- for (LogicalOperator inputs : op.getPlan().getPredecessors(op)) {
- derivedData.put(inputs, inputBags.get(count++));
- }
-
- return evaluateIsolatedOperator(op);
-
- }
-
- public DataBag evaluateIsolatedOperator(LOCogroup op) {
- // return null if the inputs are not already evaluated
- for (LogicalOperator in : op.getPlan().getPredecessors(op)) {
- if (derivedData.get(in) == null)
- return null;
- }
-
- LineageTracer oldLineage = this.lineage;
- this.lineage = new LineageTracer();
-
- PhysicalOperator physOp = LogToPhyMap.get(op);
- Random r = new Random();
- // get the list of original inputs
- // List<PhysicalOperator> inputs = physOp.getInputs();
- List<PhysicalOperator> inputs = new ArrayList<PhysicalOperator>();
- PhysicalPlan phy = new PhysicalPlan();
- phy.add(physOp);
-
- for (PhysicalOperator input : physOp.getInputs()) {
- inputs.add(input.getInputs().get(0));
- input.setInputs(null);
- phy.add(input);
- try {
- phy.connect(input, physOp);
- } catch (PlanException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- log.error("Error connecting " + input.name() + " to "
- + physOp.name());
- }
- }
- physOp.setLineageTracer(lineage);
-
- physOp.setLineageTracer(null);
-
- // replace the original inputs by POReads
- for (int i = 0; i < inputs.size(); i++) {
- DataBag bag = derivedData.get(op.getInputs().get(i));
- PORead por = new PORead(new OperatorKey("", r.nextLong()), bag);
- phy.add(por);
- try {
- phy.connect(por, physOp.getInputs().get(i));
- } catch (PlanException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- log.error("Error connecting " + por.name() + " to "
- + physOp.name());
- }
- }
-
- // replace the original inputs by POReads
- // for(LogicalOperator l : op.getPlan().getPredecessors(op)) {
- // DataBag bag = derivedData.get(l);
- // PORead por = new PORead(new OperatorKey("", r.nextLong()), bag);
- // phy.add(por);
- // try {
- // phy.connect(por, physOp);
- // } catch (PlanException e) {
- // // TODO Auto-generated catch block
- // e.printStackTrace();
- // log.error("Error connecting " + por.name() + " to " + physOp.name());
- // }
- // }
-
- DataBag output = BagFactory.getInstance().newDefaultBag();
- Tuple t = null;
- try {
- for (Result res = physOp.getNext(t); res.returnStatus != POStatus.STATUS_EOP; res = physOp
- .getNext(t)) {
- output.add((Tuple) res.result);
- }
- } catch (ExecException e) {
- log.error("Error evaluating operator : " + physOp.name());
- }
-
- this.lineage = oldLineage;
-
- physOp.setInputs(inputs);
- physOp.setLineageTracer(null);
-
- return output;
- }
-
}
Modified: pig/trunk/src/org/apache/pig/pen/EquivalenceClasses.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/pen/EquivalenceClasses.java?rev=1045314&r1=1045313&r2=1045314&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/pen/EquivalenceClasses.java (original)
+++ pig/trunk/src/org/apache/pig/pen/EquivalenceClasses.java Mon Dec 13 19:11:00 2010
@@ -19,174 +19,105 @@
package org.apache.pig.pen;
import java.util.Collection;
-import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
+import java.util.HashMap;
+import java.util.List;
+import java.util.HashSet;
+import java.util.Iterator;
-import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.Tuple;
-import org.apache.pig.impl.logicalLayer.LOCogroup;
-import org.apache.pig.impl.logicalLayer.LOFilter;
import org.apache.pig.impl.logicalLayer.LOForEach;
-import org.apache.pig.impl.logicalLayer.LOLoad;
-import org.apache.pig.impl.logicalLayer.LOSort;
-import org.apache.pig.impl.logicalLayer.LOSplit;
-import org.apache.pig.impl.logicalLayer.LOUnion;
+import org.apache.pig.impl.logicalLayer.LOCross;
import org.apache.pig.impl.logicalLayer.LogicalOperator;
+import org.apache.pig.impl.logicalLayer.LogicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.impl.util.IdentityHashSet;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.*;
+import org.apache.pig.impl.plan.VisitorException;
//These methods are used to generate equivalence classes given the operator name and the output from the operator
//For example, it gives out 2 eq. classes for filter, one that passes the filter and one that doesn't
public class EquivalenceClasses {
- public static Collection<IdentityHashSet<Tuple>> getEquivalenceClasses(
- LogicalOperator op, Map<LogicalOperator, DataBag> derivedData)
- throws ExecException {
- if (op instanceof LOCogroup)
- return getEquivalenceClasses((LOCogroup) op, derivedData);
- else if (op instanceof LOForEach)
- return getEquivalenceClasses((LOForEach) op, derivedData);
- else if (op instanceof LOFilter)
- return getEquivalenceClasses((LOFilter) op, derivedData);
- else if (op instanceof LOSort)
- return getEquivalenceClasses((LOSort) op, derivedData);
- else if (op instanceof LOSplit)
- return getEquivalenceClasses((LOSplit) op, derivedData);
- else if (op instanceof LOUnion)
- return getEquivalenceClasses((LOUnion) op, derivedData);
- else if (op instanceof LOLoad)
- return getEquivalenceClasses((LOLoad) op, derivedData);
- throw new RuntimeException("Unrecognized logical operator.");
- }
-
- static Collection<IdentityHashSet<Tuple>> getEquivalenceClasses(LOLoad op,
- Map<LogicalOperator, DataBag> derivedData) {
- // Since its a load, all the tuples belong to a single equivalence class
- Collection<IdentityHashSet<Tuple>> equivClasses = new LinkedList<IdentityHashSet<Tuple>>();
- IdentityHashSet<Tuple> input = new IdentityHashSet<Tuple>();
-
- equivClasses.add(input);
-
- DataBag output = derivedData.get(op);
-
- for (Iterator<Tuple> it = output.iterator(); it.hasNext();) {
- Tuple t = it.next();
-
- input.add(t);
- }
-
- return equivClasses;
- }
-
- static Collection<IdentityHashSet<Tuple>> getEquivalenceClasses(
- LOCogroup op, Map<LogicalOperator, DataBag> derivedData)
- throws ExecException {
- Collection<IdentityHashSet<Tuple>> equivClasses = new LinkedList<IdentityHashSet<Tuple>>();
- IdentityHashSet<Tuple> acceptableGroups = new IdentityHashSet<Tuple>();
-
- equivClasses.add(acceptableGroups);
-
- for (Iterator<Tuple> it = derivedData.get(op).iterator(); it.hasNext();) {
- Tuple t = it.next();
-
- boolean isAcceptable;
-
- if (t.size() == 2) {
- isAcceptable = (((DataBag) (t.get(1))).size() >= 2);
+
+ public static Map<LogicalOperator, Collection<IdentityHashSet<Tuple>>> getLoToEqClassMap(PhysicalPlan plan,
+ LogicalPlan lp, Map<LogicalOperator, PhysicalOperator> logToPhyMap,
+ Map<LogicalOperator, DataBag> logToDataMap,
+ Map<LOForEach, Map<LogicalOperator, PhysicalOperator>> forEachInnerLogToPhyMap,
+ final HashMap<PhysicalOperator, Collection<IdentityHashSet<Tuple>>> poToEqclassesMap)
+ throws VisitorException {
+ Map<LogicalOperator, Collection<IdentityHashSet<Tuple>>> ret =
+ new HashMap<LogicalOperator, Collection<IdentityHashSet<Tuple>>>();
+ List<LogicalOperator> roots = lp.getRoots();
+ HashSet<LogicalOperator> seen = new HashSet<LogicalOperator>();
+ for(LogicalOperator lo: roots) {
+ getEqClasses(plan, lo, lp, logToPhyMap, ret, poToEqclassesMap, logToDataMap, forEachInnerLogToPhyMap, seen);
+ }
+ return ret;
+ }
+
+ private static void getEqClasses(PhysicalPlan plan, LogicalOperator parent, LogicalPlan lp,
+ Map<LogicalOperator, PhysicalOperator> logToPhyMap, Map<LogicalOperator,
+ Collection<IdentityHashSet<Tuple>>> result,
+ final HashMap<PhysicalOperator, Collection<IdentityHashSet<Tuple>>> poToEqclassesMap,
+ Map<LogicalOperator, DataBag> logToDataMap,
+ Map<LOForEach, Map<LogicalOperator, PhysicalOperator>> forEachInnerLogToPhyMap,
+ HashSet<LogicalOperator> seen) throws VisitorException {
+ if (parent instanceof LOForEach) {
+ if (poToEqclassesMap.get(logToPhyMap.get(parent)) != null) {
+ LinkedList<IdentityHashSet<Tuple>> eqClasses = new LinkedList<IdentityHashSet<Tuple>>();
+ eqClasses.addAll(poToEqclassesMap.get(logToPhyMap.get(parent)));
+ for (Map.Entry<LogicalOperator, PhysicalOperator> entry : forEachInnerLogToPhyMap.get(parent).entrySet()) {
+ if (poToEqclassesMap.get(entry.getValue()) != null)
+ eqClasses.addAll(poToEqclassesMap.get(entry.getValue()));
+ }
+ result.put(parent, eqClasses);
+ }
+ } else if (parent instanceof LOCross) {
+ boolean ok = true;
+ for (LogicalOperator input : ((LOCross) parent).getInputs()) {
+ if (logToDataMap.get(input).size() < 2) {
+ // only if all inputs have at least more than two tuples will all outputs be added to the eq. class
+ ok = false;
+ break;
+ }
+ }
+ if (ok) {
+ LinkedList<IdentityHashSet<Tuple>> eqClasses = new LinkedList<IdentityHashSet<Tuple>>();
+ IdentityHashSet<Tuple> eqClass = new IdentityHashSet<Tuple>();
+ for (Iterator<Tuple> it = logToDataMap.get(parent).iterator(); it.hasNext();) {
+ eqClass.add(it.next());
+ }
+ eqClasses.add(eqClass);
+ result.put(parent, eqClasses);
} else {
- isAcceptable = true;
- for (int field = 1; field < t.size(); field++) {
- DataBag bag = (DataBag) t.get(field);
- if (bag.size() == 0) {
- isAcceptable = false;
- break;
- }
+ LinkedList<IdentityHashSet<Tuple>> eqClasses = new LinkedList<IdentityHashSet<Tuple>>();
+ IdentityHashSet<Tuple> eqClass = new IdentityHashSet<Tuple>();
+ eqClasses.add(eqClass);
+ result.put(parent, eqClasses);
+ }
+ } else {
+ Collection<IdentityHashSet<Tuple>> eqClasses = poToEqclassesMap.get(logToPhyMap.get(parent));
+ if (eqClasses == null) {
+ eqClasses = new LinkedList<IdentityHashSet<Tuple>>();
+ int size = ((POPackage)logToPhyMap.get(parent)).getNumInps();
+ for (int i = 0; i < size; i++) {
+ eqClasses.add(new IdentityHashSet<Tuple>());
}
}
-
- if (isAcceptable)
- acceptableGroups.add(t);
-
+ result.put(parent, eqClasses);
}
- return equivClasses;
- }
-
- static Collection<IdentityHashSet<Tuple>> getEquivalenceClasses(
- LOForEach op, Map<LogicalOperator, DataBag> derivedData) {
- Collection<IdentityHashSet<Tuple>> equivClasses = new LinkedList<IdentityHashSet<Tuple>>();
-
- IdentityHashSet<Tuple> equivClass = new IdentityHashSet<Tuple>();
- equivClasses.add(equivClass);
-
- for (Iterator<Tuple> it = derivedData.get(op).iterator(); it.hasNext();) {
- equivClass.add(it.next());
- }
-
- return equivClasses;
- }
-
- static Collection<IdentityHashSet<Tuple>> getEquivalenceClasses(
- LOFilter op, Map<LogicalOperator, DataBag> derivedData) {
- Collection<IdentityHashSet<Tuple>> equivClasses = new LinkedList<IdentityHashSet<Tuple>>();
-
- IdentityHashSet<Tuple> pass = new IdentityHashSet<Tuple>();
- IdentityHashSet<Tuple> fail = new IdentityHashSet<Tuple>();
-
- for (Iterator<Tuple> it = derivedData.get(op).iterator(); it.hasNext();) {
- pass.add(it.next());
- }
-
- LogicalOperator input = op.getInput();
-
- for (Iterator<Tuple> it = derivedData.get(input).iterator(); it
- .hasNext();) {
- Tuple t = it.next();
- if (pass.contains(t))
- continue;
- else
- fail.add(t);
- }
-
- equivClasses.add(pass);
- equivClasses.add(fail);
-
- return equivClasses;
-
- }
-
- static Collection<IdentityHashSet<Tuple>> getEquivalenceClasses(LOSort op,
- Map<LogicalOperator, DataBag> derivedData) {
- //We don't create any eq. class for sort
- IdentityHashSet<Tuple> temp = new IdentityHashSet<Tuple>();
- Collection<IdentityHashSet<Tuple>> output = new LinkedList<IdentityHashSet<Tuple>>();
- output.add(temp);
- return output;
- }
-
- static Collection<IdentityHashSet<Tuple>> getEquivalenceClasses(LOSplit op,
- Map<LogicalOperator, DataBag> derivedData) {
- throw new RuntimeException(
- "LOSplit not supported yet in example generator.");
- }
-
- static Collection<IdentityHashSet<Tuple>> getEquivalenceClasses(LOUnion op,
- Map<LogicalOperator, DataBag> derivedData) {
-
- // make one equivalence class per input relation
-
- Collection<IdentityHashSet<Tuple>> equivClasses = new LinkedList<IdentityHashSet<Tuple>>();
-
- for (LogicalOperator input : op.getInputs()) {
- IdentityHashSet<Tuple> equivClass = new IdentityHashSet<Tuple>();
-
- for (Iterator<Tuple> it = derivedData.get(input).iterator(); it
- .hasNext();) {
- equivClass.add(it.next());
+ // result.put(parent, getEquivalenceClasses(plan, parent, lp, logToPhyMap, poToEqclassesMap));
+ if (lp.getSuccessors(parent) != null) {
+ for (LogicalOperator lo : lp.getSuccessors(parent)) {
+ if (!seen.contains(lo)) {
+ seen.add(lo);
+ getEqClasses(plan, lo, lp, logToPhyMap, result, poToEqclassesMap, logToDataMap, forEachInnerLogToPhyMap, seen);
+ }
}
- equivClasses.add(equivClass);
}
-
- return equivClasses;
}
}
Modified: pig/trunk/src/org/apache/pig/pen/ExampleGenerator.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/pen/ExampleGenerator.java?rev=1045314&r1=1045313&r2=1045314&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/pen/ExampleGenerator.java (original)
+++ pig/trunk/src/org/apache/pig/pen/ExampleGenerator.java Mon Dec 13 19:11:00 2010
@@ -19,54 +19,77 @@
package org.apache.pig.pen;
import java.util.HashMap;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.Collection;
+import java.util.Iterator;
+import java.io.IOException;
+import org.apache.pig.impl.util.IdentityHashSet;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.pig.ExecType;
import org.apache.pig.backend.executionengine.ExecException;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
-import org.apache.pig.data.BagFactory;
+import org.apache.pig.backend.hadoop.executionengine.HExecutionEngine;
+
import org.apache.pig.data.DataBag;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.PigContext;
+import org.apache.pig.PigException;
+import org.apache.pig.impl.io.FileLocalizer;
import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.logicalLayer.LOForEach;
import org.apache.pig.impl.logicalLayer.LOLoad;
import org.apache.pig.impl.logicalLayer.LogicalOperator;
import org.apache.pig.impl.logicalLayer.LogicalPlan;
-import org.apache.pig.impl.logicalLayer.PlanSetter;
-import org.apache.pig.impl.logicalLayer.optimizer.LogicalOptimizer;
import org.apache.pig.impl.logicalLayer.schema.Schema;
-import org.apache.pig.impl.logicalLayer.validators.LogicalPlanValidationExecutor;
-import org.apache.pig.impl.plan.CompilationMessageCollector;
+import org.apache.pig.impl.logicalLayer.LOSort;
+import org.apache.pig.impl.logicalLayer.LOLimit;
import org.apache.pig.impl.plan.VisitorException;
import org.apache.pig.pen.util.DisplayExamples;
-import org.apache.pig.pen.util.FunctionalLogicalOptimizer;
import org.apache.pig.pen.util.LineageTracer;
+/**
+ * This class is used to generate example tuples for the ILLUSTRATE purpose
+ *
+ *
+ */
public class ExampleGenerator {
LogicalPlan plan;
- Map<LOLoad, DataBag> baseData;
+ Map<LOLoad, DataBag> baseData = null;
PigContext pigContext;
- Map<LogicalOperator, PhysicalOperator> LogToPhyMap;
PhysicalPlan physPlan;
+ PhysicalPlanResetter physPlanReseter;
+ private HExecutionEngine execEngine;
+ private LocalMapReduceSimulator localMRRunner;
Log log = LogFactory.getLog(getClass());
private int MAX_RECORDS = 10000;
+
+ private Map<LogicalOperator, PhysicalOperator> logToPhyMap;
+ private Map<PhysicalOperator, LogicalOperator> poLoadToLogMap;
+ private Map<PhysicalOperator, LogicalOperator> poToLogMap;
+ private HashMap<PhysicalOperator, Collection<IdentityHashSet<Tuple>>> poToEqclassesMap;
+ private LineageTracer lineage;
+ private Map<LogicalOperator, DataBag> logToDataMap = null;
+ private Map<LOForEach, Map<LogicalOperator, DataBag>> forEachInnerLogToDataMap;
+ Map<LOForEach, Map<LogicalOperator, PhysicalOperator>> forEachInnerLogToPhyMap;
+ Map<LOLimit, Long> oriLimitMap = null;
+ Map<POLoad, Schema> poLoadToSchemaMap;
public ExampleGenerator(LogicalPlan plan, PigContext hadoopPigContext) {
this.plan = plan;
// pigContext = new PigContext(ExecType.LOCAL, hadoopPigContext
// .getProperties());
pigContext = hadoopPigContext;
+ // pigContext.setExecType(ExecType.LOCAL);
+ FileLocalizer.setInitialized(false);
try {
pigContext.connect();
} catch (ExecException e) {
@@ -74,167 +97,283 @@ public class ExampleGenerator {
+ e.getLocalizedMessage());
}
-
+ execEngine = new HExecutionEngine(pigContext);
+ localMRRunner = new LocalMapReduceSimulator();
+ poLoadToSchemaMap = new HashMap<POLoad, Schema>();
}
+ public LineageTracer getLineage() {
+ return lineage;
+ }
+
+ public Map<LogicalOperator, PhysicalOperator> getLogToPhyMap() {
+ return logToPhyMap;
+ }
+
public void setMaxRecords(int max) {
MAX_RECORDS = max;
}
- public Map<LogicalOperator, DataBag> getExamples() {
-
- compilePlan(plan);
-
+ public Map<LogicalOperator, DataBag> getExamples() throws IOException, InterruptedException {
+ if (pigContext.getProperties().getProperty("pig.usenewlogicalplan", "true").equals("false"))
+ throw new ExecException("ILLUSTRATE must use the new logical plan!");
+ pigContext.inIllustrator = true;
+ physPlan = compilePlan(plan);
+ physPlanReseter = new PhysicalPlanResetter(physPlan);
List<LogicalOperator> loads = plan.getRoots();
+ List<PhysicalOperator> pRoots = physPlan.getRoots();
+ if (loads.size() != pRoots.size())
+ throw new ExecException("Logical and Physical plans have different number of roots");
+ logToPhyMap = execEngine.getLogToPhyMap();
+ forEachInnerLogToPhyMap = execEngine.getForEachInnerLogToPhyMap();
+ poLoadToLogMap = new HashMap<PhysicalOperator, LogicalOperator>();
+ logToDataMap = new HashMap<LogicalOperator, DataBag>();
+ poToLogMap = new HashMap<PhysicalOperator, LogicalOperator>();
+
+ // set up foreach inner data map
+ forEachInnerLogToDataMap = new HashMap<LOForEach, Map<LogicalOperator, DataBag>>();
+ for (Map.Entry<LOForEach, Map<LogicalOperator, PhysicalOperator>> entry : forEachInnerLogToPhyMap.entrySet()) {
+ Map<LogicalOperator, DataBag> innerMap = new HashMap<LogicalOperator, DataBag>();
+ forEachInnerLogToDataMap.put(entry.getKey(), innerMap);
+ }
+ for (LogicalOperator load : loads)
+ {
+ poLoadToLogMap.put(logToPhyMap.get(load), load);
+ }
+
+ boolean hasLimit = false;
+ for (LogicalOperator lo : logToPhyMap.keySet()) {
+ poToLogMap.put(logToPhyMap.get(lo), lo);
+ if (!hasLimit && lo instanceof LOLimit)
+ hasLimit = true;
+ }
+
try {
readBaseData(loads);
} catch (ExecException e) {
- // TODO Auto-generated catch block
log.error("Error reading data. " + e.getMessage());
- throw new RuntimeException(e.getMessage());
+ throw e;
} catch (FrontendException e) {
- // TODO Auto-generated catch block
log.error("Error reading data. " + e.getMessage());
+ throw new RuntimeException(e.getMessage());
}
- DerivedDataVisitor derivedData = null;
+ Map<LogicalOperator, DataBag> derivedData = null;
try {
// create derived data and trim base data
LineageTrimmingVisitor trimmer = new LineageTrimmingVisitor(plan,
- baseData, LogToPhyMap, physPlan, pigContext);
+ baseData, this, logToPhyMap, physPlan, pigContext);
trimmer.visit();
// System.out.println(
// "Obtained the first level derived and trimmed data");
// create new derived data from trimmed basedata
- derivedData = new DerivedDataVisitor(plan, null, baseData,
- trimmer.LogToPhyMap, physPlan);
- derivedData.visit();
+ derivedData = getData(physPlan);
// System.out.println(
// "Got new derived data from the trimmed base data");
// augment base data
AugmentBaseDataVisitor augment = new AugmentBaseDataVisitor(plan,
- baseData, derivedData.derivedData);
+ logToPhyMap, baseData, derivedData);
augment.visit();
this.baseData = augment.getNewBaseData();
// System.out.println("Obtained augmented base data");
// create new derived data and trim the base data after augmenting
// base data with synthetic tuples
- trimmer = new LineageTrimmingVisitor(plan, baseData,
- derivedData.LogToPhyMap, physPlan, pigContext);
+ trimmer = new LineageTrimmingVisitor(plan, baseData, this,
+ logToPhyMap, physPlan, pigContext);
trimmer.visit();
// System.out.println("Final trimming");
// create the final version of derivedData to give to the output
- derivedData = new DerivedDataVisitor(plan, null, baseData,
- trimmer.LogToPhyMap, physPlan);
- derivedData.visit();
+ derivedData = getData(physPlan);
// System.out.println("Obtaining final derived data for output");
+
+ if (hasLimit)
+ {
+ augment.setLimit();
+ augment.visit();
+ this.baseData = augment.getNewBaseData();
+ oriLimitMap = augment.getOriLimitMap();
+ derivedData = getData();
+ }
} catch (VisitorException e) {
- // TODO Auto-generated catch block
+ e.printStackTrace(System.out);
log.error("Visitor exception while creating example data "
+ e.getMessage());
+ throw new RuntimeException(e.getMessage());
}
// DisplayExamples.printSimple(plan.getLeaves().get(0),
// derivedData.derivedData);
System.out.println(DisplayExamples.printTabular(plan,
- derivedData.derivedData));
- return derivedData.derivedData;
+ derivedData, forEachInnerLogToDataMap));
+ pigContext.inIllustrator = false;
+ return derivedData;
}
- private void readBaseData(List<LogicalOperator> loads) throws ExecException, FrontendException {
- baseData = new HashMap<LOLoad, DataBag>();
+ private void readBaseData(List<LogicalOperator> loads) throws IOException, InterruptedException, FrontendException, ExecException {
+ PhysicalPlan thisPhyPlan = new PhysicalPlan();
for (LogicalOperator op : loads) {
Schema schema = op.getSchema();
if(schema == null) {
throw new ExecException("Example Generator requires a schema. Please provide a schema while loading data.");
}
-
- DataBag opBaseData = BagFactory.getInstance().newDefaultBag();
-
- POLoad poLoad = (POLoad) LogToPhyMap.get(op);
-// PigContext oldPC = poLoad.getPc();
-// poLoad.setPc(pigContext);
-
- poLoad.setLineageTracer(new LineageTracer());
-
- Tuple t = null;
- int count = 0;
- for (Result res = poLoad.getNext(t); res.returnStatus != POStatus.STATUS_EOP
- && count < MAX_RECORDS; res = poLoad.getNext(t)) {
- if (res.returnStatus == POStatus.STATUS_NULL)
- continue;
- if (res.returnStatus == POStatus.STATUS_ERR) {
- log.error("Error reading Tuple");
- } else {
- opBaseData.add((Tuple) res.result);
- count++;
- }
-
+ poLoadToSchemaMap.put((POLoad)logToPhyMap.get(op), schema);
+ thisPhyPlan.add(logToPhyMap.get(op));
+ }
+ baseData = null;
+ Map<LogicalOperator, DataBag> result = getData(thisPhyPlan);
+ baseData = new HashMap<LOLoad, DataBag>();
+ for (LogicalOperator lo : result.keySet()) {
+ if (lo instanceof LOLoad) {
+ baseData.put((LOLoad) lo, result.get(lo));
}
- baseData.put((LOLoad) op, opBaseData);
- // poLoad.setPc(oldPC);
- poLoad.setLineageTracer(null);
}
-
}
- private void compilePlan(LogicalPlan plan) {
-
- plan = refineLogicalPlan(plan);
-
- LocalLogToPhyTranslationVisitor visitor = new LocalLogToPhyTranslationVisitor(
- plan);
- visitor.setPigContext(pigContext);
- try {
- visitor.visit();
- } catch (VisitorException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- log.error("Error visiting the logical plan in ExampleGenerator");
+ PhysicalPlan compilePlan(LogicalPlan plan) throws ExecException, FrontendException {
+ return execEngine.compile(plan, null);
+ }
+
+ public Map<LogicalOperator, DataBag> getData() throws IOException, InterruptedException {
+ return getData(physPlan);
+ }
+
+ private Map<LogicalOperator, DataBag> getData(PhysicalPlan plan) throws PigException, IOException, InterruptedException
+ {
+ // get data on a physical plan possibly trimmed of one branch
+ lineage = new LineageTracer();
+ IllustratorAttacher attacher = new IllustratorAttacher(plan, lineage, MAX_RECORDS, poLoadToSchemaMap, pigContext);
+ attacher.visit();
+ if (oriLimitMap != null) {
+ for (Map.Entry<LOLimit, Long> entry : oriLimitMap.entrySet()) {
+ logToPhyMap.get(entry.getKey()).getIllustrator().setOriginalLimit(entry.getValue());
+ }
}
- physPlan = visitor.getPhysicalPlan();
- LogToPhyMap = visitor.getLogToPhyMap();
+ getLogToDataMap(attacher.getDataMap());
+ if (baseData != null ) {
+ setLoadDataMap();
+ physPlanReseter.visit();
+ }
+ localMRRunner.launchPig(plan, baseData, poLoadToLogMap, lineage, attacher, this, pigContext);
+ if (baseData == null)
+ poToEqclassesMap = attacher.poToEqclassesMap;
+ else {
+ for (Map.Entry<PhysicalOperator, Collection<IdentityHashSet<Tuple>>> entry : attacher.poToEqclassesMap.entrySet()) {
+ if(!(entry.getKey() instanceof POLoad))
+ poToEqclassesMap.put(entry.getKey(), entry.getValue());
+ }
+ }
+ if (baseData != null)
+ // only for non derived data generation
+ phyToMRTransform(plan, attacher.getDataMap());
+ return logToDataMap;
}
-
- private LogicalPlan refineLogicalPlan(LogicalPlan plan) {
- PlanSetter ps = new PlanSetter(plan);
- try {
- ps.visit();
-
- } catch (VisitorException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
+
+ public Map<LogicalOperator, DataBag> getData(Map<LOLoad, DataBag> newBaseData) throws Exception
+ {
+ baseData = newBaseData;
+ return getData(physPlan);
+ }
+
+ private void phyToMRTransform(PhysicalPlan plan, Map<PhysicalOperator, DataBag> phyToDataMap) {
+ // remap the LO to PO as result of the MR compilation may have changed PO in the MR plans
+ Map<PhysicalOperator, PhysicalOperator> phyToMRMap = localMRRunner.getPhyToMRMap();
+ for (Map.Entry<PhysicalOperator, LogicalOperator> entry : poToLogMap.entrySet()) {
+ if (phyToMRMap.get(entry.getKey()) != null) {
+ PhysicalOperator poInMR = phyToMRMap.get(entry.getKey());
+ logToDataMap.put(entry.getValue(), phyToDataMap.get(poInMR));
+ poToEqclassesMap.put(entry.getKey(), poToEqclassesMap.get(poInMR));
+ }
}
-
- // run through validator
- CompilationMessageCollector collector = new CompilationMessageCollector();
- FrontendException caught = null;
- try {
- boolean isBeforeOptimizer = true;
- LogicalPlanValidationExecutor validator = new LogicalPlanValidationExecutor(
- plan, pigContext, isBeforeOptimizer);
- validator.validate(plan, collector);
-
- FunctionalLogicalOptimizer optimizer = new FunctionalLogicalOptimizer(
- plan);
- optimizer.optimize();
-
- isBeforeOptimizer = false;
- validator = new LogicalPlanValidationExecutor(
- plan, pigContext, isBeforeOptimizer);
- validator.validate(plan, collector);
- } catch (FrontendException fe) {
- // Need to go through and see what the collector has in it. But
- // remember what we've caught so we can wrap it into what we
- // throw.
- caught = fe;
+ }
+
+ private void getLogToDataMap(Map<PhysicalOperator, DataBag> phyToDataMap) {
+ logToDataMap.clear();
+ for (LogicalOperator lo : logToPhyMap.keySet()) {
+ if (logToPhyMap.get(lo) != null)
+ logToDataMap.put(lo, phyToDataMap.get(logToPhyMap.get(lo)));
}
+
+ // set the LO-to-Data mapping for the ForEach inner plans
+ for (Map.Entry<LOForEach, Map<LogicalOperator, DataBag>> entry : forEachInnerLogToDataMap.entrySet()) {
+ entry.getValue().clear();
+ for (Map.Entry<LogicalOperator, PhysicalOperator> innerEntry : forEachInnerLogToPhyMap.get(entry.getKey()).entrySet()) {
+ entry.getValue().put(innerEntry.getKey(), phyToDataMap.get(innerEntry.getValue()));
+ }
+ }
+ }
+
+ private void setLoadDataMap() {
+ // This function sets up the LO-TO-Data map, eq. class, and lineage for the base data used in the coming runner
+ // this must be called after logToDataMap has been properly (re)set and before the runner is started
+ if (baseData != null) {
+ if (poToEqclassesMap == null)
+ poToEqclassesMap = new HashMap<PhysicalOperator, Collection<IdentityHashSet<Tuple>>>();
+ else
+ poToEqclassesMap.clear();
+ for (LOLoad lo : baseData.keySet()) {
+ logToDataMap.get(lo).addAll(baseData.get(lo));
+ LinkedList<IdentityHashSet<Tuple>> equivalenceClasses = new LinkedList<IdentityHashSet<Tuple>>();
+ IdentityHashSet<Tuple> equivalenceClass = new IdentityHashSet<Tuple>();
+ equivalenceClasses.add(equivalenceClass);
+ for (Tuple t : baseData.get(lo)) {
+ lineage.insert(t);
+ equivalenceClass.add(t);
+ }
+ poToEqclassesMap.put(logToPhyMap.get(lo), equivalenceClasses);
+ }
+ }
+ }
+
+ public Collection<IdentityHashSet<Tuple>> getEqClasses() throws VisitorException {
+ Map<LogicalOperator, Collection<IdentityHashSet<Tuple>>> logToEqclassesMap = getLoToEqClassMap();
+ LinkedList<IdentityHashSet<Tuple>> ret = new LinkedList<IdentityHashSet<Tuple>>();
+ for (Map.Entry<LogicalOperator, Collection<IdentityHashSet<Tuple>>> entry :
+ logToEqclassesMap.entrySet()) {
+ if (entry.getValue() != null)
+ ret.addAll(entry.getValue());
+ }
+ return ret;
+ }
- return plan;
-
+ public Map<LogicalOperator, Collection<IdentityHashSet<Tuple>>> getLoToEqClassMap() throws VisitorException {
+ Map<LogicalOperator, Collection<IdentityHashSet<Tuple>>> ret =
+ EquivalenceClasses.getLoToEqClassMap(physPlan, plan, logToPhyMap, logToDataMap, forEachInnerLogToPhyMap, poToEqclassesMap);
+ // eq classes adjustments based upon logical operators
+
+ for (Map.Entry<LogicalOperator, Collection<IdentityHashSet<Tuple>>> entry :ret.entrySet())
+ {
+ if (entry.getKey() instanceof LOSort) {
+ Collection<IdentityHashSet<Tuple>> eqClasses = entry.getValue();
+ for (Iterator<IdentityHashSet<Tuple>> it = eqClasses.iterator(); it.hasNext(); ) {
+ Object t = null;
+ IdentityHashSet<Tuple> eqClass = it.next();
+ if (eqClass.size() == 1) {
+ eqClass.clear();
+ continue;
+ }
+ boolean first = true, allIdentical = true;
+ for (Iterator<Tuple> it1 = eqClass.iterator(); it1.hasNext();)
+ {
+ if (first) {
+ first = false;
+ t = it1.next();
+ } else {
+ if (!it1.next().equals(t)) {
+ allIdentical = false;
+ break;
+ }
+ }
+ }
+ if (allIdentical)
+ eqClass.clear();
+ }
+ }
+ }
+
+ return ret;
}
}
Added: pig/trunk/src/org/apache/pig/pen/FakeRawKeyValueIterator.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/pen/FakeRawKeyValueIterator.java?rev=1045314&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/pen/FakeRawKeyValueIterator.java (added)
+++ pig/trunk/src/org/apache/pig/pen/FakeRawKeyValueIterator.java Mon Dec 13 19:11:00 2010
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.pen;
+
+import org.apache.hadoop.mapred.RawKeyValueIterator;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.util.Progress;
+
+public class FakeRawKeyValueIterator implements RawKeyValueIterator {
+ private boolean hasData;
+
+ public FakeRawKeyValueIterator(boolean hasData) {
+ this.hasData = hasData;
+ }
+
+ @Override
+ public DataInputBuffer getKey() {
+ return null;
+ }
+
+ @Override
+ public void close() {
+
+ }
+
+ @Override
+ public Progress getProgress() {
+ return null;
+ }
+
+ @Override
+ public DataInputBuffer getValue() {
+ return null;
+ }
+
+ @Override
+ public boolean next() {
+ return hasData;
+ }
+}
Added: pig/trunk/src/org/apache/pig/pen/Illustrable.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/pen/Illustrable.java?rev=1045314&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/pen/Illustrable.java (added)
+++ pig/trunk/src/org/apache/pig/pen/Illustrable.java Mon Dec 13 19:11:00 2010
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.pen;
+
+import org.apache.pig.data.Tuple;
+
+public interface Illustrable {
+ public void setIllustrator(Illustrator illustrator);
+ /**
+ * input tuple mark up to be illustrate-able
+ * @param in input tuple
+ * @param out output tuple before wrapped in ExampleTuple
+ * @param eqClassIndex index into equivalence classes in illustrator
+ *
+ * @return tuple
+ */
+ public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex);
+}
Added: pig/trunk/src/org/apache/pig/pen/Illustrator.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/pen/Illustrator.java?rev=1045314&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/pen/Illustrator.java (added)
+++ pig/trunk/src/org/apache/pig/pen/Illustrator.java Mon Dec 13 19:11:00 2010
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.pen;
+
+import java.util.LinkedList;
+import java.util.ArrayList;
+
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.pen.util.LineageTracer;
+import org.apache.pig.impl.util.IdentityHashSet;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+
+/**
+ * Class used by physical operators to generate example tuples for the ILLUSTRATE
+ * purpose
+ */
+
+public class Illustrator {
+
+ private LineageTracer lineage;
+ private LinkedList<IdentityHashSet<Tuple>> equivalenceClasses;
+ // all input tuples for an expression
+ private IdentityHashSet<Tuple> inputs = null;
+
+ private DataBag data;
+ private int maxRecords = -1;
+ private int recCounter = 0;
+ private IllustratorAttacher attacher;
+ private ArrayList<Boolean[]> subExpResults;
+ private Boolean[] subExpResult;
+ private boolean eqClassesShared;
+ private long oriLimit = -1;
+ private Schema schema;
+
+ public Illustrator(LineageTracer lineage, LinkedList<IdentityHashSet<Tuple>> equivalenceClasses, IllustratorAttacher attacher, PigContext hadoopPigContext) {
+ this.lineage = lineage;
+ this.equivalenceClasses = equivalenceClasses;
+ data = BagFactory.getInstance().newDefaultBag();
+ this.attacher = attacher;
+ subExpResults = new ArrayList<Boolean[]>();
+ subExpResult = new Boolean[1];
+ schema = null;
+ }
+
+ public Illustrator(LineageTracer lineage, LinkedList<IdentityHashSet<Tuple>> equivalenceClasses, int maxRecords, IllustratorAttacher attacher,
+ Schema schema, PigContext hadoopPigContext) {
+ this(lineage, equivalenceClasses, attacher, hadoopPigContext);
+ this.maxRecords = maxRecords;
+ this.schema = schema;
+ }
+
+ public ArrayList<Boolean[]> getSubExpResults() {
+ return subExpResults;
+ }
+
+ Boolean[] getSubExpResult() {
+ return subExpResult;
+ }
+
+ public LineageTracer getLineage() {
+ return lineage;
+ }
+
+ public LinkedList<IdentityHashSet<Tuple>> getEquivalenceClasses() {
+ return equivalenceClasses;
+ }
+
+ public void setSubExpResult(boolean result) {
+ subExpResult[0] = result;
+ }
+
+ public void setEquivalenceClasses(LinkedList<IdentityHashSet<Tuple>> eqClasses, PhysicalOperator po) {
+ equivalenceClasses = eqClasses;
+ attacher.poToEqclassesMap.put(po, eqClasses);
+ }
+
+ public boolean ceilingCheck() {
+ if (maxRecords != -1 && ++recCounter > maxRecords)
+ return false;
+ else
+ return true;
+ }
+
+ public IdentityHashSet<Tuple> getInputs() {
+ return inputs;
+ }
+
+ public void addInputs(IdentityHashSet<Tuple> inputs) {
+ if (this.inputs == null)
+ this.inputs = new IdentityHashSet<Tuple>();
+ this.inputs.addAll(inputs);
+ }
+
+ public void addData(Tuple t) {
+ data.add(t);
+ }
+
+ public DataBag getData() {
+ return data;
+ }
+
+ public long getOriginalLimit() {
+ return oriLimit;
+ }
+
+ public void setOriginalLimit(long oriLimit) {
+ this.oriLimit = oriLimit;
+ }
+
+ public void setEqClassesShared() {
+ eqClassesShared = true;
+ }
+
+ public boolean getEqClassesShared() {
+ return eqClassesShared;
+ }
+
+ public Schema getSchema() {
+ return schema;
+ }
+}