You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by pr...@apache.org on 2016/07/04 06:46:13 UTC
svn commit: r1751215 - in
/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine:
physicalLayer/relationalOperators/ spark/ spark/converter/ spark/plan/
Author: praveen
Date: Mon Jul 4 06:46:13 2016
New Revision: 1751215
URL: http://svn.apache.org/viewvc?rev=1751215&view=rev
Log:
PIG-4810: Implement Merge join for spark engine (Xianda via Praveen)
Modified:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/JobGraphBuilder.java
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/MergeJoinConverter.java
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java?rev=1751215&r1=1751214&r2=1751215&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java Mon Jul 4 06:46:13 2016
@@ -131,6 +131,24 @@ public class POMergeJoin extends Physica
private byte endOfRecordMark = POStatus.STATUS_NULL;
+ // Only for Spark
+ // If current operator reaches at its end, flag endOfInput is set as true.
+ // The old flag parentPlan.endOfAllInput doesn't work in spark mode, because it is shared
+ // between operators in the same plan, so it could be set by preceding operators even
+ // current operator does not reach at its end. (see PIG-4876)
+ private boolean endOfInput = false;
+ public boolean isEndOfInput() {
+ return endOfInput;
+ }
+ public void setEndOfInput (boolean isEndOfInput) {
+ endOfInput = isEndOfInput;
+ }
+
+ // Only for spark.
+ // it means that current operator reaches at its end and the last left input was
+ // added into 'leftTuples', ready for join.
+ private boolean leftInputConsumedInSpark = false;
+
// This serves as the default TupleFactory
private transient TupleFactory mTupleFactory;
@@ -353,7 +371,7 @@ public class POMergeJoin extends Physica
}
else if(cmpval > 0){ // We got ahead on right side. Store currently read right tuple.
- if(!this.parentPlan.endOfAllInput){
+ if(!(this.parentPlan.endOfAllInput|| leftInputConsumedInSpark)){
prevRightKey = rightKey;
prevRightInp = rightInp;
// There cant be any more join on this key.
@@ -414,11 +432,14 @@ public class POMergeJoin extends Physica
}
case POStatus.STATUS_EOP:
- if(this.parentPlan.endOfAllInput){
+ if(this.parentPlan.endOfAllInput || isEndOfInput()){
// We hit the end on left input.
// Tuples in bag may still possibly join with right side.
curJoinKey = prevLeftKey;
curLeftKey = null;
+ if (isEndOfInput()) {
+ leftInputConsumedInSpark = true;
+ }
break;
}
else // Fetch next left input.
@@ -428,7 +449,9 @@ public class POMergeJoin extends Physica
return curLeftInp;
}
- if((null != prevRightKey) && !this.parentPlan.endOfAllInput && ((Comparable)prevRightKey).compareTo(curLeftKey) >= 0){
+ if((null != prevRightKey)
+ && !(this.parentPlan.endOfAllInput || leftInputConsumedInSpark)
+ && ((Comparable)prevRightKey).compareTo(curLeftKey) >= 0){
// This will happen when we accumulated inputs on left side and moved on, but are still behind the right side
// In that case, throw away the tuples accumulated till now and add the one we read in this function call.
@@ -510,7 +533,7 @@ public class POMergeJoin extends Physica
leftTuples.add((Tuple)curLeftInp.result);
prevLeftInp = curLeftInp;
prevLeftKey = curLeftKey;
- if(this.parentPlan.endOfAllInput){ // This is end of all input and this is last time we will read right input.
+ if(this.parentPlan.endOfAllInput || leftInputConsumedInSpark){ // This is end of all input and this is last time we will read right input.
// Right loader in this case wouldn't get a chance to close input stream. So, we close it ourself.
try {
((IndexableLoadFunc)rightLoader).close();
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/JobGraphBuilder.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/JobGraphBuilder.java?rev=1751215&r1=1751214&r2=1751215&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/JobGraphBuilder.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/JobGraphBuilder.java Mon Jul 4 06:46:13 2016
@@ -41,6 +41,7 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFRJoin;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoin;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
@@ -92,6 +93,7 @@ public class JobGraphBuilder extends Spa
new PhyPlanSetter(sparkOp.physicalPlan).visit();
try {
setReplicationForFRJoin(sparkOp.physicalPlan);
+ setReplicationForMergeJoin(sparkOp.physicalPlan);
sparkOperToRDD(sparkOp);
finishUDFs(sparkOp.physicalPlan);
} catch (InterruptedException e) {
@@ -104,21 +106,45 @@ public class JobGraphBuilder extends Spa
}
private void setReplicationForFRJoin(PhysicalPlan plan) throws IOException {
+ List<Path> filesForMoreReplication = new ArrayList<Path>();
List<POFRJoin> pofrJoins = PlanHelper.getPhysicalOperators(plan, POFRJoin.class);
if (pofrJoins.size() > 0) {
- FileSystem fs = FileSystem.get(this.jobConf);
- short replication = (short) jobConf.getInt(MRConfiguration.SUMIT_REPLICATION, 10);
for (POFRJoin pofrJoin : pofrJoins) {
FileSpec[] fileSpecs = pofrJoin.getReplFiles();
if (fileSpecs != null) {
for (int i = 0; i < fileSpecs.length; i++) {
if (i != pofrJoin.getFragment()) {
- fs.setReplication(new Path(fileSpecs[i].getFileName()), replication);
+ filesForMoreReplication.add(new Path(fileSpecs[i].getFileName()));
}
}
}
}
}
+ setReplicationForFiles(filesForMoreReplication);
+ }
+
+ private void setReplicationForMergeJoin(PhysicalPlan plan) throws IOException {
+ List<Path> filesForMoreReplication = new ArrayList<Path>();
+ List<POMergeJoin> poMergeJoins = PlanHelper.getPhysicalOperators(plan, POMergeJoin.class);
+ if (poMergeJoins.size() > 0) {
+ for (POMergeJoin poMergeJoin : poMergeJoins) {
+ String idxFileName = poMergeJoin.getIndexFile();
+ filesForMoreReplication.add(new Path(idxFileName));
+ // in spark mode, set as null so that PoMergeJoin won't use hadoop distributed cache
+ // see POMergeJoin.seekInRightStream()
+ poMergeJoin.setIndexFile(null);
+ }
+ }
+
+ setReplicationForFiles(filesForMoreReplication);
+ }
+
+ private void setReplicationForFiles(List<Path> files) throws IOException {
+ FileSystem fs = FileSystem.get(this.jobConf);
+ short replication = (short) jobConf.getInt(MRConfiguration.SUMIT_REPLICATION, 10);
+ for (int i = 0; i < files.size(); i++) {
+ fs.setReplication(files.get(i), replication);
+ }
}
// Calling EvalFunc.finish()
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java?rev=1751215&r1=1751214&r2=1751215&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java Mon Jul 4 06:46:13 2016
@@ -202,7 +202,7 @@ public class SparkLauncher extends Launc
convertMap.put(POSort.class, new SortConverter());
convertMap.put(POSplit.class, new SplitConverter());
convertMap.put(POSkewedJoin.class, new SkewedJoinConverter());
- convertMap.put(POMergeJoin.class, new MergeJoinConverter());
+ convertMap.put(POMergeJoin.class, new MergeJoinConverter(confBytes));
convertMap.put(POCollectedGroup.class, new CollectedGroupConverter());
convertMap.put(POCounter.class, new CounterConverter());
convertMap.put(PORank.class, new RankConverter());
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/MergeJoinConverter.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/MergeJoinConverter.java?rev=1751215&r1=1751214&r2=1751215&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/MergeJoinConverter.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/MergeJoinConverter.java Mon Jul 4 06:46:13 2016
@@ -22,151 +22,85 @@ import java.io.Serializable;
import java.util.Iterator;
import java.util.List;
-import scala.Tuple2;
-import scala.runtime.AbstractFunction1;
-
-import org.apache.pig.PigException;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoin;
+import org.apache.pig.backend.hadoop.executionengine.spark.KryoSerializer;
import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil;
import org.apache.pig.data.Tuple;
-import org.apache.pig.data.TupleFactory;
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.rdd.RDD;
+
@SuppressWarnings("serial")
public class MergeJoinConverter implements
RDDConverter<Tuple, Tuple, POMergeJoin> {
+ private byte[] confBytes;
+ public MergeJoinConverter(byte[] confBytes) {
+ this.confBytes = confBytes;
+ }
+
@Override
public RDD<Tuple> convert(List<RDD<Tuple>> predecessors,
POMergeJoin poMergeJoin) throws IOException {
- SparkUtil.assertPredecessorSize(predecessors, poMergeJoin, 2);
-
- RDD<Tuple> rdd1 = predecessors.get(0);
- RDD<Tuple> rdd2 = predecessors.get(1);
-
- // make (key, value) pairs, key has type IndexedKey, value has type Tuple
- RDD<Tuple2<IndexedKey, Tuple>> rdd1Pair = rdd1.map(new ExtractKeyFunction(
- poMergeJoin, 0), SparkUtil.<IndexedKey, Tuple>getTuple2Manifest());
- RDD<Tuple2<IndexedKey, Tuple>> rdd2Pair = rdd2.map(new ExtractKeyFunction(
- poMergeJoin, 1), SparkUtil.<IndexedKey, Tuple>getTuple2Manifest());
-
- JavaPairRDD<IndexedKey, Tuple> prdd1 = new JavaPairRDD<IndexedKey, Tuple>(
- rdd1Pair, SparkUtil.getManifest(IndexedKey.class),
- SparkUtil.getManifest(Tuple.class));
- JavaPairRDD<IndexedKey, Tuple> prdd2 = new JavaPairRDD<IndexedKey, Tuple>(
- rdd2Pair, SparkUtil.getManifest(IndexedKey.class),
- SparkUtil.getManifest(Tuple.class));
-
- JavaPairRDD<IndexedKey, Tuple2<Tuple, Tuple>> jrdd = prdd1
- .join(prdd2);
-
- // map to get JavaRDD<Tuple> from join() output, which is
- // JavaPairRDD<IndexedKey, Tuple2<Tuple, Tuple>> by
- // ignoring the key (of type IndexedKey) and appending the values (the
- // Tuples)
- JavaRDD<Tuple> result = jrdd
- .mapPartitions(new ToValueFunction());
+ SparkUtil.assertPredecessorSize(predecessors, poMergeJoin, 1);
+ RDD<Tuple> rdd = predecessors.get(0);
+ MergeJoinFunction mergeJoinFunction = new MergeJoinFunction(poMergeJoin, confBytes);
- return result.rdd();
+ return rdd.toJavaRDD().mapPartitions(mergeJoinFunction, true).rdd();
}
- private static class ExtractKeyFunction extends
- AbstractFunction1<Tuple, Tuple2<IndexedKey, Tuple>> implements
- Serializable {
+ private static class MergeJoinFunction implements
+ FlatMapFunction<Iterator<Tuple>, Tuple>, Serializable {
- private final POMergeJoin poMergeJoin;
- private final int LR_index; // 0 for left table, 1 for right table
+ private POMergeJoin poMergeJoin;
+ private byte[] confBytes;
- public ExtractKeyFunction(POMergeJoin poMergeJoin, int LR_index) {
+ private transient JobConf jobConf;
+
+ private MergeJoinFunction(POMergeJoin poMergeJoin, byte[] confBytes) {
this.poMergeJoin = poMergeJoin;
- this.LR_index = LR_index;
+ this.confBytes = confBytes;
}
- @Override
- public Tuple2<IndexedKey, Tuple> apply(Tuple tuple) {
- poMergeJoin.getLRs()[LR_index].attachInput(tuple);
-
- try {
- Result lrOut = poMergeJoin.getLRs()[LR_index].getNextTuple();
- if(lrOut.returnStatus!= POStatus.STATUS_OK){
- int errCode = 2167;
- String errMsg = "LocalRearrange used to extract keys from " +
- "tuple is not configured correctly";
- throw new ExecException(errMsg,errCode, PigException.BUG);
- }
-
- // If tuple is (AA, 5) and key index is $1, then it lrOut is 0 5 (AA),
- // so get(1) returns key
- Object index = ((Tuple) lrOut.result).get(0);
- Object key = ((Tuple) lrOut.result).get(1);
- Tuple value = tuple;
- Tuple2<IndexedKey, Tuple> tuple_KeyValue = new Tuple2<IndexedKey, Tuple>(new IndexedKey((Byte)index,key),
- value);
-
- return tuple_KeyValue;
-
- } catch (Exception e) {
- throw new RuntimeException(e);
+ void initializeJobConf() {
+ if (this.jobConf == null) {
+ this.jobConf = KryoSerializer.deserializeJobConf(this.confBytes);
+ PigMapReduce.sJobConfInternal.set(jobConf);
}
}
- }
-
- private static class ToValueFunction
- implements FlatMapFunction<Iterator<Tuple2<IndexedKey, Tuple2<Tuple, Tuple>>>, Tuple>, Serializable {
-
- private class Tuple2TransformIterable implements Iterable<Tuple> {
-
- Iterator<Tuple2<IndexedKey, Tuple2<Tuple, Tuple>>> in;
-
- Tuple2TransformIterable(
- Iterator<Tuple2<IndexedKey, Tuple2<Tuple, Tuple>>> input) {
- in = input;
- }
- public Iterator<Tuple> iterator() {
- return new IteratorTransform<Tuple2<IndexedKey, Tuple2<Tuple, Tuple>>, Tuple>(
- in) {
- @Override
- protected Tuple transform(
- Tuple2<IndexedKey, Tuple2<Tuple, Tuple>> next) {
- try {
-
- Tuple leftTuple = next._2()._1();
- Tuple rightTuple = next._2()._2();
-
- TupleFactory tf = TupleFactory.getInstance();
- Tuple result = tf.newTuple(leftTuple.size()
- + rightTuple.size());
-
- // concatenate the two tuples together to make a
- // resulting tuple
- for (int i = 0; i < leftTuple.size(); i++)
- result.set(i, leftTuple.get(i));
- for (int i = 0; i < rightTuple.size(); i++)
- result.set(i + leftTuple.size(),
- rightTuple.get(i));
+ public Iterable<Tuple> call(final Iterator<Tuple> input) {
+ initializeJobConf();
- return result;
+ return new Iterable<Tuple>() {
+ @Override
+ public Iterator<Tuple> iterator() {
+ return new OutputConsumerIterator(input) {
+
+ @Override
+ protected void attach(Tuple tuple) {
+ poMergeJoin.setInputs(null);
+ poMergeJoin.attachInput(tuple);
+ }
- } catch (Exception e) {
- throw new RuntimeException(e);
+ @Override
+ protected Result getNextResult() throws ExecException {
+ return poMergeJoin.getNextTuple();
}
- }
- };
- }
- }
- @Override
- public Iterable<Tuple> call(
- Iterator<Tuple2<IndexedKey, Tuple2<Tuple, Tuple>>> input) {
- return new Tuple2TransformIterable(input);
+ @Override
+ protected void endOfInput() {
+ poMergeJoin.setEndOfInput(true);
+ }
+ };
+ }
+ };
}
}
}
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java?rev=1751215&r1=1751214&r2=1751215&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java Mon Jul 4 06:46:13 2016
@@ -42,6 +42,7 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.UDFFinder;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCollectedGroup;
@@ -72,6 +73,7 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.spark.operator.NativeSparkOperator;
import org.apache.pig.backend.hadoop.executionengine.spark.operator.POGlobalRearrangeSpark;
import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.builtin.DefaultIndexableLoader;
import org.apache.pig.impl.io.FileLocalizer;
import org.apache.pig.impl.io.FileSpec;
import org.apache.pig.impl.plan.DepthFirstWalker;
@@ -83,6 +85,8 @@ import org.apache.pig.impl.plan.PlanExce
import org.apache.pig.impl.plan.VisitorException;
import org.apache.pig.impl.util.ObjectSerializer;
import org.apache.pig.impl.util.Utils;
+import org.apache.pig.newplan.logical.relational.LOJoin;
+
/**
* The compiler that compiles a given physical physicalPlan into a DAG of Spark
@@ -732,16 +736,156 @@ public class SparkCompiler extends PhyPl
@Override
public void visitMergeJoin(POMergeJoin joinOp) throws VisitorException {
- try {
- addToPlan(joinOp);
- phyToSparkOpMap.put(joinOp, curSparkOp);
- } catch (Exception e) {
+ try {
+ if (compiledInputs.length != 2 || joinOp.getInputs().size() != 2){
+ int errCode=1101;
+ throw new SparkCompilerException("Merge Join must have exactly two inputs. Found : "+compiledInputs.length, errCode);
+ }
- int errCode = 2034;
- String msg = "Error compiling operator "
- + joinOp.getClass().getSimpleName();
- throw new SparkCompilerException(msg, errCode, PigException.BUG, e);
- }
+ curSparkOp = phyToSparkOpMap.get(joinOp.getInputs().get(0));
+ SparkOperator rightSparkOp;
+ if(curSparkOp.equals(compiledInputs[0])) {
+ rightSparkOp = compiledInputs[1];
+ } else {
+ rightSparkOp = compiledInputs[0];
+ }
+
+ PhysicalPlan rightPipelinePlan;
+ PhysicalPlan rightPhyPlan = rightSparkOp.physicalPlan;
+ if (rightPhyPlan.getRoots().size() != 1) {
+ int errCode = 2171;
+ String errMsg = "Expected one but found more then one root physical operator in physical plan.";
+ throw new SparkCompilerException(errMsg,errCode);
+ }
+ PhysicalOperator rightPhyLoader = rightPhyPlan.getRoots().get(0);
+ if (!(rightPhyLoader instanceof POLoad)) {
+ int errCode = 2172;
+ String errMsg = "Expected physical operator at root to be POLoad. Found : "+rightPhyLoader.getClass().getCanonicalName();
+ throw new SparkCompilerException(errMsg,errCode);
+ }
+ if (rightPhyPlan.getSuccessors(rightPhyLoader) == null || rightPhyPlan.getSuccessors(rightPhyLoader).isEmpty()) {
+ // Load - Join case.
+ rightPipelinePlan = null;
+ } else{ // We got something on right side. Yank it and set it as inner plan of right input.
+ rightPipelinePlan = rightPhyPlan.clone();
+ PhysicalOperator root = rightPipelinePlan.getRoots().get(0);
+ rightPipelinePlan.disconnect(root, rightPipelinePlan.getSuccessors(root).get(0));
+ rightPipelinePlan.remove(root);
+ rightPhyPlan.trimBelow(rightPhyLoader);
+ }
+
+ joinOp.setupRightPipeline(rightPipelinePlan);
+ rightSparkOp.setRequestedParallelism(1); // for indexing job
+
+ POLoad rightLoader = (POLoad)rightSparkOp.physicalPlan.getRoots().get(0);
+ joinOp.setSignature(rightLoader.getSignature());
+ LoadFunc rightLoadFunc = rightLoader.getLoadFunc();
+
+ if(IndexableLoadFunc.class.isAssignableFrom(rightLoadFunc.getClass())) {
+ joinOp.setRightLoaderFuncSpec(rightLoader.getLFile().getFuncSpec());
+ joinOp.setRightInputFileName(rightLoader.getLFile().getFileName());
+ curSparkOp.UDFs.add(rightLoader.getLFile().getFuncSpec().toString());
+
+ // we don't need the right rightSparkOp since
+ // the right loader is an IndexableLoadFunc which can handle the index itself
+ sparkPlan.remove(rightSparkOp);
+ if(rightSparkOp == compiledInputs[0]) {
+ compiledInputs[0] = null;
+ } else if(rightSparkOp == compiledInputs[1]) {
+ compiledInputs[1] = null;
+ }
+
+ // validate that the join keys in merge join are only
+ // simple column projections or '*' and not expression - expressions
+ // cannot be handled when the index is built by the storage layer on the sorted
+ // data when the sorted data (and corresponding index) is written.
+ // So merge join will be restricted not have expressions as join keys
+ int numInputs = mPlan.getPredecessors(joinOp).size(); // should be 2
+ for(int i = 0; i < numInputs; i++) {
+ List<PhysicalPlan> keyPlans = joinOp.getInnerPlansOf(i);
+ for (PhysicalPlan keyPlan : keyPlans) {
+ for(PhysicalOperator op : keyPlan) {
+ if(!(op instanceof POProject)) {
+ int errCode = 1106;
+ String errMsg = "Merge join is possible only for simple column or '*' join keys when using " +
+ rightLoader.getLFile().getFuncSpec() + " as the loader";
+ throw new SparkCompilerException(errMsg, errCode, PigException.INPUT);
+ }
+ }
+ }
+ }
+
+ } else {
+ //Replacing POLoad with indexer is disabled for 'merge-sparse' joins. While
+ //this feature would be useful, the current implementation of DefaultIndexableLoader
+ //is not designed to handle multiple calls to seekNear. Specifically, it rereads the entire index
+ //for each call. Some refactoring of this class is required - and then the check below could be removed.
+ if (joinOp.getJoinType() == LOJoin.JOINTYPE.MERGESPARSE) {
+ int errCode = 1104;
+ String errMsg = "Right input of merge-join must implement IndexableLoadFunc. " +
+ "The specified loader " + rightLoadFunc + " doesn't implement it";
+ throw new SparkCompilerException(errMsg,errCode);
+ }
+
+ // Replace POLoad with indexer.
+ if (! (OrderedLoadFunc.class.isAssignableFrom(rightLoadFunc.getClass()))){
+ int errCode = 1104;
+ String errMsg = "Right input of merge-join must implement " +
+ "OrderedLoadFunc interface. The specified loader "
+ + rightLoadFunc + " doesn't implement it";
+ throw new SparkCompilerException(errMsg,errCode);
+ }
+
+ String[] indexerArgs = new String[6];
+ List<PhysicalPlan> rightInpPlans = joinOp.getInnerPlansOf(1);
+ FileSpec origRightLoaderFileSpec = rightLoader.getLFile();
+
+ indexerArgs[0] = origRightLoaderFileSpec.getFuncSpec().toString();
+ indexerArgs[1] = ObjectSerializer.serialize((Serializable)rightInpPlans);
+ indexerArgs[2] = ObjectSerializer.serialize(rightPipelinePlan);
+ indexerArgs[3] = rightLoader.getSignature();
+ indexerArgs[4] = rightLoader.getOperatorKey().scope;
+ indexerArgs[5] = Boolean.toString(true);
+
+ FileSpec lFile = new FileSpec(rightLoader.getLFile().getFileName(),new FuncSpec(MergeJoinIndexer.class.getName(), indexerArgs));
+ rightLoader.setLFile(lFile);
+
+ // (keyFirst1, keyFirst2, .. , position, splitIndex) See MergeJoinIndexer
+ rightSparkOp.useTypedComparator(true);
+ POStore idxStore = getStore();
+ FileSpec idxStrFile = getTempFileSpec();
+ idxStore.setSFile(idxStrFile);
+ rightSparkOp.physicalPlan.addAsLeaf(idxStore);
+ rightSparkOp.markIndexer();
+
+ curSparkOp.UDFs.add(origRightLoaderFileSpec.getFuncSpec().toString());
+
+ // We want to ensure indexing job runs prior to actual join job.
+ // So, connect them in order.
+ sparkPlan.connect(rightSparkOp, curSparkOp);
+
+ // set up the DefaultIndexableLoader for the join operator
+ String[] defaultIndexableLoaderArgs = new String[5];
+ defaultIndexableLoaderArgs[0] = origRightLoaderFileSpec.getFuncSpec().toString();
+ defaultIndexableLoaderArgs[1] = idxStrFile.getFileName();
+ defaultIndexableLoaderArgs[2] = idxStrFile.getFuncSpec().toString();
+ defaultIndexableLoaderArgs[3] = joinOp.getOperatorKey().scope;
+ defaultIndexableLoaderArgs[4] = origRightLoaderFileSpec.getFileName();
+ joinOp.setRightLoaderFuncSpec((new FuncSpec(DefaultIndexableLoader.class.getName(), defaultIndexableLoaderArgs)));
+ joinOp.setRightInputFileName(origRightLoaderFileSpec.getFileName());
+
+ joinOp.setIndexFile(idxStrFile.getFileName());
+ }
+
+ curSparkOp.physicalPlan.addAsLeaf(joinOp);
+ phyToSparkOpMap.put(joinOp, curSparkOp);
+
+ } catch (Exception e) {
+ int errCode = 2034;
+ String msg = "Error compiling operator "
+ + joinOp.getClass().getSimpleName();
+ throw new SparkCompilerException(msg, errCode, PigException.BUG, e);
+ }
}
private void processUDFs(PhysicalPlan plan) throws VisitorException {