You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by zl...@apache.org on 2017/02/09 01:34:12 UTC

svn commit: r1782286 - in /pig/branches/spark: ./ src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/ src/org/apache/pig/backend/hadoop/executionengine/spark/ src/org/apache/pig/backend/hadoop/executionengine/spark/conv...

Author: zly
Date: Thu Feb  9 01:34:12 2017
New Revision: 1782286

URL: http://svn.apache.org/viewvc?rev=1782286&view=rev
Log:
PIG-4891: Implement FR join by broadcasting small rdd not making more copys of data (Nandor via Liyun)

Added:
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoinSpark.java
Modified:
    pig/branches/spark/build.xml
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/JobGraphBuilder.java
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/FRJoinConverter.java
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java

Modified: pig/branches/spark/build.xml
URL: http://svn.apache.org/viewvc/pig/branches/spark/build.xml?rev=1782286&r1=1782285&r2=1782286&view=diff
==============================================================================
--- pig/branches/spark/build.xml (original)
+++ pig/branches/spark/build.xml Thu Feb  9 01:34:12 2017
@@ -1049,7 +1049,7 @@
         <ant dir="${test.e2e.dir}" target="test-tez"/>
     </target>
 
-	<target name="test-e2e-spark" depends="jar, piggybank" description="run end-to-end tests in tez mode">
+	<target name="test-e2e-spark" depends="jar, piggybank" description="run end-to-end tests in spark mode">
 	        <ant dir="${test.e2e.dir}" target="test-spark"/>
 	</target>
 

Added: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoinSpark.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoinSpark.java?rev=1782286&view=auto
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoinSpark.java (added)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoinSpark.java Thu Feb  9 01:34:12 2017
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.SchemaTupleBackend;
+import org.apache.pig.data.SchemaTupleClassGenerator;
+import org.apache.pig.data.SchemaTupleFactory;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+
+import java.util.List;
+import java.util.Map;
+
+public class POFRJoinSpark extends POFRJoin {
+    private static final Log log = LogFactory.getLog(POFRJoinSpark.class);
+
+    private Map<String, List<Tuple>> broadcasts;
+
+    public POFRJoinSpark(POFRJoin copy) throws ExecException {
+        super(copy);
+    }
+
+    @Override
+    protected void setUpHashMap() throws ExecException {
+        log.info("Building replication hash table");
+
+        SchemaTupleFactory[] inputSchemaTupleFactories = new SchemaTupleFactory[inputSchemas.length];
+        SchemaTupleFactory[] keySchemaTupleFactories = new SchemaTupleFactory[inputSchemas.length];
+        for (int i = 0; i < inputSchemas.length; i++) {
+            addSchemaToFactories(inputSchemas[i], inputSchemaTupleFactories, i);
+            addSchemaToFactories(keySchemas[i], keySchemaTupleFactories, i);
+        }
+
+        replicates[fragment] = null;
+        int i = -1;
+        long start = System.currentTimeMillis();
+        for (int k = 0; k < inputSchemas.length; ++k) {
+            ++i;
+
+            SchemaTupleFactory inputSchemaTupleFactory = inputSchemaTupleFactories[i];
+            SchemaTupleFactory keySchemaTupleFactory = keySchemaTupleFactories[i];
+
+            if (i == fragment) {
+                replicates[i] = null;
+                continue;
+            }
+
+            TupleToMapKey replicate = new TupleToMapKey(1000, keySchemaTupleFactory);
+
+            log.debug("Completed setup. Trying to build replication hash table");
+            List<Tuple> tuples = broadcasts.get(parentPlan.getPredecessors(this).get(i).getOperatorKey().toString());
+
+            POLocalRearrange localRearrange = LRs[i];
+
+            for (Tuple t : tuples) {
+                localRearrange.attachInput(t);
+                Result res = localRearrange.getNextTuple();
+                if (getReporter() != null) {
+                    getReporter().progress();
+                }
+                Tuple tuple = (Tuple) res.result;
+                if (isKeyNull(tuple.get(1))) continue;
+                Tuple key = mTupleFactory.newTuple(1);
+                key.set(0, tuple.get(1));
+                Tuple value = getValueTuple(localRearrange, tuple);
+
+                if (replicate.get(key) == null) {
+                    replicate.put(key, new POMergeJoin.TuplesToSchemaTupleList(1, inputSchemaTupleFactory));
+                }
+
+                replicate.get(key).add(value);
+
+            }
+            replicates[i] = replicate;
+        }
+        long end = System.currentTimeMillis();
+        log.debug("Hash Table built. Time taken: " + (end - start));
+    }
+
+    @Override
+    public String name() {
+        return getAliasString() + "FRJoinSpark[" + DataType.findTypeName(resultType)
+                + "]" + " - " + mKey.toString();
+    }
+
+    private void addSchemaToFactories(Schema schema, SchemaTupleFactory[] schemaTupleFactories, int index) {
+        if (schema != null) {
+            log.debug("Using SchemaTuple for FR Join Schema: " + schema);
+            schemaTupleFactories[index] = SchemaTupleBackend.newSchemaTupleFactory(schema, false, SchemaTupleClassGenerator.GenContext.FR_JOIN);
+        }
+    }
+
+    public void attachInputs(Map<String, List<Tuple>> broadcasts) {
+        this.broadcasts = broadcasts;
+    }
+}

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/JobGraphBuilder.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/JobGraphBuilder.java?rev=1782286&r1=1782285&r2=1782286&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/JobGraphBuilder.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/JobGraphBuilder.java Thu Feb  9 01:34:12 2017
@@ -23,7 +23,6 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.LinkedHashMap;
 import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
