You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ch...@apache.org on 2013/11/28 09:56:34 UTC
svn commit: r1546314 [2/6] - in /pig/branches/tez:
src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/
src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/
src/org/apache/pig/backend/hadoop/executionengine/physicalLayer...
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=1546314&r1=1546313&r2=1546314&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java Thu Nov 28 08:56:33 2013
@@ -57,6 +57,8 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.JoinPackager;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.LitePackager;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCollectedGroup;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCounter;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCross;
@@ -65,7 +67,6 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFilter;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POGlobalRearrange;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POJoinPackage;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLimit;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
@@ -73,8 +74,6 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoin;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PONative;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage.PackageType;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackageLite;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPartitionRearrange;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PORank;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSkewedJoin;
@@ -83,6 +82,8 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStream;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POUnion;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.Packager;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.Packager.PackageType;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
@@ -115,21 +116,21 @@ import org.apache.pig.newplan.logical.re
/**
* The compiler that compiles a given physical plan
- * into a DAG of MapReduce operators which can then
+ * into a DAG of MapReduce operators which can then
* be converted into the JobControl structure.
- *
+ *
* Is implemented as a visitor of the PhysicalPlan it
* is compiling.
- *
+ *
* Currently supports all operators except the MR Sort
- * operator
- *
- * Uses a predecessor based depth first traversal.
+ * operator
+ *
+ * Uses a predecessor based depth first traversal.
* To compile an operator, first compiles
* the predecessors into MapReduce Operators and tries to
* merge the current operator into one of them. The goal
* being to keep the number of MROpers to a minimum.
- *
+ *
* It also merges multiple Map jobs, created by compiling
* the inputs individually, into a single job. Here a new
* map job is created and then the contents of the previous
@@ -137,28 +138,28 @@ import org.apache.pig.newplan.logical.re
* the previous map plans, should be manually moved over. So,
* if you are adding something new take care about this.
* Ex of this is in requestedParallelism
- *
- * Only in case of blocking operators and splits, a new
+ *
+ * Only in case of blocking operators and splits, a new
* MapReduce operator is started using a store-load combination
* to connect the two operators. Whenever this happens
* care is taken to add the MROper into the MRPlan and connect it
* appropriately.
- *
+ *
*
*/
public class MRCompiler extends PhyPlanVisitor {
PigContext pigContext;
-
+
//The plan that is being compiled
PhysicalPlan plan;
//The plan of MapReduce Operators
MROperPlan MRPlan;
-
+
//The current MapReduce Operator
//that is being compiled
MapReduceOper curMROp;
-
+
//The output of compiling the inputs
MapReduceOper[] compiledInputs = null;
@@ -166,38 +167,38 @@ public class MRCompiler extends PhyPlanV
//maintained they will haunt you.
//During the traversal a split is the only
//operator that can be revisited from a different
- //path. So this map stores the split job. So
+ //path. So this map stores the split job. So
//whenever we hit the split, we create a new MROper
//and connect the split job using load-store and also
//in the MRPlan
Map<OperatorKey, MapReduceOper> splitsSeen;
-
+
NodeIdGenerator nig;
private String scope;
-
+
private Random r;
-
+
private UDFFinder udfFinder;
-
+
private CompilationMessageCollector messageCollector = null;
-
+
private Map<PhysicalOperator,MapReduceOper> phyToMROpMap;
-
+
public static final String USER_COMPARATOR_MARKER = "user.comparator.func:";
-
+
private static final Log LOG = LogFactory.getLog(MRCompiler.class);
-
+
public static final String FILE_CONCATENATION_THRESHOLD = "pig.files.concatenation.threshold";
public static final String OPTIMISTIC_FILE_CONCATENATION = "pig.optimistic.files.concatenation";
-
+
private int fileConcatenationThreshold = 100;
private boolean optimisticFileConcatenation = false;
-
+
public MRCompiler(PhysicalPlan plan) throws MRCompilerException {
this(plan,null);
}
-
+
public MRCompiler(PhysicalPlan plan,
PigContext pigContext) throws MRCompilerException {
super(plan, new DepthFirstWalker<PhysicalOperator, PhysicalPlan>(plan));
@@ -211,14 +212,14 @@ public class MRCompiler extends PhyPlanV
udfFinder = new UDFFinder();
List<PhysicalOperator> roots = plan.getRoots();
if((roots == null) || (roots.size() <= 0)) {
- int errCode = 2053;
- String msg = "Internal error. Did not find roots in the physical plan.";
- throw new MRCompilerException(msg, errCode, PigException.BUG);
+ int errCode = 2053;
+ String msg = "Internal error. Did not find roots in the physical plan.";
+ throw new MRCompilerException(msg, errCode, PigException.BUG);
}
scope = roots.get(0).getOperatorKey().getScope();
messageCollector = new CompilationMessageCollector() ;
phyToMROpMap = new HashMap<PhysicalOperator, MapReduceOper>();
-
+
fileConcatenationThreshold = Integer.parseInt(pigContext.getProperties()
.getProperty(FILE_CONCATENATION_THRESHOLD, "100"));
optimisticFileConcatenation = pigContext.getProperties().getProperty(
@@ -226,23 +227,23 @@ public class MRCompiler extends PhyPlanV
LOG.info("File concatenation threshold: " + fileConcatenationThreshold
+ " optimistic? " + optimisticFileConcatenation);
}
-
+
public void aggregateScalarsFiles() throws PlanException, IOException {
List<MapReduceOper> mrOpList = new ArrayList<MapReduceOper>();
for(MapReduceOper mrOp: MRPlan) {
mrOpList.add(mrOp);
}
-
- Configuration conf =
+
+ Configuration conf =
ConfigurationUtil.toConfiguration(pigContext.getProperties());
boolean combinable = !conf.getBoolean("pig.noSplitCombination", false);
-
+
Map<FileSpec, MapReduceOper> seen = new HashMap<FileSpec, MapReduceOper>();
-
+
for(MapReduceOper mrOp: mrOpList) {
- for(PhysicalOperator scalar: mrOp.scalars) {
+ for(PhysicalOperator scalar: mrOp.scalars) {
MapReduceOper mro = phyToMROpMap.get(scalar);
- if (scalar instanceof POStore) {
+ if (scalar instanceof POStore) {
FileSpec oldSpec = ((POStore)scalar).getSFile();
MapReduceOper mro2 = seen.get(oldSpec);
boolean hasSeen = false;
@@ -256,17 +257,17 @@ public class MRCompiler extends PhyPlanV
: (mro.requestedParallelism >= fileConcatenationThreshold))) {
PhysicalPlan pl = mro.reducePlan.isEmpty() ? mro.mapPlan : mro.reducePlan;
FileSpec newSpec = getTempFileSpec();
-
+
// replace oldSpec in mro with newSpec
new FindStoreNameVisitor(pl, newSpec, oldSpec).visit();
-
+
POStore newSto = getStore();
newSto.setSFile(oldSpec);
- if (MRPlan.getPredecessors(mrOp)!=null &&
+ if (MRPlan.getPredecessors(mrOp)!=null &&
MRPlan.getPredecessors(mrOp).contains(mro))
MRPlan.disconnect(mro, mrOp);
- MapReduceOper catMROp = getConcatenateJob(newSpec, mro, newSto);
- MRPlan.connect(catMROp, mrOp);
+ MapReduceOper catMROp = getConcatenateJob(newSpec, mro, newSto);
+ MRPlan.connect(catMROp, mrOp);
seen.put(oldSpec, catMROp);
} else {
if (!hasSeen) seen.put(oldSpec, mro);
@@ -275,11 +276,11 @@ public class MRCompiler extends PhyPlanV
}
}
}
-
+
public void randomizeFileLocalizer(){
FileLocalizer.setR(new Random());
}
-
+
/**
* Used to get the compiled plan
* @return map reduce plan built by the compiler
@@ -287,7 +288,7 @@ public class MRCompiler extends PhyPlanV
public MROperPlan getMRPlan() {
return MRPlan;
}
-
+
/**
* Used to get the plan that was compiled
* @return physical plan
@@ -296,11 +297,11 @@ public class MRCompiler extends PhyPlanV
public PhysicalPlan getPlan() {
return plan;
}
-
+
public CompilationMessageCollector getMessageCollector() {
- return messageCollector;
+ return messageCollector;
}
-
+
/**
* The front-end method that the user calls to compile
* the plan. Assumes that all submitted plans have a Store
@@ -337,16 +338,16 @@ public class MRCompiler extends PhyPlanV
}
ops.addAll(nativeMRs);
Collections.sort(ops);
-
+
for (PhysicalOperator op : ops) {
compile(op);
}
-
+
connectSoftLink();
-
+
return MRPlan;
}
-
+
public void connectSoftLink() throws PlanException, IOException {
for (PhysicalOperator op : plan) {
if (plan.getSoftLinkPredecessors(op)!=null) {
@@ -362,7 +363,7 @@ public class MRCompiler extends PhyPlanV
}
}
}
-
+
/**
* Compiles the plan below op into a MapReduce Operator
* and stores it in curMROp.
@@ -376,8 +377,8 @@ public class MRCompiler extends PhyPlanV
//An artifact of the Visitor. Need to save
//this so that it is not overwritten.
MapReduceOper[] prevCompInp = compiledInputs;
-
- //Compile each predecessor into the MROper and
+
+ //Compile each predecessor into the MROper and
//store them away so that we can use them for compiling
//op.
List<PhysicalOperator> predecessors = plan.getPredecessors(op);
@@ -405,7 +406,7 @@ public class MRCompiler extends PhyPlanV
PhysicalOperator p = predecessors.get(0);
MapReduceOper oper = null;
if(p instanceof POStore || p instanceof PONative){
- oper = phyToMROpMap.get(p);
+ oper = phyToMROpMap.get(p);
}else{
int errCode = 2126;
String msg = "Predecessor of load should be a store or mapreduce operator. Got "+p.getClass();
@@ -422,7 +423,7 @@ public class MRCompiler extends PhyPlanV
phyToMROpMap.put(op, curMROp);
return;
}
-
+
Collections.sort(predecessors);
compiledInputs = new MapReduceOper[predecessors.size()];
int i = -1;
@@ -449,35 +450,35 @@ public class MRCompiler extends PhyPlanV
phyToMROpMap.put(op, curMROp);
return;
}
-
+
//Now we have the inputs compiled. Do something
//with the input oper op.
op.visit(this);
if(op.getRequestedParallelism() > curMROp.requestedParallelism ) {
- // we don't want to change prallelism for skewed join due to sampling
- // and pre-allocated reducers for skewed keys
- if (!curMROp.isSkewedJoin()) {
- curMROp.requestedParallelism = op.getRequestedParallelism();
- }
+ // we don't want to change prallelism for skewed join due to sampling
+ // and pre-allocated reducers for skewed keys
+ if (!curMROp.isSkewedJoin()) {
+ curMROp.requestedParallelism = op.getRequestedParallelism();
+ }
}
compiledInputs = prevCompInp;
}
-
+
private MapReduceOper getMROp(){
return new MapReduceOper(new OperatorKey(scope,nig.getNextNodeId(scope)));
}
-
+
private NativeMapReduceOper getNativeMROp(String mrJar, String[] parameters) {
return new NativeMapReduceOper(new OperatorKey(scope,nig.getNextNodeId(scope)), mrJar, parameters);
}
-
+
private POLoad getLoad(){
POLoad ld = new POLoad(new OperatorKey(scope,nig.getNextNodeId(scope)));
ld.setPc(pigContext);
ld.setIsTmpLoad(true);
return ld;
}
-
+
private POStore getStore(){
POStore st = new POStore(new OperatorKey(scope,nig.getNextNodeId(scope)));
// mark store as tmp store. These could be removed by the
@@ -485,13 +486,13 @@ public class MRCompiler extends PhyPlanV
st.setIsTmpStore(true);
return st;
}
-
+
/**
* A map MROper is an MROper whose map plan is still open
* for taking more non-blocking operators.
* A reduce MROper is an MROper whose map plan is done but
* the reduce plan is open for taking more non-blocking opers.
- *
+ *
* Used for compiling non-blocking operators. The logic here
* is simple. If there is a single input, just push the operator
* into whichever phase is open. Otherwise, we merge the compiled
@@ -500,14 +501,14 @@ public class MRCompiler extends PhyPlanV
* as reduce plans can't be merged.
* Then we add the input oper op into the merged map MROper's map plan
* as a leaf and connect the reduce MROpers using store-load combinations
- * to the input operator which is the leaf. Also care is taken to
+ * to the input operator which is the leaf. Also care is taken to
* connect the MROpers according to the dependencies.
* @param op
* @throws PlanException
* @throws IOException
*/
private void nonBlocking(PhysicalOperator op) throws PlanException, IOException{
-
+
if (compiledInputs.length == 1) {
//For speed
MapReduceOper mro = compiledInputs[0];
@@ -517,29 +518,29 @@ public class MRCompiler extends PhyPlanV
mro.reducePlan.addAsLeaf(op);
} else {
int errCode = 2022;
- String msg = "Both map and reduce phases have been done. This is unexpected while compiling.";
+ String msg = "Both map and reduce phases have been done. This is unexpected while compiling.";
throw new PlanException(msg, errCode, PigException.BUG);
}
curMROp = mro;
} else {
List<MapReduceOper> mergedPlans = merge(compiledInputs);
-
+
//The first MROper is always the merged map MROper
MapReduceOper mro = mergedPlans.remove(0);
//Push the input operator into the merged map MROper
mro.mapPlan.addAsLeaf(op);
-
+
//Connect all the reduce MROpers
if(mergedPlans.size()>0)
connRedOper(mergedPlans, mro);
-
+
//return the compiled MROper
curMROp = mro;
}
}
-
+
private void addToMap(PhysicalOperator op) throws PlanException, IOException{
-
+
if (compiledInputs.length == 1) {
//For speed
MapReduceOper mro = compiledInputs[0];
@@ -547,7 +548,7 @@ public class MRCompiler extends PhyPlanV
mro.mapPlan.addAsLeaf(op);
} else if (mro.isMapDone() && !mro.isReduceDone()) {
FileSpec fSpec = getTempFileSpec();
-
+
POStore st = getStore();
st.setSFile(fSpec);
mro.reducePlan.addAsLeaf(st);
@@ -557,35 +558,35 @@ public class MRCompiler extends PhyPlanV
compiledInputs[0] = mro;
} else {
int errCode = 2022;
- String msg = "Both map and reduce phases have been done. This is unexpected while compiling.";
+ String msg = "Both map and reduce phases have been done. This is unexpected while compiling.";
throw new PlanException(msg, errCode, PigException.BUG);
}
curMROp = mro;
} else {
List<MapReduceOper> mergedPlans = merge(compiledInputs);
-
+
//The first MROper is always the merged map MROper
MapReduceOper mro = mergedPlans.remove(0);
//Push the input operator into the merged map MROper
mro.mapPlan.addAsLeaf(op);
-
+
//Connect all the reduce MROpers
if(mergedPlans.size()>0)
connRedOper(mergedPlans, mro);
-
+
//return the compiled MROper
curMROp = mro;
}
}
-
+
/**
* Used for compiling blocking operators. If there is a single input
* and its map phase is still open, then close it so that further
* operators can be compiled into the reduce phase. If its reduce phase
* is open, add a store and close it. Start a new map MROper into which
- * further operators can be compiled into.
- *
- * If there are multiple inputs, the logic
+ * further operators can be compiled into.
+ *
+ * If there are multiple inputs, the logic
* is to merge all map MROpers into one map MROper and retain
* the reduce MROpers. Since the operator is blocking, it has
* to be a Global Rerrange at least now. This operator need not
@@ -593,15 +594,15 @@ public class MRCompiler extends PhyPlanV
* But this creates the map-reduce boundary. So the merged map MROper
* is closed and its reduce phase is started. Depending on the number
* of reduce MROpers and the number of pipelines in the map MRoper
- * a Union operator is inserted whenever necessary. This also leads to the
+ * a Union operator is inserted whenever necessary. This also leads to the
* possibility of empty map plans. So have to be careful while handling
* it in the PigMapReduce class. If there are no map
* plans, then a new one is created as a side effect of the merge
* process. If there are no reduce MROpers, and only a single pipeline
- * in the map, then no union oper is added. Otherwise a Union oper is
- * added to the merged map MROper to which all the reduce MROpers
+ * in the map, then no union oper is added. Otherwise a Union oper is
+ * added to the merged map MROper to which all the reduce MROpers
* are connected by store-load combinations. Care is taken
- * to connect the MROpers in the MRPlan.
+ * to connect the MROpers in the MRPlan.
* @param op
* @throws IOException
* @throws PlanException
@@ -615,7 +616,7 @@ public class MRCompiler extends PhyPlanV
}
else if(mro.isMapDone() && !mro.isReduceDone()){
FileSpec fSpec = getTempFileSpec();
-
+
POStore st = getStore();
st.setSFile(fSpec);
mro.reducePlan.addAsLeaf(st);
@@ -627,7 +628,7 @@ public class MRCompiler extends PhyPlanV
else{
List<MapReduceOper> mergedPlans = merge(compiledInputs);
MapReduceOper mro = mergedPlans.remove(0);
-
+
if(mergedPlans.size()>0)
mro.setMapDoneMultiple(true);
else
@@ -639,13 +640,13 @@ public class MRCompiler extends PhyPlanV
curMROp = mro;
}
}
-
+
/**
* Connect the reduce MROpers to the leaf node in the map MROper mro
* by adding appropriate loads
* @param mergedPlans - The list of reduce MROpers
* @param mro - The map MROper
- * @throws PlanException
+ * @throws PlanException
* @throws IOException
*/
private void connRedOper(List<MapReduceOper> mergedPlans, MapReduceOper mro) throws PlanException, IOException{
@@ -668,8 +669,8 @@ public class MRCompiler extends PhyPlanV
MRPlan.connect(mmro, mro);
}
}
-
-
+
+
/**
* Force an end to the current map reduce job with a store into a temporary
* file.
@@ -699,16 +700,16 @@ public class MRCompiler extends PhyPlanV
}
return mro;
}
-
+
/**
* Starts a new MRoper and connects it to the old
- * one by load-store. The assumption is that the
+ * one by load-store. The assumption is that the
* store is already inserted into the old MROper.
* @param fSpec
* @param old
* @return
* @throws IOException
- * @throws PlanException
+ * @throws PlanException
*/
private MapReduceOper startNew(FileSpec fSpec, MapReduceOper old) throws PlanException{
POLoad ld = getLoad();
@@ -719,7 +720,7 @@ public class MRCompiler extends PhyPlanV
MRPlan.connect(old, ret);
return ret;
}
-
+
/**
* Returns a temporary DFS Path
* @return
@@ -729,18 +730,18 @@ public class MRCompiler extends PhyPlanV
return new FileSpec(FileLocalizer.getTemporaryPath(pigContext).toString(),
new FuncSpec(Utils.getTmpFileCompressorName(pigContext)));
}
-
+
/**
* Merges the map MROpers in the compiledInputs into a single
* merged map MRoper and returns a List with the merged map MROper
* as the first oper and the rest being reduce MROpers.
- *
+ *
* Care is taken to remove the map MROpers that are merged from the
* MRPlan and their connections moved over to the merged map MROper.
- *
+ *
* Merge is implemented as a sequence of binary merges.
- * merge(PhyPlan finPlan, List<PhyPlan> lst) := finPlan,merge(p) foreach p in lst
- *
+ * merge(PhyPlan finPlan, List<PhyPlan> lst) := finPlan,merge(p) foreach p in lst
+ *
* @param compiledInputs
* @return
* @throws PlanException
@@ -749,11 +750,11 @@ public class MRCompiler extends PhyPlanV
private List<MapReduceOper> merge(MapReduceOper[] compiledInputs)
throws PlanException {
List<MapReduceOper> ret = new ArrayList<MapReduceOper>();
-
+
MapReduceOper mergedMap = getMROp();
ret.add(mergedMap);
MRPlan.add(mergedMap);
-
+
Set<MapReduceOper> toBeConnected = new HashSet<MapReduceOper>();
List<MapReduceOper> remLst = new ArrayList<MapReduceOper>();
@@ -772,12 +773,12 @@ public class MRCompiler extends PhyPlanV
ret.add(mro);
} else {
int errCode = 2027;
- String msg = "Both map and reduce phases have been done. This is unexpected for a merge.";
+ String msg = "Both map and reduce phases have been done. This is unexpected for a merge.";
throw new PlanException(msg, errCode, PigException.BUG);
}
}
merge(ret.get(0).mapPlan, mpLst);
-
+
Iterator<MapReduceOper> it = toBeConnected.iterator();
while(it.hasNext())
MRPlan.connect(it.next(), mergedMap);
@@ -804,12 +805,12 @@ public class MRCompiler extends PhyPlanV
for (PhysicalOperator op : opsToChange) {
phyToMROpMap.put(op, mergedMap);
}
-
+
MRPlan.remove(rmro);
}
return ret;
}
-
+
/**
* The merge of a list of map plans
* @param <O>
@@ -831,24 +832,24 @@ public class MRCompiler extends PhyPlanV
ScalarPhyFinder scalarPhyFinder = new ScalarPhyFinder(plan);
scalarPhyFinder.visit();
curMROp.scalars.addAll(scalarPhyFinder.getScalars());
-
+
//Process UDFs
udfFinder.setPlan(plan);
udfFinder.visit();
curMROp.UDFs.addAll(udfFinder.getUDFs());
}
}
-
-
+
+
/* The visitOp methods that decide what to do with the current operator */
-
+
/**
* Compiles a split operator. The logic is to
* close the split job by replacing the split oper by
* a store and creating a new Map MRoper and return
* that as the current MROper to which other operators
* would be compiled into. The new MROper would be connected
- * to the split job by load-store. Also add the split oper
+ * to the split job by load-store. Also add the split oper
* to the splitsSeen map.
* @param op - The split operator
* @throws VisitorException
@@ -868,7 +869,7 @@ public class MRCompiler extends PhyPlanV
throw new MRCompilerException(msg, errCode, PigException.BUG, e);
}
}
-
+
@Override
public void visitLoad(POLoad op) throws VisitorException{
try{
@@ -880,7 +881,7 @@ public class MRCompiler extends PhyPlanV
throw new MRCompilerException(msg, errCode, PigException.BUG, e);
}
}
-
+
@Override
public void visitNative(PONative op) throws VisitorException{
// We will explode the native operator here to add a new MROper for native Mapreduce job
@@ -897,7 +898,7 @@ public class MRCompiler extends PhyPlanV
throw new MRCompilerException(msg, errCode, PigException.BUG, e);
}
}
-
+
@Override
public void visitStore(POStore op) throws VisitorException{
try{
@@ -911,7 +912,7 @@ public class MRCompiler extends PhyPlanV
throw new MRCompilerException(msg, errCode, PigException.BUG, e);
}
}
-
+
@Override
public void visitFilter(POFilter op) throws VisitorException{
try{
@@ -924,7 +925,7 @@ public class MRCompiler extends PhyPlanV
throw new MRCompilerException(msg, errCode, PigException.BUG, e);
}
}
-
+
@Override
public void visitCross(POCross op) throws VisitorException {
try{
@@ -948,7 +949,7 @@ public class MRCompiler extends PhyPlanV
throw new MRCompilerException(msg, errCode, PigException.BUG, e);
}
}
-
+
@Override
public void visitLimit(POLimit op) throws VisitorException{
try{
@@ -959,14 +960,14 @@ public class MRCompiler extends PhyPlanV
mro.limitPlan = op.getLimitPlan();
}
if (!mro.isMapDone()) {
- // if map plan is open, add a limit for optimization, eventually we
- // will add another limit to reduce plan
+ // if map plan is open, add a limit for optimization, eventually we
+ // will add another limit to reduce plan
if (!pigContext.inIllustrator)
{
mro.mapPlan.addAsLeaf(op);
mro.setMapDone(true);
}
-
+
if (mro.reducePlan.isEmpty())
{
MRUtil.simpleConnectMapToReduce(mro, scope, nig);
@@ -982,15 +983,15 @@ public class MRCompiler extends PhyPlanV
}
else
{
- messageCollector.collect("Something in the reduce plan while map plan is not done. Something wrong!",
- MessageType.Warning, PigWarning.REDUCE_PLAN_NOT_EMPTY_WHILE_MAP_PLAN_UNDER_PROCESS);
+ messageCollector.collect("Something in the reduce plan while map plan is not done. Something wrong!",
+ MessageType.Warning, PigWarning.REDUCE_PLAN_NOT_EMPTY_WHILE_MAP_PLAN_UNDER_PROCESS);
}
} else if (mro.isMapDone() && !mro.isReduceDone()) {
- // limit should add into reduce plan
+ // limit should add into reduce plan
mro.reducePlan.addAsLeaf(op);
} else {
- messageCollector.collect("Both map and reduce phases have been done. This is unexpected while compiling!",
- MessageType.Warning, PigWarning.UNREACHABLE_CODE_BOTH_MAP_AND_REDUCE_PLANS_PROCESSED);
+ messageCollector.collect("Both map and reduce phases have been done. This is unexpected while compiling!",
+ MessageType.Warning, PigWarning.UNREACHABLE_CODE_BOTH_MAP_AND_REDUCE_PLANS_PROCESSED);
}
phyToMROpMap.put(op, mro);
}catch(Exception e){
@@ -1018,24 +1019,24 @@ public class MRCompiler extends PhyPlanV
@Override
public void visitCollectedGroup(POCollectedGroup op) throws VisitorException {
-
+
if(!curMROp.mapDone){
-
+
List<PhysicalOperator> roots = curMROp.mapPlan.getRoots();
if(roots.size() != 1){
int errCode = 2171;
String errMsg = "Expected one but found more then one root physical operator in physical plan.";
throw new MRCompilerException(errMsg,errCode,PigException.BUG);
}
-
+
PhysicalOperator phyOp = roots.get(0);
if(! (phyOp instanceof POLoad)){
int errCode = 2172;
String errMsg = "Expected physical operator at root to be POLoad. Found : "+phyOp.getClass().getCanonicalName();
throw new MRCompilerException(errMsg,errCode,PigException.BUG);
}
-
-
+
+
LoadFunc loadFunc = ((POLoad)phyOp).getLoadFunc();
try {
if(!(CollectableLoadFunc.class.isAssignableFrom(loadFunc.getClass()))){
@@ -1062,19 +1063,19 @@ public class MRCompiler extends PhyPlanV
int errCode = 2034;
String msg = "Error compiling operator " + op.getClass().getSimpleName();
throw new MRCompilerException(msg, errCode, PigException.BUG, e);
- }
+ }
}
else if(!curMROp.reduceDone){
- int errCode=2250;
+ int errCode=2250;
String msg = "Blocking operators are not allowed before Collected Group. Consider dropping using 'collected'.";
- throw new MRCompilerException(msg, errCode, PigException.BUG);
+ throw new MRCompilerException(msg, errCode, PigException.BUG);
}
else{
int errCode = 2022;
String msg = "Both map and reduce phases have been done. This is unexpected while compiling.";
- throw new MRCompilerException(msg, errCode, PigException.BUG);
+ throw new MRCompilerException(msg, errCode, PigException.BUG);
}
-
+
}
@Override
@@ -1093,7 +1094,7 @@ public class MRCompiler extends PhyPlanV
throw new MRCompilerException(msg, errCode, PigException.BUG, e);
}
}
-
+
@Override
public void visitGlobalRearrange(POGlobalRearrange op) throws VisitorException{
try{
@@ -1106,15 +1107,15 @@ public class MRCompiler extends PhyPlanV
throw new MRCompilerException(msg, errCode, PigException.BUG, e);
}
}
-
+
@Override
public void visitPackage(POPackage op) throws VisitorException{
try{
nonBlocking(op);
phyToMROpMap.put(op, curMROp);
- if (op.getPackageType() == PackageType.JOIN) {
+ if (op.getPkgr().getPackageType() == PackageType.JOIN) {
curMROp.markRegularJoin();
- } else if (op.getPackageType() == PackageType.GROUP) {
+ } else if (op.getPkgr().getPackageType() == PackageType.GROUP) {
if (op.getNumInps() == 1) {
curMROp.markGroupBy();
} else if (op.getNumInps() > 1) {
@@ -1127,7 +1128,7 @@ public class MRCompiler extends PhyPlanV
throw new MRCompilerException(msg, errCode, PigException.BUG, e);
}
}
-
+
@Override
public void visitUnion(POUnion op) throws VisitorException{
try{
@@ -1139,7 +1140,7 @@ public class MRCompiler extends PhyPlanV
throw new MRCompilerException(msg, errCode, PigException.BUG, e);
}
}
-
+
/**
* This is an operator which will have multiple inputs(= to number of join inputs)
* But it prunes off all inputs but the fragment input and creates separate MR jobs
@@ -1156,7 +1157,7 @@ public class MRCompiler extends PhyPlanV
replFiles[i] = getTempFileSpec();
}
op.setReplFiles(replFiles);
-
+
curMROp = phyToMROpMap.get(op.getInputs().get(op.getFragment()));
for(int i=0;i<compiledInputs.length;i++){
@@ -1165,33 +1166,33 @@ public class MRCompiler extends PhyPlanV
continue;
POStore str = getStore();
str.setSFile(replFiles[i]);
-
- Configuration conf =
+
+ Configuration conf =
ConfigurationUtil.toConfiguration(pigContext.getProperties());
boolean combinable = !conf.getBoolean("pig.noSplitCombination", false);
-
- if (!mro.isMapDone()) {
- if (combinable && hasTooManyInputFiles(mro, conf)) {
+
+ if (!mro.isMapDone()) {
+ if (combinable && hasTooManyInputFiles(mro, conf)) {
POStore tmpSto = getStore();
FileSpec fSpec = getTempFileSpec();
- tmpSto.setSFile(fSpec);
+ tmpSto.setSFile(fSpec);
mro.mapPlan.addAsLeaf(tmpSto);
- mro.setMapDoneSingle(true);
- MapReduceOper catMROp = getConcatenateJob(fSpec, mro, str);
+ mro.setMapDoneSingle(true);
+ MapReduceOper catMROp = getConcatenateJob(fSpec, mro, str);
MRPlan.connect(catMROp, curMROp);
} else {
mro.mapPlan.addAsLeaf(str);
- mro.setMapDoneSingle(true);
+ mro.setMapDoneSingle(true);
MRPlan.connect(mro, curMROp);
}
} else if (mro.isMapDone() && !mro.isReduceDone()) {
if (combinable && (mro.requestedParallelism >= fileConcatenationThreshold)) {
POStore tmpSto = getStore();
FileSpec fSpec = getTempFileSpec();
- tmpSto.setSFile(fSpec);
+ tmpSto.setSFile(fSpec);
mro.reducePlan.addAsLeaf(tmpSto);
mro.setReduceDone(true);
- MapReduceOper catMROp = getConcatenateJob(fSpec, mro, str);
+ MapReduceOper catMROp = getConcatenateJob(fSpec, mro, str);
MRPlan.connect(catMROp, curMROp);
} else {
mro.reducePlan.addAsLeaf(str);
@@ -1202,15 +1203,15 @@ public class MRCompiler extends PhyPlanV
int errCode = 2022;
String msg = "Both map and reduce phases have been done. This is unexpected while compiling.";
throw new PlanException(msg, errCode, PigException.BUG);
- }
+ }
}
-
+
if (!curMROp.isMapDone()) {
curMROp.mapPlan.addAsLeaf(op);
} else if (curMROp.isMapDone() && !curMROp.isReduceDone()) {
curMROp.reducePlan.addAsLeaf(op);
} else {
- int errCode = 2022;
+ int errCode = 2022;
String msg = "Both map and reduce phases have been done. This is unexpected while compiling.";
throw new PlanException(msg, errCode, PigException.BUG);
}
@@ -1229,33 +1230,33 @@ public class MRCompiler extends PhyPlanV
throw new MRCompilerException(msg, errCode, PigException.BUG, e);
}
}
-
+
@SuppressWarnings("unchecked")
private boolean hasTooManyInputFiles(MapReduceOper mro, Configuration conf) {
if (pigContext == null || pigContext.getExecType() == ExecType.LOCAL) {
return false;
}
-
+
if (mro instanceof NativeMapReduceOper) {
return optimisticFileConcatenation ? false : true;
}
-
+
PhysicalPlan mapPlan = mro.mapPlan;
-
+
List<PhysicalOperator> roots = mapPlan.getRoots();
if (roots == null || roots.size() == 0) return false;
-
+
int numFiles = 0;
boolean ret = false;
try {
for (PhysicalOperator root : roots) {
POLoad ld = (POLoad) root;
String fileName = ld.getLFile().getFileName();
-
+
if(UriUtil.isHDFSFile(fileName)){
- // Only if the input is an hdfs file, this optimization is
+ // Only if the input is an hdfs file, this optimization is
// useful (to reduce load on namenode)
-
+
//separate out locations separated by comma
String [] locations = LoadFunc.getPathStrings(fileName);
for(String location : locations){
@@ -1281,14 +1282,14 @@ public class MRCompiler extends PhyPlanV
List<MapReduceOper> preds = MRPlan.getPredecessors(mro);
if (preds != null && preds.size() == 1) {
MapReduceOper pred = preds.get(0);
- if (!pred.reducePlan.isEmpty()) {
+ if (!pred.reducePlan.isEmpty()) {
numFiles += pred.requestedParallelism;
} else { // map-only job
ret = hasTooManyInputFiles(pred, conf);
break;
}
- } else if (!optimisticFileConcatenation) {
- // can't determine the number of input files.
+ } else if (!optimisticFileConcatenation) {
+ // can't determine the number of input files.
// Treat it as having too manyfiles
numFiles = fileConcatenationThreshold;
break;
@@ -1298,31 +1299,31 @@ public class MRCompiler extends PhyPlanV
}
}
} catch (IOException e) {
- LOG.warn("failed to get number of input files", e);
+ LOG.warn("failed to get number of input files", e);
} catch (InterruptedException e) {
- LOG.warn("failed to get number of input files", e);
+ LOG.warn("failed to get number of input files", e);
}
-
+
LOG.info("number of input files: " + numFiles);
return ret ? true : (numFiles >= fileConcatenationThreshold);
}
-
+
/*
* Use Mult File Combiner to concatenate small input files
*/
private MapReduceOper getConcatenateJob(FileSpec fSpec, MapReduceOper old, POStore str)
throws PlanException, ExecException {
-
+
MapReduceOper mro = startNew(fSpec, old);
mro.mapPlan.addAsLeaf(str);
mro.setMapDone(true);
-
+
LOG.info("Insert a file-concatenation job");
-
+
return mro;
- }
-
- /** Leftmost relation is referred as base relation (this is the one fed into mappers.)
+ }
+
+ /** Leftmost relation is referred as base relation (this is the one fed into mappers.)
* First, close all MROpers except for first one (referred as baseMROPer)
* Then, create a MROper which will do indexing job (idxMROper)
* Connect idxMROper before the mappedMROper in the MRPlan.
@@ -1334,20 +1335,20 @@ public class MRCompiler extends PhyPlanV
if(compiledInputs.length < 2){
int errCode=2251;
String errMsg = "Merge Cogroup work on two or more relations." +
- "To use map-side group-by on single relation, use 'collected' qualifier.";
+ "To use map-side group-by on single relation, use 'collected' qualifier.";
throw new MRCompilerException(errMsg, errCode);
}
-
+
List<FuncSpec> funcSpecs = new ArrayList<FuncSpec>(compiledInputs.length-1);
List<String> fileSpecs = new ArrayList<String>(compiledInputs.length-1);
List<String> loaderSigns = new ArrayList<String>(compiledInputs.length-1);
-
+
try{
- // Iterate through all the MROpers, disconnect side MROPers from
+ // Iterate through all the MROpers, disconnect side MROPers from
// MROPerPlan and collect all the information needed in different lists.
-
+
for(int i=0 ; i < compiledInputs.length; i++){
-
+
MapReduceOper mrOper = compiledInputs[i];
PhysicalPlan mapPlan = mrOper.mapPlan;
if(mapPlan.getRoots().size() != 1){
@@ -1362,18 +1363,18 @@ public class MRCompiler extends PhyPlanV
String errMsg = "Expected physical operator at root to be POLoad. Found : "+rootPOOp.getClass().getCanonicalName();
throw new MRCompilerException(errMsg,errCode);
}
-
+
POLoad sideLoader = (POLoad)rootPOOp;
FileSpec loadFileSpec = sideLoader.getLFile();
FuncSpec funcSpec = loadFileSpec.getFuncSpec();
LoadFunc loadfunc = sideLoader.getLoadFunc();
if(i == 0){
-
+
if(!(CollectableLoadFunc.class.isAssignableFrom(loadfunc.getClass()))){
- int errCode = 2252;
+ int errCode = 2252;
throw new MRCompilerException("Base loader in Cogroup must implement CollectableLoadFunc.", errCode);
}
-
+
((CollectableLoadFunc)loadfunc).ensureAllKeyInstancesInSameSplit();
continue;
}
@@ -1381,30 +1382,30 @@ public class MRCompiler extends PhyPlanV
int errCode = 2253;
throw new MRCompilerException("Side loaders in cogroup must implement IndexableLoadFunc.", errCode);
}
-
+
funcSpecs.add(funcSpec);
fileSpecs.add(loadFileSpec.getFileName());
loaderSigns.add(sideLoader.getSignature());
MRPlan.remove(mrOper);
}
-
+
poCoGrp.setSideLoadFuncs(funcSpecs);
poCoGrp.setSideFileSpecs(fileSpecs);
poCoGrp.setLoaderSignatures(loaderSigns);
-
+
// Use map-reduce operator of base relation for the cogroup operation.
MapReduceOper baseMROp = phyToMROpMap.get(poCoGrp.getInputs().get(0));
if(baseMROp.mapDone || !baseMROp.reducePlan.isEmpty()){
int errCode = 2254;
throw new MRCompilerException("Currently merged cogroup is not supported after blocking operators.", errCode);
}
-
+
// Create new map-reduce operator for indexing job and then configure it.
MapReduceOper indexerMROp = getMROp();
FileSpec idxFileSpec = getIndexingJob(indexerMROp, baseMROp, poCoGrp.getLRInnerPlansOf(0));
poCoGrp.setIdxFuncSpec(idxFileSpec.getFuncSpec());
poCoGrp.setIndexFileName(idxFileSpec.getFileName());
-
+
baseMROp.mapPlan.addAsLeaf(poCoGrp);
for (FuncSpec funcSpec : funcSpecs)
baseMROp.UDFs.add(funcSpec.toString());
@@ -1436,41 +1437,41 @@ public class MRCompiler extends PhyPlanV
throw new MRCompilerException(errMsg, errCode,e);
}
}
-
+
// Sets up the indexing job for map-side cogroups.
- private FileSpec getIndexingJob(MapReduceOper indexerMROp,
+ private FileSpec getIndexingJob(MapReduceOper indexerMROp,
final MapReduceOper baseMROp, final List<PhysicalPlan> mapperLRInnerPlans)
throws MRCompilerException, PlanException, ExecException, IOException, CloneNotSupportedException {
-
+
// First replace loader with MergeJoinIndexer.
PhysicalPlan baseMapPlan = baseMROp.mapPlan;
- POLoad baseLoader = (POLoad)baseMapPlan.getRoots().get(0);
+ POLoad baseLoader = (POLoad)baseMapPlan.getRoots().get(0);
FileSpec origLoaderFileSpec = baseLoader.getLFile();
FuncSpec funcSpec = origLoaderFileSpec.getFuncSpec();
LoadFunc loadFunc = baseLoader.getLoadFunc();
-
+
if (! (OrderedLoadFunc.class.isAssignableFrom(loadFunc.getClass()))){
int errCode = 1104;
String errMsg = "Base relation of merge-coGroup must implement " +
- "OrderedLoadFunc interface. The specified loader "
+ "OrderedLoadFunc interface. The specified loader "
+ funcSpec + " doesn't implement it";
throw new MRCompilerException(errMsg,errCode);
}
-
+
String[] indexerArgs = new String[6];
indexerArgs[0] = funcSpec.toString();
indexerArgs[1] = ObjectSerializer.serialize((Serializable)mapperLRInnerPlans);
indexerArgs[3] = baseLoader.getSignature();
indexerArgs[4] = baseLoader.getOperatorKey().scope;
- indexerArgs[5] = Boolean.toString(false); // we care for nulls.
-
+ indexerArgs[5] = Boolean.toString(false); // we care for nulls.
+
PhysicalPlan phyPlan;
- if (baseMapPlan.getSuccessors(baseLoader) == null
+ if (baseMapPlan.getSuccessors(baseLoader) == null
|| baseMapPlan.getSuccessors(baseLoader).isEmpty()){
// Load-Load-Cogroup case.
- phyPlan = null;
+ phyPlan = null;
}
-
+
else{ // We got something. Yank it and set it as inner plan.
phyPlan = baseMapPlan.clone();
PhysicalOperator root = phyPlan.getRoots().get(0);
@@ -1485,20 +1486,20 @@ public class MRCompiler extends PhyPlanV
new FuncSpec(MergeJoinIndexer.class.getName(), indexerArgs)));
indexerMROp.mapPlan.add(idxJobLoader);
indexerMROp.UDFs.add(baseLoader.getLFile().getFuncSpec().toString());
-
- // Loader of mro will return a tuple of form -
+
+ // Loader of mro will return a tuple of form -
// (key1, key2, .. , WritableComparable, splitIndex). See MergeJoinIndexer for details.
-
- // After getting an index entry in each mapper, send all of them to one
+
+ // After getting an index entry in each mapper, send all of them to one
// reducer where they will be sorted on the way by Hadoop.
MRUtil.simpleConnectMapToReduce(indexerMROp, scope, nig);
-
+
indexerMROp.requestedParallelism = 1; // we need exactly one reducer for indexing job.
-
- // We want to use typed tuple comparator for this job, instead of default
- // raw binary comparator used by Pig, to make sure index entries are
+
+ // We want to use typed tuple comparator for this job, instead of default
+ // raw binary comparator used by Pig, to make sure index entries are
// sorted correctly by Hadoop.
- indexerMROp.useTypedComparator(true);
+ indexerMROp.useTypedComparator(true);
POStore st = getStore();
FileSpec strFile = getTempFileSpec();
@@ -1508,7 +1509,7 @@ public class MRCompiler extends PhyPlanV
return strFile;
}
-
+
/** Since merge-join works on two inputs there are exactly two MROper predecessors identified as left and right.
* Instead of merging two operators, both are used to generate a MR job each. First MR oper is run to generate on-the-fly index on right side.
* Second is used to actually do the join. First MR oper is identified as rightMROper and second as curMROper.
@@ -1530,13 +1531,13 @@ public class MRCompiler extends PhyPlanV
}
curMROp = phyToMROpMap.get(joinOp.getInputs().get(0));
-
+
MapReduceOper rightMROpr = null;
if(curMROp.equals(compiledInputs[0]))
rightMROpr = compiledInputs[1];
else
rightMROpr = compiledInputs[0];
-
+
// We will first operate on right side which is indexer job.
// First yank plan of the compiled right input and set that as an inner plan of right operator.
PhysicalPlan rightPipelinePlan;
@@ -1547,18 +1548,18 @@ public class MRCompiler extends PhyPlanV
String errMsg = "Expected one but found more then one root physical operator in physical plan.";
throw new MRCompilerException(errMsg,errCode,PigException.BUG);
}
-
+
PhysicalOperator rightLoader = rightMapPlan.getRoots().get(0);
if(! (rightLoader instanceof POLoad)){
int errCode = 2172;
String errMsg = "Expected physical operator at root to be POLoad. Found : "+rightLoader.getClass().getCanonicalName();
throw new MRCompilerException(errMsg,errCode);
}
-
+
if (rightMapPlan.getSuccessors(rightLoader) == null || rightMapPlan.getSuccessors(rightLoader).isEmpty())
// Load - Join case.
- rightPipelinePlan = null;
-
+ rightPipelinePlan = null;
+
else{ // We got something on right side. Yank it and set it as inner plan of right input.
rightPipelinePlan = rightMapPlan.clone();
PhysicalOperator root = rightPipelinePlan.getRoots().get(0);
@@ -1567,8 +1568,8 @@ public class MRCompiler extends PhyPlanV
rightMapPlan.trimBelow(rightLoader);
}
}
-
- else if(!rightMROpr.reduceDone){
+
+ else if(!rightMROpr.reduceDone){
// Indexer must run in map. If we are in reduce, close it and start new MROper.
// No need of yanking in this case. Since we are starting brand new MR Operator and it will contain nothing.
POStore rightStore = getStore();
@@ -1577,18 +1578,18 @@ public class MRCompiler extends PhyPlanV
rightMROpr.reducePlan.addAsLeaf(rightStore);
rightMROpr.setReduceDone(true);
rightMROpr = startNew(rightStrFile, rightMROpr);
- rightPipelinePlan = null;
+ rightPipelinePlan = null;
}
-
+
else{
int errCode = 2022;
String msg = "Both map and reduce phases have been done. This is unexpected while compiling.";
throw new PlanException(msg, errCode, PigException.BUG);
}
-
+
joinOp.setupRightPipeline(rightPipelinePlan);
- rightMROpr.requestedParallelism = 1; // we need exactly one reducer for indexing job.
-
+ rightMROpr.requestedParallelism = 1; // we need exactly one reducer for indexing job.
+
// At this point, we must be operating on map plan of right input and it would contain nothing else other then a POLoad.
POLoad rightLoader = (POLoad)rightMROpr.mapPlan.getRoots().get(0);
joinOp.setSignature(rightLoader.getSignature());
@@ -1598,7 +1599,7 @@ public class MRCompiler extends PhyPlanV
joinOp.setRightLoaderFuncSpec(rightLoader.getLFile().getFuncSpec());
joinOp.setRightInputFileName(rightLoader.getLFile().getFileName());
udfs.add(rightLoader.getLFile().getFuncSpec().toString());
-
+
// we don't need the right MROper since
// the right loader is an IndexableLoadFunc which can handle the index
// itself
@@ -1607,15 +1608,15 @@ public class MRCompiler extends PhyPlanV
compiledInputs[0] = null;
} else if(rightMROpr == compiledInputs[1]) {
compiledInputs[1] = null;
- }
+ }
rightMROpr = null;
-
- // validate that the join keys in merge join are only
- // simple column projections or '*' and not expression - expressions
- // cannot be handled when the index is built by the storage layer on the sorted
- // data when the sorted data (and corresponding index) is written.
- // So merge join will be restricted not have expressions as
- // join keys
+
+ // validate that the join keys in merge join are only
+ // simple column projections or '*' and not expression - expressions
+ // cannot be handled when the index is built by the storage layer on the sorted
+ // data when the sorted data (and corresponding index) is written.
+ // So merge join will be restricted not have expressions as
+ // join keys
int numInputs = mPlan.getPredecessors(joinOp).size(); // should be 2
for(int i = 0; i < numInputs; i++) {
List<PhysicalPlan> keyPlans = joinOp.getInnerPlansOf(i);
@@ -1632,23 +1633,23 @@ public class MRCompiler extends PhyPlanV
}
} else {
LoadFunc loadFunc = rightLoader.getLoadFunc();
- //Replacing POLoad with indexer is disabled for 'merge-sparse' joins. While
+ //Replacing POLoad with indexer is disabled for 'merge-sparse' joins. While
//this feature would be useful, the current implementation of DefaultIndexableLoader
//is not designed to handle multiple calls to seekNear. Specifically, it rereads the entire index
//for each call. Some refactoring of this class is required - and then the check below could be removed.
- if (joinOp.getJoinType() == LOJoin.JOINTYPE.MERGESPARSE) {
+ if (joinOp.getJoinType() == LOJoin.JOINTYPE.MERGESPARSE) {
int errCode = 1104;
String errMsg = "Right input of merge-join must implement IndexableLoadFunc. " +
"The specified loader " + loadFunc + " doesn't implement it";
throw new MRCompilerException(errMsg,errCode);
- }
-
+ }
+
// Replace POLoad with indexer.
if (! (OrderedLoadFunc.class.isAssignableFrom(loadFunc.getClass()))){
int errCode = 1104;
String errMsg = "Right input of merge-join must implement " +
- "OrderedLoadFunc interface. The specified loader "
+ "OrderedLoadFunc interface. The specified loader "
+ loadFunc + " doesn't implement it";
throw new MRCompilerException(errMsg,errCode);
}
@@ -1663,22 +1664,22 @@ public class MRCompiler extends PhyPlanV
indexerArgs[3] = rightLoader.getSignature();
indexerArgs[4] = rightLoader.getOperatorKey().scope;
indexerArgs[5] = Boolean.toString(true);
-
+
FileSpec lFile = new FileSpec(rightLoader.getLFile().getFileName(),new FuncSpec(MergeJoinIndexer.class.getName(), indexerArgs));
rightLoader.setLFile(lFile);
-
- // Loader of mro will return a tuple of form -
+
+ // Loader of mro will return a tuple of form -
// (keyFirst1, keyFirst2, .. , position, splitIndex) See MergeJoinIndexer
MRUtil.simpleConnectMapToReduce(rightMROpr, scope, nig);
rightMROpr.useTypedComparator(true);
-
+
POStore st = getStore();
FileSpec strFile = getTempFileSpec();
st.setSFile(strFile);
rightMROpr.reducePlan.addAsLeaf(st);
rightMROpr.setReduceDone(true);
-
+
// set up the DefaultIndexableLoader for the join operator
String[] defaultIndexableLoaderArgs = new String[5];
defaultIndexableLoaderArgs[0] = origRightLoaderFileSpec.getFuncSpec().toString();
@@ -1687,17 +1688,17 @@ public class MRCompiler extends PhyPlanV
defaultIndexableLoaderArgs[3] = joinOp.getOperatorKey().scope;
defaultIndexableLoaderArgs[4] = origRightLoaderFileSpec.getFileName();
joinOp.setRightLoaderFuncSpec((new FuncSpec(DefaultIndexableLoader.class.getName(), defaultIndexableLoaderArgs)));
- joinOp.setRightInputFileName(origRightLoaderFileSpec.getFileName());
-
+ joinOp.setRightInputFileName(origRightLoaderFileSpec.getFileName());
+
joinOp.setIndexFile(strFile.getFileName());
udfs.add(origRightLoaderFileSpec.getFuncSpec().toString());
}
-
+
// We are done with right side. Lets work on left now.
// Join will be materialized in leftMROper.
- if(!curMROp.mapDone) // Life is easy
+ if(!curMROp.mapDone) // Life is easy
curMROp.mapPlan.addAsLeaf(joinOp);
-
+
else if(!curMROp.reduceDone){ // This is a map-side join. Close this MROper and start afresh.
POStore leftStore = getStore();
FileSpec leftStrFile = getTempFileSpec();
@@ -1707,7 +1708,7 @@ public class MRCompiler extends PhyPlanV
curMROp = startNew(leftStrFile, curMROp);
curMROp.mapPlan.addAsLeaf(joinOp);
}
-
+
else{
int errCode = 2022;
String msg = "Both map and reduce phases have been done. This is unexpected while compiling.";
@@ -1749,30 +1750,31 @@ public class MRCompiler extends PhyPlanV
prjStar.setResultType(DataType.TUPLE);
prjStar.setStar(true);
ep.add(prjStar);
-
+
List<PhysicalPlan> eps = new ArrayList<PhysicalPlan>();
eps.add(ep);
-
+
POLocalRearrange lr = new POLocalRearrange(new OperatorKey(scope,nig.getNextNodeId(scope)));
lr.setIndex(0);
lr.setKeyType(DataType.TUPLE);
lr.setPlans(eps);
lr.setResultType(DataType.TUPLE);
lr.setDistinct(true);
-
+
addToMap(lr);
-
+
blocking(op);
curMROp.customPartitioner = op.getCustomPartitioner();
-
+
POPackage pkg = new POPackage(new OperatorKey(scope,nig.getNextNodeId(scope)));
- pkg.setKeyType(DataType.TUPLE);
- pkg.setDistinct(true);
+ Packager pkgr = pkg.getPkgr();
+ pkgr.setKeyType(DataType.TUPLE);
+ pkgr.setDistinct(true);
pkg.setNumInps(1);
- boolean[] inner = {false};
- pkg.setInner(inner);
+ boolean[] inner = {false};
+ pkgr.setInner(inner);
curMROp.reducePlan.add(pkg);
-
+
List<PhysicalPlan> eps1 = new ArrayList<PhysicalPlan>();
List<Boolean> flat1 = new ArrayList<Boolean>();
PhysicalPlan ep1 = new PhysicalPlan();
@@ -1798,160 +1800,161 @@ public class MRCompiler extends PhyPlanV
throw new MRCompilerException(msg, errCode, PigException.BUG, e);
}
}
-
+
@Override
public void visitSkewedJoin(POSkewedJoin op) throws VisitorException {
- try {
- if (compiledInputs.length != 2) {
- int errCode = 2255;
- throw new VisitorException("POSkewedJoin operator has " + compiledInputs.length + " inputs. It should have 2.", errCode);
- }
-
- //change plan to store the first join input into a temp file
- FileSpec fSpec = getTempFileSpec();
- MapReduceOper mro = compiledInputs[0];
- POStore str = getStore();
- str.setSFile(fSpec);
- if (!mro.isMapDone()) {
- mro.mapPlan.addAsLeaf(str);
- mro.setMapDoneSingle(true);
- } else if (mro.isMapDone() && !mro.isReduceDone()) {
- mro.reducePlan.addAsLeaf(str);
- mro.setReduceDone(true);
- } else {
- int errCode = 2022;
- String msg = "Both map and reduce phases have been done. This is unexpected while compiling.";
- throw new PlanException(msg, errCode, PigException.BUG);
- }
-
- FileSpec partitionFile = getTempFileSpec();
- int rp = op.getRequestedParallelism();
-
- Pair<MapReduceOper, Integer> sampleJobPair = getSkewedJoinSampleJob(op, mro, fSpec, partitionFile, rp);
- rp = sampleJobPair.second;
-
- // set parallelism of SkewedJoin as the value calculated by sampling job
- // if "parallel" is specified in join statement, "rp" is equal to that number
- // if not specified, use the value that sampling process calculated
- // based on default.
- op.setRequestedParallelism(rp);
-
- // load the temp file for first table as input of join
- MapReduceOper[] joinInputs = new MapReduceOper[] {startNew(fSpec, sampleJobPair.first), compiledInputs[1]};
- MapReduceOper[] rearrangeOutputs = new MapReduceOper[2];
-
- compiledInputs = new MapReduceOper[] {joinInputs[0]};
- // run POLocalRearrange for first join table
- POLocalRearrange lr = new POLocalRearrange(new OperatorKey(scope,nig.getNextNodeId(scope)), rp);
- try {
- lr.setIndex(0);
- } catch (ExecException e) {
- int errCode = 2058;
- String msg = "Unable to set index on newly created POLocalRearrange.";
- throw new PlanException(msg, errCode, PigException.BUG, e);
- }
-
- List<PhysicalOperator> l = plan.getPredecessors(op);
- MultiMap<PhysicalOperator, PhysicalPlan> joinPlans = op.getJoinPlans();
- List<PhysicalPlan> groups = joinPlans.get(l.get(0));
- // check the type of group keys, if there are more than one field, the key is TUPLE.
- byte type = DataType.TUPLE;
- if (groups.size() == 1) {
- type = groups.get(0).getLeaves().get(0).getResultType();
- }
-
- lr.setKeyType(type);
- lr.setPlans(groups);
- lr.setResultType(DataType.TUPLE);
-
- lr.visit(this);
- if(lr.getRequestedParallelism() > curMROp.requestedParallelism)
- curMROp.requestedParallelism = lr.getRequestedParallelism();
- rearrangeOutputs[0] = curMROp;
-
- compiledInputs = new MapReduceOper[] {joinInputs[1]};
- // if the map for current input is already closed, then start a new job
- if (compiledInputs[0].isMapDone() && !compiledInputs[0].isReduceDone()) {
- FileSpec f = getTempFileSpec();
- POStore s = getStore();
- s.setSFile(f);
- compiledInputs[0].reducePlan.addAsLeaf(s);
- compiledInputs[0].setReduceDone(true);
- compiledInputs[0] = startNew(f, compiledInputs[0]);
- }
-
- // run POPartitionRearrange for second join table
- POPartitionRearrange pr =
- new POPartitionRearrange(new OperatorKey(scope,nig.getNextNodeId(scope)), rp);
- pr.setPigContext(pigContext);
- lr = pr;
- try {
- lr.setIndex(1);
- } catch (ExecException e) {
- int errCode = 2058;
- String msg = "Unable to set index on newly created POLocalRearrange.";
- throw new PlanException(msg, errCode, PigException.BUG, e);
- }
-
- groups = joinPlans.get(l.get(1));
- lr.setPlans(groups);
- lr.setKeyType(type);
- lr.setResultType(DataType.BAG);
-
- lr.visit(this);
- if(lr.getRequestedParallelism() > curMROp.requestedParallelism)
- curMROp.requestedParallelism = lr.getRequestedParallelism();
- rearrangeOutputs[1] = curMROp;
- compiledInputs = rearrangeOutputs;
-
-
- // create POGlobalRearrange
- POGlobalRearrange gr = new POGlobalRearrange(new OperatorKey(scope,nig.getNextNodeId(scope)), rp);
- // Skewed join has its own special partitioner
- gr.setResultType(DataType.TUPLE);
- gr.visit(this);
- if(gr.getRequestedParallelism() > curMROp.requestedParallelism)
- curMROp.requestedParallelism = gr.getRequestedParallelism();
- compiledInputs = new MapReduceOper[] {curMROp};
-
- // create POPakcage
- POPackage pkg = new POPackage(new OperatorKey(scope,nig.getNextNodeId(scope)), rp);
- pkg.setKeyType(type);
- pkg.setResultType(DataType.TUPLE);
- pkg.setNumInps(2);
- boolean [] inner = op.getInnerFlags();
- pkg.setInner(inner);
- pkg.visit(this);
- compiledInputs = new MapReduceOper[] {curMROp};
-
- // create POForEach
- List<PhysicalPlan> eps = new ArrayList<PhysicalPlan>();
- List<Boolean> flat = new ArrayList<Boolean>();
-
- PhysicalPlan ep;
- // Add corresponding POProjects
- for (int i=0; i < 2; i++ ) {
- ep = new PhysicalPlan();
- POProject prj = new POProject(new OperatorKey(scope,nig.getNextNodeId(scope)));
- prj.setColumn(i+1);
- prj.setOverloaded(false);
- prj.setResultType(DataType.BAG);
- ep.add(prj);
- eps.add(ep);
- if (!inner[i]) {
- // Add an empty bag for outer join
- CompilerUtils.addEmptyBagOuterJoin(ep, op.getSchema(i));
- }
- flat.add(true);
- }
-
- POForEach fe = new POForEach(new OperatorKey(scope,nig.getNextNodeId(scope)), -1, eps, flat);
- fe.setResultType(DataType.TUPLE);
-
- fe.visit(this);
-
- curMROp.setSkewedJoinPartitionFile(partitionFile.getFileName());
- phyToMROpMap.put(op, curMROp);
+ try {
+ if (compiledInputs.length != 2) {
+ int errCode = 2255;
+ throw new VisitorException("POSkewedJoin operator has " + compiledInputs.length + " inputs. It should have 2.", errCode);
+ }
+
+ //change plan to store the first join input into a temp file
+ FileSpec fSpec = getTempFileSpec();
+ MapReduceOper mro = compiledInputs[0];
+ POStore str = getStore();
+ str.setSFile(fSpec);
+ if (!mro.isMapDone()) {
+ mro.mapPlan.addAsLeaf(str);
+ mro.setMapDoneSingle(true);
+ } else if (mro.isMapDone() && !mro.isReduceDone()) {
+ mro.reducePlan.addAsLeaf(str);
+ mro.setReduceDone(true);
+ } else {
+ int errCode = 2022;
+ String msg = "Both map and reduce phases have been done. This is unexpected while compiling.";
+ throw new PlanException(msg, errCode, PigException.BUG);
+ }
+
+ FileSpec partitionFile = getTempFileSpec();
+ int rp = op.getRequestedParallelism();
+
+ Pair<MapReduceOper, Integer> sampleJobPair = getSkewedJoinSampleJob(op, mro, fSpec, partitionFile, rp);
+ rp = sampleJobPair.second;
+
+ // set parallelism of SkewedJoin as the value calculated by sampling job
+ // if "parallel" is specified in join statement, "rp" is equal to that number
+ // if not specified, use the value that sampling process calculated
+ // based on default.
+ op.setRequestedParallelism(rp);
+
+ // load the temp file for first table as input of join
+ MapReduceOper[] joinInputs = new MapReduceOper[] {startNew(fSpec, sampleJobPair.first), compiledInputs[1]};
+ MapReduceOper[] rearrangeOutputs = new MapReduceOper[2];
+
+ compiledInputs = new MapReduceOper[] {joinInputs[0]};
+ // run POLocalRearrange for first join table
+ POLocalRearrange lr = new POLocalRearrange(new OperatorKey(scope,nig.getNextNodeId(scope)), rp);
+ try {
+ lr.setIndex(0);
+ } catch (ExecException e) {
+ int errCode = 2058;
+ String msg = "Unable to set index on newly created POLocalRearrange.";
+ throw new PlanException(msg, errCode, PigException.BUG, e);
+ }
+
+ List<PhysicalOperator> l = plan.getPredecessors(op);
+ MultiMap<PhysicalOperator, PhysicalPlan> joinPlans = op.getJoinPlans();
+ List<PhysicalPlan> groups = joinPlans.get(l.get(0));
+ // check the type of group keys, if there are more than one field, the key is TUPLE.
+ byte type = DataType.TUPLE;
+ if (groups.size() == 1) {
+ type = groups.get(0).getLeaves().get(0).getResultType();
+ }
+
+ lr.setKeyType(type);
+ lr.setPlans(groups);
+ lr.setResultType(DataType.TUPLE);
+
+ lr.visit(this);
+ if(lr.getRequestedParallelism() > curMROp.requestedParallelism)
+ curMROp.requestedParallelism = lr.getRequestedParallelism();
+ rearrangeOutputs[0] = curMROp;
+
+ compiledInputs = new MapReduceOper[] {joinInputs[1]};
+ // if the map for current input is already closed, then start a new job
+ if (compiledInputs[0].isMapDone() && !compiledInputs[0].isReduceDone()) {
+ FileSpec f = getTempFileSpec();
+ POStore s = getStore();
+ s.setSFile(f);
+ compiledInputs[0].reducePlan.addAsLeaf(s);
+ compiledInputs[0].setReduceDone(true);
+ compiledInputs[0] = startNew(f, compiledInputs[0]);
+ }
+
+ // run POPartitionRearrange for second join table
+ POPartitionRearrange pr =
+ new POPartitionRearrange(new OperatorKey(scope,nig.getNextNodeId(scope)), rp);
+ pr.setPigContext(pigContext);
+ lr = pr;
+ try {
+ lr.setIndex(1);
+ } catch (ExecException e) {
+ int errCode = 2058;
+ String msg = "Unable to set index on newly created POLocalRearrange.";
+ throw new PlanException(msg, errCode, PigException.BUG, e);
+ }
+
+ groups = joinPlans.get(l.get(1));
+ lr.setPlans(groups);
+ lr.setKeyType(type);
+ lr.setResultType(DataType.BAG);
+
+ lr.visit(this);
+ if(lr.getRequestedParallelism() > curMROp.requestedParallelism)
+ curMROp.requestedParallelism = lr.getRequestedParallelism();
+ rearrangeOutputs[1] = curMROp;
+ compiledInputs = rearrangeOutputs;
+
+
+ // create POGlobalRearrange
+ POGlobalRearrange gr = new POGlobalRearrange(new OperatorKey(scope,nig.getNextNodeId(scope)), rp);
+ // Skewed join has its own special partitioner
+ gr.setResultType(DataType.TUPLE);
+ gr.visit(this);
+ if(gr.getRequestedParallelism() > curMROp.requestedParallelism)
+ curMROp.requestedParallelism = gr.getRequestedParallelism();
+ compiledInputs = new MapReduceOper[] {curMROp};
+
+ // create POPakcage
+ POPackage pkg = new POPackage(new OperatorKey(scope,nig.getNextNodeId(scope)), rp);
+ Packager pkgr = pkg.getPkgr();
+ pkgr.setKeyType(type);
+ pkg.setResultType(DataType.TUPLE);
+ pkg.setNumInps(2);
+ boolean [] inner = op.getInnerFlags();
+ pkgr.setInner(inner);
+ pkg.visit(this);
+ compiledInputs = new MapReduceOper[] {curMROp};
+
+ // create POForEach
+ List<PhysicalPlan> eps = new ArrayList<PhysicalPlan>();
+ List<Boolean> flat = new ArrayList<Boolean>();
+
+ PhysicalPlan ep;
+ // Add corresponding POProjects
+ for (int i=0; i < 2; i++ ) {
+ ep = new PhysicalPlan();
+ POProject prj = new POProject(new OperatorKey(scope,nig.getNextNodeId(scope)));
+ prj.setColumn(i+1);
+ prj.setOverloaded(false);
+ prj.setResultType(DataType.BAG);
+ ep.add(prj);
+ eps.add(ep);
+ if (!inner[i]) {
+ // Add an empty bag for outer join
+ CompilerUtils.addEmptyBagOuterJoin(ep, op.getSchema(i));
+ }
+ flat.add(true);
+ }
+
+ POForEach fe = new POForEach(new OperatorKey(scope,nig.getNextNodeId(scope)), -1, eps, flat);
+ fe.setResultType(DataType.TUPLE);
+
+ fe.visit(this);
+
+ curMROp.setSkewedJoinPartitionFile(partitionFile.getFileName());
+ phyToMROpMap.put(op, curMROp);
}catch(PlanException e) {
int errCode = 2034;
String msg = "Error compiling operator " + op.getClass().getSimpleName();
@@ -1972,11 +1975,11 @@ public class MRCompiler extends PhyPlanV
FileSpec quantFile = getTempFileSpec();
int rp = op.getRequestedParallelism();
Pair<POProject, Byte>[] fields = getSortCols(op.getSortPlans());
- Pair<MapReduceOper, Integer> quantJobParallelismPair =
+ Pair<MapReduceOper, Integer> quantJobParallelismPair =
getQuantileJob(op, mro, fSpec, quantFile, rp);
- curMROp = getSortJob(op, quantJobParallelismPair.first, fSpec, quantFile,
+ curMROp = getSortJob(op, quantJobParallelismPair.first, fSpec, quantFile,
quantJobParallelismPair.second, fields);
-
+
if(op.isUDFComparatorUsed){
curMROp.UDFs.add(op.getMSortFunc().getFuncSpec().toString());
curMROp.isUDFComparatorUsed = true;
@@ -2068,7 +2071,7 @@ public class MRCompiler extends PhyPlanV
String msg = "No expression plan found in POSort.";
throw new PlanException(msg, errCode, PigException.BUG);
}
-
+
private MapReduceOper getSortJob(
POSort sort,
MapReduceOper quantJob,
@@ -2083,11 +2086,11 @@ public class MRCompiler extends PhyPlanV
long limit = sort.getLimit();
mro.limit = limit;
-
+
List<PhysicalPlan> eps1 = new ArrayList<PhysicalPlan>();
byte keyType = DataType.UNKNOWN;
-
+
boolean[] sortOrder;
List<Boolean> sortOrderList = sort.getMAscCols();
@@ -2139,13 +2142,13 @@ public class MRCompiler extends PhyPlanV
throw new PlanException(msg, errCode, PigException.BUG, ve);
}
}
-
+
POLocalRearrange lr = new POLocalRearrange(new OperatorKey(scope,nig.getNextNodeId(scope)));
try {
lr.setIndex(0);
} catch (ExecException e) {
- int errCode = 2058;
- String msg = "Unable to set index on newly created POLocalRearrange.";
+ int errCode = 2058;
+ String msg = "Unable to set index on newly created POLocalRearrange.";
throw new PlanException(msg, errCode, PigException.BUG, e);
}
lr.setKeyType((fields == null || fields.length>1) ? DataType.TUPLE :
@@ -2154,16 +2157,18 @@ public class MRCompiler extends PhyPlanV
lr.setResultType(DataType.TUPLE);
lr.addOriginalLocation(sort.getAlias(), sort.getOriginalLocations());
mro.mapPlan.addAsLeaf(lr);
-
+
mro.setMapDone(true);
-
+
if (limit!=-1) {
- POPackageLite pkg_c = new POPackageLite(new OperatorKey(scope,nig.getNextNodeId(scope)));
- pkg_c.setKeyType((fields.length>1) ? DataType.TUPLE : keyType);
+ POPackage pkg_c = new POPackage(new OperatorKey(scope,
+ nig.getNextNodeId(scope)));
+ LitePackager pkgr = new LitePackager();
+ pkgr.setKeyType((fields.length > 1) ? DataType.TUPLE : keyType);
+ pkg_c.setPkgr(pkgr);
pkg_c.setNumInps(1);
- //pkg.setResultType(DataType.TUPLE);
mro.combinePlan.add(pkg_c);
-
+
List<PhysicalPlan> eps_c1 = new ArrayList<PhysicalPlan>();
List<Boolean> flat_c1 = new ArrayList<Boolean>();
PhysicalPlan ep_c1 = new PhysicalPlan();
@@ -2174,38 +2179,41 @@ public class MRCompiler extends PhyPlanV
ep_c1.add(prj_c1);
eps_c1.add(ep_c1);
flat_c1.add(true);
- POForEach fe_c1 = new POForEach(new OperatorKey(scope,nig.getNextNodeId(scope)),
- -1, eps_c1, flat_c1);
+ POForEach fe_c1 = new POForEach(new OperatorKey(scope,nig.getNextNodeId(scope)),
+ -1, eps_c1, flat_c1);
fe_c1.setResultType(DataType.TUPLE);
mro.combinePlan.addAsLeaf(fe_c1);
-
+
POLimit pLimit = new POLimit(new OperatorKey(scope,nig.getNextNodeId(scope)));
- pLimit.setLimit(limit);
- mro.combinePlan.addAsLeaf(pLimit);
-
+ pLimit.setLimit(limit);
+ mro.combinePlan.addAsLeaf(pLimit);
+
List<PhysicalPlan> eps_c2 = new ArrayList<PhysicalPlan>();
eps_c2.addAll(sort.getSortPlans());
-
- POLocalRearrange lr_c2 = new POLocalRearrange(new OperatorKey(scope,nig.getNextNodeId(scope)));
- try {
+
+ POLocalRearrange lr_c2 = new POLocalRearrange(new OperatorKey(scope,nig.getNextNodeId(scope)));
+ try {
lr_c2.setIndex(0);
} catch (ExecException e) {
- int errCode = 2058;
- String msg = "Unable to set index on newly created POLocalRearrange.";
+ int errCode = 2058;
+ String msg = "Unable to set index on newly created POLocalRearrange.";
throw new PlanException(msg, errCode, PigException.BUG, e);
}
- lr_c2.setKeyType((fields.length>1) ? DataType.TUPLE : keyType);
- lr_c2.setPlans(eps_c2);
- lr_c2.setResultType(DataType.TUPLE);
- mro.combinePlan.addAsLeaf(lr_c2);
- }
-
- POPackageLite pkg = new POPackageLite(new OperatorKey(scope,nig.getNextNodeId(scope)));
- pkg.setKeyType((fields == null || fields.length>1) ? DataType.TUPLE :
- keyType);
- pkg.setNumInps(1);
+ lr_c2.setKeyType((fields.length>1) ? DataType.TUPLE : keyType);
+ lr_c2.setPlans(eps_c2);
+ lr_c2.setResultType(DataType.TUPLE);
+ mro.combinePlan.addAsLeaf(lr_c2);
+ }
+
+ POPackage pkg = new POPackage(new OperatorKey(scope,
+ nig.getNextNodeId(scope)));
+ LitePackager pkgr = new LitePackager();
+ pkgr.setKeyType((fields == null || fields.length > 1) ? DataType.TUPLE
+ : keyType);
+ pkg.setPkgr(pkgr);
+ pkg.setNumInps(1);
mro.reducePlan.add(pkg);
-
+
PhysicalPlan ep = new PhysicalPlan();
POProject prj = new POProject(new OperatorKey(scope,nig.getNextNodeId(scope)));
prj.setColumn(1);
@@ -2222,13 +2230,12 @@ public class MRCompiler extends PhyPlanV
mro.phyToMRMap.put(sort, nfe1);
if (limit!=-1)
{
- POLimit pLimit2 = new POLimit(new OperatorKey(scope,nig.getNextNodeId(scope)));
- pLimit2.setLimit(limit);
- mro.reducePlan.addAsLeaf(pLimit2);
- mro.phyToMRMap.put(sort, pLimit2);
+ POLimit pLimit2 = new POLimit(new OperatorKey(scope,nig.getNextNodeId(scope)));
+ pLimit2.setLimit(limit);
+ mro.reducePlan.addAsLeaf(pLimit2);
+ mro.phyToMRMap.put(sort, pLimit2);
}
-// ep1.add(innGen);
return mro;
}
@@ -2238,13 +2245,13 @@ public class MRCompiler extends PhyPlanV
FileSpec lFile,
FileSpec quantFile,
int rp) throws PlanException, VisitorException {
-
+
POSort sort = new POSort(inpSort.getOperatorKey(), inpSort
.getRequestedParallelism(), null, inpSort.getSortPlans(),
inpSort.getMAscCols(), inpSort.getMSortFunc());
sort.addOriginalLocation(inpSort.getAlias(), inpSort.getOriginalLocations());
-
- // Turn the asc/desc array into an array of strings so that we can pass it
+
+ // Turn the asc/desc array into an array of strings so that we can pass it
// to the FindQuantiles function.
List<Boolean> ascCols = inpSort.getMAscCols();
String[] ascs = new String[ascCols.size()];
@@ -2261,65 +2268,65 @@ public class MRCompiler extends PhyPlanV
ctorArgs[j+1] = ascs[j];
}
}
-
+
return getSamplingJob(sort, prevJob, null, lFile, quantFile, rp, null, FindQuantiles.class.getName(), ctorArgs, RandomSampleLoader.class.getName());
}
-
+
/**
* Create Sampling job for skewed join.
*/
- private Pair<MapReduceOper, Integer> getSkewedJoinSampleJob(POSkewedJoin op, MapReduceOper prevJob,
- FileSpec lFile, FileSpec sampleFile, int rp ) throws PlanException, VisitorException {
-
- MultiMap<PhysicalOperator, PhysicalPlan> joinPlans = op.getJoinPlans();
-
- List<PhysicalOperator> l = plan.getPredecessors(op);
- List<PhysicalPlan> groups = joinPlans.get(l.get(0));
- List<Boolean> ascCol = new ArrayList<Boolean>();
- for(int i=0; i<groups.size(); i++) {
- ascCol.add(false);
- }
-
- POSort sort = new POSort(op.getOperatorKey(), op.getRequestedParallelism(), null, groups, ascCol, null);
-
- // set up transform plan to get keys and memory size of input tuples
- // it first adds all the plans to get key columns,
- List<PhysicalPlan> transformPlans = new ArrayList<PhysicalPlan>();
- transformPlans.addAll(groups);
-
- // then it adds a column for memory size
- POProject prjStar = new POProject(new OperatorKey(scope,nig.getNextNodeId(scope)));
+ private Pair<MapReduceOper, Integer> getSkewedJoinSampleJob(POSkewedJoin op, MapReduceOper prevJob,
+ FileSpec lFile, FileSpec sampleFile, int rp ) throws PlanException, VisitorException {
+
+ MultiMap<PhysicalOperator, PhysicalPlan> joinPlans = op.getJoinPlans();
+
+ List<PhysicalOperator> l = plan.getPredecessors(op);
+ List<PhysicalPlan> groups = joinPlans.get(l.get(0));
+ List<Boolean> ascCol = new ArrayList<Boolean>();
+ for(int i=0; i<groups.size(); i++) {
+ ascCol.add(false);
+ }
+
+ POSort sort = new POSort(op.getOperatorKey(), op.getRequestedParallelism(), null, groups, ascCol, null);
+
+ // set up transform plan to get keys and memory size of input tuples
+ // it first adds all the plans to get key columns,
+ List<PhysicalPlan> transformPlans = new ArrayList<PhysicalPlan>();
+ transformPlans.addAll(groups);
+
+ // then it adds a column for memory size
+ POProject prjStar = new POProject(new OperatorKey(scope,nig.getNextNodeId(scope)));
prjStar.setResultType(DataType.TUPLE);
- prjStar.setStar(true);
-
+ prjStar.setStar(true);
+
List<PhysicalOperator> ufInps = new ArrayList<PhysicalOperator>();
ufInps.add(prjStar);
-
- PhysicalPlan ep = new PhysicalPlan();
- POUserFunc uf = new POUserFunc(new OperatorKey(scope,nig.getNextNodeId(scope)), -1, ufInps,
- new FuncSpec(GetMemNumRows.class.getName(), (String[])null));
- uf.setResultType(DataType.TUPLE);
- ep.add(uf);
- ep.add(prjStar);
- ep.connect(prjStar, uf);
-
- transformPlans.add(ep);
-
- try{
- // pass configurations to the User Function
- String per = pigContext.getProperties().getProperty("pig.skewedjoin.reduce.memusage",
+
+ PhysicalPlan ep = new PhysicalPlan();
+ POUserFunc uf = new POUserFunc(new OperatorKey(scope,nig.getNextNodeId(scope)), -1, ufInps,
+ new FuncSpec(GetMemNumRows.class.getName(), (String[])null));
+ uf.setResultType(DataType.TUPLE);
+ ep.add(uf);
+ ep.add(prjStar);
+ ep.connect(prjStar, uf);
+
+ transformPlans.add(ep);
+
+ try{
+ // pass configurations to the User Function
+ String per = pigContext.getProperties().getProperty("pig.skewedjoin.reduce.memusage",
String.valueOf(PartitionSkewedKeys.DEFAULT_PERCENT_MEMUSAGE));
- String mc = pigContext.getProperties().getProperty("pig.skewedjoin.reduce.maxtuple", "0");
- String inputFile = lFile.getFileName();
+ String mc = pigContext.getProperties().getProperty("pig.skewedjoin.reduce.maxtuple", "0");
+ String inputFile = lFile.getFileName();
+
+ return getSamplingJob(sort, prevJob, transformPlans, lFile, sampleFile, rp, null,
[... 440 lines stripped ...]