You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2014/01/09 22:50:46 UTC

svn commit: r1556961 - in /pig/branches/tez: src/org/apache/pig/ src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/ src/org/apache/pig/backend/hadoop/executionengine/tez/ test/e2e/pig/tests/

Author: daijy
Date: Thu Jan  9 21:50:46 2014
New Revision: 1556961

URL: http://svn.apache.org/r1556961
Log:
PIG-3647: Implement merge join in Tez

Added:
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompilerUtil.java
Modified:
    pig/branches/tez/src/org/apache/pig/PigConfiguration.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperator.java
    pig/branches/tez/test/e2e/pig/tests/tez.conf

Modified: pig/branches/tez/src/org/apache/pig/PigConfiguration.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/PigConfiguration.java?rev=1556961&r1=1556960&r2=1556961&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/PigConfiguration.java (original)
+++ pig/branches/tez/src/org/apache/pig/PigConfiguration.java Thu Jan  9 21:50:46 2014
@@ -153,5 +153,21 @@ public class PigConfiguration {
       * the distributed cache when doing fragment-replicated join
       */
     public static final String PIG_JOIN_REPLICATED_MAX_BYTES = "pig.join.replicated.max.bytes";
+
+    /**
+     * Turns combine split files on or off
+     */
+    public static final String PIG_SPLIT_COMBINATION = "pig.splitCombination";
+
+    /**
+     * Whether turns combine split files off. This is for internal use only
+     */
+    public static final String PIG_NO_SPLIT_COMBINATION = "pig.noSplitCombination";
+
+    /**
+     * Specifies the size, in bytes, of data to be processed by a single map.
+     * Smaller files are combined untill this size is reached.
+     */
+    public static final String PIG_MAX_COMBINED_SPLIT_SIZE = "pig.maxCombinedSplitSize";
 }
 

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java?rev=1556961&r1=1556960&r2=1556961&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java Thu Jan  9 21:50:46 2014
@@ -123,6 +123,8 @@ public class POMergeJoin extends Physica
 
     private String signature;
 
+    private byte endOfRecordMark;
+
     // This serves as the default TupleFactory
     private transient TupleFactory mTupleFactory;
 
@@ -154,14 +156,23 @@ public class POMergeJoin extends Physica
         LRs = new POLocalRearrange[2];
         this.createJoinPlans(inpPlans,keyTypes);
         this.indexFile = null;
-        this.joinType = joinType;  
+        this.joinType = joinType;
         this.leftInputSchema = leftInputSchema;
         this.mergedInputSchema = mergedInputSchema;
+        this.endOfRecordMark = POStatus.STATUS_EOP;
+    }
+
+    // Set to POStatus.STATUS_EOP (default) for MR and POStatus.STATUS_NULL for Tez.
+    // This is because:
+    // For MR, we send EOP at the end of every record
+    // For Tez, we only use a global EOP, so send NULL for end of record
+    public void setEndOfRecordMark(byte endOfRecordMark) {
+        this.endOfRecordMark = endOfRecordMark;
     }
 
     /**
      * Configures the Local Rearrange operators to get keys out of tuple.
-     * @throws ExecException 
+     * @throws ExecException
      */
     private void createJoinPlans(MultiMap<PhysicalOperator, PhysicalPlan> inpPlans, List<List<Byte>> keyTypes) throws PlanException{
 
@@ -285,8 +296,8 @@ public class POMergeJoin extends Physica
 
             curLeftKey = extractKeysFromTuple(curLeftInp, 0);
             if(null == curLeftKey) // We drop the tuples which have null keys.
-                return new Result(POStatus.STATUS_EOP, null);
-            
+                return new Result(endOfRecordMark, null);
+
             try {
                 seekInRightStream(curLeftKey);
             } catch (IOException e) {
@@ -297,7 +308,7 @@ public class POMergeJoin extends Physica
             leftTuples.add((Tuple)curLeftInp.result);
             firstTime = false;
             prevLeftKey = curLeftKey;
-            return new Result(POStatus.STATUS_EOP, null);
+            return new Result(endOfRecordMark, null);
         }
 
         if(doingJoin){
@@ -363,7 +374,7 @@ public class POMergeJoin extends Physica
                                 log.error("Received exception while trying to close right side file: " + e.getMessage());
                             }
                         }