@@ -43,11 +42,12 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.UDFFinishVisitor;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFRJoin;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POBroadcastSpark;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoin;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
+import org.apache.pig.backend.hadoop.executionengine.spark.converter.FRJoinConverter;
 import org.apache.pig.backend.hadoop.executionengine.spark.converter.RDDConverter;
 import org.apache.pig.backend.hadoop.executionengine.spark.operator.NativeSparkOperator;
 import org.apache.pig.backend.hadoop.executionengine.spark.operator.POJoinGroupSpark;
@@ -56,14 +56,12 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperator;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.PigContext;
-import org.apache.pig.impl.io.FileSpec;
 import org.apache.pig.impl.plan.DependencyOrderWalker;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.tools.pigstats.spark.SparkPigStats;
 import org.apache.pig.tools.pigstats.spark.SparkStatsUtil;
 import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.broadcast.Broadcast;
 import org.apache.spark.rdd.RDD;
 
 import com.google.common.collect.Lists;
@@ -102,7 +100,6 @@ public class JobGraphBuilder extends Spa
     public void visitSparkOp(SparkOperator sparkOp) throws VisitorException {
         new PhyPlanSetter(sparkOp.physicalPlan).visit();
         try {
-            setReplicationForFRJoin(sparkOp.physicalPlan);
             setReplicationForMergeJoin(sparkOp.physicalPlan);
             sparkOperToRDD(sparkOp);
             finishUDFs(sparkOp.physicalPlan);
@@ -117,26 +114,8 @@ public class JobGraphBuilder extends Spa
         }
     }
 
