You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by pr...@apache.org on 2016/07/04 06:46:13 UTC

svn commit: r1751215 - in /pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine: physicalLayer/relationalOperators/ spark/ spark/converter/ spark/plan/

Author: praveen
Date: Mon Jul  4 06:46:13 2016
New Revision: 1751215

URL: http://svn.apache.org/viewvc?rev=1751215&view=rev
Log:
PIG-4810: Implement Merge join for spark engine (Xianda via Praveen)

Modified:
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/JobGraphBuilder.java
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/MergeJoinConverter.java
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java?rev=1751215&r1=1751214&r2=1751215&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java Mon Jul  4 06:46:13 2016
@@ -131,6 +131,24 @@ public class POMergeJoin extends Physica
 
     private byte endOfRecordMark = POStatus.STATUS_NULL;
 
+    // Only for Spark
+    // If current operator reaches at its end, flag endOfInput is set as true.
+    // The old flag parentPlan.endOfAllInput doesn't work in spark mode, because it is shared
+    // between operators in the same plan, so it could be set by preceding operators even
+    // current operator does not reach at its end. (see PIG-4876)
+    private boolean endOfInput = false;
+    public boolean isEndOfInput() {
+        return endOfInput;
+    }
+    public void setEndOfInput (boolean isEndOfInput) {
+        endOfInput = isEndOfInput;
+    }
+
+    // Only for spark.
+    // it means that current operator reaches at its end and the last left input was
+    // added into 'leftTuples', ready for join.
+    private boolean leftInputConsumedInSpark = false;
+
     // This serves as the default TupleFactory
     private transient TupleFactory mTupleFactory;
 