-                        return new Result(POStatus.STATUS_EOP, null);
+                        return new Result(endOfRecordMark, null);
                     }
                     else{   // At this point right side can't be behind.
                         int errCode = 1102;
@@ -381,17 +392,17 @@ public class POMergeJoin extends Physica
         case POStatus.STATUS_OK:
             curLeftKey = extractKeysFromTuple(curLeftInp, 0);
             if(null == curLeftKey) // We drop the tuples which have null keys.
-                return new Result(POStatus.STATUS_EOP, null);
-            
+                return new Result(endOfRecordMark, null);
+
             int cmpVal = ((Comparable)curLeftKey).compareTo(prevLeftKey);
             if(cmpVal == 0){
                 // Keep on accumulating.
                 leftTuples.add((Tuple)curLeftInp.result);
-                return new Result(POStatus.STATUS_EOP, null);
+                return new Result(endOfRecordMark, null);
             }
             else if(cmpVal > 0){ // Filled with left bag. Move on.
                 curJoinKey = prevLeftKey;
-                break;   
+                break;
             }
             else{   // Current key < Prev Key
                 int errCode = 1102;
@@ -423,7 +434,7 @@ public class POMergeJoin extends Physica
             leftTuples.add((Tuple)curLeftInp.result);
             prevLeftInp = curLeftInp;
             prevLeftKey = curLeftKey;
-            return new Result(POStatus.STATUS_EOP, null);
+            return new Result(endOfRecordMark, null);
         }
 
         // Accumulated tuples with same key on left side.
@@ -504,14 +515,14 @@ public class POMergeJoin extends Physica
                         log.error("Received exception while trying to close right side file: " + e.getMessage());
                     }
                 }
-                return new Result(POStatus.STATUS_EOP, null);
+                return new Result(endOfRecordMark, null);
             }
         }
     }
-    
+
     private void seekInRightStream(Object firstLeftKey) throws IOException{
         rightLoader = (LoadFunc)PigContext.instantiateFuncFromSpec(rightLoaderFuncSpec);
-        
+
         // check if hadoop distributed cache is used
         if (indexFile != null && rightLoader instanceof DefaultIndexableLoader) {
             DefaultIndexableLoader loader = (DefaultIndexableLoader)rightLoader;

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java?rev=1556961&r1=1556960&r2=1556961&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java Thu Jan  9 21:50:46 2014
@@ -18,6 +18,7 @@
 package org.apache.pig.backend.hadoop.executionengine.tez;
 
 import java.io.IOException;
+import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Iterator;
@@ -30,10 +31,16 @@ import java.util.Stack;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.pig.FuncSpec;
+import org.apache.pig.IndexableLoadFunc;
+import org.apache.pig.LoadFunc;
+import org.apache.pig.OrderedLoadFunc;
 import org.apache.pig.PigException;
 import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRCompilerException;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MergeJoinIndexer;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.ScalarPhyFinder;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.UDFFinder;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ConstantExpression;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject;
@@ -69,6 +76,7 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.tez.POLocalRearrangeTezFactory.LocalRearrangeType;
 import org.apache.pig.data.DataType;
 import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.builtin.DefaultIndexableLoader;
 import org.apache.pig.impl.builtin.FindQuantiles;
 import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.impl.io.FileSpec;
@@ -79,8 +87,10 @@ import org.apache.pig.impl.plan.Operator
 import org.apache.pig.impl.plan.OperatorPlan;
 import org.apache.pig.impl.plan.PlanException;
 import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.util.ObjectSerializer;
 import org.apache.pig.impl.util.Pair;
 import org.apache.pig.impl.util.Utils;
+import org.apache.pig.newplan.logical.relational.LOJoin;
 import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
 import org.apache.tez.runtime.library.input.ShuffledUnorderedKVInput;
 import org.apache.tez.runtime.library.output.OnFileUnorderedKVOutput;
@@ -380,13 +390,6 @@ public class TezCompiler extends PhyPlan
         curTezOp = tezOp;
     }
 
-    private void connect(TezOperPlan plan, TezOperator from, TezOperator to) throws PlanException {
-        plan.connect(from, to);
-        // Add edge descriptors to old and new operators
-        to.inEdges.put(from.getOperatorKey(), new TezEdgeDescriptor());
-        from.outEdges.put(to.getOperatorKey(), new TezEdgeDescriptor());
-    }
-
     private void blocking() throws IOException, PlanException {
         TezOperator newTezOp = getTezOp();
         tezPlan.add(newTezOp);
@@ -416,9 +419,9 @@ public class TezCompiler extends PhyPlan
             POSplit split = findPOSplit(splitOp, from.getSplitOperatorKey());
             split.addPlan(from.plan);
             addSubPlanPropertiesToParent(splitOp, curTezOp);
-            connect(tezPlan, splitOp, to);
+            TezCompilerUtil.connect(tezPlan, splitOp, to);
         } else {
-            connect(tezPlan, from, to);
+            TezCompilerUtil.connect(tezPlan, from, to);
         }
     }
 
@@ -672,7 +675,7 @@ public class TezCompiler extends PhyPlan
         project.setColumn(0);
         project.setOverloaded(false);
 