-    private void setReplicationForFRJoin(PhysicalPlan plan) throws IOException {
-        List<Path> filesForMoreReplication = new ArrayList<Path>();
-        List<POFRJoin> pofrJoins = PlanHelper.getPhysicalOperators(plan, POFRJoin.class);
-        if (pofrJoins.size() > 0) {
-            for (POFRJoin pofrJoin : pofrJoins) {
-                FileSpec[] fileSpecs = pofrJoin.getReplFiles();
-                if (fileSpecs != null) {
-                    for (int i = 0; i < fileSpecs.length; i++) {
-                        if (i != pofrJoin.getFragment()) {
-                            filesForMoreReplication.add(new Path(fileSpecs[i].getFileName()));
-                        }
-                    }
-                }
-            }
-        }
-        setReplicationForFiles(filesForMoreReplication);
-    }
-
     private void setReplicationForMergeJoin(PhysicalPlan plan) throws IOException {
-        List<Path> filesForMoreReplication = new ArrayList<Path>();
+        List<Path> filesForMoreReplication = new ArrayList<>();
         List<POMergeJoin> poMergeJoins = PlanHelper.getPhysicalOperators(plan, POMergeJoin.class);
         if (poMergeJoins.size() > 0) {
             for (POMergeJoin poMergeJoin : poMergeJoins) {
@@ -260,7 +239,6 @@ public class JobGraphBuilder extends Spa
             }
         }
 
-
         if (physicalOperator instanceof POSplit) {
             List<PhysicalPlan> successorPlans = ((POSplit) physicalOperator).getPlans();
             for (PhysicalPlan successPlan : successorPlans) {
@@ -283,6 +261,11 @@ public class JobGraphBuilder extends Spa
                     + physicalOperator.getClass().getSimpleName() + " "
                     + physicalOperator);
             List<RDD<Tuple>> allPredRDDs = sortPredecessorRDDs(operatorKeysOfAllPreds);
+
+            if (converter instanceof FRJoinConverter) {
+                setReplicatedInputs(physicalOperator, (FRJoinConverter) converter);
+            }
+
             nextRDD = converter.convert(allPredRDDs, physicalOperator);
 
             if (nextRDD == null) {
@@ -295,6 +278,16 @@ public class JobGraphBuilder extends Spa
         }
     }
 
+    private void setReplicatedInputs(PhysicalOperator physicalOperator, FRJoinConverter converter) {
+        Set<String> replicatedInputs = new HashSet<>();
+        for (PhysicalOperator operator : physicalOperator.getInputs()) {
+            if (operator instanceof POBroadcastSpark) {
+                replicatedInputs.add(((POBroadcastSpark) operator).getBroadcastedVariableName());
+            }
+        }
+        converter.setReplicatedInputs(replicatedInputs);
+    }
+
     private List<PhysicalOperator> getPredecessors(PhysicalPlan plan, PhysicalOperator op) {
         List preds = null;
         if (!(op instanceof POJoinGroupSpark)) {

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java?rev=1782286&r1=1782285&r2=1782286&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java Thu Feb  9 01:34:12 2017
@@ -51,10 +51,11 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POBroadcastSpark;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCollectedGroup;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCounter;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PODistinct;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFRJoin;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFRJoinSpark;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFilter;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLimit;
@@ -71,6 +72,7 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStream;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POUnion;
+import org.apache.pig.backend.hadoop.executionengine.spark.converter.BroadcastConverter;
 import org.apache.pig.backend.hadoop.executionengine.spark.converter.CollectedGroupConverter;
 import org.apache.pig.backend.hadoop.executionengine.spark.converter.CounterConverter;
 import org.apache.pig.backend.hadoop.executionengine.spark.converter.DistinctConverter;
@@ -205,10 +207,11 @@ public class SparkLauncher extends Launc
         convertMap.put(POCounter.class, new CounterConverter());
         convertMap.put(PORank.class, new RankConverter());
         convertMap.put(POStream.class, new StreamConverter());
-        convertMap.put(POFRJoin.class, new FRJoinConverter());
+        convertMap.put(POFRJoinSpark.class, new FRJoinConverter());
         convertMap.put(POMergeCogroup.class, new MergeCogroupConverter());
         convertMap.put(POReduceBySpark.class, new ReduceByConverter());
         convertMap.put(POPreCombinerLocalRearrange.class, new LocalRearrangeConverter());
+        convertMap.put(POBroadcastSpark.class, new BroadcastConverter(sparkContext));
 
         uploadResources(sparkplan);
         new JobGraphBuilder(sparkplan, convertMap, sparkStats, sparkContext, jobMetricsListener, jobGroupID, jobConf, pigContext).visit();

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/FRJoinConverter.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/FRJoinConverter.java?rev=1782286&r1=1782285&r2=1782286&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/FRJoinConverter.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/FRJoinConverter.java Thu Feb  9 01:34:12 2017
@@ -19,48 +19,56 @@ package org.apache.pig.backend.hadoop.ex
 
 import java.io.IOException;
 import java.io.Serializable;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
+import java.util.Set;
 
-import scala.Tuple2;
-import scala.runtime.AbstractFunction1;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFRJoinSpark;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.pig.PigException;
 import org.apache.pig.backend.executionengine.ExecException;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFRJoin;
 import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil;
 import org.apache.pig.data.Tuple;
-import org.apache.pig.data.TupleFactory;
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.function.FlatMapFunction;
-import org.apache.spark.api.java.function.PairFunction;
 import org.apache.spark.rdd.RDD;
 
-import com.google.common.base.Optional;
-
 @SuppressWarnings("serial")
 public class FRJoinConverter implements
         RDDConverter<Tuple, Tuple, POFRJoin> {
     private static final Log LOG = LogFactory.getLog(FRJoinConverter.class);
 
+    private Set<String> replicatedInputs;
+
     public RDD<Tuple> convert(List<RDD<Tuple>> predecessors,
                               POFRJoin poFRJoin) throws IOException {
-        SparkUtil.assertPredecessorSize(predecessors, poFRJoin, 1);
+        SparkUtil.assertPredecessorSizeGreaterThan(predecessors, poFRJoin, 1);
         RDD<Tuple> rdd = predecessors.get(0);
+
+        attachReplicatedInputs((POFRJoinSpark) poFRJoin);
+
         FRJoinFunction frJoinFunction = new FRJoinFunction(poFRJoin);
         return rdd.toJavaRDD().mapPartitions(frJoinFunction, true).rdd();
     }
 
+    private void attachReplicatedInputs(POFRJoinSpark poFRJoin) {
+        Map<String, List<Tuple>> replicatedInputMap = new HashMap<>();
+
+        for (String replicatedInput : replicatedInputs) {
+            replicatedInputMap.put(replicatedInput, SparkUtil.getBroadcastedVars().get(replicatedInput).value());
+        }
+
+        poFRJoin.attachInputs(replicatedInputMap);
+    }
 
     private static class FRJoinFunction implements
             FlatMapFunction<Iterator<Tuple>, Tuple>, Serializable {
-        private POFRJoin poFRJoin;
 
+        private POFRJoin poFRJoin;
         private FRJoinFunction(POFRJoin poFRJoin) {
             this.poFRJoin = poFRJoin;
         }
@@ -92,5 +100,10 @@ public class FRJoinConverter implements
                 }
             };
         }
+
+    }
+
+    public void setReplicatedInputs(Set<String> replicatedInputs) {
+        this.replicatedInputs = replicatedInputs;
     }
 }
