You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2014/01/09 22:50:46 UTC
svn commit: r1556961 - in /pig/branches/tez: src/org/apache/pig/
src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/
src/org/apache/pig/backend/hadoop/executionengine/tez/ test/e2e/pig/tests/
Author: daijy
Date: Thu Jan 9 21:50:46 2014
New Revision: 1556961
URL: http://svn.apache.org/r1556961
Log:
PIG-3647: Implement merge join in Tez
Added:
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompilerUtil.java
Modified:
pig/branches/tez/src/org/apache/pig/PigConfiguration.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperator.java
pig/branches/tez/test/e2e/pig/tests/tez.conf
Modified: pig/branches/tez/src/org/apache/pig/PigConfiguration.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/PigConfiguration.java?rev=1556961&r1=1556960&r2=1556961&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/PigConfiguration.java (original)
+++ pig/branches/tez/src/org/apache/pig/PigConfiguration.java Thu Jan 9 21:50:46 2014
@@ -153,5 +153,21 @@ public class PigConfiguration {
* the distributed cache when doing fragment-replicated join
*/
public static final String PIG_JOIN_REPLICATED_MAX_BYTES = "pig.join.replicated.max.bytes";
+
+ /**
+ * Turns combine split files on or off
+ */
+ public static final String PIG_SPLIT_COMBINATION = "pig.splitCombination";
+
+ /**
+ * Whether turns combine split files off. This is for internal use only
+ */
+ public static final String PIG_NO_SPLIT_COMBINATION = "pig.noSplitCombination";
+
+ /**
+ * Specifies the size, in bytes, of data to be processed by a single map.
+ * Smaller files are combined untill this size is reached.
+ */
+ public static final String PIG_MAX_COMBINED_SPLIT_SIZE = "pig.maxCombinedSplitSize";
}
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java?rev=1556961&r1=1556960&r2=1556961&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java Thu Jan 9 21:50:46 2014
@@ -123,6 +123,8 @@ public class POMergeJoin extends Physica
private String signature;
+ private byte endOfRecordMark;
+
// This serves as the default TupleFactory
private transient TupleFactory mTupleFactory;
@@ -154,14 +156,23 @@ public class POMergeJoin extends Physica
LRs = new POLocalRearrange[2];
this.createJoinPlans(inpPlans,keyTypes);
this.indexFile = null;
- this.joinType = joinType;
+ this.joinType = joinType;
this.leftInputSchema = leftInputSchema;
this.mergedInputSchema = mergedInputSchema;
+ this.endOfRecordMark = POStatus.STATUS_EOP;
+ }
+
+ // Set to POStatus.STATUS_EOP (default) for MR and POStatus.STATUS_NULL for Tez.
+ // This is because:
+ // For MR, we send EOP at the end of every record
+ // For Tez, we only use a global EOP, so send NULL for end of record
+ public void setEndOfRecordMark(byte endOfRecordMark) {
+ this.endOfRecordMark = endOfRecordMark;
}
/**
* Configures the Local Rearrange operators to get keys out of tuple.
- * @throws ExecException
+ * @throws ExecException
*/
private void createJoinPlans(MultiMap<PhysicalOperator, PhysicalPlan> inpPlans, List<List<Byte>> keyTypes) throws PlanException{
@@ -285,8 +296,8 @@ public class POMergeJoin extends Physica
curLeftKey = extractKeysFromTuple(curLeftInp, 0);
if(null == curLeftKey) // We drop the tuples which have null keys.
- return new Result(POStatus.STATUS_EOP, null);
-
+ return new Result(endOfRecordMark, null);
+
try {
seekInRightStream(curLeftKey);
} catch (IOException e) {
@@ -297,7 +308,7 @@ public class POMergeJoin extends Physica
leftTuples.add((Tuple)curLeftInp.result);
firstTime = false;
prevLeftKey = curLeftKey;
- return new Result(POStatus.STATUS_EOP, null);
+ return new Result(endOfRecordMark, null);
}
if(doingJoin){
@@ -363,7 +374,7 @@ public class POMergeJoin extends Physica
log.error("Received exception while trying to close right side file: " + e.getMessage());
}
}
- return new Result(POStatus.STATUS_EOP, null);
+ return new Result(endOfRecordMark, null);
}
else{ // At this point right side can't be behind.
int errCode = 1102;
@@ -381,17 +392,17 @@ public class POMergeJoin extends Physica
case POStatus.STATUS_OK:
curLeftKey = extractKeysFromTuple(curLeftInp, 0);
if(null == curLeftKey) // We drop the tuples which have null keys.
- return new Result(POStatus.STATUS_EOP, null);
-
+ return new Result(endOfRecordMark, null);
+
int cmpVal = ((Comparable)curLeftKey).compareTo(prevLeftKey);
if(cmpVal == 0){
// Keep on accumulating.
leftTuples.add((Tuple)curLeftInp.result);
- return new Result(POStatus.STATUS_EOP, null);
+ return new Result(endOfRecordMark, null);
}
else if(cmpVal > 0){ // Filled with left bag. Move on.
curJoinKey = prevLeftKey;
- break;
+ break;
}
else{ // Current key < Prev Key
int errCode = 1102;
@@ -423,7 +434,7 @@ public class POMergeJoin extends Physica
leftTuples.add((Tuple)curLeftInp.result);
prevLeftInp = curLeftInp;
prevLeftKey = curLeftKey;
- return new Result(POStatus.STATUS_EOP, null);
+ return new Result(endOfRecordMark, null);
}
// Accumulated tuples with same key on left side.
@@ -504,14 +515,14 @@ public class POMergeJoin extends Physica
log.error("Received exception while trying to close right side file: " + e.getMessage());
}
}
- return new Result(POStatus.STATUS_EOP, null);
+ return new Result(endOfRecordMark, null);
}
}
}
-
+
private void seekInRightStream(Object firstLeftKey) throws IOException{
rightLoader = (LoadFunc)PigContext.instantiateFuncFromSpec(rightLoaderFuncSpec);
-
+
// check if hadoop distributed cache is used
if (indexFile != null && rightLoader instanceof DefaultIndexableLoader) {
DefaultIndexableLoader loader = (DefaultIndexableLoader)rightLoader;
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java?rev=1556961&r1=1556960&r2=1556961&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java Thu Jan 9 21:50:46 2014
@@ -18,6 +18,7 @@
package org.apache.pig.backend.hadoop.executionengine.tez;
import java.io.IOException;
+import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
@@ -30,10 +31,16 @@ import java.util.Stack;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.pig.FuncSpec;
+import org.apache.pig.IndexableLoadFunc;
+import org.apache.pig.LoadFunc;
+import org.apache.pig.OrderedLoadFunc;
import org.apache.pig.PigException;
import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRCompilerException;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MergeJoinIndexer;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.ScalarPhyFinder;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.UDFFinder;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ConstantExpression;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject;
@@ -69,6 +76,7 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.tez.POLocalRearrangeTezFactory.LocalRearrangeType;
import org.apache.pig.data.DataType;
import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.builtin.DefaultIndexableLoader;
import org.apache.pig.impl.builtin.FindQuantiles;
import org.apache.pig.impl.io.FileLocalizer;
import org.apache.pig.impl.io.FileSpec;
@@ -79,8 +87,10 @@ import org.apache.pig.impl.plan.Operator
import org.apache.pig.impl.plan.OperatorPlan;
import org.apache.pig.impl.plan.PlanException;
import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.util.ObjectSerializer;
import org.apache.pig.impl.util.Pair;
import org.apache.pig.impl.util.Utils;
+import org.apache.pig.newplan.logical.relational.LOJoin;
import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
import org.apache.tez.runtime.library.input.ShuffledUnorderedKVInput;
import org.apache.tez.runtime.library.output.OnFileUnorderedKVOutput;
@@ -380,13 +390,6 @@ public class TezCompiler extends PhyPlan
curTezOp = tezOp;
}
- private void connect(TezOperPlan plan, TezOperator from, TezOperator to) throws PlanException {
- plan.connect(from, to);
- // Add edge descriptors to old and new operators
- to.inEdges.put(from.getOperatorKey(), new TezEdgeDescriptor());
- from.outEdges.put(to.getOperatorKey(), new TezEdgeDescriptor());
- }
-
private void blocking() throws IOException, PlanException {
TezOperator newTezOp = getTezOp();
tezPlan.add(newTezOp);
@@ -416,9 +419,9 @@ public class TezCompiler extends PhyPlan
POSplit split = findPOSplit(splitOp, from.getSplitOperatorKey());
split.addPlan(from.plan);
addSubPlanPropertiesToParent(splitOp, curTezOp);
- connect(tezPlan, splitOp, to);
+ TezCompilerUtil.connect(tezPlan, splitOp, to);
} else {
- connect(tezPlan, from, to);
+ TezCompilerUtil.connect(tezPlan, from, to);
}
}
@@ -672,7 +675,7 @@ public class TezCompiler extends PhyPlan
project.setColumn(0);
project.setOverloaded(false);
- POForEach forEach = getForEach(project, rp);
+ POForEach forEach = TezCompilerUtil.getForEach(project, rp, scope, nig);
plan.addAsLeaf(forEach);
}
@@ -732,7 +735,7 @@ public class TezCompiler extends PhyPlan
lr.setOutputKey(curTezOp.getOperatorKey().toString());
tezOp.plan.addAsLeaf(lr);
- connect(tezPlan, tezOp, curTezOp);
+ TezCompilerUtil.connect(tezPlan, tezOp, curTezOp);
inputKeys.add(tezOp.getOperatorKey().toString());
// Configure broadcast edges for replicated tables
@@ -799,7 +802,7 @@ public class TezCompiler extends PhyPlan
// Then add a POPackage and a POForEach to the start of the new tezOp.
POPackage pkg = getPackage(1, DataType.TUPLE);
- POForEach forEach = getForEachPlain();
+ POForEach forEach = TezCompilerUtil.getForEachPlain(scope, nig);
curTezOp.plan.add(pkg);
curTezOp.plan.addAsLeaf(forEach);
@@ -869,11 +872,221 @@ public class TezCompiler extends PhyPlan
throw new TezCompilerException(msg, errCode, PigException.BUG);
}
+ /** Since merge-join works on two inputs there are exactly two TezOper predecessors identified as left and right.
+ * Right input generates index on-the-fly. This consists of two Tez vertexes. The first vertex generates index,
+ * and the second vertex sort them.
+ * Left input contains POMergeJoin which do the actual join.
+ * First right Tez oper is identified as rightTezOpr, second is identified as rightTezOpr2
+ * Left Tez oper is identified as curTezOper.
+
+ * 1) RightTezOpr: It can be preceded only by POLoad. If there is anything else
+ * in physical plan, that is yanked and set as inner plans of joinOp.
+ * 2) LeftTezOper: add the Join operator in it.
+ *
+ * We also need to segment the DAG into two, because POMergeJoin depends on the index file which loads with
+ * DefaultIndexableLoader. It is possible to convert the index as a broadcast input, but that is costly
+ * because a lot of logic is built into DefaultIndexableLoader. We can revisit it later.
+ */
@Override
- public void visitMergeJoin(POMergeJoin op) throws VisitorException {
- int errCode = 2034;
- String msg = "Cannot compile " + op.getClass().getSimpleName();
- throw new TezCompilerException(msg, errCode, PigException.BUG);
+ public void visitMergeJoin(POMergeJoin joinOp) throws VisitorException {
+
+ try{
+ joinOp.setEndOfRecordMark(POStatus.STATUS_NULL);
+ if(compiledInputs.length != 2 || joinOp.getInputs().size() != 2){
+ int errCode=1101;
+ throw new MRCompilerException("Merge Join must have exactly two inputs. Found : "+compiledInputs.length, errCode);
+ }
+
+ curTezOp = phyToTezOpMap.get(joinOp.getInputs().get(0));
+
+ TezOperator rightTezOpr = null;
+ TezOperator rightTezOprAggr = null;
+ if(curTezOp.equals(compiledInputs[0]))
+ rightTezOpr = compiledInputs[1];
+ else
+ rightTezOpr = compiledInputs[0];
+
+ // We will first operate on right side which is indexer job.
+ // First yank plan of the compiled right input and set that as an inner plan of right operator.
+ PhysicalPlan rightPipelinePlan;
+ if(!rightTezOpr.closed){
+ PhysicalPlan rightPlan = rightTezOpr.plan;
+ if(rightPlan.getRoots().size() != 1){
+ int errCode = 2171;
+ String errMsg = "Expected one but found more then one root physical operator in physical plan.";
+ throw new MRCompilerException(errMsg,errCode,PigException.BUG);
+ }
+
+ PhysicalOperator rightLoader = rightPlan.getRoots().get(0);
+ if(! (rightLoader instanceof POLoad)){
+ int errCode = 2172;
+ String errMsg = "Expected physical operator at root to be POLoad. Found : "+rightLoader.getClass().getCanonicalName();
+ throw new MRCompilerException(errMsg,errCode);
+ }
+
+ if (rightPlan.getSuccessors(rightLoader) == null || rightPlan.getSuccessors(rightLoader).isEmpty())
+ // Load - Join case.
+ rightPipelinePlan = null;
+
+ else{ // We got something on right side. Yank it and set it as inner plan of right input.
+ rightPipelinePlan = rightPlan.clone();
+ PhysicalOperator root = rightPipelinePlan.getRoots().get(0);
+ rightPipelinePlan.disconnect(root, rightPipelinePlan.getSuccessors(root).get(0));
+ rightPipelinePlan.remove(root);
+ rightPlan.trimBelow(rightLoader);
+ }
+ }
+ else{
+ int errCode = 2022;
+ String msg = "Right input plan have been closed. This is unexpected while compiling.";
+ throw new PlanException(msg, errCode, PigException.BUG);
+ }
+
+ joinOp.setupRightPipeline(rightPipelinePlan);
+
+ // At this point, we must be operating on input plan of right input and it would contain nothing else other then a POLoad.
+ POLoad rightLoader = (POLoad)rightTezOpr.plan.getRoots().get(0);
+ joinOp.setSignature(rightLoader.getSignature());
+ LoadFunc rightLoadFunc = rightLoader.getLoadFunc();
+ List<String> udfs = new ArrayList<String>();
+ if(IndexableLoadFunc.class.isAssignableFrom(rightLoadFunc.getClass())) {
+ joinOp.setRightLoaderFuncSpec(rightLoader.getLFile().getFuncSpec());
+ joinOp.setRightInputFileName(rightLoader.getLFile().getFileName());
+ udfs.add(rightLoader.getLFile().getFuncSpec().toString());
+
+ // we don't need the right TezOper since
+ // the right loader is an IndexableLoadFunc which can handle the index
+ // itself
+ tezPlan.remove(rightTezOpr);
+ if(rightTezOpr == compiledInputs[0]) {
+ compiledInputs[0] = null;
+ } else if(rightTezOpr == compiledInputs[1]) {
+ compiledInputs[1] = null;
+ }
+ rightTezOpr = null;
+
+ // validate that the join keys in merge join are only
+ // simple column projections or '*' and not expression - expressions
+ // cannot be handled when the index is built by the storage layer on the sorted
+ // data when the sorted data (and corresponding index) is written.
+ // So merge join will be restricted not have expressions as
+ // join keys
+ int numInputs = mPlan.getPredecessors(joinOp).size(); // should be 2
+ for(int i = 0; i < numInputs; i++) {
+ List<PhysicalPlan> keyPlans = joinOp.getInnerPlansOf(i);
+ for (PhysicalPlan keyPlan : keyPlans) {
+ for(PhysicalOperator op : keyPlan) {
+ if(!(op instanceof POProject)) {
+ int errCode = 1106;
+ String errMsg = "Merge join is possible only for simple column or '*' join keys when using " +
+ rightLoader.getLFile().getFuncSpec() + " as the loader";
+ throw new MRCompilerException(errMsg, errCode, PigException.INPUT);
+ }
+ }
+ }
+ }
+ } else {
+ LoadFunc loadFunc = rightLoader.getLoadFunc();
+ //Replacing POLoad with indexer is disabled for 'merge-sparse' joins. While
+ //this feature would be useful, the current implementation of DefaultIndexableLoader
+ //is not designed to handle multiple calls to seekNear. Specifically, it rereads the entire index
+ //for each call. Some refactoring of this class is required - and then the check below could be removed.
+ if (joinOp.getJoinType() == LOJoin.JOINTYPE.MERGESPARSE) {
+ int errCode = 1104;
+ String errMsg = "Right input of merge-join must implement IndexableLoadFunc. " +
+ "The specified loader " + loadFunc + " doesn't implement it";
+ throw new MRCompilerException(errMsg,errCode);
+ }
+
+ // Replace POLoad with indexer.
+
+ if (! (OrderedLoadFunc.class.isAssignableFrom(loadFunc.getClass()))){
+ int errCode = 1104;
+ String errMsg = "Right input of merge-join must implement " +
+ "OrderedLoadFunc interface. The specified loader "
+ + loadFunc + " doesn't implement it";
+ throw new MRCompilerException(errMsg,errCode);
+ }
+
+ String[] indexerArgs = new String[6];
+ List<PhysicalPlan> rightInpPlans = joinOp.getInnerPlansOf(1);
+ FileSpec origRightLoaderFileSpec = rightLoader.getLFile();
+
+ indexerArgs[0] = origRightLoaderFileSpec.getFuncSpec().toString();
+ indexerArgs[1] = ObjectSerializer.serialize((Serializable)rightInpPlans);
+ indexerArgs[2] = ObjectSerializer.serialize(rightPipelinePlan);
+ indexerArgs[3] = rightLoader.getSignature();
+ indexerArgs[4] = rightLoader.getOperatorKey().scope;
+ indexerArgs[5] = Boolean.toString(true);
+
+ FileSpec lFile = new FileSpec(rightLoader.getLFile().getFileName(),new FuncSpec(MergeJoinIndexer.class.getName(), indexerArgs));
+ rightLoader.setLFile(lFile);
+
+ // Loader of operator will return a tuple of form -
+ // (keyFirst1, keyFirst2, .. , position, splitIndex) See MergeJoinIndexer
+
+ rightTezOprAggr = getTezOp();
+ tezPlan.add(rightTezOprAggr);
+ TezCompilerUtil.simpleConnectTwoVertex(tezPlan, rightTezOpr, rightTezOprAggr, scope, nig);
+ rightTezOprAggr.requestedParallelism = 1; // we need exactly one task for indexing job.
+
+ POStore st = TezCompilerUtil.getStore(scope, nig);
+ FileSpec strFile = getTempFileSpec();
+ st.setSFile(strFile);
+ rightTezOprAggr.plan.addAsLeaf(st);
+ rightTezOprAggr.setClosed(true);
+ rightTezOprAggr.segmentBelow = true;
+
+ // set up the DefaultIndexableLoader for the join operator
+ String[] defaultIndexableLoaderArgs = new String[5];
+ defaultIndexableLoaderArgs[0] = origRightLoaderFileSpec.getFuncSpec().toString();
+ defaultIndexableLoaderArgs[1] = strFile.getFileName();
+ defaultIndexableLoaderArgs[2] = strFile.getFuncSpec().toString();
+ defaultIndexableLoaderArgs[3] = joinOp.getOperatorKey().scope;
+ defaultIndexableLoaderArgs[4] = origRightLoaderFileSpec.getFileName();
+ joinOp.setRightLoaderFuncSpec((new FuncSpec(DefaultIndexableLoader.class.getName(), defaultIndexableLoaderArgs)));
+ joinOp.setRightInputFileName(origRightLoaderFileSpec.getFileName());
+
+ joinOp.setIndexFile(strFile.getFileName());
+ udfs.add(origRightLoaderFileSpec.getFuncSpec().toString());
+ }
+
+ // We are done with right side. Lets work on left now.
+ // Join will be materialized in leftTezOper.
+ if(!curTezOp.isClosed()) // Life is easy
+ curTezOp.plan.addAsLeaf(joinOp);
+
+ else{
+ int errCode = 2022;
+ String msg = "Input plan has been closed. This is unexpected while compiling.";
+ throw new PlanException(msg, errCode, PigException.BUG);
+ }
+ if(rightTezOprAggr != null) {
+ rightTezOprAggr.markIndexer();
+ // We want to ensure indexing job runs prior to actual join job. So, connect them in order.
+ TezCompilerUtil.connect(tezPlan, rightTezOprAggr, curTezOp);
+ }
+ phyToTezOpMap.put(joinOp, curTezOp);
+ // no combination of small splits as there is currently no way to guarantee the sortness
+ // of the combined splits.
+ curTezOp.noCombineSmallSplits();
+ curTezOp.UDFs.addAll(udfs);
+ }
+ catch(PlanException e){
+ int errCode = 2034;
+ String msg = "Error compiling operator " + joinOp.getClass().getCanonicalName();
+ throw new MRCompilerException(msg, errCode, PigException.BUG, e);
+ }
+ catch (IOException e){
+ int errCode = 3000;
+ String errMsg = "IOException caught while compiling POMergeJoin";
+ throw new MRCompilerException(errMsg, errCode,e);
+ }
+ catch(CloneNotSupportedException e){
+ int errCode = 2127;
+ String errMsg = "Cloning exception caught while compiling POMergeJoin";
+ throw new MRCompilerException(errMsg, errCode, PigException.BUG, e);
+ }
}
@Override
@@ -1284,7 +1497,7 @@ public class TezCompiler extends PhyPlan
project.setResultType(DataType.BAG);
project.setStar(false);
project.setColumn(1);
- POForEach forEach = getForEach(project, sort.getRequestedParallelism());
+ POForEach forEach = TezCompilerUtil.getForEach(project, sort.getRequestedParallelism(), scope, nig);
oper1.plan.addAsLeaf(forEach);
boolean[] sortOrder;
@@ -1442,7 +1655,7 @@ public class TezCompiler extends PhyPlan
lr.setOutputKey(sortOpers[0].getOperatorKey().toString());
lrSample.setOutputKey(quantJobParallelismPair.first.getOperatorKey().toString());
- connect(tezPlan, prevOper, sortOpers[0]);
+ TezCompilerUtil.connect(tezPlan, prevOper, sortOpers[0]);
TezEdgeDescriptor edge = sortOpers[0].inEdges.get(prevOper.getOperatorKey());
// TODO: Convert to unsorted shuffle after TEZ-661
@@ -1450,9 +1663,9 @@ public class TezCompiler extends PhyPlan
// edge.inputClassName = ShuffledUnorderedKVInput.class.getName();
edge.partitionerClass = RoundRobinPartitioner.class;
- connect(tezPlan, prevOper, quantJobParallelismPair.first);
+ TezCompilerUtil.connect(tezPlan, prevOper, quantJobParallelismPair.first);
- connect(tezPlan, quantJobParallelismPair.first, sortOpers[0]);
+ TezCompilerUtil.connect(tezPlan, quantJobParallelismPair.first, sortOpers[0]);
edge = sortOpers[0].inEdges.get(quantJobParallelismPair.first.getOperatorKey());
edge.dataMovementType = DataMovementType.BROADCAST;
edge.outputClassName = OnFileUnorderedKVOutput.class.getName();
@@ -1461,7 +1674,7 @@ public class TezCompiler extends PhyPlan
lr.setOutputKey(sortOpers[0].getOperatorKey().toString());
sortOpers[0].sampleOperator = quantJobParallelismPair.first;
- connect(tezPlan, sortOpers[0], sortOpers[1]);
+ TezCompilerUtil.connect(tezPlan, sortOpers[0], sortOpers[1]);
edge = sortOpers[1].inEdges.get(sortOpers[0].getOperatorKey());
edge.partitionerClass = WeightedRangePartitionerTez.class;
@@ -1574,30 +1787,7 @@ public class TezCompiler extends PhyPlan
}
}
- private POForEach getForEach(POProject project, int rp) {
- PhysicalPlan forEachPlan = new PhysicalPlan();
- forEachPlan.add(project);
- List<PhysicalPlan> forEachPlans = Lists.newArrayList();
- forEachPlans.add(forEachPlan);
-
- List<Boolean> flatten = Lists.newArrayList();
- flatten.add(true);
-
- POForEach forEach = new POForEach(new OperatorKey(scope, nig.getNextNodeId(scope)), rp, forEachPlans, flatten);
- forEach.setResultType(DataType.BAG);
- return forEach;
- }
-
- // Get a plain POForEach: ForEach X generate flatten($1)
- private POForEach getForEachPlain() {
- POProject project = new POProject(new OperatorKey(scope, nig.getNextNodeId(scope)));
- project.setResultType(DataType.TUPLE);
- project.setStar(false);
- project.setColumn(1);
- project.setOverloaded(true);
- return getForEach(project, -1);
- }
/**
* Returns a POPackage with default packager. This method shouldn't be used
Added: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompilerUtil.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompilerUtil.java?rev=1556961&view=auto
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompilerUtil.java (added)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompilerUtil.java Thu Jan 9 21:50:46 2014
@@ -0,0 +1,106 @@
+package org.apache.pig.backend.hadoop.executionengine.tez;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.pig.PigException;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import org.apache.pig.data.DataType;
+import org.apache.pig.impl.plan.NodeIdGenerator;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.PlanException;
+
+import com.google.common.collect.Lists;
+
+public class TezCompilerUtil {
+
+ // simpleConnectTwoVertex is a utility to end a vertex equivalent to map and start vertex equivalent to
+ // reduce in a tez operator:
+ // 1. op1 is open
+ // 2. op2 is blank
+ // POPackage to start a reduce vertex
+ // 3. POLocalRearrange/POPackage are trivial
+ // 4. User need to connect op1 to op2 themselves
+ static public void simpleConnectTwoVertex(TezOperPlan tezPlan, TezOperator op1, TezOperator op2, String scope, NodeIdGenerator nig) throws PlanException
+ {
+ PhysicalPlan ep = new PhysicalPlan();
+ POProject prjStar = new POProject(new OperatorKey(scope,nig.getNextNodeId(scope)));
+ prjStar.setResultType(DataType.TUPLE);
+ prjStar.setStar(true);
+ ep.add(prjStar);
+
+ List<PhysicalPlan> eps = new ArrayList<PhysicalPlan>();
+ eps.add(ep);
+
+ POLocalRearrangeTez lr = new POLocalRearrangeTez(new OperatorKey(scope,nig.getNextNodeId(scope)));
+ try {
+ lr.setIndex(0);
+ } catch (ExecException e) {
+ int errCode = 2058;
+ String msg = "Unable to set index on the newly created POLocalRearrange.";
+ throw new PlanException(msg, errCode, PigException.BUG, e);
+ }
+ lr.setKeyType(DataType.TUPLE);
+ lr.setPlans(eps);
+ lr.setResultType(DataType.TUPLE);
+ lr.setOutputKey(op2.getOperatorKey().toString());
+
+ op1.plan.addAsLeaf(lr);
+
+ POPackage pkg = new POPackage(new OperatorKey(scope,nig.getNextNodeId(scope)));
+ pkg.getPkgr().setKeyType(DataType.TUPLE);
+ pkg.setNumInps(1);
+ boolean[] inner = {false};
+ pkg.getPkgr().setInner(inner);
+ op2.plan.add(pkg);
+
+ op2.plan.addAsLeaf(getForEachPlain(scope, nig));
+
+ connect(tezPlan, op1, op2);
+ }
+
+ static public void connect(TezOperPlan plan, TezOperator from, TezOperator to) throws PlanException {
+ plan.connect(from, to);
+ // Add edge descriptors to old and new operators
+ to.inEdges.put(from.getOperatorKey(), new TezEdgeDescriptor());
+ from.outEdges.put(to.getOperatorKey(), new TezEdgeDescriptor());
+ }
+
+ static public POForEach getForEach(POProject project, int rp, String scope, NodeIdGenerator nig) {
+ PhysicalPlan forEachPlan = new PhysicalPlan();
+ forEachPlan.add(project);
+
+ List<PhysicalPlan> forEachPlans = Lists.newArrayList();
+ forEachPlans.add(forEachPlan);
+
+ List<Boolean> flatten = Lists.newArrayList();
+ flatten.add(true);
+
+ POForEach forEach = new POForEach(new OperatorKey(scope, nig.getNextNodeId(scope)), rp, forEachPlans, flatten);
+ forEach.setResultType(DataType.BAG);
+ return forEach;
+ }
+
+ // Get a plain POForEach: ForEach X generate flatten($1)
+ static public POForEach getForEachPlain(String scope, NodeIdGenerator nig) {
+ POProject project = new POProject(new OperatorKey(scope, nig.getNextNodeId(scope)));
+ project.setResultType(DataType.TUPLE);
+ project.setStar(false);
+ project.setColumn(1);
+ project.setOverloaded(true);
+ return getForEach(project, -1, scope, nig);
+ }
+
+ static public POStore getStore(String scope, NodeIdGenerator nig) {
+ POStore st = new POStoreTez(new OperatorKey(scope, nig.getNextNodeId(scope)));
+ // mark store as tmp store. These could be removed by the
+ // optimizer, because it wasn't the user requesting it.
+ st.setIsTmpStore(true);
+ return st;
+ }
+}
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java?rev=1556961&r1=1556960&r2=1556961&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java Thu Jan 9 21:50:46 2014
@@ -45,6 +45,7 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.pig.LoadFunc;
+import org.apache.pig.PigConfiguration;
import org.apache.pig.PigException;
import org.apache.pig.StoreFuncInterface;
import org.apache.pig.backend.executionengine.ExecException;
@@ -266,11 +267,25 @@ public class TezDagBuilder extends TezOp
// Pass physical plans to vertex as user payload.
Configuration payloadConf = job.getConfiguration();
-
+
if (tezOp.sampleOperator != null) {
payloadConf.set("pig.sampleVertex", tezOp.sampleOperator.getOperatorKey().toString());
}
+ String tmp;
+ long maxCombinedSplitSize = 0;
+ if (!tezOp.combineSmallSplits() || pc.getProperties().getProperty(PigConfiguration.PIG_SPLIT_COMBINATION, "true").equals("false"))
+ payloadConf.setBoolean(PigConfiguration.PIG_NO_SPLIT_COMBINATION, true);
+ else if ((tmp = pc.getProperties().getProperty(PigConfiguration.PIG_MAX_COMBINED_SPLIT_SIZE, null)) != null) {
+ try {
+ maxCombinedSplitSize = Long.parseLong(tmp);
+ } catch (NumberFormatException e) {
+ log.warn("Invalid numeric format for pig.maxCombinedSplitSize; use the default maximum combined split size");
+ }
+ }
+ if (maxCombinedSplitSize > 0)
+ payloadConf.setLong("pig.maxCombinedSplitSize", maxCombinedSplitSize);
+
List<POLoad> loads = processLoads(tezOp, payloadConf, job);
LinkedList<POStore> stores = processStores(tezOp, payloadConf, job);
@@ -278,6 +293,7 @@ public class TezDagBuilder extends TezOp
payloadConf.set("udf.import.list",
ObjectSerializer.serialize(PigContext.getPackageImportList()));
+ payloadConf.set("exectype", "TEZ");
payloadConf.setBoolean("mapred.mapper.new-api", true);
payloadConf.setClass("mapreduce.inputformat.class",
PigInputFormat.class, InputFormat.class);
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperator.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperator.java?rev=1556961&r1=1556960&r2=1556961&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperator.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperator.java Thu Jan 9 21:50:46 2014
@@ -89,7 +89,12 @@ public class TezOperator extends Operato
// Last POLimit value in this map reduce operator, needed by LimitAdjuster
// to add additional map reduce operator with 1 reducer after this
long limit = -1;
-
+
+ // Flag to indicate if the small input splits need to be combined to form a larger
+ // one in order to reduce the number of mappers. For merge join, both tables
+ // are NOT combinable for correctness.
+ private boolean combineSmallSplits = true;
+
// If not null, need to collect sample sent from predecessor
TezOperator sampleOperator = null;
@@ -98,6 +103,8 @@ public class TezOperator extends Operato
NONE,
// Indicate if this job is a union job
UNION,
+ // Indicate if this job is a merge indexer
+ INDEXER,
// Indicate if this job is a sampling job
SAMPLER,
// Indicate if this job is a group by job
@@ -186,6 +193,10 @@ public class TezOperator extends Operato
return (feature == OPER_FEATURE.UNION);
}
+ public void markIndexer() {
+ feature = OPER_FEATURE.INDEXER;
+ }
+
public void markUnion() {
feature = OPER_FEATURE.UNION;
}
@@ -279,5 +290,13 @@ public class TezOperator extends Operato
public boolean isLimitAfterSort() {
return limitAfterSort;
}
+
+ protected void noCombineSmallSplits() {
+ combineSmallSplits = false;
+ }
+
+ public boolean combineSmallSplits() {
+ return combineSmallSplits;
+ }
}
Modified: pig/branches/tez/test/e2e/pig/tests/tez.conf
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/e2e/pig/tests/tez.conf?rev=1556961&r1=1556960&r2=1556961&view=diff
==============================================================================
--- pig/branches/tez/test/e2e/pig/tests/tez.conf (original)
+++ pig/branches/tez/test/e2e/pig/tests/tez.conf Thu Jan 9 21:50:46 2014
@@ -190,7 +190,58 @@ c = filter a by age < 20;
d = filter b by age < 20;
e = join c by name LEFT OUTER, d by name using 'replicated';
store e into ':OUTPATH:';\,
- }
+ },
+ # Simplest merge-join.
+ {
+ 'num' => 4,
+ 'pig' => q\a = load ':INPATH:/singlefile/studenttab10k';
+ b = load ':INPATH:/singlefile/votertab10k';
+ c = order a by $0;
+ d = order b by $0;
+ store c into ':OUTPATH:.intermediate1';
+ store d into ':OUTPATH:.intermediate2';
+ exec;
+ e = load ':OUTPATH:.intermediate1';
+ f = load ':OUTPATH:.intermediate2';
+ g = join e by $0, f by $0 using 'merge';
+ store g into ':OUTPATH:';\,
+ 'notmq' => 1,
+ },
+ # Merge-join with right-side filter
+ {
+ 'num' => 5,
+ 'pig' => q\a = load ':INPATH:/singlefile/studenttab10k';
+ b = load ':INPATH:/singlefile/votertab10k';
+ c = order a by $0;
+ d = order b by $0;
+ store c into ':OUTPATH:.intermediate1';
+ store d into ':OUTPATH:.intermediate2';
+ exec;
+ e = load ':OUTPATH:.intermediate1';
+ f = load ':OUTPATH:.intermediate2';
+ i = filter f by $2 != 'democrat';
+ g = join e by $0, i by $0 using 'merge';
+ store g into ':OUTPATH:';\,
+ 'notmq' => 1,
+ },
+ # Merge-join with schemas
+ {
+ 'num' => 6,
+ 'floatpostprocess' => 1,
+ 'delimiter' => ' ',
+ 'pig' => q\a = load ':INPATH:/singlefile/studenttab10k';
+ b = load ':INPATH:/singlefile/votertab10k';
+ c = order a by $0;
+ d = order b by $0;
+ store c into ':OUTPATH:.intermediate1';
+ store d into ':OUTPATH:.intermediate2';
+ exec;
+ e = load ':OUTPATH:.intermediate1' as (name:chararray, age:int, gpa:float);
+ f = load ':OUTPATH:.intermediate2' as (name:chararray, age:int, reg:chararray, contrib:float);
+ g = join e by $0, f by $0 using 'merge';
+ store g into ':OUTPATH:';\,
+ 'notmq' => 1,
+ },
]
},
{