-        POForEach forEach = getForEach(project, rp);
+        POForEach forEach = TezCompilerUtil.getForEach(project, rp, scope, nig);
         plan.addAsLeaf(forEach);
     }
 
@@ -732,7 +735,7 @@ public class TezCompiler extends PhyPlan
                     lr.setOutputKey(curTezOp.getOperatorKey().toString());
 
                     tezOp.plan.addAsLeaf(lr);
-                    connect(tezPlan, tezOp, curTezOp);
+                    TezCompilerUtil.connect(tezPlan, tezOp, curTezOp);
                     inputKeys.add(tezOp.getOperatorKey().toString());
 
                     // Configure broadcast edges for replicated tables
@@ -799,7 +802,7 @@ public class TezCompiler extends PhyPlan
 
             // Then add a POPackage and a POForEach to the start of the new tezOp.
             POPackage pkg = getPackage(1, DataType.TUPLE);
-            POForEach forEach = getForEachPlain();
+            POForEach forEach = TezCompilerUtil.getForEachPlain(scope, nig);
             curTezOp.plan.add(pkg);
             curTezOp.plan.addAsLeaf(forEach);
 
@@ -869,11 +872,221 @@ public class TezCompiler extends PhyPlan
         throw new TezCompilerException(msg, errCode, PigException.BUG);
     }
 
+    /** Since merge-join works on two inputs there are exactly two TezOper predecessors identified  as left and right.
+     *  Right input generates index on-the-fly. This consists of two Tez vertexes. The first vertex generates index,
+     *  and the second vertex sort them.
+     *  Left input contains POMergeJoin which do the actual join.
+     *  First right Tez oper is identified as rightTezOpr, second is identified as rightTezOpr2
+     *  Left Tez oper is identified as curTezOper.
+
+     *  1) RightTezOpr: It can be preceded only by POLoad. If there is anything else
+     *                  in physical plan, that is yanked and set as inner plans of joinOp.
+     *  2) LeftTezOper:  add the Join operator in it.
+     *
+     *  We also need to segment the DAG into two, because POMergeJoin depends on the index file which loads with
+     *  DefaultIndexableLoader. It is possible to convert the index as a broadcast input, but that is costly
+     *  because a lot of logic is built into DefaultIndexableLoader. We can revisit it later.
+     */
     @Override