\ No newline at end of file

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java?rev=1782286&r1=1782285&r2=1782286&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java Thu Feb  9 01:34:12 2017
@@ -45,10 +45,12 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POBroadcastSpark;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCollectedGroup;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCross;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PODistinct;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFRJoin;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFRJoinSpark;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFilter;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POGlobalRearrange;
@@ -691,47 +693,29 @@ public class SparkCompiler extends PhyPl
 
     @Override
     public void visitFRJoin(POFRJoin op) throws VisitorException {
-        try {
-            FileSpec[] replFiles = new FileSpec[op.getInputs().size()];
-            for (int i = 0; i < replFiles.length; i++) {
-                if (i == op.getFragment()) continue;
-                replFiles[i] = getTempFileSpec();
-            }
-            op.setReplFiles(replFiles);
-            curSparkOp = phyToSparkOpMap.get(op.getInputs().get(op.getFragment()));
+		try {
+			curSparkOp = phyToSparkOpMap.get(op.getInputs().get(op.getFragment()));
+			for (int i = 0; i < compiledInputs.length; i++) {
+				SparkOperator sparkOperator = compiledInputs[i];
+				if (curSparkOp.equals(sparkOperator)) {
+					continue;
+				}
 
-            //We create a sparkOperator to save the result of replicated file to the hdfs
-            //temporary file. We load the temporary file in POFRJoin#setUpHashMap
-            //More detail see PIG-4771
-            for (int i = 0; i < compiledInputs.length; i++) {
-                SparkOperator sparkOp = compiledInputs[i];
-                if (curSparkOp.equals(sparkOp)) {
-                    continue;
-                }
-                POStore store = getStore();
-                store.setSFile(replFiles[i]);
-                sparkOp.physicalPlan.addAsLeaf(store);
-                sparkPlan.connect(sparkOp, curSparkOp);
-            }
+				OperatorKey broadcastKey = new OperatorKey(scope, nig.getNextNodeId(scope));
+				POBroadcastSpark poBroadcastSpark = new POBroadcastSpark(broadcastKey);
+				poBroadcastSpark.setBroadcastedVariableName(broadcastKey.toString());
 
-            curSparkOp.physicalPlan.addAsLeaf(op);
+				sparkOperator.physicalPlan.addAsLeaf(poBroadcastSpark);
+			}
 
-            List<List<PhysicalPlan>> joinPlans = op.getJoinPlans();
-            if (joinPlans != null) {
-                for (List<PhysicalPlan> joinPlan : joinPlans) {
-                    if (joinPlan != null) {
-                        for (PhysicalPlan plan : joinPlan) {
-                            processUDFs(plan);
-                        }
-                    }
-                }
-            }
-            phyToSparkOpMap.put(op, curSparkOp);
-        } catch (Exception e) {
-            int errCode = 2034;
-            String msg = "Error compiling operator " + op.getClass().getSimpleName();
-            throw new SparkCompilerException(msg, errCode, PigException.BUG, e);
-        }
+			POFRJoinSpark poFRJoinSpark = new POFRJoinSpark(op);
+			addToPlan(poFRJoinSpark);
+			phyToSparkOpMap.put(op, curSparkOp);
+		} catch (Exception e) {
+			int errCode = 2034;
+			String msg = "Error compiling operator " + op.getClass().getSimpleName();
+			throw new SparkCompilerException(msg, errCode, PigException.BUG, e);
+		}
     }
 
 	@Override