@@ -353,7 +371,7 @@ public class POMergeJoin extends Physica
 
                     }
                     else if(cmpval > 0){    // We got ahead on right side. Store currently read right tuple.
-                        if(!this.parentPlan.endOfAllInput){
+                        if(!(this.parentPlan.endOfAllInput|| leftInputConsumedInSpark)){
                             prevRightKey = rightKey;
                             prevRightInp = rightInp;
                             // There cant be any more join on this key.
@@ -414,11 +432,14 @@ public class POMergeJoin extends Physica
             }
  
         case POStatus.STATUS_EOP:
-            if(this.parentPlan.endOfAllInput){
+            if(this.parentPlan.endOfAllInput || isEndOfInput()){
                 // We hit the end on left input. 
                 // Tuples in bag may still possibly join with right side.
                 curJoinKey = prevLeftKey;
                 curLeftKey = null;
+                if (isEndOfInput()) {
+                    leftInputConsumedInSpark = true;
+                }
                 break;                
             }
             else    // Fetch next left input.
@@ -428,7 +449,9 @@ public class POMergeJoin extends Physica
             return curLeftInp;
         }
 
-        if((null != prevRightKey) && !this.parentPlan.endOfAllInput && ((Comparable)prevRightKey).compareTo(curLeftKey) >= 0){
+        if((null != prevRightKey)
+                && !(this.parentPlan.endOfAllInput || leftInputConsumedInSpark)
+                && ((Comparable)prevRightKey).compareTo(curLeftKey) >= 0){
 
             // This will happen when we accumulated inputs on left side and moved on, but are still behind the right side
             // In that case, throw away the tuples accumulated till now and add the one we read in this function call.
@@ -510,7 +533,7 @@ public class POMergeJoin extends Physica
                 leftTuples.add((Tuple)curLeftInp.result);
                 prevLeftInp = curLeftInp;
                 prevLeftKey = curLeftKey;
-                if(this.parentPlan.endOfAllInput){  // This is end of all input and this is last time we will read right input.
+                if(this.parentPlan.endOfAllInput || leftInputConsumedInSpark){  // This is end of all input and this is last time we will read right input.
                     // Right loader in this case wouldn't get a chance to close input stream. So, we close it ourself.
                     try {
                         ((IndexableLoadFunc)rightLoader).close();

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/JobGraphBuilder.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/JobGraphBuilder.java?rev=1751215&r1=1751214&r2=1751215&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/JobGraphBuilder.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/JobGraphBuilder.java Mon Jul  4 06:46:13 2016
@@ -41,6 +41,7 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFRJoin;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoin;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
@@ -92,6 +93,7 @@ public class JobGraphBuilder extends Spa
         new PhyPlanSetter(sparkOp.physicalPlan).visit();
         try {
             setReplicationForFRJoin(sparkOp.physicalPlan);
+            setReplicationForMergeJoin(sparkOp.physicalPlan);
             sparkOperToRDD(sparkOp);
             finishUDFs(sparkOp.physicalPlan);
         } catch (InterruptedException e) {
@@ -104,21 +106,45 @@ public class JobGraphBuilder extends Spa
     }
 
     private void setReplicationForFRJoin(PhysicalPlan plan) throws IOException {
+        List<Path> filesForMoreReplication = new ArrayList<Path>();
         List<POFRJoin> pofrJoins = PlanHelper.getPhysicalOperators(plan, POFRJoin.class);
         if (pofrJoins.size() > 0) {
-            FileSystem fs = FileSystem.get(this.jobConf);
-            short replication = (short) jobConf.getInt(MRConfiguration.SUMIT_REPLICATION, 10);
             for (POFRJoin pofrJoin : pofrJoins) {
                 FileSpec[] fileSpecs = pofrJoin.getReplFiles();
                 if (fileSpecs != null) {
                     for (int i = 0; i < fileSpecs.length; i++) {
                         if (i != pofrJoin.getFragment()) {
-                            fs.setReplication(new Path(fileSpecs[i].getFileName()), replication);
+                            filesForMoreReplication.add(new Path(fileSpecs[i].getFileName()));
                         }
                     }
                 }
             }
         }
+        setReplicationForFiles(filesForMoreReplication);
+    }
+
+    private void setReplicationForMergeJoin(PhysicalPlan plan) throws IOException {
+        List<Path> filesForMoreReplication = new ArrayList<Path>();
+        List<POMergeJoin> poMergeJoins = PlanHelper.getPhysicalOperators(plan, POMergeJoin.class);
+        if (poMergeJoins.size() > 0) {
+            for (POMergeJoin poMergeJoin : poMergeJoins) {
+                String idxFileName = poMergeJoin.getIndexFile();
+                filesForMoreReplication.add(new Path(idxFileName));
+                // in spark mode, set as null so that PoMergeJoin won't use hadoop distributed cache
+                // see POMergeJoin.seekInRightStream()
+                poMergeJoin.setIndexFile(null);
+            }
+        }
+
+        setReplicationForFiles(filesForMoreReplication);
+    }
+
+    private void setReplicationForFiles(List<Path> files) throws IOException {
+        FileSystem fs = FileSystem.get(this.jobConf);
+        short replication = (short) jobConf.getInt(MRConfiguration.SUMIT_REPLICATION, 10);
+        for (int i = 0; i < files.size(); i++) {
+            fs.setReplication(files.get(i), replication);
+        }
     }
 
     // Calling EvalFunc.finish()

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java?rev=1751215&r1=1751214&r2=1751215&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java Mon Jul  4 06:46:13 2016
@@ -202,7 +202,7 @@ public class SparkLauncher extends Launc
         convertMap.put(POSort.class, new SortConverter());
         convertMap.put(POSplit.class, new SplitConverter());
         convertMap.put(POSkewedJoin.class, new SkewedJoinConverter());
-        convertMap.put(POMergeJoin.class, new MergeJoinConverter());
+        convertMap.put(POMergeJoin.class, new MergeJoinConverter(confBytes));
         convertMap.put(POCollectedGroup.class, new CollectedGroupConverter());
         convertMap.put(POCounter.class, new CounterConverter());
         convertMap.put(PORank.class, new RankConverter());

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/MergeJoinConverter.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/MergeJoinConverter.java?rev=1751215&r1=1751214&r2=1751215&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/MergeJoinConverter.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/MergeJoinConverter.java Mon Jul  4 06:46:13 2016
@@ -22,151 +22,85 @@ import java.io.Serializable;
 import java.util.Iterator;
 import java.util.List;
 
-import scala.Tuple2;
-import scala.runtime.AbstractFunction1;
-
-import org.apache.pig.PigException;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoin;
+import org.apache.pig.backend.hadoop.executionengine.spark.KryoSerializer;
 import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil;
 import org.apache.pig.data.Tuple;
-import org.apache.pig.data.TupleFactory;
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.function.FlatMapFunction;
 import org.apache.spark.rdd.RDD;
 
+
 @SuppressWarnings("serial")
 public class MergeJoinConverter implements
         RDDConverter<Tuple, Tuple, POMergeJoin> {
 
+    private byte[] confBytes;
+    public MergeJoinConverter(byte[] confBytes) {
+        this.confBytes = confBytes;
+    }
+
     @Override
     public RDD<Tuple> convert(List<RDD<Tuple>> predecessors,
                               POMergeJoin poMergeJoin) throws IOException {
 
-        SparkUtil.assertPredecessorSize(predecessors, poMergeJoin, 2);
-
-        RDD<Tuple> rdd1 = predecessors.get(0);
-        RDD<Tuple> rdd2 = predecessors.get(1);
-
-        // make (key, value) pairs, key has type IndexedKey, value has type Tuple
-        RDD<Tuple2<IndexedKey, Tuple>> rdd1Pair = rdd1.map(new ExtractKeyFunction(
-                poMergeJoin, 0), SparkUtil.<IndexedKey, Tuple>getTuple2Manifest());
-        RDD<Tuple2<IndexedKey, Tuple>> rdd2Pair = rdd2.map(new ExtractKeyFunction(
-                poMergeJoin, 1), SparkUtil.<IndexedKey, Tuple>getTuple2Manifest());
-
-        JavaPairRDD<IndexedKey, Tuple> prdd1 = new JavaPairRDD<IndexedKey, Tuple>(
-                rdd1Pair, SparkUtil.getManifest(IndexedKey.class),
-                SparkUtil.getManifest(Tuple.class));
-        JavaPairRDD<IndexedKey, Tuple> prdd2 = new JavaPairRDD<IndexedKey, Tuple>(
-                rdd2Pair, SparkUtil.getManifest(IndexedKey.class),
-                SparkUtil.getManifest(Tuple.class));
-
-        JavaPairRDD<IndexedKey, Tuple2<Tuple, Tuple>> jrdd = prdd1
-                .join(prdd2);
-
-        // map to get JavaRDD<Tuple> from join() output, which is
-        // JavaPairRDD<IndexedKey, Tuple2<Tuple, Tuple>> by
-        // ignoring the key (of type IndexedKey) and appending the values (the
-        // Tuples)
-        JavaRDD<Tuple> result = jrdd
-                .mapPartitions(new ToValueFunction());
+        SparkUtil.assertPredecessorSize(predecessors, poMergeJoin, 1);
+        RDD<Tuple> rdd = predecessors.get(0);
+        MergeJoinFunction mergeJoinFunction = new MergeJoinFunction(poMergeJoin, confBytes);
 
-        return result.rdd();
+        return rdd.toJavaRDD().mapPartitions(mergeJoinFunction, true).rdd();
     }
 
-    private static class ExtractKeyFunction extends
-            AbstractFunction1<Tuple, Tuple2<IndexedKey, Tuple>> implements
-            Serializable {
+    private static class MergeJoinFunction implements
+            FlatMapFunction<Iterator<Tuple>, Tuple>, Serializable {
 
-        private final POMergeJoin poMergeJoin;
-        private final int LR_index; // 0 for left table, 1 for right table
+        private POMergeJoin poMergeJoin;
+        private byte[] confBytes;
 
-        public ExtractKeyFunction(POMergeJoin poMergeJoin, int LR_index) {
+        private transient JobConf jobConf;
+
+        private MergeJoinFunction(POMergeJoin poMergeJoin, byte[] confBytes) {
             this.poMergeJoin = poMergeJoin;
-            this.LR_index = LR_index;
+            this.confBytes = confBytes;
         }
 
-        @Override
-        public Tuple2<IndexedKey, Tuple> apply(Tuple tuple) {
-            poMergeJoin.getLRs()[LR_index].attachInput(tuple);
-
-            try {
-                Result lrOut = poMergeJoin.getLRs()[LR_index].getNextTuple();
-                if(lrOut.returnStatus!= POStatus.STATUS_OK){
-                    int errCode = 2167;
-                    String errMsg = "LocalRearrange used to extract keys from " +
-                            "tuple is not configured correctly";
-                    throw new ExecException(errMsg,errCode, PigException.BUG);
-                }
-
-                // If tuple is (AA, 5) and key index is $1, then it lrOut is 0 5 (AA),
-                // so get(1) returns key
-                Object index =  ((Tuple) lrOut.result).get(0);
-                Object key = ((Tuple) lrOut.result).get(1);
-                Tuple value = tuple;
-                Tuple2<IndexedKey, Tuple> tuple_KeyValue = new Tuple2<IndexedKey, Tuple>(new IndexedKey((Byte)index,key),
-                        value);
-
-                return tuple_KeyValue;
-
-            } catch (Exception e) {
-                throw new RuntimeException(e);
+        void initializeJobConf() {
+            if (this.jobConf == null) {
+                this.jobConf = KryoSerializer.deserializeJobConf(this.confBytes);
+                PigMapReduce.sJobConfInternal.set(jobConf);
             }
         }
-    }
-
-    private static class ToValueFunction
-            implements FlatMapFunction<Iterator<Tuple2<IndexedKey, Tuple2<Tuple, Tuple>>>, Tuple>, Serializable {
-
-        private class Tuple2TransformIterable implements Iterable<Tuple> {
-
-            Iterator<Tuple2<IndexedKey, Tuple2<Tuple, Tuple>>> in;
-
-            Tuple2TransformIterable(
-                    Iterator<Tuple2<IndexedKey, Tuple2<Tuple, Tuple>>> input) {
-                in = input;
-            }
 
-            public Iterator<Tuple> iterator() {
-                return new IteratorTransform<Tuple2<IndexedKey, Tuple2<Tuple, Tuple>>, Tuple>(
-                        in) {
-                    @Override
-                    protected Tuple transform(
-                            Tuple2<IndexedKey, Tuple2<Tuple, Tuple>> next) {
-                        try {
-
-                            Tuple leftTuple = next._2()._1();
-                            Tuple rightTuple = next._2()._2();
-
-                            TupleFactory tf = TupleFactory.getInstance();
-                            Tuple result = tf.newTuple(leftTuple.size()
-                                    + rightTuple.size());
-
-                            // concatenate the two tuples together to make a
-                            // resulting tuple
-                            for (int i = 0; i < leftTuple.size(); i++)
-                                result.set(i, leftTuple.get(i));
-                            for (int i = 0; i < rightTuple.size(); i++)
-                                result.set(i + leftTuple.size(),
-                                        rightTuple.get(i));
+        public Iterable<Tuple> call(final Iterator<Tuple> input) {
+            initializeJobConf();
 
-                            return result;
+            return new Iterable<Tuple>() {
+                @Override
+                public Iterator<Tuple> iterator() {
+                    return new OutputConsumerIterator(input) {
+
+                        @Override
+                        protected void attach(Tuple tuple) {
+                            poMergeJoin.setInputs(null);
+                            poMergeJoin.attachInput(tuple);
+                        }
 
-                        } catch (Exception e) {
-                            throw new RuntimeException(e);
+                        @Override
+                        protected Result getNextResult() throws ExecException {
+                            return poMergeJoin.getNextTuple();
                         }
-                    }
-                };
-            }
-        }
 
-        @Override
-        public Iterable<Tuple> call(
-                Iterator<Tuple2<IndexedKey, Tuple2<Tuple, Tuple>>> input) {
-            return new Tuple2TransformIterable(input);
+                        @Override
+                        protected void endOfInput() {
+                            poMergeJoin.setEndOfInput(true);
+                        }
+                    };
+                }
+            };
         }
     }
 }

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java?rev=1751215&r1=1751214&r2=1751215&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java Mon Jul  4 06:46:13 2016
@@ -42,6 +42,7 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.UDFFinder;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCollectedGroup;
@@ -72,6 +73,7 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.spark.operator.NativeSparkOperator;
 import org.apache.pig.backend.hadoop.executionengine.spark.operator.POGlobalRearrangeSpark;
 import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.builtin.DefaultIndexableLoader;
 import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.impl.io.FileSpec;
 import org.apache.pig.impl.plan.DepthFirstWalker;
@@ -83,6 +85,8 @@ import org.apache.pig.impl.plan.PlanExce
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.impl.util.ObjectSerializer;
 import org.apache.pig.impl.util.Utils;
+import org.apache.pig.newplan.logical.relational.LOJoin;
+
 
 /**
  * The compiler that compiles a given physical physicalPlan into a DAG of Spark
@@ -732,16 +736,156 @@ public class SparkCompiler extends PhyPl
 
 	@Override
 	public void visitMergeJoin(POMergeJoin joinOp) throws VisitorException {
-        try {
-            addToPlan(joinOp);
-            phyToSparkOpMap.put(joinOp, curSparkOp);
-        } catch (Exception e) {
+		try {
+			if (compiledInputs.length != 2 || joinOp.getInputs().size() != 2){
+				int errCode=1101;
+				throw new SparkCompilerException("Merge Join must have exactly two inputs. Found : "+compiledInputs.length, errCode);
+			}
 
-            int errCode = 2034;
-            String msg = "Error compiling operator "
-                    + joinOp.getClass().getSimpleName();
-            throw new SparkCompilerException(msg, errCode, PigException.BUG, e);
-        }
+			curSparkOp = phyToSparkOpMap.get(joinOp.getInputs().get(0));
+			SparkOperator rightSparkOp;
+			if(curSparkOp.equals(compiledInputs[0])) {
+				rightSparkOp = compiledInputs[1];
+			} else {
+				rightSparkOp = compiledInputs[0];
+			}
+
+			PhysicalPlan rightPipelinePlan;
+			PhysicalPlan rightPhyPlan = rightSparkOp.physicalPlan;
+			if (rightPhyPlan.getRoots().size() != 1) {
+				int errCode = 2171;
+				String errMsg = "Expected one but found more then one root physical operator in physical plan.";
+				throw new SparkCompilerException(errMsg,errCode);
+			}
+			PhysicalOperator rightPhyLoader = rightPhyPlan.getRoots().get(0);
+			if (!(rightPhyLoader instanceof POLoad)) {
+				int errCode = 2172;
+				String errMsg = "Expected physical operator at root to be POLoad. Found : "+rightPhyLoader.getClass().getCanonicalName();
+				throw new SparkCompilerException(errMsg,errCode);
+			}
+			if (rightPhyPlan.getSuccessors(rightPhyLoader) == null || rightPhyPlan.getSuccessors(rightPhyLoader).isEmpty()) {
+				// Load - Join case.
+				rightPipelinePlan = null;
+			} else{ // We got something on right side. Yank it and set it as inner plan of right input.
+				rightPipelinePlan = rightPhyPlan.clone();
+				PhysicalOperator root = rightPipelinePlan.getRoots().get(0);
+				rightPipelinePlan.disconnect(root, rightPipelinePlan.getSuccessors(root).get(0));
+				rightPipelinePlan.remove(root);
+				rightPhyPlan.trimBelow(rightPhyLoader);
+			}
+
+			joinOp.setupRightPipeline(rightPipelinePlan);
+			rightSparkOp.setRequestedParallelism(1); // for indexing job
+
+			POLoad rightLoader = (POLoad)rightSparkOp.physicalPlan.getRoots().get(0);
+			joinOp.setSignature(rightLoader.getSignature());
+			LoadFunc rightLoadFunc = rightLoader.getLoadFunc();
+
+			if(IndexableLoadFunc.class.isAssignableFrom(rightLoadFunc.getClass())) {
+				joinOp.setRightLoaderFuncSpec(rightLoader.getLFile().getFuncSpec());
+				joinOp.setRightInputFileName(rightLoader.getLFile().getFileName());
+				curSparkOp.UDFs.add(rightLoader.getLFile().getFuncSpec().toString());
+
+				// we don't need the right rightSparkOp since
+				// the right loader is an IndexableLoadFunc which can handle the index itself
+				sparkPlan.remove(rightSparkOp);
+				if(rightSparkOp == compiledInputs[0]) {
+					compiledInputs[0] = null;
+				} else if(rightSparkOp == compiledInputs[1]) {
+					compiledInputs[1] = null;
+				}
+
+				// validate that the join keys in merge join are only
+				// simple column projections or '*' and not expression - expressions
+				// cannot be handled when the index is built by the storage layer on the sorted
+				// data when the sorted data (and corresponding index) is written.
+				// So merge join will be restricted not have expressions as join keys
+				int numInputs = mPlan.getPredecessors(joinOp).size(); // should be 2
+				for(int i = 0; i < numInputs; i++) {
+					List<PhysicalPlan> keyPlans = joinOp.getInnerPlansOf(i);
+					for (PhysicalPlan keyPlan : keyPlans) {
+						for(PhysicalOperator op : keyPlan) {
+							if(!(op instanceof POProject)) {
+								int errCode = 1106;
+								String errMsg = "Merge join is possible only for simple column or '*' join keys when using " +
+										rightLoader.getLFile().getFuncSpec() + " as the loader";
+								throw new SparkCompilerException(errMsg, errCode, PigException.INPUT);
+							}
+						}
+					}
+				}
+
+			} else {
+				//Replacing POLoad with indexer is disabled for 'merge-sparse' joins.  While
+				//this feature would be useful, the current implementation of DefaultIndexableLoader
+				//is not designed to handle multiple calls to seekNear.  Specifically, it rereads the entire index
+				//for each call.  Some refactoring of this class is required - and then the check below could be removed.
+				if (joinOp.getJoinType() == LOJoin.JOINTYPE.MERGESPARSE) {
+					int errCode = 1104;
+					String errMsg = "Right input of merge-join must implement IndexableLoadFunc. " +
+							"The specified loader " + rightLoadFunc + " doesn't implement it";
+					throw new SparkCompilerException(errMsg,errCode);
+				}
+
+				// Replace POLoad with  indexer.
+				if (! (OrderedLoadFunc.class.isAssignableFrom(rightLoadFunc.getClass()))){
+					int errCode = 1104;
+					String errMsg = "Right input of merge-join must implement " +
+							"OrderedLoadFunc interface. The specified loader "
+							+ rightLoadFunc + " doesn't implement it";
+					throw new SparkCompilerException(errMsg,errCode);
+				}
+
+				String[] indexerArgs = new String[6];
+				List<PhysicalPlan> rightInpPlans = joinOp.getInnerPlansOf(1);
+				FileSpec origRightLoaderFileSpec = rightLoader.getLFile();
+
+				indexerArgs[0] = origRightLoaderFileSpec.getFuncSpec().toString();
+				indexerArgs[1] = ObjectSerializer.serialize((Serializable)rightInpPlans);
+				indexerArgs[2] = ObjectSerializer.serialize(rightPipelinePlan);
+				indexerArgs[3] = rightLoader.getSignature();
+				indexerArgs[4] = rightLoader.getOperatorKey().scope;
+				indexerArgs[5] = Boolean.toString(true);
+
+				FileSpec lFile = new FileSpec(rightLoader.getLFile().getFileName(),new FuncSpec(MergeJoinIndexer.class.getName(), indexerArgs));
+				rightLoader.setLFile(lFile);
+
+				// (keyFirst1, keyFirst2, .. , position, splitIndex) See MergeJoinIndexer
+				rightSparkOp.useTypedComparator(true);
+				POStore idxStore = getStore();
+				FileSpec idxStrFile = getTempFileSpec();
+				idxStore.setSFile(idxStrFile);
+				rightSparkOp.physicalPlan.addAsLeaf(idxStore);
+				rightSparkOp.markIndexer();
+
+				curSparkOp.UDFs.add(origRightLoaderFileSpec.getFuncSpec().toString());
+
+				// We want to ensure indexing job runs prior to actual join job.
+				// So, connect them in order.
+				sparkPlan.connect(rightSparkOp, curSparkOp);
+
+				// set up the DefaultIndexableLoader for the join operator
+				String[] defaultIndexableLoaderArgs = new String[5];
+				defaultIndexableLoaderArgs[0] = origRightLoaderFileSpec.getFuncSpec().toString();
+				defaultIndexableLoaderArgs[1] = idxStrFile.getFileName();
+				defaultIndexableLoaderArgs[2] = idxStrFile.getFuncSpec().toString();
+				defaultIndexableLoaderArgs[3] = joinOp.getOperatorKey().scope;
+				defaultIndexableLoaderArgs[4] = origRightLoaderFileSpec.getFileName();
+				joinOp.setRightLoaderFuncSpec((new FuncSpec(DefaultIndexableLoader.class.getName(), defaultIndexableLoaderArgs)));
+				joinOp.setRightInputFileName(origRightLoaderFileSpec.getFileName());
+
+				joinOp.setIndexFile(idxStrFile.getFileName());
+			}
+
+			curSparkOp.physicalPlan.addAsLeaf(joinOp);
+			phyToSparkOpMap.put(joinOp, curSparkOp);
+
+		} catch (Exception e) {
+			int errCode = 2034;
+			String msg = "Error compiling operator "
+					+ joinOp.getClass().getSimpleName();
+			throw new SparkCompilerException(msg, errCode, PigException.BUG, e);
+		}
 	}
 
 	private void processUDFs(PhysicalPlan plan) throws VisitorException {