-    public void visitMergeJoin(POMergeJoin op) throws VisitorException {
-        int errCode = 2034;
-        String msg = "Cannot compile " + op.getClass().getSimpleName();
-        throw new TezCompilerException(msg, errCode, PigException.BUG);
+    public void visitMergeJoin(POMergeJoin joinOp) throws VisitorException {
+
+        try{
+            joinOp.setEndOfRecordMark(POStatus.STATUS_NULL);
+            if(compiledInputs.length != 2 || joinOp.getInputs().size() != 2){
+                int errCode=1101;
+                throw new MRCompilerException("Merge Join must have exactly two inputs. Found : "+compiledInputs.length, errCode);
+            }
+
+            curTezOp = phyToTezOpMap.get(joinOp.getInputs().get(0));
+
+            TezOperator rightTezOpr = null;
+            TezOperator rightTezOprAggr = null;
+            if(curTezOp.equals(compiledInputs[0]))
+                rightTezOpr = compiledInputs[1];
+            else
+                rightTezOpr = compiledInputs[0];
+
+            // We will first operate on right side which is indexer job.
+            // First yank plan of the compiled right input and set that as an inner plan of right operator.
+            PhysicalPlan rightPipelinePlan;
+            if(!rightTezOpr.closed){
+                PhysicalPlan rightPlan = rightTezOpr.plan;
+                if(rightPlan.getRoots().size() != 1){
+                    int errCode = 2171;
+                    String errMsg = "Expected one but found more then one root physical operator in physical plan.";
+                    throw new MRCompilerException(errMsg,errCode,PigException.BUG);
+                }
+
+                PhysicalOperator rightLoader = rightPlan.getRoots().get(0);
+                if(! (rightLoader instanceof POLoad)){
+                    int errCode = 2172;
+                    String errMsg = "Expected physical operator at root to be POLoad. Found : "+rightLoader.getClass().getCanonicalName();
+                    throw new MRCompilerException(errMsg,errCode);
+                }
+
+                if (rightPlan.getSuccessors(rightLoader) == null || rightPlan.getSuccessors(rightLoader).isEmpty())
+                    // Load - Join case.
+                    rightPipelinePlan = null;
+
+                else{ // We got something on right side. Yank it and set it as inner plan of right input.
+                    rightPipelinePlan = rightPlan.clone();
+                    PhysicalOperator root = rightPipelinePlan.getRoots().get(0);
+                    rightPipelinePlan.disconnect(root, rightPipelinePlan.getSuccessors(root).get(0));
+                    rightPipelinePlan.remove(root);
+                    rightPlan.trimBelow(rightLoader);
+                }
+            }
+            else{
+                int errCode = 2022;
+                String msg = "Right input plan have been closed. This is unexpected while compiling.";
+                throw new PlanException(msg, errCode, PigException.BUG);
+            }
+
+            joinOp.setupRightPipeline(rightPipelinePlan);
+
+            // At this point, we must be operating on input plan of right input and it would contain nothing else other then a POLoad.
+            POLoad rightLoader = (POLoad)rightTezOpr.plan.getRoots().get(0);
+            joinOp.setSignature(rightLoader.getSignature());
+            LoadFunc rightLoadFunc = rightLoader.getLoadFunc();
+            List<String> udfs = new ArrayList<String>();
+            if(IndexableLoadFunc.class.isAssignableFrom(rightLoadFunc.getClass())) {
+                joinOp.setRightLoaderFuncSpec(rightLoader.getLFile().getFuncSpec());
+                joinOp.setRightInputFileName(rightLoader.getLFile().getFileName());
+                udfs.add(rightLoader.getLFile().getFuncSpec().toString());
+
+                // we don't need the right TezOper since
+                // the right loader is an IndexableLoadFunc which can handle the index
+                // itself
+                tezPlan.remove(rightTezOpr);
+                if(rightTezOpr == compiledInputs[0]) {
+                    compiledInputs[0] = null;
+                } else if(rightTezOpr == compiledInputs[1]) {
+                    compiledInputs[1] = null;
+                }
+                rightTezOpr = null;
+
+                // validate that the join keys in merge join are only
+                // simple column projections or '*' and not expression - expressions
+                // cannot be handled when the index is built by the storage layer on the sorted
+                // data when the sorted data (and corresponding index) is written.
+                // So merge join will be restricted not have expressions as
+                // join keys
+                int numInputs = mPlan.getPredecessors(joinOp).size(); // should be 2
+                for(int i = 0; i < numInputs; i++) {
+                    List<PhysicalPlan> keyPlans = joinOp.getInnerPlansOf(i);
+                    for (PhysicalPlan keyPlan : keyPlans) {
+                        for(PhysicalOperator op : keyPlan) {
+                            if(!(op instanceof POProject)) {
+                                int errCode = 1106;
+                                String errMsg = "Merge join is possible only for simple column or '*' join keys when using " +
+                                rightLoader.getLFile().getFuncSpec() + " as the loader";
+                                throw new MRCompilerException(errMsg, errCode, PigException.INPUT);
+                            }
+                        }
+                    }
+                }
+            } else {
+                LoadFunc loadFunc = rightLoader.getLoadFunc();
+                //Replacing POLoad with indexer is disabled for 'merge-sparse' joins.  While
+                //this feature would be useful, the current implementation of DefaultIndexableLoader
+                //is not designed to handle multiple calls to seekNear.  Specifically, it rereads the entire index
+                //for each call.  Some refactoring of this class is required - and then the check below could be removed.
+                if (joinOp.getJoinType() == LOJoin.JOINTYPE.MERGESPARSE) {
+                    int errCode = 1104;
+                    String errMsg = "Right input of merge-join must implement IndexableLoadFunc. " +
+                    "The specified loader " + loadFunc + " doesn't implement it";
+                    throw new MRCompilerException(errMsg,errCode);
+                }
+
+                // Replace POLoad with  indexer.
+
+                if (! (OrderedLoadFunc.class.isAssignableFrom(loadFunc.getClass()))){
+                    int errCode = 1104;
+                    String errMsg = "Right input of merge-join must implement " +
+                    "OrderedLoadFunc interface. The specified loader "
+                    + loadFunc + " doesn't implement it";
+                    throw new MRCompilerException(errMsg,errCode);
+                }
+
+                String[] indexerArgs = new String[6];
+                List<PhysicalPlan> rightInpPlans = joinOp.getInnerPlansOf(1);
+                FileSpec origRightLoaderFileSpec = rightLoader.getLFile();
+
+                indexerArgs[0] = origRightLoaderFileSpec.getFuncSpec().toString();
+                indexerArgs[1] = ObjectSerializer.serialize((Serializable)rightInpPlans);
+                indexerArgs[2] = ObjectSerializer.serialize(rightPipelinePlan);
+                indexerArgs[3] = rightLoader.getSignature();
+                indexerArgs[4] = rightLoader.getOperatorKey().scope;
+                indexerArgs[5] = Boolean.toString(true);
+
+                FileSpec lFile = new FileSpec(rightLoader.getLFile().getFileName(),new FuncSpec(MergeJoinIndexer.class.getName(), indexerArgs));
+                rightLoader.setLFile(lFile);
+
+                // Loader of operator will return a tuple of form -
+                // (keyFirst1, keyFirst2, .. , position, splitIndex) See MergeJoinIndexer
+
+                rightTezOprAggr = getTezOp();
+                tezPlan.add(rightTezOprAggr);
+                TezCompilerUtil.simpleConnectTwoVertex(tezPlan, rightTezOpr, rightTezOprAggr, scope, nig);
+                rightTezOprAggr.requestedParallelism = 1; // we need exactly one task for indexing job.
+
+                POStore st = TezCompilerUtil.getStore(scope, nig);
+                FileSpec strFile = getTempFileSpec();
+                st.setSFile(strFile);
+                rightTezOprAggr.plan.addAsLeaf(st);
+                rightTezOprAggr.setClosed(true);
+                rightTezOprAggr.segmentBelow = true;
+
+                // set up the DefaultIndexableLoader for the join operator
+                String[] defaultIndexableLoaderArgs = new String[5];
+                defaultIndexableLoaderArgs[0] = origRightLoaderFileSpec.getFuncSpec().toString();
+                defaultIndexableLoaderArgs[1] = strFile.getFileName();
+                defaultIndexableLoaderArgs[2] = strFile.getFuncSpec().toString();
+                defaultIndexableLoaderArgs[3] = joinOp.getOperatorKey().scope;
+                defaultIndexableLoaderArgs[4] = origRightLoaderFileSpec.getFileName();
+                joinOp.setRightLoaderFuncSpec((new FuncSpec(DefaultIndexableLoader.class.getName(), defaultIndexableLoaderArgs)));
+                joinOp.setRightInputFileName(origRightLoaderFileSpec.getFileName());
+
+                joinOp.setIndexFile(strFile.getFileName());
+                udfs.add(origRightLoaderFileSpec.getFuncSpec().toString());
+            }
+
+            // We are done with right side. Lets work on left now.
+            // Join will be materialized in leftTezOper.
+            if(!curTezOp.isClosed()) // Life is easy
+                curTezOp.plan.addAsLeaf(joinOp);
+
+            else{
+                int errCode = 2022;
+                String msg = "Input plan has been closed. This is unexpected while compiling.";
+                throw new PlanException(msg, errCode, PigException.BUG);
+            }
+            if(rightTezOprAggr != null) {
+                rightTezOprAggr.markIndexer();
+                // We want to ensure indexing job runs prior to actual join job. So, connect them in order.
+                TezCompilerUtil.connect(tezPlan, rightTezOprAggr, curTezOp);
+            }
+            phyToTezOpMap.put(joinOp, curTezOp);
+            // no combination of small splits as there is currently no way to guarantee the sortness
+            // of the combined splits.
+            curTezOp.noCombineSmallSplits();
+            curTezOp.UDFs.addAll(udfs);
+        }
+        catch(PlanException e){
+            int errCode = 2034;
+            String msg = "Error compiling operator " + joinOp.getClass().getCanonicalName();
+            throw new MRCompilerException(msg, errCode, PigException.BUG, e);
+        }
+       catch (IOException e){
+           int errCode = 3000;
+           String errMsg = "IOException caught while compiling POMergeJoin";
+            throw new MRCompilerException(errMsg, errCode,e);
+        }
+       catch(CloneNotSupportedException e){
+           int errCode = 2127;
+           String errMsg = "Cloning exception caught while compiling POMergeJoin";
+           throw new MRCompilerException(errMsg, errCode, PigException.BUG, e);
+       }
     }
 
     @Override
@@ -1284,7 +1497,7 @@ public class TezCompiler extends PhyPlan
         project.setResultType(DataType.BAG);
         project.setStar(false);
         project.setColumn(1);
-        POForEach forEach = getForEach(project, sort.getRequestedParallelism());
+        POForEach forEach = TezCompilerUtil.getForEach(project, sort.getRequestedParallelism(), scope, nig);
         oper1.plan.addAsLeaf(forEach);
 
         boolean[] sortOrder;
@@ -1442,7 +1655,7 @@ public class TezCompiler extends PhyPlan
             lr.setOutputKey(sortOpers[0].getOperatorKey().toString());
             lrSample.setOutputKey(quantJobParallelismPair.first.getOperatorKey().toString());
 
-            connect(tezPlan, prevOper, sortOpers[0]);
+            TezCompilerUtil.connect(tezPlan, prevOper, sortOpers[0]);
             TezEdgeDescriptor edge = sortOpers[0].inEdges.get(prevOper.getOperatorKey());
 
             // TODO: Convert to unsorted shuffle after TEZ-661
@@ -1450,9 +1663,9 @@ public class TezCompiler extends PhyPlan
             // edge.inputClassName = ShuffledUnorderedKVInput.class.getName();
             edge.partitionerClass = RoundRobinPartitioner.class;
 
-            connect(tezPlan, prevOper, quantJobParallelismPair.first);
+            TezCompilerUtil.connect(tezPlan, prevOper, quantJobParallelismPair.first);
 
-            connect(tezPlan, quantJobParallelismPair.first, sortOpers[0]);
+            TezCompilerUtil.connect(tezPlan, quantJobParallelismPair.first, sortOpers[0]);
             edge = sortOpers[0].inEdges.get(quantJobParallelismPair.first.getOperatorKey());
             edge.dataMovementType = DataMovementType.BROADCAST;
             edge.outputClassName = OnFileUnorderedKVOutput.class.getName();
@@ -1461,7 +1674,7 @@ public class TezCompiler extends PhyPlan
             lr.setOutputKey(sortOpers[0].getOperatorKey().toString());
             sortOpers[0].sampleOperator = quantJobParallelismPair.first;
 
-            connect(tezPlan, sortOpers[0], sortOpers[1]);
+            TezCompilerUtil.connect(tezPlan, sortOpers[0], sortOpers[1]);
             edge = sortOpers[1].inEdges.get(sortOpers[0].getOperatorKey());
             edge.partitionerClass = WeightedRangePartitionerTez.class;
 
@@ -1574,30 +1787,7 @@ public class TezCompiler extends PhyPlan
         }
     }
 
