You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ya...@apache.org on 2010/12/17 23:08:09 UTC
svn commit: r1050503 [1/2] - in /pig/trunk: src/org/apache/pig/
src/org/apache/pig/backend/hadoop/executionengine/
src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/
src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/ ...
Author: yanz
Date: Fri Dec 17 22:08:08 2010
New Revision: 1050503
URL: http://svn.apache.org/viewvc?rev=1050503&view=rev
Log:
PIG-1712: ILLUSTRATE rework (yanz)
Modified:
pig/trunk/src/org/apache/pig/PigServer.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhysicalPlan.java
pig/trunk/src/org/apache/pig/impl/logicalLayer/LOLimit.java
pig/trunk/src/org/apache/pig/impl/logicalLayer/LOUnion.java
pig/trunk/src/org/apache/pig/newplan/BaseOperatorPlan.java
pig/trunk/src/org/apache/pig/newplan/PlanEdge.java
pig/trunk/src/org/apache/pig/newplan/logical/relational/LOCogroup.java
pig/trunk/src/org/apache/pig/newplan/logical/relational/LOCross.java
pig/trunk/src/org/apache/pig/newplan/logical/relational/LODistinct.java
pig/trunk/src/org/apache/pig/newplan/logical/relational/LOFilter.java
pig/trunk/src/org/apache/pig/newplan/logical/relational/LOLimit.java
pig/trunk/src/org/apache/pig/newplan/logical/relational/LOLoad.java
pig/trunk/src/org/apache/pig/newplan/logical/relational/LOSort.java
pig/trunk/src/org/apache/pig/newplan/logical/relational/LOUnion.java
pig/trunk/src/org/apache/pig/newplan/logical/relational/LogicalPlan.java
pig/trunk/src/org/apache/pig/newplan/logical/relational/LogicalSchema.java
pig/trunk/src/org/apache/pig/pen/AugmentBaseDataVisitor.java
pig/trunk/src/org/apache/pig/pen/DerivedDataVisitor.java
pig/trunk/src/org/apache/pig/pen/EquivalenceClasses.java
pig/trunk/src/org/apache/pig/pen/ExampleGenerator.java
pig/trunk/src/org/apache/pig/pen/Illustrator.java
pig/trunk/src/org/apache/pig/pen/IllustratorAttacher.java
pig/trunk/src/org/apache/pig/pen/LineageTrimmingVisitor.java
pig/trunk/src/org/apache/pig/pen/LocalMapReduceSimulator.java
pig/trunk/src/org/apache/pig/pen/util/DisplayExamples.java
pig/trunk/src/org/apache/pig/pen/util/MetricEvaluation.java
pig/trunk/src/org/apache/pig/pen/util/PreOrderDepthFirstWalker.java
pig/trunk/test/org/apache/pig/test/TestExampleGenerator.java
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/MRC18.gld
Modified: pig/trunk/src/org/apache/pig/PigServer.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/PigServer.java?rev=1050503&r1=1050502&r2=1050503&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/PigServer.java (original)
+++ pig/trunk/src/org/apache/pig/PigServer.java Fri Dec 17 22:08:08 2010
@@ -102,6 +102,7 @@ import org.apache.pig.impl.util.Properti
import org.apache.pig.impl.util.Utils;
import org.apache.pig.newplan.logical.LogicalPlanMigrationVistor;
import org.apache.pig.newplan.logical.optimizer.LogicalPlanOptimizer;
+import org.apache.pig.newplan.Operator;
import org.apache.pig.pen.ExampleGenerator;
import org.apache.pig.scripting.ScriptEngine;
import org.apache.pig.tools.grunt.GruntParser;
@@ -1125,7 +1126,7 @@ public class PigServer {
return currDAG.getAliasOp().keySet();
}
- public Map<LogicalOperator, DataBag> getExamples(String alias) throws IOException {
+ public Map<Operator, DataBag> getExamples(String alias) throws IOException {
LogicalPlan plan = null;
try {
if (currDAG.isBatchOn() && alias != null) {
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java?rev=1050503&r1=1050502&r2=1050503&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java Fri Dec 17 22:08:08 2010
@@ -107,10 +107,10 @@ public class HExecutionEngine {
// map from LOGICAL key to into about the execution
protected Map<OperatorKey, MapRedResult> materializedResults;
- protected Map<LogicalOperator, PhysicalOperator> logToPhyMap;
protected Map<LogicalOperator, LogicalRelationalOperator> opsMap;
protected Map<Operator, PhysicalOperator> newLogToPhyMap;
private Map<LOForEach, Map<LogicalOperator, LogicalRelationalOperator>> forEachInnerOpMap;
+ private org.apache.pig.newplan.logical.relational.LogicalPlan newPreoptimizedPlan;
public HExecutionEngine(PigContext pigContext) {
this.pigContext = pigContext;
@@ -265,6 +265,8 @@ public class HExecutionEngine {
opsMap = visitor.getOldToNewLOOpMap();
forEachInnerOpMap = visitor.getForEachInnerMap();
org.apache.pig.newplan.logical.relational.LogicalPlan newPlan = visitor.getNewLogicalPlan();
+ newPreoptimizedPlan =
+ new org.apache.pig.newplan.logical.relational.LogicalPlan(newPlan);
if (pigContext.inIllustrator) {
// disable all PO-specific optimizations
@@ -342,33 +344,28 @@ public class HExecutionEngine {
}
}
- public Map<LogicalOperator, PhysicalOperator> getLogToPhyMap() {
- if (logToPhyMap != null)
- return logToPhyMap;
- else if (newLogToPhyMap != null) {
- Map<LogicalOperator, PhysicalOperator> result = new HashMap<LogicalOperator, PhysicalOperator>();
- for (LogicalOperator lo: opsMap.keySet()) {
- result.put(lo, newLogToPhyMap.get(opsMap.get(lo)));
- }
- return result;
- } else
- return null;
- }
-
- public Map<LOForEach, Map<LogicalOperator, PhysicalOperator>> getForEachInnerLogToPhyMap() {
- Map<LOForEach, Map<LogicalOperator, PhysicalOperator>> result =
- new HashMap<LOForEach, Map<LogicalOperator, PhysicalOperator>>();
+ public Map<Operator, PhysicalOperator> getLogToPhyMap() {
+ return newLogToPhyMap;
+ }
+
+ public Map<org.apache.pig.newplan.logical.relational.LOForEach, Map<LogicalRelationalOperator, PhysicalOperator>> getForEachInnerLogToPhyMap() {
+ Map<org.apache.pig.newplan.logical.relational.LOForEach, Map<LogicalRelationalOperator, PhysicalOperator>> result =
+ new HashMap<org.apache.pig.newplan.logical.relational.LOForEach, Map<LogicalRelationalOperator, PhysicalOperator>>();
for (Map.Entry<LOForEach, Map<LogicalOperator, LogicalRelationalOperator>> entry :
forEachInnerOpMap.entrySet()) {
- Map<LogicalOperator, PhysicalOperator> innerOpMap = new HashMap<LogicalOperator, PhysicalOperator>();
+ Map<LogicalRelationalOperator, PhysicalOperator> innerOpMap = new HashMap<LogicalRelationalOperator, PhysicalOperator>();
for (Map.Entry<LogicalOperator, LogicalRelationalOperator> innerEntry : entry.getValue().entrySet()) {
- innerOpMap.put(innerEntry.getKey(), newLogToPhyMap.get(innerEntry.getValue()));
+ innerOpMap.put(innerEntry.getValue(), newLogToPhyMap.get(innerEntry.getValue()));
}
- result.put(entry.getKey(), innerOpMap);
+ result.put((org.apache.pig.newplan.logical.relational.LOForEach) (opsMap.get(entry.getKey())), innerOpMap);
}
return result;
}
+ public org.apache.pig.newplan.logical.relational.LogicalPlan getNewPlan() {
+ return newPreoptimizedPlan;
+ }
+
public static class SortInfoSetter extends LogicalRelationalNodesVisitor {
public SortInfoSetter(OperatorPlan plan) throws FrontendException {
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java?rev=1050503&r1=1050502&r2=1050503&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java Fri Dec 17 22:08:08 2010
@@ -524,7 +524,7 @@ public class MapReduceLauncher extends L
if (isMultiQuery) {
// reduces the number of MROpers in the MR plan generated
// by multi-query (multi-store) script.
- MultiQueryOptimizer mqOptimizer = new MultiQueryOptimizer(plan);
+ MultiQueryOptimizer mqOptimizer = new MultiQueryOptimizer(plan, pc.inIllustrator);
mqOptimizer.visit();
}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java?rev=1050503&r1=1050502&r2=1050503&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java Fri Dec 17 22:08:08 2010
@@ -20,8 +20,6 @@ package org.apache.pig.backend.hadoop.ex
import java.io.ByteArrayOutputStream;
import java.util.HashSet;
import java.util.Set;
-import java.util.Map;
-import java.util.HashMap;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.NodeIdGenerator;
@@ -32,6 +30,7 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.impl.plan.Operator;
import org.apache.pig.impl.plan.PlanException;
import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.util.MultiMap;
/**
* An operator model for a Map Reduce job.
@@ -150,7 +149,7 @@ public class MapReduceOper extends Opera
// Map of the physical operator in physical plan to the one in MR plan: only needed
// if the physical operator is changed/replaced in MR compilation due to, e.g., optimization
- public Map<PhysicalOperator, PhysicalOperator> phyToMRMap;
+ public MultiMap<PhysicalOperator, PhysicalOperator> phyToMRMap;
private static enum OPER_FEATURE {
NONE,
@@ -175,7 +174,7 @@ public class MapReduceOper extends Opera
scalars = new HashSet<PhysicalOperator>();
nig = NodeIdGenerator.getGenerator();
scope = k.getScope();
- phyToMRMap = new HashMap<PhysicalOperator, PhysicalOperator>();
+ phyToMRMap = new MultiMap<PhysicalOperator, PhysicalOperator>();
}
/*@Override
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java?rev=1050503&r1=1050502&r2=1050503&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java Fri Dec 17 22:08:08 2010
@@ -21,6 +21,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.Iterator;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -77,12 +78,14 @@ class MultiQueryOptimizer extends MROpPl
private String scope;
- MultiQueryOptimizer(MROperPlan plan) {
+ private boolean inIllustrator = false;
+
+ MultiQueryOptimizer(MROperPlan plan, boolean inIllustrator) {
super(plan, new ReverseDependencyOrderWalker<MapReduceOper, MROperPlan>(plan));
nig = NodeIdGenerator.getGenerator();
List<MapReduceOper> roots = plan.getRoots();
scope = roots.get(0).getOperatorKey().getScope();
-
+ this.inIllustrator = inIllustrator;
log.info("MR plan size before optimization: " + plan.size());
}
@@ -290,13 +293,28 @@ class MultiQueryOptimizer extends MROpPl
PhysicalOperator opSucc = succ.mapPlan.getSuccessors(op).get(0);
PhysicalPlan clone = null;
try {
+ if (inIllustrator)
+ pl.setOpMap(succ.phyToMRMap);
clone = pl.clone();
+ if (inIllustrator)
+ pl.resetOpMap();
} catch (CloneNotSupportedException e) {
int errCode = 2127;
String msg = "Internal Error: Cloning of plan failed for optimization.";
throw new OptimizerException(msg, errCode, PigException.BUG, e);
}
succ.mapPlan.remove(op);
+
+ if (inIllustrator) {
+ // need to remove the LOAD since data from load on temporary files can't be handled by illustrator
+ for (Iterator<PhysicalOperator> it = pl.iterator(); it.hasNext(); )
+ {
+ PhysicalOperator po = it.next();
+ if (po instanceof POLoad)
+ succ.phyToMRMap.removeKey(po);
+ }
+ }
+
while (!clone.isEmpty()) {
PhysicalOperator oper = clone.getLeaves().get(0);
clone.remove(oper);
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhysicalPlan.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhysicalPlan.java?rev=1050503&r1=1050502&r2=1050503&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhysicalPlan.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhysicalPlan.java Fri Dec 17 22:08:08 2010
@@ -24,7 +24,6 @@ import java.io.PrintStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -37,6 +36,7 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.impl.plan.OperatorPlan;
import org.apache.pig.impl.plan.PlanException;
import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.util.MultiMap;
/**
*
@@ -57,6 +57,8 @@ public class PhysicalPlan extends Operat
// and that there is no more input expected.
public boolean endOfAllInput = false;
+ private MultiMap<PhysicalOperator, PhysicalOperator> opmap = null;
+
public PhysicalPlan() {
super();
}
@@ -224,6 +226,8 @@ public class PhysicalPlan extends Operat
for (PhysicalOperator op : mOps.keySet()) {
PhysicalOperator c = op.clone();
clone.add(c);
+ if (opmap != null)
+ opmap.put(op, c);
matches.put(op, c);
}
@@ -296,6 +300,12 @@ public class PhysicalPlan extends Operat
return clone;
}
+ public void setOpMap(MultiMap<PhysicalOperator, PhysicalOperator> opmap) {
+ this.opmap = opmap;
+ }
-
+ public void resetOpMap()
+ {
+ opmap = null;
+ }
}
Modified: pig/trunk/src/org/apache/pig/impl/logicalLayer/LOLimit.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOLimit.java?rev=1050503&r1=1050502&r2=1050503&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/logicalLayer/LOLimit.java (original)
+++ pig/trunk/src/org/apache/pig/impl/logicalLayer/LOLimit.java Fri Dec 17 22:08:08 2010
@@ -52,6 +52,10 @@ public class LOLimit extends RelationalO
return mPlan.getPredecessors(this).get(0);
}
+ public LogicalOperator getInput(LogicalPlan plan) {
+ return plan.getPredecessors(this).get(0);
+ }
+
public long getLimit() {
return mLimit;
}
Modified: pig/trunk/src/org/apache/pig/impl/logicalLayer/LOUnion.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOUnion.java?rev=1050503&r1=1050502&r2=1050503&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/logicalLayer/LOUnion.java (original)
+++ pig/trunk/src/org/apache/pig/impl/logicalLayer/LOUnion.java Fri Dec 17 22:08:08 2010
@@ -55,6 +55,10 @@ public class LOUnion extends RelationalO
return mPlan.getPredecessors(this);
}
+ public List<LogicalOperator> getInputs(LogicalPlan plan) {
+ return plan.getPredecessors(this);
+ }
+
@Override
public Schema getSchema() throws FrontendException {
if (!mIsSchemaComputed) {
Modified: pig/trunk/src/org/apache/pig/newplan/BaseOperatorPlan.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/BaseOperatorPlan.java?rev=1050503&r1=1050502&r2=1050503&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/BaseOperatorPlan.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/BaseOperatorPlan.java Fri Dec 17 22:08:08 2010
@@ -54,6 +54,18 @@ public abstract class BaseOperatorPlan i
softToEdges = new PlanEdge();
}
+ @SuppressWarnings("unchecked")
+ public BaseOperatorPlan(BaseOperatorPlan other) {
+ // (shallow) copy constructor
+ ops = (Set<Operator>) ((HashSet<Operator>) other.ops).clone();
+ roots = (List<Operator>) ((ArrayList) other.roots).clone();
+ leaves = (List<Operator>) ((ArrayList) other.leaves).clone();
+ fromEdges = other.fromEdges.shallowClone();
+ toEdges = other.toEdges.shallowClone();
+ softFromEdges = other.softFromEdges.shallowClone();
+ softToEdges = other.softToEdges.shallowClone();
+ }
+
/**
* Get number of nodes in the plan.
*/
Modified: pig/trunk/src/org/apache/pig/newplan/PlanEdge.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/PlanEdge.java?rev=1050503&r1=1050502&r2=1050503&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/PlanEdge.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/PlanEdge.java Fri Dec 17 22:08:08 2010
@@ -20,6 +20,7 @@ package org.apache.pig.newplan;
import java.util.ArrayList;
import java.util.Iterator;
+import java.util.Map;
import org.apache.pig.impl.util.MultiMap;
import org.apache.pig.impl.util.Pair;
@@ -100,4 +101,14 @@ public class PlanEdge extends MultiMap<O
return new Pair<Operator, Integer>(keeper, index);
}
+ public PlanEdge shallowClone() {
+ // shallow clone: elements not cloned
+ PlanEdge result = new PlanEdge();
+ for (Map.Entry<Operator, ArrayList<Operator>> entry : mMap.entrySet()) {
+ ArrayList<Operator> list = new ArrayList<Operator>();
+ list.addAll(entry.getValue());
+ result.put(entry.getKey(), list);
+ }
+ return result;
+ }
}
Modified: pig/trunk/src/org/apache/pig/newplan/logical/relational/LOCogroup.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOCogroup.java?rev=1050503&r1=1050502&r2=1050503&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/relational/LOCogroup.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/relational/LOCogroup.java Fri Dec 17 22:08:08 2010
@@ -292,4 +292,8 @@ public class LOCogroup extends LogicalRe
groupKeyUidOnlySchema = null;
generatedInputUids = new HashMap<Integer,Long>();
}
+
+ public List<Operator> getInputs(LogicalPlan plan) {
+ return plan.getPredecessors(this);
+ }
}
Modified: pig/trunk/src/org/apache/pig/newplan/logical/relational/LOCross.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOCross.java?rev=1050503&r1=1050502&r2=1050503&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/relational/LOCross.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/relational/LOCross.java Fri Dec 17 22:08:08 2010
@@ -21,6 +21,7 @@ import java.util.ArrayList;
import java.util.List;
import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.logicalLayer.LogicalOperator;
import org.apache.pig.newplan.Operator;
import org.apache.pig.newplan.PlanVisitor;
@@ -93,4 +94,8 @@ public class LOCross extends LogicalRela
return false;
}
}
+
+ public List<Operator> getInputs() {
+ return plan.getPredecessors(this);
+ }
}
Modified: pig/trunk/src/org/apache/pig/newplan/logical/relational/LODistinct.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/relational/LODistinct.java?rev=1050503&r1=1050502&r2=1050503&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/relational/LODistinct.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/relational/LODistinct.java Fri Dec 17 22:08:08 2010
@@ -59,4 +59,8 @@ public class LODistinct extends LogicalR
return false;
}
}
+
+ public Operator getInput(LogicalPlan plan) {
+ return plan.getPredecessors(this).get(0);
+ }
}
Modified: pig/trunk/src/org/apache/pig/newplan/logical/relational/LOFilter.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOFilter.java?rev=1050503&r1=1050502&r2=1050503&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/relational/LOFilter.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/relational/LOFilter.java Fri Dec 17 22:08:08 2010
@@ -73,5 +73,9 @@ public class LOFilter extends LogicalRel
return false;
}
}
+
+ public Operator getInput(LogicalPlan plan) {
+ return plan.getPredecessors(this).get(0);
+ }
}
Modified: pig/trunk/src/org/apache/pig/newplan/logical/relational/LOLimit.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOLimit.java?rev=1050503&r1=1050502&r2=1050503&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/relational/LOLimit.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/relational/LOLimit.java Fri Dec 17 22:08:08 2010
@@ -18,6 +18,7 @@
package org.apache.pig.newplan.logical.relational;
import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.logicalLayer.LogicalOperator;
import org.apache.pig.newplan.Operator;
import org.apache.pig.newplan.PlanVisitor;
@@ -69,4 +70,8 @@ public class LOLimit extends LogicalRela
else
return false;
}
+
+ public Operator getInput(LogicalPlan plan) {
+ return plan.getPredecessors(this).get(0);
+ }
}
Modified: pig/trunk/src/org/apache/pig/newplan/logical/relational/LOLoad.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOLoad.java?rev=1050503&r1=1050502&r2=1050503&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/relational/LOLoad.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/relational/LOLoad.java Fri Dec 17 22:08:08 2010
@@ -45,6 +45,7 @@ public class LOLoad extends LogicalRelat
private List<Integer> requiredFields = null;
private boolean castInserted = false;
private LogicalSchema uidOnlySchema;
+ private String schemaFile = null;
/**
*
@@ -57,9 +58,15 @@ public class LOLoad extends LogicalRelat
super("LOLoad", plan);
scriptSchema = schema;
fs = loader;
+ if (loader != null)
+ schemaFile = loader.getFileName();
this.conf = conf;
}
+ public String getSchemaFile() {
+ return schemaFile;
+ }
+
public LoadFunc getLoadFunc() throws FrontendException {
try {
if (loadFunc == null && fs!=null) {
Modified: pig/trunk/src/org/apache/pig/newplan/logical/relational/LOSort.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOSort.java?rev=1050503&r1=1050502&r2=1050503&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/relational/LOSort.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/relational/LOSort.java Fri Dec 17 22:08:08 2010
@@ -25,6 +25,7 @@ import org.apache.pig.FuncSpec;
import org.apache.pig.SortColInfo;
import org.apache.pig.SortInfo;
import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.logicalLayer.LogicalOperator;
import org.apache.pig.newplan.Operator;
import org.apache.pig.newplan.OperatorPlan;
import org.apache.pig.newplan.PlanVisitor;
@@ -157,4 +158,8 @@ public class LOSort extends LogicalRelat
}
return checkEquality((LogicalRelationalOperator)other);
}
+
+ public Operator getInput(LogicalPlan plan) {
+ return plan.getPredecessors(this).get(0);
+ }
}
Modified: pig/trunk/src/org/apache/pig/newplan/logical/relational/LOUnion.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOUnion.java?rev=1050503&r1=1050502&r2=1050503&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/relational/LOUnion.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/relational/LOUnion.java Fri Dec 17 22:08:08 2010
@@ -125,4 +125,12 @@ public class LOUnion extends LogicalRela
public void resetUid() {
uidMapping = new ArrayList<Pair<Long, Long>>();
}
+
+ public List<Operator> getInputs() {
+ return plan.getPredecessors(this);
+ }
+
+ public List<Operator> getInputs(LogicalPlan plan) {
+ return plan.getPredecessors(this);
+ }
}
Modified: pig/trunk/src/org/apache/pig/newplan/logical/relational/LogicalPlan.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/relational/LogicalPlan.java?rev=1050503&r1=1050502&r2=1050503&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/relational/LogicalPlan.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/relational/LogicalPlan.java Fri Dec 17 22:08:08 2010
@@ -19,6 +19,7 @@
package org.apache.pig.newplan.logical.relational;
import java.io.PrintStream;
+import java.util.HashSet;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.newplan.BaseOperatorPlan;
@@ -32,6 +33,15 @@ import org.apache.pig.newplan.logical.op
* each relational operator.
*/
public class LogicalPlan extends BaseOperatorPlan {
+
+ public LogicalPlan(LogicalPlan other) {
+ // shallow copy constructor
+ super(other);
+ }
+
+ public LogicalPlan() {
+ super();
+ }
/**
* Equality is checked by calling equals on every leaf in the plan. This
Modified: pig/trunk/src/org/apache/pig/newplan/logical/relational/LogicalSchema.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/relational/LogicalSchema.java?rev=1050503&r1=1050502&r2=1050503&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/relational/LogicalSchema.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/relational/LogicalSchema.java Fri Dec 17 22:08:08 2010
@@ -80,12 +80,12 @@ public class LogicalSchema {
if( schema == null ) {
return ( alias + uidString + ":bag{}" );
}
- return ( alias + uidString + ":bag{" + schema.toString() + "}" );
+ return ( alias + uidString + ":bag{" + schema.toString(verbose) + "}" );
} else if( type == DataType.TUPLE ) {
if( schema == null ) {
return ( alias + uidString + ":tuple{}" );
}
- return ( alias + uidString + ":tuple(" + schema.toString() + ")" );
+ return ( alias + uidString + ":tuple(" + schema.toString(verbose) + ")" );
}
return ( alias + uidString + ":" + DataType.findTypeName(type) );
}
Modified: pig/trunk/src/org/apache/pig/pen/AugmentBaseDataVisitor.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/pen/AugmentBaseDataVisitor.java?rev=1050503&r1=1050502&r2=1050503&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/pen/AugmentBaseDataVisitor.java (original)
+++ pig/trunk/src/org/apache/pig/pen/AugmentBaseDataVisitor.java Fri Dec 17 22:08:08 2010
@@ -26,6 +26,7 @@ import java.util.List;
import java.util.Map;
import java.util.HashSet;
import java.util.Set;
+import java.util.Collection;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -37,71 +38,75 @@ import org.apache.pig.data.DataByteArray
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
-import org.apache.pig.impl.logicalLayer.BinaryExpressionOperator;
-import org.apache.pig.impl.logicalLayer.ExpressionOperator;
+import org.apache.pig.newplan.logical.expression.BinaryExpression;
import org.apache.pig.impl.logicalLayer.FrontendException;
-import org.apache.pig.impl.logicalLayer.LOAdd;
-import org.apache.pig.impl.logicalLayer.LOAnd;
-import org.apache.pig.impl.logicalLayer.LOCast;
-import org.apache.pig.impl.logicalLayer.LOCogroup;
-import org.apache.pig.impl.logicalLayer.LOLimit;
-import org.apache.pig.impl.logicalLayer.LOConst;
-import org.apache.pig.impl.logicalLayer.LOCross;
-import org.apache.pig.impl.logicalLayer.LODistinct;
-import org.apache.pig.impl.logicalLayer.LODivide;
-import org.apache.pig.impl.logicalLayer.LOEqual;
-import org.apache.pig.impl.logicalLayer.LOFilter;
-import org.apache.pig.impl.logicalLayer.LOForEach;
-import org.apache.pig.impl.logicalLayer.LOGreaterThan;
-import org.apache.pig.impl.logicalLayer.LOGreaterThanEqual;
-import org.apache.pig.impl.logicalLayer.LOLesserThan;
-import org.apache.pig.impl.logicalLayer.LOLesserThanEqual;
-import org.apache.pig.impl.logicalLayer.LOLoad;
-import org.apache.pig.impl.logicalLayer.LOMod;
-import org.apache.pig.impl.logicalLayer.LOMultiply;
-import org.apache.pig.impl.logicalLayer.LONot;
-import org.apache.pig.impl.logicalLayer.LONotEqual;
-import org.apache.pig.impl.logicalLayer.LOOr;
-import org.apache.pig.impl.logicalLayer.LOProject;
-import org.apache.pig.impl.logicalLayer.LORegexp;
-import org.apache.pig.impl.logicalLayer.LOSort;
-import org.apache.pig.impl.logicalLayer.LOSplit;
-import org.apache.pig.impl.logicalLayer.LOStore;
-import org.apache.pig.impl.logicalLayer.LOSubtract;
-import org.apache.pig.impl.logicalLayer.LOUnion;
-import org.apache.pig.impl.logicalLayer.LOVisitor;
-import org.apache.pig.impl.logicalLayer.LogicalOperator;
-import org.apache.pig.impl.logicalLayer.LogicalPlan;
-import org.apache.pig.impl.logicalLayer.schema.Schema;
-import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.newplan.logical.expression.AddExpression;
+import org.apache.pig.newplan.logical.expression.AndExpression;
+import org.apache.pig.newplan.logical.expression.CastExpression;
+import org.apache.pig.newplan.logical.expression.LogicalExpressionPlan;
+import org.apache.pig.newplan.logical.relational.LOCogroup;
+import org.apache.pig.newplan.logical.relational.LOLimit;
+import org.apache.pig.newplan.logical.expression.ConstantExpression;
+import org.apache.pig.newplan.logical.relational.LogicalRelationalOperator;
+import org.apache.pig.newplan.logical.relational.LOCross;
+import org.apache.pig.newplan.logical.relational.LODistinct;
+import org.apache.pig.newplan.logical.expression.DivideExpression;
+import org.apache.pig.newplan.logical.expression.EqualExpression;
+import org.apache.pig.newplan.logical.relational.LOFilter;
+import org.apache.pig.newplan.logical.relational.LOForEach;
+import org.apache.pig.newplan.logical.expression.GreaterThanExpression;
+import org.apache.pig.newplan.logical.expression.GreaterThanEqualExpression;
+import org.apache.pig.newplan.logical.expression.LessThanExpression;
+import org.apache.pig.newplan.logical.expression.LessThanEqualExpression;
+import org.apache.pig.newplan.logical.relational.LOLoad;
+import org.apache.pig.newplan.logical.relational.LogicalPlan;
+import org.apache.pig.newplan.logical.expression.ModExpression;
+import org.apache.pig.newplan.logical.expression.LogicalExpression;
+import org.apache.pig.newplan.logical.expression.MultiplyExpression;
+import org.apache.pig.newplan.logical.expression.NotExpression;
+import org.apache.pig.newplan.logical.expression.NotEqualExpression;
+import org.apache.pig.newplan.logical.expression.OrExpression;
+import org.apache.pig.newplan.logical.expression.ProjectExpression;
+import org.apache.pig.newplan.logical.expression.RegexExpression;
+import org.apache.pig.newplan.logical.relational.LOSort;
+import org.apache.pig.newplan.logical.relational.LOSplit;
+import org.apache.pig.newplan.logical.relational.LOStore;
+import org.apache.pig.newplan.logical.expression.SubtractExpression;
+import org.apache.pig.newplan.logical.relational.LOUnion;
+import org.apache.pig.newplan.OperatorPlan;
+import org.apache.pig.newplan.Operator;
+import org.apache.pig.newplan.logical.relational.LogicalSchema;
+import org.apache.pig.newplan.logical.relational.LogicalRelationalNodesVisitor;
import org.apache.pig.pen.util.ExampleTuple;
import org.apache.pig.pen.util.PreOrderDepthFirstWalker;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLimit;
+import org.apache.pig.impl.util.MultiMap;
+import org.apache.pig.impl.io.FileSpec;
//This is used to generate synthetic data
//Synthetic data generation is done by making constraint tuples for each operator as we traverse the plan
//and try to replace the constraints with values as far as possible. We only deal with simple conditions right now
-public class AugmentBaseDataVisitor extends LOVisitor {
+public class AugmentBaseDataVisitor extends LogicalRelationalNodesVisitor {
Map<LOLoad, DataBag> baseData = null;
Map<LOLoad, DataBag> newBaseData = new HashMap<LOLoad, DataBag>();
- Map<LogicalOperator, DataBag> derivedData = null;
+ Map<Operator, DataBag> derivedData = null;
private boolean limit = false;
- private final Map<LogicalOperator, PhysicalOperator> logToPhysMap;
+ private final Map<Operator, PhysicalOperator> logToPhysMap;
private Map<LOLimit, Long> oriLimitMap;
- Map<LogicalOperator, DataBag> outputConstraintsMap = new HashMap<LogicalOperator, DataBag>();
+ Map<Operator, DataBag> outputConstraintsMap = new HashMap<Operator, DataBag>();
Log log = LogFactory.getLog(getClass());
// Augmentation moves from the leaves to root and hence needs a
// depthfirstwalker
- public AugmentBaseDataVisitor(LogicalPlan plan,
- Map<LogicalOperator, PhysicalOperator> logToPhysMap,
+ public AugmentBaseDataVisitor(OperatorPlan plan,
+ Map<Operator, PhysicalOperator> logToPhysMap,
Map<LOLoad, DataBag> baseData,
- Map<LogicalOperator, DataBag> derivedData) {
- super(plan, new PreOrderDepthFirstWalker<LogicalOperator, LogicalPlan>(
+ Map<Operator, DataBag> derivedData) throws FrontendException {
+ super(plan, new PreOrderDepthFirstWalker(
plan));
this.baseData = baseData;
this.derivedData = derivedData;
@@ -112,7 +117,43 @@ public class AugmentBaseDataVisitor exte
limit = true;
}
- public Map<LOLoad, DataBag> getNewBaseData() {
+ public Map<LOLoad, DataBag> getNewBaseData() throws ExecException {
+ // consolidate base data from different LOADs on the same inputs
+ MultiMap<FileSpec, DataBag> inputDataMap = new MultiMap<FileSpec, DataBag>();
+ for (Map.Entry<LOLoad, DataBag> e : newBaseData.entrySet()) {
+ inputDataMap.put(e.getKey().getFileSpec(), e.getValue());
+ }
+
+ int index = 0;
+ for (FileSpec fs : inputDataMap.keySet()) {
+ int maxSchemaSize = 0;
+ Tuple tupleOfMaxSchemaSize = null;
+ for (DataBag bag : inputDataMap.get(fs)) {
+ if (bag.size() > 0) {
+ int size = 0;
+ Tuple t = null;
+ t = bag.iterator().next();
+ size = t.size();
+ if (size > maxSchemaSize) {
+ maxSchemaSize = size;
+ tupleOfMaxSchemaSize = t;
+ }
+ }
+ }
+ for (DataBag bag : inputDataMap.get(fs)) {
+ if (bag.size() > 0) {
+ for (Iterator<Tuple> it = bag.iterator(); it.hasNext();) {
+ Tuple t = it.next();
+ for (int i = t.size(); i < maxSchemaSize; ++i) {
+ t.append(tupleOfMaxSchemaSize.get(i));
+ }
+ }
+ }
+ }
+ index++;
+ }
+
+
for (Map.Entry<LOLoad, DataBag> e : baseData.entrySet()) {
DataBag bag = newBaseData.get(e.getKey());
if (bag == null) {
@@ -129,8 +170,8 @@ public class AugmentBaseDataVisitor exte
}
@Override
- protected void visit(LOCogroup cg) throws VisitorException {
- if (limit && !((PreOrderDepthFirstWalker<LogicalOperator, LogicalPlan>) mCurrentWalker).getBranchFlag())
+ public void visit(LOCogroup cg) throws FrontendException {
+ if (limit && !((PreOrderDepthFirstWalker) currentWalker).getBranchFlag())
return;
// we first get the outputconstraints for the current cogroup
DataBag outputConstraints = outputConstraintsMap.get(cg);
@@ -141,16 +182,14 @@ public class AugmentBaseDataVisitor exte
List<List<Integer>> groupSpecs = new LinkedList<List<Integer>>();
int numCols = -1;
- int minGroupSize = (cg.getInputs().size() == 1) ? 1 : 2;
-
- for (LogicalOperator op : cg.getInputs()) {
- List<LogicalPlan> groupByPlans = (List<LogicalPlan>) cg
- .getGroupByPlans().get(op);
+ for (int index = 0; index < cg.getInputs((LogicalPlan)plan).size(); ++index) {
+ Collection<LogicalExpressionPlan> groupByPlans = (List<LogicalExpressionPlan>) cg
+ .getExpressionPlans().get(index);
List<Integer> groupCols = new ArrayList<Integer>();
- for (LogicalPlan plan : groupByPlans) {
- LogicalOperator leaf = plan.getLeaves().get(0);
- if (leaf instanceof LOProject) {
- groupCols.add(((LOProject) leaf).getCol());
+ for (LogicalExpressionPlan plan : groupByPlans) {
+ Operator leaf = plan.getSinks().get(0);
+ if (leaf instanceof ProjectExpression) {
+ groupCols.add(Integer.valueOf(((ProjectExpression) leaf).getColNum()));
} else {
ableToHandle = false;
break;
@@ -173,7 +212,7 @@ public class AugmentBaseDataVisitor exte
try {
if (ableToHandle) {
// we need to go through the output constraints first
- int numInputs = cg.getInputs().size();
+ int numInputs = cg.getInputs((LogicalPlan) plan).size();
if (outputConstraints != null) {
for (Iterator<Tuple> it = outputConstraints.iterator(); it
.hasNext();) {
@@ -182,19 +221,19 @@ public class AugmentBaseDataVisitor exte
for (int input = 0; input < numInputs; input++) {
- int numInputFields = cg.getInputs().get(input)
+ int numInputFields = ((LogicalRelationalOperator) cg.getInputs((LogicalPlan) plan).get(input))
.getSchema().size();
List<Integer> groupCols = groupSpecs.get(input);
DataBag output = outputConstraintsMap.get(cg
- .getInputs().get(input));
+ .getInputs((LogicalPlan) plan).get(input));
if (output == null) {
output = BagFactory.getInstance()
.newDefaultBag();
- outputConstraintsMap.put(cg.getInputs().get(
+ outputConstraintsMap.put(cg.getInputs((LogicalPlan) plan).get(
input), output);
}
- for (int i = 0; i < minGroupSize; i++) {
+ for (int i = 0; i < 2; i++) {
Tuple inputConstraint = GetGroupByInput(
groupLabel, groupCols, numInputFields);
if (inputConstraint != null)
@@ -212,18 +251,18 @@ public class AugmentBaseDataVisitor exte
Object groupLabel = groupTup.get(0);
for (int input = 0; input < numInputs; input++) {
- int numInputFields = cg.getInputs().get(input)
+ int numInputFields = ((LogicalRelationalOperator)cg.getInputs((LogicalPlan) plan).get(input))
.getSchema().size();
List<Integer> groupCols = groupSpecs.get(input);
DataBag output = outputConstraintsMap.get(cg
- .getInputs().get(input));
+ .getInputs((LogicalPlan) plan).get(input));
if (output == null) {
output = BagFactory.getInstance().newDefaultBag();
- outputConstraintsMap.put(cg.getInputs().get(input),
+ outputConstraintsMap.put(cg.getInputs((LogicalPlan) plan).get(input),
output);
}
- int numTupsToAdd = minGroupSize
+ int numTupsToAdd = 2
- (int) ((DataBag) groupTup.get(input + 1))
.size();
for (int i = 0; i < numTupsToAdd; i++) {
@@ -239,29 +278,29 @@ public class AugmentBaseDataVisitor exte
log
.error("Error visiting Cogroup during Augmentation phase of Example Generator! "
+ e.getMessage());
- throw new VisitorException(
+ throw new FrontendException(
"Error visiting Cogroup during Augmentation phase of Example Generator! "
+ e.getMessage());
}
}
@Override
- protected void visit(LOCross cs) throws VisitorException {
+ public void visit(LOCross cs) throws FrontendException {
}
@Override
- protected void visit(LODistinct dt) throws VisitorException {
- if (limit && !((PreOrderDepthFirstWalker<LogicalOperator, LogicalPlan>) mCurrentWalker).getBranchFlag())
+ public void visit(LODistinct dt) throws FrontendException {
+ if (limit && !((PreOrderDepthFirstWalker) currentWalker).getBranchFlag())
return;
DataBag outputConstraints = outputConstraintsMap.get(dt);
outputConstraintsMap.remove(dt);
- DataBag inputConstraints = outputConstraintsMap.get(dt.getInput());
+ DataBag inputConstraints = outputConstraintsMap.get(dt.getInput((LogicalPlan) plan));
if (inputConstraints == null) {
inputConstraints = BagFactory.getInstance().newDefaultBag();
- outputConstraintsMap.put(dt.getInput(), inputConstraints);
+ outputConstraintsMap.put(dt.getInput((LogicalPlan) plan), inputConstraints);
}
if (outputConstraints != null && outputConstraints.size() > 0) {
@@ -273,7 +312,7 @@ public class AugmentBaseDataVisitor exte
boolean emptyInputConstraints = inputConstraints.size() == 0;
if (emptyInputConstraints) {
- DataBag inputData = derivedData.get(dt.getInput());
+ DataBag inputData = derivedData.get(dt.getInput((LogicalPlan) plan));
for (Iterator<Tuple> it = inputData.iterator(); it.hasNext();)
{
inputConstraints.add(it.next());
@@ -300,22 +339,22 @@ public class AugmentBaseDataVisitor exte
}
@Override
- protected void visit(LOFilter filter) throws VisitorException {
- if (limit && !((PreOrderDepthFirstWalker<LogicalOperator, LogicalPlan>) mCurrentWalker).getBranchFlag())
+ public void visit(LOFilter filter) throws FrontendException {
+ if (limit && !((PreOrderDepthFirstWalker) currentWalker).getBranchFlag())
return;
DataBag outputConstraints = outputConstraintsMap.get(filter);
outputConstraintsMap.remove(filter);
- LogicalPlan filterCond = filter.getComparisonPlan();
- DataBag inputConstraints = outputConstraintsMap.get(filter.getInput());
+ LogicalExpressionPlan filterCond = filter.getFilterPlan();
+ DataBag inputConstraints = outputConstraintsMap.get(filter.getInput((LogicalPlan) plan));
if (inputConstraints == null) {
inputConstraints = BagFactory.getInstance().newDefaultBag();
- outputConstraintsMap.put(filter.getInput(), inputConstraints);
+ outputConstraintsMap.put(filter.getInput((LogicalPlan) plan), inputConstraints);
}
DataBag outputData = derivedData.get(filter);
- DataBag inputData = derivedData.get(filter.getInput());
+ DataBag inputData = derivedData.get(filter.getInput((LogicalPlan) plan));
try {
if (outputConstraints != null && outputConstraints.size() > 0) { // there
// 's
@@ -365,19 +404,19 @@ public class AugmentBaseDataVisitor exte
log
.error("Error visiting Load during Augmentation phase of Example Generator! "
+ e.getMessage());
- throw new VisitorException(
+ throw new FrontendException(
"Error visiting Load during Augmentation phase of Example Generator! "
+ e.getMessage());
}
}
@Override
- protected void visit(LOForEach forEach) throws VisitorException {
- if (limit && !((PreOrderDepthFirstWalker<LogicalOperator, LogicalPlan>) mCurrentWalker).getBranchFlag())
+ public void visit(LOForEach forEach) throws FrontendException {
+ if (limit && !((PreOrderDepthFirstWalker) currentWalker).getBranchFlag())
return;
DataBag outputConstraints = outputConstraintsMap.get(forEach);
outputConstraintsMap.remove(forEach);
- List<LogicalPlan> plans = forEach.getForEachPlans();
+ LogicalPlan plan = forEach.getInnerPlan();
boolean ableToHandle = true;
List<Integer> cols = new ArrayList<Integer>();
boolean cast = false;
@@ -386,20 +425,18 @@ public class AugmentBaseDataVisitor exte
// we dont have to do anything in this case
return;
- for (LogicalPlan plan : plans) {
- LogicalOperator op = plan.getLeaves().get(0);
- if (op instanceof LOCast) {
+
+ Operator op = plan.getSinks().get(0);
+ if (op instanceof CastExpression) {
cast = true;
- op = ((LOCast) op).getExpression();
+ op = ((CastExpression) op).getExpression();
}
- if (!(op instanceof LOProject)) {
+ if (!(op instanceof ProjectExpression)) {
ableToHandle = false;
- break;
} else {
- cols.add(((LOProject) op).getCol());
+ cols.add(Integer.valueOf(((ProjectExpression) op).getColNum()));
}
- }
if (ableToHandle) {
// we can only handle simple projections
@@ -409,26 +446,31 @@ public class AugmentBaseDataVisitor exte
Tuple outputConstraint = it.next();
try {
Tuple inputConstraint = BackPropConstraint(
- outputConstraint, cols, (forEach.getPlan()
- .getPredecessors(forEach)).get(0)
+ outputConstraint, cols, ((LogicalRelationalOperator)plan
+ .getPredecessors(forEach).get(0))
.getSchema(), cast);
output.add(inputConstraint);
} catch (Exception e) {
e.printStackTrace();
- throw new VisitorException(
+ throw new FrontendException(
"Operator error during Augmenting Phase in Example Generator "
+ e.getMessage());
}
}
- outputConstraintsMap.put(forEach.getPlan().getPredecessors(forEach)
+ outputConstraintsMap.put(plan.getPredecessors(forEach)
.get(0), output);
}
}
@Override
- protected void visit(LOLoad load) throws VisitorException {
+ public void visit(LOLoad load) throws FrontendException {
DataBag inputData = baseData.get(load);
+ // check if the inputData exists
+ if (inputData == null || inputData.size() == 0) {
+ log.error("No (valid) input data found!");
+ throw new RuntimeException("No (valid) input data found!");
+ }
DataBag newInputData = newBaseData.get(load);
if (newInputData == null) {
@@ -436,7 +478,7 @@ public class AugmentBaseDataVisitor exte
newBaseData.put(load, newInputData);
}
- Schema schema;
+ LogicalSchema schema;
try {
schema = load.getSchema();
if (schema == null)
@@ -447,17 +489,15 @@ public class AugmentBaseDataVisitor exte
log
.error("Error visiting Load during Augmentation phase of Example Generator! "
+ e.getMessage());
- throw new VisitorException(
+ throw new FrontendException(
"Error visiting Load during Augmentation phase of Example Generator! "
+ e.getMessage());
}
+
+ Tuple exampleTuple = inputData.iterator().next();
+
DataBag outputConstraints = outputConstraintsMap.get(load);
outputConstraintsMap.remove(load);
- // check if the inputData exists
- if (inputData == null || inputData.size() == 0) {
- log.error("No (valid) input data found!");
- throw new RuntimeException("No (valid) input data found!");
- }
// first of all, we are required to guarantee that there is at least one
// output tuple
@@ -469,7 +509,6 @@ public class AugmentBaseDataVisitor exte
// create example tuple to steal values from when we encounter
// "don't care" fields (i.e. null fields)
- Tuple exampleTuple = inputData.iterator().next();
System.out.println(exampleTuple.toString());
// run through output constraints; for each one synthesize a tuple and
@@ -505,7 +544,7 @@ public class AugmentBaseDataVisitor exte
log
.error("Error visiting Load during Augmentation phase of Example Generator! "
+ e.getMessage());
- throw new VisitorException(
+ throw new FrontendException(
"Error visiting Load during Augmentation phase of Example Generator! "
+ e.getMessage());
@@ -521,24 +560,14 @@ public class AugmentBaseDataVisitor exte
newInput = true;
}
} catch (ExecException e) {
- throw new VisitorException(
+ throw new FrontendException(
"Error visiting Load during Augmentation phase of Example Generator! "
+ e.getMessage());
}
}
-
- if (newInput) {
- for (Map.Entry<LOLoad, DataBag> entry : newBaseData.entrySet()) {
- LOLoad otherLoad = entry.getKey();
- if (otherLoad != load && otherLoad.getInputFile().equals(load.getInputFile())) {
- // different load sharing the same input file
- entry.getValue().addAll(newInputData);
- }
- }
- }
}
- private boolean inInput(Tuple newTuple, DataBag input, Schema schema) throws ExecException {
+ private boolean inInput(Tuple newTuple, DataBag input, LogicalSchema schema) throws ExecException {
boolean result;
for (Iterator<Tuple> iter = input.iterator(); iter.hasNext();) {
result = true;
@@ -556,43 +585,43 @@ public class AugmentBaseDataVisitor exte
}
@Override
- protected void visit(LOSort s) throws VisitorException {
- if (limit && !((PreOrderDepthFirstWalker<LogicalOperator, LogicalPlan>) mCurrentWalker).getBranchFlag())
+ public void visit(LOSort s) throws FrontendException {
+ if (limit && !((PreOrderDepthFirstWalker) currentWalker).getBranchFlag())
return;
DataBag outputConstraints = outputConstraintsMap.get(s);
outputConstraintsMap.remove(s);
if (outputConstraints == null)
- outputConstraintsMap.put(s.getInput(), BagFactory.getInstance()
+ outputConstraintsMap.put(s.getInput((LogicalPlan) plan), BagFactory.getInstance()
.newDefaultBag());
else
- outputConstraintsMap.put(s.getInput(), outputConstraints);
+ outputConstraintsMap.put(s.getInput((LogicalPlan) plan), outputConstraints);
}
@Override
- protected void visit(LOSplit split) throws VisitorException {
- if (limit && !((PreOrderDepthFirstWalker<LogicalOperator, LogicalPlan>) mCurrentWalker).getBranchFlag())
+ public void visit(LOSplit split) throws FrontendException {
+ if (limit && !((PreOrderDepthFirstWalker) currentWalker).getBranchFlag())
return;
}
@Override
- protected void visit(LOStore store) throws VisitorException {
- if (limit && !((PreOrderDepthFirstWalker<LogicalOperator, LogicalPlan>) mCurrentWalker).getBranchFlag())
+ public void visit(LOStore store) throws FrontendException {
+ if (limit && !((PreOrderDepthFirstWalker) currentWalker).getBranchFlag())
return;
DataBag outputConstraints = outputConstraintsMap.get(store);
if (outputConstraints == null) {
- outputConstraintsMap.put(store.getPlan().getPredecessors(store)
+ outputConstraintsMap.put(plan.getPredecessors(store)
.get(0), BagFactory.getInstance().newDefaultBag());
} else {
outputConstraintsMap.remove(store);
- outputConstraintsMap.put(store.getPlan().getPredecessors(store)
+ outputConstraintsMap.put(plan.getPredecessors(store)
.get(0), outputConstraints);
}
}
@Override
- protected void visit(LOUnion u) throws VisitorException {
- if (limit && !((PreOrderDepthFirstWalker<LogicalOperator, LogicalPlan>) mCurrentWalker).getBranchFlag())
+ public void visit(LOUnion u) throws FrontendException {
+ if (limit && !((PreOrderDepthFirstWalker) currentWalker).getBranchFlag())
return;
DataBag outputConstraints = outputConstraintsMap.get(u);
outputConstraintsMap.remove(u);
@@ -600,7 +629,7 @@ public class AugmentBaseDataVisitor exte
// we dont need to do anything
// we just find the inputs, create empty bags as their
// outputConstraints and return
- for (LogicalOperator op : u.getInputs()) {
+ for (Operator op : u.getInputs((LogicalPlan) plan)) {
DataBag constraints = BagFactory.getInstance().newDefaultBag();
outputConstraintsMap.put(op, constraints);
}
@@ -610,10 +639,10 @@ public class AugmentBaseDataVisitor exte
// since we have some outputConstraints, we apply them to the inputs
// round-robin
int count = 0;
- List<LogicalOperator> inputs = u.getInputs();
+ List<Operator> inputs = u.getInputs(((LogicalPlan) plan));
int noInputs = inputs.size();
- for (LogicalOperator op : inputs) {
+ for (Operator op : inputs) {
DataBag constraint = BagFactory.getInstance().newDefaultBag();
outputConstraintsMap.put(op, constraint);
}
@@ -626,7 +655,7 @@ public class AugmentBaseDataVisitor exte
}
@Override
- protected void visit(LOLimit lm) throws VisitorException {
+ public void visit(LOLimit lm) throws FrontendException {
if (!limit) // not augment for LIMIT in this traversal
return;
@@ -636,13 +665,13 @@ public class AugmentBaseDataVisitor exte
DataBag outputConstraints = outputConstraintsMap.get(lm);
outputConstraintsMap.remove(lm);
- DataBag inputConstraints = outputConstraintsMap.get(lm.getInput());
+ DataBag inputConstraints = outputConstraintsMap.get(lm.getInput((LogicalPlan) plan));
if (inputConstraints == null) {
inputConstraints = BagFactory.getInstance().newDefaultBag();
- outputConstraintsMap.put(lm.getInput(), inputConstraints);
+ outputConstraintsMap.put(lm.getInput((LogicalPlan) plan), inputConstraints);
}
- DataBag inputData = derivedData.get(lm.getInput());
+ DataBag inputData = derivedData.get(lm.getInput((LogicalPlan) plan));
if (outputConstraints != null && outputConstraints.size() > 0) { // there
// 's
@@ -662,7 +691,7 @@ public class AugmentBaseDataVisitor exte
// ... plus one more if only one
if (inputConstraints.size() == 1) {
inputConstraints.add(inputData.iterator().next());
- ((PreOrderDepthFirstWalker<LogicalOperator, LogicalPlan>) mCurrentWalker).setBranchFlag();
+ ((PreOrderDepthFirstWalker) currentWalker).setBranchFlag();
}
}
} else if (inputConstraints.size() == 0){
@@ -671,7 +700,7 @@ public class AugmentBaseDataVisitor exte
// ... plus one more if only one
if (inputConstraints.size() == 1) {
inputConstraints.add(inputData.iterator().next());
- ((PreOrderDepthFirstWalker<LogicalOperator, LogicalPlan>) mCurrentWalker).setBranchFlag();
+ ((PreOrderDepthFirstWalker) currentWalker).setBranchFlag();
}
}
POLimit poLimit = (POLimit) logToPhysMap.get(lm);
@@ -700,7 +729,7 @@ public class AugmentBaseDataVisitor exte
}
Tuple BackPropConstraint(Tuple outputConstraint, List<Integer> cols,
- Schema inputSchema, boolean cast) throws ExecException {
+ LogicalSchema inputSchema, boolean cast) throws ExecException {
Tuple inputConst = TupleFactory.getInstance().newTuple(
inputSchema.getFields().size());
@@ -732,8 +761,8 @@ public class AugmentBaseDataVisitor exte
// predicate
// (or null if unable to find such a tuple)
- ExampleTuple GenerateMatchingTuple(Schema schema, LogicalPlan plan,
- boolean invert) throws ExecException {
+ ExampleTuple GenerateMatchingTuple(LogicalSchema schema, LogicalExpressionPlan plan,
+ boolean invert) throws FrontendException, ExecException {
return GenerateMatchingTuple(TupleFactory.getInstance().newTuple(
schema.getFields().size()), plan, invert);
}
@@ -753,39 +782,39 @@ public class AugmentBaseDataVisitor exte
// what predicate it wants satisfied in a given field)
//
- ExampleTuple GenerateMatchingTuple(Tuple constraint, LogicalPlan predicate,
- boolean invert) throws ExecException {
+ ExampleTuple GenerateMatchingTuple(Tuple constraint, LogicalExpressionPlan predicate,
+ boolean invert) throws ExecException, FrontendException {
Tuple t = TupleFactory.getInstance().newTuple(constraint.size());
ExampleTuple tOut = new ExampleTuple(t);
for (int i = 0; i < t.size(); i++)
tOut.set(i, constraint.get(i));
- GenerateMatchingTupleHelper(tOut, (ExpressionOperator) predicate
- .getLeaves().get(0), invert);
+ GenerateMatchingTupleHelper(tOut, predicate
+ .getSources().get(0), invert);
tOut.synthetic = true;
return tOut;
}
- void GenerateMatchingTupleHelper(Tuple t, ExpressionOperator pred,
- boolean invert) throws ExecException {
- if (pred instanceof BinaryExpressionOperator)
- GenerateMatchingTupleHelper(t, (BinaryExpressionOperator) pred,
+ void GenerateMatchingTupleHelper(Tuple t, Operator pred,
+ boolean invert) throws FrontendException, ExecException {
+ if (pred instanceof BinaryExpression)
+ GenerateMatchingTupleHelper(t, (BinaryExpression) pred,
invert);
- else if (pred instanceof LONot)
- GenerateMatchingTupleHelper(t, (LONot) pred, invert);
+ else if (pred instanceof NotExpression)
+ GenerateMatchingTupleHelper(t, (NotExpression) pred, invert);
else
- throw new ExecException("Unknown operator in filter predicate");
+ throw new FrontendException("Unknown operator in filter predicate");
}
- void GenerateMatchingTupleHelper(Tuple t, BinaryExpressionOperator pred,
- boolean invert) throws ExecException {
+ void GenerateMatchingTupleHelper(Tuple t, BinaryExpression pred,
+ boolean invert) throws FrontendException, ExecException {
- if (pred instanceof LOAnd) {
- GenerateMatchingTupleHelper(t, (LOAnd) pred, invert);
+ if (pred instanceof AndExpression) {
+ GenerateMatchingTupleHelper(t, (AndExpression) pred, invert);
return;
- } else if (pred instanceof LOOr) {
- GenerateMatchingTupleHelper(t, (LOOr) pred, invert);
+ } else if (pred instanceof OrExpression) {
+ GenerateMatchingTupleHelper(t, (OrExpression) pred, invert);
return;
}
@@ -798,27 +827,26 @@ public class AugmentBaseDataVisitor exte
int leftCol = -1, rightCol = -1;
- if (pred instanceof LOAdd || pred instanceof LOSubtract
- || pred instanceof LOMultiply || pred instanceof LODivide
- || pred instanceof LOMod || pred instanceof LORegexp)
+ if (pred instanceof AddExpression || pred instanceof SubtractExpression
+ || pred instanceof MultiplyExpression || pred instanceof DivideExpression
+ || pred instanceof ModExpression || pred instanceof RegexExpression)
return; // We don't try to work around these operators right now
- if (pred.getLhsOperand() instanceof LOConst) {
+ if (pred.getLhs() instanceof ConstantExpression) {
leftIsConst = true;
- leftConst = ((LOConst) (pred.getLhsOperand())).getValue();
+ leftConst = ((ConstantExpression) (pred.getLhs())).getValue();
} else {
- LogicalOperator lhs = pred.getLhsOperand();
- if (lhs instanceof LOCast)
- lhs = ((LOCast) lhs).getExpression();
- // if (!(pred.getLhsOperand() instanceof LOProject && ((LOProject)
+ LogicalExpression lhs = pred.getLhs();
+ if (lhs instanceof CastExpression)
+ lhs = ((CastExpression) lhs).getExpression();
+ // if (!(pred.getLhsOperand() instanceof ProjectExpression && ((ProjectExpression)
// pred
// .getLhsOperand()).getProjection().size() == 1))
// return; // too hard
- if (!(lhs instanceof LOProject && ((LOProject) lhs).getProjection()
- .size() == 1))
+ if (!(lhs instanceof ProjectExpression))
return;
- leftCol = ((LOProject) lhs).getCol();
- leftDataType = ((LOProject) lhs).getType();
+ leftCol = ((ProjectExpression) lhs).getColNum();
+ leftDataType = ((ProjectExpression) lhs).getType();
Object d = t.get(leftCol);
if (d != null) {
@@ -827,22 +855,21 @@ public class AugmentBaseDataVisitor exte
}
}
- if (pred.getRhsOperand() instanceof LOConst) {
+ if (pred.getRhs() instanceof ConstantExpression) {
rightIsConst = true;
- rightConst = ((LOConst) (pred.getRhsOperand())).getValue();
+ rightConst = ((ConstantExpression) (pred.getRhs())).getValue();
} else {
- LogicalOperator rhs = pred.getRhsOperand();
- if (rhs instanceof LOCast)
- rhs = ((LOCast) rhs).getExpression();
- // if (!(pred.getRhsOperand() instanceof LOProject && ((LOProject)
+ Operator rhs = pred.getRhs();
+ if (rhs instanceof CastExpression)
+ rhs = ((CastExpression) rhs).getExpression();
+ // if (!(pred.getRhsOperand() instanceof ProjectExpression && ((ProjectExpression)
// pred
// .getRhsOperand()).getProjection().size() == 1))
// return; // too hard
- if (!(rhs instanceof LOProject && ((LOProject) rhs).getProjection()
- .size() == 1))
+ if (!(rhs instanceof ProjectExpression))
return;
- rightCol = ((LOProject) rhs).getCol();
- rightDataType = ((LOProject) rhs).getType();
+ rightCol = ((ProjectExpression) rhs).getColNum();
+ rightDataType = ((ProjectExpression) rhs).getType();
Object d = t.get(rightCol);
if (d != null) {
@@ -858,7 +885,7 @@ public class AugmentBaseDataVisitor exte
// convert some nulls to constants
if (!invert) {
- if (pred instanceof LOEqual) {
+ if (pred instanceof EqualExpression) {
if (leftIsConst) {
t.set(rightCol, generateData(rightDataType, leftConst
.toString()));
@@ -869,7 +896,7 @@ public class AugmentBaseDataVisitor exte
t.set(leftCol, generateData(leftDataType, "0"));
t.set(rightCol, generateData(rightDataType, "0"));
}
- } else if (pred instanceof LONotEqual) {
+ } else if (pred instanceof NotEqualExpression) {
if (leftIsConst) {
t.set(rightCol, generateData(rightDataType,
GetUnequalValue(leftConst).toString()));
@@ -880,8 +907,8 @@ public class AugmentBaseDataVisitor exte
t.set(leftCol, generateData(leftDataType, "0"));
t.set(rightCol, generateData(rightDataType, "1"));
}
- } else if (pred instanceof LOGreaterThan
- || pred instanceof LOGreaterThanEqual) {
+ } else if (pred instanceof GreaterThanExpression
+ || pred instanceof GreaterThanEqualExpression) {
if (leftIsConst) {
t.set(rightCol, generateData(rightDataType,
GetSmallerValue(leftConst).toString()));
@@ -892,8 +919,8 @@ public class AugmentBaseDataVisitor exte
t.set(leftCol, generateData(leftDataType, "1"));
t.set(rightCol, generateData(rightDataType, "0"));
}
- } else if (pred instanceof LOLesserThan
- || pred instanceof LOLesserThanEqual) {
+ } else if (pred instanceof LessThanExpression
+ || pred instanceof LessThanEqualExpression) {
if (leftIsConst) {
t.set(rightCol, generateData(rightDataType, GetLargerValue(
leftConst).toString()));
@@ -906,7 +933,7 @@ public class AugmentBaseDataVisitor exte
}
}
} else {
- if (pred instanceof LOEqual) {
+ if (pred instanceof EqualExpression) {
if (leftIsConst) {
t.set(rightCol, generateData(rightDataType,
GetUnequalValue(leftConst).toString()));
@@ -917,7 +944,7 @@ public class AugmentBaseDataVisitor exte
t.set(leftCol, generateData(leftDataType, "0"));
t.set(rightCol, generateData(rightDataType, "1"));
}
- } else if (pred instanceof LONotEqual) {
+ } else if (pred instanceof NotEqualExpression) {
if (leftIsConst) {
t.set(rightCol, generateData(rightDataType, leftConst
.toString()));
@@ -928,8 +955,8 @@ public class AugmentBaseDataVisitor exte
t.set(leftCol, generateData(leftDataType, "0"));
t.set(rightCol, generateData(rightDataType, "0"));
}
- } else if (pred instanceof LOGreaterThan
- || pred instanceof LOGreaterThanEqual) {
+ } else if (pred instanceof GreaterThanExpression
+ || pred instanceof GreaterThanEqualExpression) {
if (leftIsConst) {
t.set(rightCol, generateData(rightDataType, GetLargerValue(
leftConst).toString()));
@@ -940,8 +967,8 @@ public class AugmentBaseDataVisitor exte
t.set(leftCol, generateData(leftDataType, "0"));
t.set(rightCol, generateData(rightDataType, "1"));
}
- } else if (pred instanceof LOLesserThan
- || pred instanceof LOLesserThanEqual) {
+ } else if (pred instanceof LessThanExpression
+ || pred instanceof LessThanEqualExpression) {
if (leftIsConst) {
t.set(rightCol, generateData(rightDataType,
GetSmallerValue(leftConst).toString()));
@@ -957,27 +984,27 @@ public class AugmentBaseDataVisitor exte
}
- void GenerateMatchingTupleHelper(Tuple t, LOAnd op, boolean invert)
- throws ExecException {
- ExpressionOperator input = op.getLhsOperand();
+ void GenerateMatchingTupleHelper(Tuple t, AndExpression op, boolean invert)
+ throws FrontendException, ExecException {
+ Operator input = op.getLhs();
GenerateMatchingTupleHelper(t, input, invert);
- input = op.getRhsOperand();
+ input = op.getRhs();
GenerateMatchingTupleHelper(t, input, invert);
}
- void GenerateMatchingTupleHelper(Tuple t, LOOr op, boolean invert)
- throws ExecException {
- ExpressionOperator input = op.getLhsOperand();
+ void GenerateMatchingTupleHelper(Tuple t, OrExpression op, boolean invert)
+ throws FrontendException, ExecException {
+ Operator input = op.getLhs();
GenerateMatchingTupleHelper(t, input, invert);
- input = op.getRhsOperand();
+ input = op.getRhs();
GenerateMatchingTupleHelper(t, input, invert);
}
- void GenerateMatchingTupleHelper(Tuple t, LONot op, boolean invert)
- throws ExecException {
- ExpressionOperator input = op.getOperand();
+ void GenerateMatchingTupleHelper(Tuple t, NotExpression op, boolean invert)
+ throws FrontendException, ExecException {
+ LogicalExpression input = op.getExpression();
GenerateMatchingTupleHelper(t, input, !invert);
}
Modified: pig/trunk/src/org/apache/pig/pen/DerivedDataVisitor.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/pen/DerivedDataVisitor.java?rev=1050503&r1=1050502&r2=1050503&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/pen/DerivedDataVisitor.java (original)
+++ pig/trunk/src/org/apache/pig/pen/DerivedDataVisitor.java Fri Dec 17 22:08:08 2010
@@ -1,113 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.pig.pen;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.pig.backend.executionengine.ExecException;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PORead;
-import org.apache.pig.data.BagFactory;
-import org.apache.pig.data.DataBag;
-import org.apache.pig.data.Tuple;
-import org.apache.pig.impl.PigContext;
-import org.apache.pig.impl.logicalLayer.FrontendException;
-import org.apache.pig.impl.logicalLayer.LOCogroup;
-import org.apache.pig.impl.logicalLayer.LOCross;
-import org.apache.pig.impl.logicalLayer.LODistinct;
-import org.apache.pig.impl.logicalLayer.LOFilter;
-import org.apache.pig.impl.logicalLayer.LOForEach;
-import org.apache.pig.impl.logicalLayer.LOLimit;
-import org.apache.pig.impl.logicalLayer.LOLoad;
-import org.apache.pig.impl.logicalLayer.LOSort;
-import org.apache.pig.impl.logicalLayer.LOSplit;
-import org.apache.pig.impl.logicalLayer.LOStore;
-import org.apache.pig.impl.logicalLayer.LOUnion;
-import org.apache.pig.impl.logicalLayer.LOVisitor;
-import org.apache.pig.impl.logicalLayer.LogicalOperator;
-import org.apache.pig.impl.logicalLayer.LogicalPlan;
-import org.apache.pig.impl.logicalLayer.PlanSetter;
-import org.apache.pig.impl.logicalLayer.validators.LogicalPlanValidationExecutor;
-import org.apache.pig.impl.plan.CompilationMessageCollector;
-import org.apache.pig.impl.plan.DependencyOrderWalker;
-import org.apache.pig.impl.plan.OperatorKey;
-import org.apache.pig.impl.plan.PlanException;
-import org.apache.pig.impl.plan.VisitorException;
-import org.apache.pig.impl.util.IdentityHashSet;
-import org.apache.pig.pen.util.DependencyOrderLimitedWalker;
-import org.apache.pig.pen.util.LineageTracer;
-
-
-//This class is used to pass data through the entire plan and save the intermediates results.
-public class DerivedDataVisitor {
-
- Map<LogicalOperator, DataBag> derivedData = new HashMap<LogicalOperator, DataBag>();
- PhysicalPlan physPlan = null;
- Map<LOLoad, DataBag> baseData = null;
-
- Map<LogicalOperator, PhysicalOperator> LogToPhyMap = null;
- Log log = LogFactory.getLog(getClass());
-
- Map<LogicalOperator, Collection<IdentityHashSet<Tuple>>> OpToEqClasses = null;
- Collection<IdentityHashSet<Tuple>> EqClasses = null;
-
- LineageTracer lineage = new LineageTracer();
-
- public DerivedDataVisitor(LogicalPlan plan, PigContext pc,
- Map<LOLoad, DataBag> baseData,
- PhysicalPlan physPlan) {
-
- this.baseData = baseData;
-
- OpToEqClasses = new HashMap<LogicalOperator, Collection<IdentityHashSet<Tuple>>>();
- EqClasses = new LinkedList<IdentityHashSet<Tuple>>();
-
- this.physPlan = physPlan;
- // if(logToPhyMap == null)
- // compilePlan(plan);
- // else
- // LogToPhyMap = logToPhyMap;
-
- }
-
- public DerivedDataVisitor(LogicalOperator op, PigContext pc,
- Map<LOLoad, DataBag> baseData,
- Map<LogicalOperator, PhysicalOperator> logToPhyMap,
- PhysicalPlan physPlan) {
- this.baseData = baseData;
-
- OpToEqClasses = new HashMap<LogicalOperator, Collection<IdentityHashSet<Tuple>>>();
- EqClasses = new LinkedList<IdentityHashSet<Tuple>>();
-
- LogToPhyMap = logToPhyMap;
- this.physPlan = physPlan;
- }
-}
Modified: pig/trunk/src/org/apache/pig/pen/EquivalenceClasses.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/pen/EquivalenceClasses.java?rev=1050503&r1=1050502&r2=1050503&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/pen/EquivalenceClasses.java (original)
+++ pig/trunk/src/org/apache/pig/pen/EquivalenceClasses.java Fri Dec 17 22:08:08 2010
@@ -29,56 +29,56 @@ import java.util.Iterator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.Tuple;
-import org.apache.pig.impl.logicalLayer.LOForEach;
-import org.apache.pig.impl.logicalLayer.LOCross;
-import org.apache.pig.impl.logicalLayer.LogicalOperator;
-import org.apache.pig.impl.logicalLayer.LogicalPlan;
+import org.apache.pig.newplan.logical.relational.LOForEach;
+import org.apache.pig.newplan.logical.relational.LOCross;
+import org.apache.pig.newplan.logical.relational.LogicalRelationalOperator;
+import org.apache.pig.newplan.logical.relational.LogicalPlan;
+import org.apache.pig.newplan.Operator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.impl.util.IdentityHashSet;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.*;
-import org.apache.pig.impl.plan.VisitorException;
//These methods are used to generate equivalence classes given the operator name and the output from the operator
//For example, it gives out 2 eq. classes for filter, one that passes the filter and one that doesn't
public class EquivalenceClasses {
- public static Map<LogicalOperator, Collection<IdentityHashSet<Tuple>>> getLoToEqClassMap(PhysicalPlan plan,
- LogicalPlan lp, Map<LogicalOperator, PhysicalOperator> logToPhyMap,
- Map<LogicalOperator, DataBag> logToDataMap,
- Map<LOForEach, Map<LogicalOperator, PhysicalOperator>> forEachInnerLogToPhyMap,
+ public static Map<LogicalRelationalOperator, Collection<IdentityHashSet<Tuple>>> getLoToEqClassMap(PhysicalPlan plan,
+ LogicalPlan lp, Map<Operator, PhysicalOperator> logToPhyMap,
+ Map<Operator, DataBag> logToDataMap,
+ Map<LOForEach, Map<LogicalRelationalOperator, PhysicalOperator>> forEachInnerLogToPhyMap,
final HashMap<PhysicalOperator, Collection<IdentityHashSet<Tuple>>> poToEqclassesMap)
- throws VisitorException {
- Map<LogicalOperator, Collection<IdentityHashSet<Tuple>>> ret =
- new HashMap<LogicalOperator, Collection<IdentityHashSet<Tuple>>>();
- List<LogicalOperator> roots = lp.getRoots();
- HashSet<LogicalOperator> seen = new HashSet<LogicalOperator>();
- for(LogicalOperator lo: roots) {
+ {
+ Map<LogicalRelationalOperator, Collection<IdentityHashSet<Tuple>>> ret =
+ new HashMap<LogicalRelationalOperator, Collection<IdentityHashSet<Tuple>>>();
+ List<Operator> roots = lp.getSources();
+ HashSet<Operator> seen = new HashSet<Operator>();
+ for(Operator lo: roots) {
getEqClasses(plan, lo, lp, logToPhyMap, ret, poToEqclassesMap, logToDataMap, forEachInnerLogToPhyMap, seen);
}
return ret;
}
- private static void getEqClasses(PhysicalPlan plan, LogicalOperator parent, LogicalPlan lp,
- Map<LogicalOperator, PhysicalOperator> logToPhyMap, Map<LogicalOperator,
+ private static void getEqClasses(PhysicalPlan plan, Operator parent, LogicalPlan lp,
+ Map<Operator, PhysicalOperator> logToPhyMap, Map<LogicalRelationalOperator,
Collection<IdentityHashSet<Tuple>>> result,
final HashMap<PhysicalOperator, Collection<IdentityHashSet<Tuple>>> poToEqclassesMap,
- Map<LogicalOperator, DataBag> logToDataMap,
- Map<LOForEach, Map<LogicalOperator, PhysicalOperator>> forEachInnerLogToPhyMap,
- HashSet<LogicalOperator> seen) throws VisitorException {
+ Map<Operator, DataBag> logToDataMap,
+ Map<LOForEach, Map<LogicalRelationalOperator, PhysicalOperator>> forEachInnerLogToPhyMap,
+ HashSet<Operator> seen) {
if (parent instanceof LOForEach) {
if (poToEqclassesMap.get(logToPhyMap.get(parent)) != null) {
LinkedList<IdentityHashSet<Tuple>> eqClasses = new LinkedList<IdentityHashSet<Tuple>>();
eqClasses.addAll(poToEqclassesMap.get(logToPhyMap.get(parent)));
- for (Map.Entry<LogicalOperator, PhysicalOperator> entry : forEachInnerLogToPhyMap.get(parent).entrySet()) {
+ for (Map.Entry<LogicalRelationalOperator, PhysicalOperator> entry : forEachInnerLogToPhyMap.get(parent).entrySet()) {
if (poToEqclassesMap.get(entry.getValue()) != null)
eqClasses.addAll(poToEqclassesMap.get(entry.getValue()));
}
- result.put(parent, eqClasses);
+ result.put((LogicalRelationalOperator) parent, eqClasses);
}
} else if (parent instanceof LOCross) {
boolean ok = true;
- for (LogicalOperator input : ((LOCross) parent).getInputs()) {
+ for (Operator input : ((LOCross) parent).getInputs()) {
if (logToDataMap.get(input).size() < 2) {
// only if all inputs have at least more than two tuples will all outputs be added to the eq. class
ok = false;
@@ -92,12 +92,12 @@ public class EquivalenceClasses {
eqClass.add(it.next());
}
eqClasses.add(eqClass);
- result.put(parent, eqClasses);
+ result.put((LogicalRelationalOperator) parent, eqClasses);
} else {
LinkedList<IdentityHashSet<Tuple>> eqClasses = new LinkedList<IdentityHashSet<Tuple>>();
IdentityHashSet<Tuple> eqClass = new IdentityHashSet<Tuple>();
eqClasses.add(eqClass);
- result.put(parent, eqClasses);
+ result.put((LogicalRelationalOperator)parent, eqClasses);
}
} else {
Collection<IdentityHashSet<Tuple>> eqClasses = poToEqclassesMap.get(logToPhyMap.get(parent));
@@ -108,11 +108,11 @@ public class EquivalenceClasses {
eqClasses.add(new IdentityHashSet<Tuple>());
}
}
- result.put(parent, eqClasses);
+ result.put((LogicalRelationalOperator)parent, eqClasses);
}
// result.put(parent, getEquivalenceClasses(plan, parent, lp, logToPhyMap, poToEqclassesMap));
if (lp.getSuccessors(parent) != null) {
- for (LogicalOperator lo : lp.getSuccessors(parent)) {
+ for (Operator lo : lp.getSuccessors(parent)) {
if (!seen.contains(lo)) {
seen.add(lo);
getEqClasses(plan, lo, lp, logToPhyMap, result, poToEqclassesMap, logToDataMap, forEachInnerLogToPhyMap, seen);