You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by zl...@apache.org on 2017/02/09 01:34:12 UTC
svn commit: r1782286 - in /pig/branches/spark: ./
src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/
src/org/apache/pig/backend/hadoop/executionengine/spark/
src/org/apache/pig/backend/hadoop/executionengine/spark/conv...
Author: zly
Date: Thu Feb 9 01:34:12 2017
New Revision: 1782286
URL: http://svn.apache.org/viewvc?rev=1782286&view=rev
Log:
PIG-4891: Implement FR join by broadcasting small rdd not making more copys of data (Nandor via Liyun)
Added:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoinSpark.java
Modified:
pig/branches/spark/build.xml
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/JobGraphBuilder.java
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/FRJoinConverter.java
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java
Modified: pig/branches/spark/build.xml
URL: http://svn.apache.org/viewvc/pig/branches/spark/build.xml?rev=1782286&r1=1782285&r2=1782286&view=diff
==============================================================================
--- pig/branches/spark/build.xml (original)
+++ pig/branches/spark/build.xml Thu Feb 9 01:34:12 2017
@@ -1049,7 +1049,7 @@
<ant dir="${test.e2e.dir}" target="test-tez"/>
</target>
- <target name="test-e2e-spark" depends="jar, piggybank" description="run end-to-end tests in tez mode">
+ <target name="test-e2e-spark" depends="jar, piggybank" description="run end-to-end tests in spark mode">
<ant dir="${test.e2e.dir}" target="test-spark"/>
</target>
Added: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoinSpark.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoinSpark.java?rev=1782286&view=auto
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoinSpark.java (added)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoinSpark.java Thu Feb 9 01:34:12 2017
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.SchemaTupleBackend;
+import org.apache.pig.data.SchemaTupleClassGenerator;
+import org.apache.pig.data.SchemaTupleFactory;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+
+import java.util.List;
+import java.util.Map;
+
+public class POFRJoinSpark extends POFRJoin {
+ private static final Log log = LogFactory.getLog(POFRJoinSpark.class);
+
+ private Map<String, List<Tuple>> broadcasts;
+
+ public POFRJoinSpark(POFRJoin copy) throws ExecException {
+ super(copy);
+ }
+
+ @Override
+ protected void setUpHashMap() throws ExecException {
+ log.info("Building replication hash table");
+
+ SchemaTupleFactory[] inputSchemaTupleFactories = new SchemaTupleFactory[inputSchemas.length];
+ SchemaTupleFactory[] keySchemaTupleFactories = new SchemaTupleFactory[inputSchemas.length];
+ for (int i = 0; i < inputSchemas.length; i++) {
+ addSchemaToFactories(inputSchemas[i], inputSchemaTupleFactories, i);
+ addSchemaToFactories(keySchemas[i], keySchemaTupleFactories, i);
+ }
+
+ replicates[fragment] = null;
+ int i = -1;
+ long start = System.currentTimeMillis();
+ for (int k = 0; k < inputSchemas.length; ++k) {
+ ++i;
+
+ SchemaTupleFactory inputSchemaTupleFactory = inputSchemaTupleFactories[i];
+ SchemaTupleFactory keySchemaTupleFactory = keySchemaTupleFactories[i];
+
+ if (i == fragment) {
+ replicates[i] = null;
+ continue;
+ }
+
+ TupleToMapKey replicate = new TupleToMapKey(1000, keySchemaTupleFactory);
+
+ log.debug("Completed setup. Trying to build replication hash table");
+ List<Tuple> tuples = broadcasts.get(parentPlan.getPredecessors(this).get(i).getOperatorKey().toString());
+
+ POLocalRearrange localRearrange = LRs[i];
+
+ for (Tuple t : tuples) {
+ localRearrange.attachInput(t);
+ Result res = localRearrange.getNextTuple();
+ if (getReporter() != null) {
+ getReporter().progress();
+ }
+ Tuple tuple = (Tuple) res.result;
+ if (isKeyNull(tuple.get(1))) continue;
+ Tuple key = mTupleFactory.newTuple(1);
+ key.set(0, tuple.get(1));
+ Tuple value = getValueTuple(localRearrange, tuple);
+
+ if (replicate.get(key) == null) {
+ replicate.put(key, new POMergeJoin.TuplesToSchemaTupleList(1, inputSchemaTupleFactory));
+ }
+
+ replicate.get(key).add(value);
+
+ }
+ replicates[i] = replicate;
+ }
+ long end = System.currentTimeMillis();
+ log.debug("Hash Table built. Time taken: " + (end - start));
+ }
+
+ @Override
+ public String name() {
+ return getAliasString() + "FRJoinSpark[" + DataType.findTypeName(resultType)
+ + "]" + " - " + mKey.toString();
+ }
+
+ private void addSchemaToFactories(Schema schema, SchemaTupleFactory[] schemaTupleFactories, int index) {
+ if (schema != null) {
+ log.debug("Using SchemaTuple for FR Join Schema: " + schema);
+ schemaTupleFactories[index] = SchemaTupleBackend.newSchemaTupleFactory(schema, false, SchemaTupleClassGenerator.GenContext.FR_JOIN);
+ }
+ }
+
+ public void attachInputs(Map<String, List<Tuple>> broadcasts) {
+ this.broadcasts = broadcasts;
+ }
+}
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/JobGraphBuilder.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/JobGraphBuilder.java?rev=1782286&r1=1782285&r2=1782286&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/JobGraphBuilder.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/JobGraphBuilder.java Thu Feb 9 01:34:12 2017
@@ -23,7 +23,6 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
@@ -43,11 +42,12 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.UDFFinishVisitor;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFRJoin;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POBroadcastSpark;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoin;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
+import org.apache.pig.backend.hadoop.executionengine.spark.converter.FRJoinConverter;
import org.apache.pig.backend.hadoop.executionengine.spark.converter.RDDConverter;
import org.apache.pig.backend.hadoop.executionengine.spark.operator.NativeSparkOperator;
import org.apache.pig.backend.hadoop.executionengine.spark.operator.POJoinGroupSpark;
@@ -56,14 +56,12 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperator;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.PigContext;
-import org.apache.pig.impl.io.FileSpec;
import org.apache.pig.impl.plan.DependencyOrderWalker;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.VisitorException;
import org.apache.pig.tools.pigstats.spark.SparkPigStats;
import org.apache.pig.tools.pigstats.spark.SparkStatsUtil;
import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.rdd.RDD;
import com.google.common.collect.Lists;
@@ -102,7 +100,6 @@ public class JobGraphBuilder extends Spa
public void visitSparkOp(SparkOperator sparkOp) throws VisitorException {
new PhyPlanSetter(sparkOp.physicalPlan).visit();
try {
- setReplicationForFRJoin(sparkOp.physicalPlan);
setReplicationForMergeJoin(sparkOp.physicalPlan);
sparkOperToRDD(sparkOp);
finishUDFs(sparkOp.physicalPlan);
@@ -117,26 +114,8 @@ public class JobGraphBuilder extends Spa
}
}
- private void setReplicationForFRJoin(PhysicalPlan plan) throws IOException {
- List<Path> filesForMoreReplication = new ArrayList<Path>();
- List<POFRJoin> pofrJoins = PlanHelper.getPhysicalOperators(plan, POFRJoin.class);
- if (pofrJoins.size() > 0) {
- for (POFRJoin pofrJoin : pofrJoins) {
- FileSpec[] fileSpecs = pofrJoin.getReplFiles();
- if (fileSpecs != null) {
- for (int i = 0; i < fileSpecs.length; i++) {
- if (i != pofrJoin.getFragment()) {
- filesForMoreReplication.add(new Path(fileSpecs[i].getFileName()));
- }
- }
- }
- }
- }
- setReplicationForFiles(filesForMoreReplication);
- }
-
private void setReplicationForMergeJoin(PhysicalPlan plan) throws IOException {
- List<Path> filesForMoreReplication = new ArrayList<Path>();
+ List<Path> filesForMoreReplication = new ArrayList<>();
List<POMergeJoin> poMergeJoins = PlanHelper.getPhysicalOperators(plan, POMergeJoin.class);
if (poMergeJoins.size() > 0) {
for (POMergeJoin poMergeJoin : poMergeJoins) {
@@ -260,7 +239,6 @@ public class JobGraphBuilder extends Spa
}
}
-
if (physicalOperator instanceof POSplit) {
List<PhysicalPlan> successorPlans = ((POSplit) physicalOperator).getPlans();
for (PhysicalPlan successPlan : successorPlans) {
@@ -283,6 +261,11 @@ public class JobGraphBuilder extends Spa
+ physicalOperator.getClass().getSimpleName() + " "
+ physicalOperator);
List<RDD<Tuple>> allPredRDDs = sortPredecessorRDDs(operatorKeysOfAllPreds);
+
+ if (converter instanceof FRJoinConverter) {
+ setReplicatedInputs(physicalOperator, (FRJoinConverter) converter);
+ }
+
nextRDD = converter.convert(allPredRDDs, physicalOperator);
if (nextRDD == null) {
@@ -295,6 +278,16 @@ public class JobGraphBuilder extends Spa
}
}
+ private void setReplicatedInputs(PhysicalOperator physicalOperator, FRJoinConverter converter) {
+ Set<String> replicatedInputs = new HashSet<>();
+ for (PhysicalOperator operator : physicalOperator.getInputs()) {
+ if (operator instanceof POBroadcastSpark) {
+ replicatedInputs.add(((POBroadcastSpark) operator).getBroadcastedVariableName());
+ }
+ }
+ converter.setReplicatedInputs(replicatedInputs);
+ }
+
private List<PhysicalOperator> getPredecessors(PhysicalPlan plan, PhysicalOperator op) {
List preds = null;
if (!(op instanceof POJoinGroupSpark)) {
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java?rev=1782286&r1=1782285&r2=1782286&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java Thu Feb 9 01:34:12 2017
@@ -51,10 +51,11 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POBroadcastSpark;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCollectedGroup;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCounter;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PODistinct;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFRJoin;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFRJoinSpark;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFilter;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLimit;
@@ -71,6 +72,7 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStream;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POUnion;
+import org.apache.pig.backend.hadoop.executionengine.spark.converter.BroadcastConverter;
import org.apache.pig.backend.hadoop.executionengine.spark.converter.CollectedGroupConverter;
import org.apache.pig.backend.hadoop.executionengine.spark.converter.CounterConverter;
import org.apache.pig.backend.hadoop.executionengine.spark.converter.DistinctConverter;
@@ -205,10 +207,11 @@ public class SparkLauncher extends Launc
convertMap.put(POCounter.class, new CounterConverter());
convertMap.put(PORank.class, new RankConverter());
convertMap.put(POStream.class, new StreamConverter());
- convertMap.put(POFRJoin.class, new FRJoinConverter());
+ convertMap.put(POFRJoinSpark.class, new FRJoinConverter());
convertMap.put(POMergeCogroup.class, new MergeCogroupConverter());
convertMap.put(POReduceBySpark.class, new ReduceByConverter());
convertMap.put(POPreCombinerLocalRearrange.class, new LocalRearrangeConverter());
+ convertMap.put(POBroadcastSpark.class, new BroadcastConverter(sparkContext));
uploadResources(sparkplan);
new JobGraphBuilder(sparkplan, convertMap, sparkStats, sparkContext, jobMetricsListener, jobGroupID, jobConf, pigContext).visit();
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/FRJoinConverter.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/FRJoinConverter.java?rev=1782286&r1=1782285&r2=1782286&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/FRJoinConverter.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/FRJoinConverter.java Thu Feb 9 01:34:12 2017
@@ -19,48 +19,56 @@ package org.apache.pig.backend.hadoop.ex
import java.io.IOException;
import java.io.Serializable;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
+import java.util.Set;
-import scala.Tuple2;
-import scala.runtime.AbstractFunction1;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFRJoinSpark;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.pig.PigException;
import org.apache.pig.backend.executionengine.ExecException;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFRJoin;
import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil;
import org.apache.pig.data.Tuple;
-import org.apache.pig.data.TupleFactory;
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.FlatMapFunction;
-import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.rdd.RDD;
-import com.google.common.base.Optional;
-
@SuppressWarnings("serial")
public class FRJoinConverter implements
RDDConverter<Tuple, Tuple, POFRJoin> {
private static final Log LOG = LogFactory.getLog(FRJoinConverter.class);
+ private Set<String> replicatedInputs;
+
public RDD<Tuple> convert(List<RDD<Tuple>> predecessors,
POFRJoin poFRJoin) throws IOException {
- SparkUtil.assertPredecessorSize(predecessors, poFRJoin, 1);
+ SparkUtil.assertPredecessorSizeGreaterThan(predecessors, poFRJoin, 1);
RDD<Tuple> rdd = predecessors.get(0);
+
+ attachReplicatedInputs((POFRJoinSpark) poFRJoin);
+
FRJoinFunction frJoinFunction = new FRJoinFunction(poFRJoin);
return rdd.toJavaRDD().mapPartitions(frJoinFunction, true).rdd();
}
+ private void attachReplicatedInputs(POFRJoinSpark poFRJoin) {
+ Map<String, List<Tuple>> replicatedInputMap = new HashMap<>();
+
+ for (String replicatedInput : replicatedInputs) {
+ replicatedInputMap.put(replicatedInput, SparkUtil.getBroadcastedVars().get(replicatedInput).value());
+ }
+
+ poFRJoin.attachInputs(replicatedInputMap);
+ }
private static class FRJoinFunction implements
FlatMapFunction<Iterator<Tuple>, Tuple>, Serializable {
- private POFRJoin poFRJoin;
+ private POFRJoin poFRJoin;
private FRJoinFunction(POFRJoin poFRJoin) {
this.poFRJoin = poFRJoin;
}
@@ -92,5 +100,10 @@ public class FRJoinConverter implements
}
};
}
+
+ }
+
+ public void setReplicatedInputs(Set<String> replicatedInputs) {
+ this.replicatedInputs = replicatedInputs;
}
}
\ No newline at end of file
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java?rev=1782286&r1=1782285&r2=1782286&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java Thu Feb 9 01:34:12 2017
@@ -45,10 +45,12 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POBroadcastSpark;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCollectedGroup;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCross;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PODistinct;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFRJoin;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFRJoinSpark;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFilter;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POGlobalRearrange;
@@ -691,47 +693,29 @@ public class SparkCompiler extends PhyPl
@Override
public void visitFRJoin(POFRJoin op) throws VisitorException {
- try {
- FileSpec[] replFiles = new FileSpec[op.getInputs().size()];
- for (int i = 0; i < replFiles.length; i++) {
- if (i == op.getFragment()) continue;
- replFiles[i] = getTempFileSpec();
- }
- op.setReplFiles(replFiles);
- curSparkOp = phyToSparkOpMap.get(op.getInputs().get(op.getFragment()));
+ try {
+ curSparkOp = phyToSparkOpMap.get(op.getInputs().get(op.getFragment()));
+ for (int i = 0; i < compiledInputs.length; i++) {
+ SparkOperator sparkOperator = compiledInputs[i];
+ if (curSparkOp.equals(sparkOperator)) {
+ continue;
+ }
- //We create a sparkOperator to save the result of replicated file to the hdfs
- //temporary file. We load the temporary file in POFRJoin#setUpHashMap
- //More detail see PIG-4771
- for (int i = 0; i < compiledInputs.length; i++) {
- SparkOperator sparkOp = compiledInputs[i];
- if (curSparkOp.equals(sparkOp)) {
- continue;
- }
- POStore store = getStore();
- store.setSFile(replFiles[i]);
- sparkOp.physicalPlan.addAsLeaf(store);
- sparkPlan.connect(sparkOp, curSparkOp);
- }
+ OperatorKey broadcastKey = new OperatorKey(scope, nig.getNextNodeId(scope));
+ POBroadcastSpark poBroadcastSpark = new POBroadcastSpark(broadcastKey);
+ poBroadcastSpark.setBroadcastedVariableName(broadcastKey.toString());
- curSparkOp.physicalPlan.addAsLeaf(op);
+ sparkOperator.physicalPlan.addAsLeaf(poBroadcastSpark);
+ }
- List<List<PhysicalPlan>> joinPlans = op.getJoinPlans();
- if (joinPlans != null) {
- for (List<PhysicalPlan> joinPlan : joinPlans) {
- if (joinPlan != null) {
- for (PhysicalPlan plan : joinPlan) {
- processUDFs(plan);
- }
- }
- }
- }
- phyToSparkOpMap.put(op, curSparkOp);
- } catch (Exception e) {
- int errCode = 2034;
- String msg = "Error compiling operator " + op.getClass().getSimpleName();
- throw new SparkCompilerException(msg, errCode, PigException.BUG, e);
- }
+ POFRJoinSpark poFRJoinSpark = new POFRJoinSpark(op);
+ addToPlan(poFRJoinSpark);
+ phyToSparkOpMap.put(op, curSparkOp);
+ } catch (Exception e) {
+ int errCode = 2034;
+ String msg = "Error compiling operator " + op.getClass().getSimpleName();
+ throw new SparkCompilerException(msg, errCode, PigException.BUG, e);
+ }
}
@Override