-    private POForEach getForEach(POProject project, int rp) {
-        PhysicalPlan forEachPlan = new PhysicalPlan();
-        forEachPlan.add(project);
 
-        List<PhysicalPlan> forEachPlans = Lists.newArrayList();
-        forEachPlans.add(forEachPlan);
-
-        List<Boolean> flatten = Lists.newArrayList();
-        flatten.add(true);
-
-        POForEach forEach = new POForEach(new OperatorKey(scope, nig.getNextNodeId(scope)), rp, forEachPlans, flatten);
-        forEach.setResultType(DataType.BAG);
-        return forEach;
-    }
-
-    // Get a plain POForEach: ForEach X generate flatten($1)
-    private POForEach getForEachPlain() {
-        POProject project = new POProject(new OperatorKey(scope, nig.getNextNodeId(scope)));
-        project.setResultType(DataType.TUPLE);
-        project.setStar(false);
-        project.setColumn(1);
-        project.setOverloaded(true);
-        return getForEach(project, -1);
-    }
 
     /**
      * Returns a POPackage with default packager. This method shouldn't be used

Added: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompilerUtil.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompilerUtil.java?rev=1556961&view=auto
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompilerUtil.java (added)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompilerUtil.java Thu Jan  9 21:50:46 2014
@@ -0,0 +1,106 @@
+package org.apache.pig.backend.hadoop.executionengine.tez;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.pig.PigException;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import org.apache.pig.data.DataType;
+import org.apache.pig.impl.plan.NodeIdGenerator;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.PlanException;
+
+import com.google.common.collect.Lists;
+
+public class TezCompilerUtil {
+
+    // simpleConnectTwoVertex is a utility to end a vertex equivalent to map and start vertex equivalent to
+    // reduce in a tez operator:
+    // 1. op1 is open
+    // 2. op2 is blank
+    //    POPackage to start a reduce vertex
+    // 3. POLocalRearrange/POPackage are trivial
+    // 4. User need to connect op1 to op2 themselves
+    static public void simpleConnectTwoVertex(TezOperPlan tezPlan, TezOperator op1, TezOperator op2, String scope, NodeIdGenerator nig) throws PlanException
+    {
+        PhysicalPlan ep = new PhysicalPlan();
+        POProject prjStar = new POProject(new OperatorKey(scope,nig.getNextNodeId(scope)));
+        prjStar.setResultType(DataType.TUPLE);
+        prjStar.setStar(true);
+        ep.add(prjStar);
+
+        List<PhysicalPlan> eps = new ArrayList<PhysicalPlan>();
+        eps.add(ep);
+
+        POLocalRearrangeTez lr = new POLocalRearrangeTez(new OperatorKey(scope,nig.getNextNodeId(scope)));
+        try {
+            lr.setIndex(0);
+        } catch (ExecException e) {
+            int errCode = 2058;
+            String msg = "Unable to set index on the newly created POLocalRearrange.";
+            throw new PlanException(msg, errCode, PigException.BUG, e);
+        }
+        lr.setKeyType(DataType.TUPLE);
+        lr.setPlans(eps);
+        lr.setResultType(DataType.TUPLE);
+        lr.setOutputKey(op2.getOperatorKey().toString());
+
+        op1.plan.addAsLeaf(lr);
+
+        POPackage pkg = new POPackage(new OperatorKey(scope,nig.getNextNodeId(scope)));
+        pkg.getPkgr().setKeyType(DataType.TUPLE);
+        pkg.setNumInps(1);
+        boolean[] inner = {false};
+        pkg.getPkgr().setInner(inner);
+        op2.plan.add(pkg);
+
+        op2.plan.addAsLeaf(getForEachPlain(scope, nig));
+
+        connect(tezPlan, op1, op2);
+    }
+
+    static public void connect(TezOperPlan plan, TezOperator from, TezOperator to) throws PlanException {
+        plan.connect(from, to);
+        // Add edge descriptors to old and new operators
+        to.inEdges.put(from.getOperatorKey(), new TezEdgeDescriptor());
+        from.outEdges.put(to.getOperatorKey(), new TezEdgeDescriptor());
+    }
+
+    static public POForEach getForEach(POProject project, int rp, String scope, NodeIdGenerator nig) {
+        PhysicalPlan forEachPlan = new PhysicalPlan();
+        forEachPlan.add(project);
+
+        List<PhysicalPlan> forEachPlans = Lists.newArrayList();
+        forEachPlans.add(forEachPlan);
+
+        List<Boolean> flatten = Lists.newArrayList();
+        flatten.add(true);
+
+        POForEach forEach = new POForEach(new OperatorKey(scope, nig.getNextNodeId(scope)), rp, forEachPlans, flatten);
+        forEach.setResultType(DataType.BAG);
+        return forEach;
+    }
+
+    // Get a plain POForEach: ForEach X generate flatten($1)
+    static public POForEach getForEachPlain(String scope, NodeIdGenerator nig) {
+        POProject project = new POProject(new OperatorKey(scope, nig.getNextNodeId(scope)));
+        project.setResultType(DataType.TUPLE);
+        project.setStar(false);
+        project.setColumn(1);
+        project.setOverloaded(true);
+        return getForEach(project, -1, scope, nig);
+    }
+
+    static public POStore getStore(String scope, NodeIdGenerator nig) {
+        POStore st = new POStoreTez(new OperatorKey(scope, nig.getNextNodeId(scope)));
+        // mark store as tmp store. These could be removed by the
+        // optimizer, because it wasn't the user requesting it.
+        st.setIsTmpStore(true);
+        return st;
+    }
+}

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java?rev=1556961&r1=1556960&r2=1556961&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java Thu Jan  9 21:50:46 2014
@@ -45,6 +45,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.pig.LoadFunc;
+import org.apache.pig.PigConfiguration;
 import org.apache.pig.PigException;
 import org.apache.pig.StoreFuncInterface;
 import org.apache.pig.backend.executionengine.ExecException;
@@ -266,11 +267,25 @@ public class TezDagBuilder extends TezOp
 
         // Pass physical plans to vertex as user payload.
         Configuration payloadConf = job.getConfiguration();
-        
+
         if (tezOp.sampleOperator != null) {
             payloadConf.set("pig.sampleVertex", tezOp.sampleOperator.getOperatorKey().toString());
         }
 
+        String tmp;
+        long maxCombinedSplitSize = 0;
+        if (!tezOp.combineSmallSplits() || pc.getProperties().getProperty(PigConfiguration.PIG_SPLIT_COMBINATION, "true").equals("false"))
+            payloadConf.setBoolean(PigConfiguration.PIG_NO_SPLIT_COMBINATION, true);
+        else if ((tmp = pc.getProperties().getProperty(PigConfiguration.PIG_MAX_COMBINED_SPLIT_SIZE, null)) != null) {
+            try {
+                maxCombinedSplitSize = Long.parseLong(tmp);
+            } catch (NumberFormatException e) {
+                log.warn("Invalid numeric format for pig.maxCombinedSplitSize; use the default maximum combined split size");
+            }
+        }
+        if (maxCombinedSplitSize > 0)
+            payloadConf.setLong("pig.maxCombinedSplitSize", maxCombinedSplitSize);
+
         List<POLoad> loads = processLoads(tezOp, payloadConf, job);
         LinkedList<POStore> stores = processStores(tezOp, payloadConf, job);
 
@@ -278,6 +293,7 @@ public class TezDagBuilder extends TezOp
 
         payloadConf.set("udf.import.list",
                 ObjectSerializer.serialize(PigContext.getPackageImportList()));
+        payloadConf.set("exectype", "TEZ");
         payloadConf.setBoolean("mapred.mapper.new-api", true);
         payloadConf.setClass("mapreduce.inputformat.class",
                 PigInputFormat.class, InputFormat.class);

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperator.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperator.java?rev=1556961&r1=1556960&r2=1556961&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperator.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperator.java Thu Jan  9 21:50:46 2014
@@ -89,7 +89,12 @@ public class TezOperator extends Operato
     // Last POLimit value in this map reduce operator, needed by LimitAdjuster
     // to add additional map reduce operator with 1 reducer after this
     long limit = -1;
-    
+
+    // Flag to indicate if the small input splits need to be combined to form a larger
+    // one in order to reduce the number of mappers. For merge join, both tables
+    // are NOT combinable for correctness.
+    private boolean combineSmallSplits = true;
+
     // If not null, need to collect sample sent from predecessor
     TezOperator sampleOperator = null;
 
@@ -98,6 +103,8 @@ public class TezOperator extends Operato
         NONE,
         // Indicate if this job is a union job
         UNION,
+        // Indicate if this job is a merge indexer
+        INDEXER,
         // Indicate if this job is a sampling job
         SAMPLER,
         // Indicate if this job is a group by job
@@ -186,6 +193,10 @@ public class TezOperator extends Operato
         return (feature == OPER_FEATURE.UNION);
     }
 
+    public void markIndexer() {
+        feature = OPER_FEATURE.INDEXER;
+    }
+
     public void markUnion() {
         feature = OPER_FEATURE.UNION;
     }
@@ -279,5 +290,13 @@ public class TezOperator extends Operato
     public boolean isLimitAfterSort() {
         return limitAfterSort;
     }
+
+    protected void noCombineSmallSplits() {
+        combineSmallSplits = false;
+    }
+
+    public boolean combineSmallSplits() {
+        return combineSmallSplits;
+    }
 }
 

Modified: pig/branches/tez/test/e2e/pig/tests/tez.conf
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/e2e/pig/tests/tez.conf?rev=1556961&r1=1556960&r2=1556961&view=diff
==============================================================================
--- pig/branches/tez/test/e2e/pig/tests/tez.conf (original)
+++ pig/branches/tez/test/e2e/pig/tests/tez.conf Thu Jan  9 21:50:46 2014
@@ -190,7 +190,58 @@ c = filter a by age < 20;
 d = filter b by age < 20;
 e = join c by name LEFT OUTER, d by name using 'replicated';
 store e into ':OUTPATH:';\,
-                        }
+                        },
+                        # Simplest merge-join.
+                        {
+                        'num' => 4,
+                        'pig' => q\a = load ':INPATH:/singlefile/studenttab10k';
+                                        b = load ':INPATH:/singlefile/votertab10k';
+                                        c = order a by $0;
+                                        d = order b by $0;
+                                        store c into ':OUTPATH:.intermediate1';
+                                        store d into ':OUTPATH:.intermediate2';
+                                        exec;
+                                        e = load ':OUTPATH:.intermediate1';
+                                        f = load ':OUTPATH:.intermediate2';
+                                        g = join e by $0, f by $0 using 'merge';
+                                        store g into ':OUTPATH:';\,
+                        'notmq' => 1,
+                        },
+                        # Merge-join with right-side filter
+                        {
+                        'num' => 5,
+                        'pig' => q\a = load ':INPATH:/singlefile/studenttab10k';
+                                        b = load ':INPATH:/singlefile/votertab10k';
+                                        c = order a by $0;
+                                        d = order b by $0;
+                                        store c into ':OUTPATH:.intermediate1';
+                                        store d into ':OUTPATH:.intermediate2';
+                                        exec;
+                                        e = load ':OUTPATH:.intermediate1';
+                                        f = load ':OUTPATH:.intermediate2';
+                                        i = filter f by $2 != 'democrat';
+                                        g = join e by $0, i by $0 using 'merge';
+                                        store g into ':OUTPATH:';\,
+                        'notmq' => 1,
+                        },
+                        # Merge-join with schemas
+                        {
+                        'num' => 6,
+                        'floatpostprocess' => 1,
+                        'delimiter' => '	',
+                        'pig' => q\a = load ':INPATH:/singlefile/studenttab10k';
+                                        b = load ':INPATH:/singlefile/votertab10k';
+                                        c = order a by $0;
+                                        d = order b by $0;
+                                        store c into ':OUTPATH:.intermediate1';
+                                        store d into ':OUTPATH:.intermediate2';
+                                        exec;
+                                        e = load ':OUTPATH:.intermediate1'  as (name:chararray, age:int, gpa:float);
+                                        f = load ':OUTPATH:.intermediate2'  as (name:chararray, age:int, reg:chararray, contrib:float);
+                                        g = join e by $0, f by $0 using 'merge';
+                                        store g into ':OUTPATH:';\,
+                        'notmq' => 1,
+                        },
                   ]
                 },
                 {