You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by zl...@apache.org on 2017/03/29 21:42:49 UTC
svn commit: r1789416 - in
/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark:
SparkLauncher.java optimizer/JoinGroupOptimizerSpark.java
Author: zly
Date: Wed Mar 29 21:42:48 2017
New Revision: 1789416
URL: http://svn.apache.org/viewvc?rev=1789416&view=rev
Log:
PIG-5163:MultiQuery_Streaming_1 is failing with spark exec type(Nandor via Liyun)
Modified:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/JoinGroupOptimizerSpark.java
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java?rev=1789416&r1=1789415&r2=1789416&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java Wed Mar 29 21:42:48 2017
@@ -257,7 +257,7 @@ public class SparkLauncher extends Launc
skOptimizer.visit();
}
- boolean isAccum = conf.getBoolean("opt.accumulator", true);
+ boolean isAccum = conf.getBoolean(PigConfiguration.PIG_OPT_ACCUMULATOR, true);
if (isAccum) {
AccumulatorOptimizer accum = new AccumulatorOptimizer(plan);
accum.visit();
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/JoinGroupOptimizerSpark.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/JoinGroupOptimizerSpark.java?rev=1789416&r1=1789415&r2=1789416&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/JoinGroupOptimizerSpark.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/JoinGroupOptimizerSpark.java Wed Mar 29 21:42:48 2017
@@ -37,8 +37,10 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperator;
import org.apache.pig.impl.plan.DependencyOrderWalker;
import org.apache.pig.impl.plan.DepthFirstWalker;
+import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.PlanException;
import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.util.MultiMap;
/**
* Collapse LocalRearrange,GlobalRearrange,Package to POJoinGroupSpark to reduce unnecessary
@@ -57,18 +59,18 @@ public class JoinGroupOptimizerSpark ext
GlobalRearrangeDiscover glrDiscover = new GlobalRearrangeDiscover(sparkOp.physicalPlan);
glrDiscover.visit();
List<PhysicalPlan> plans = glrDiscover.getPlansWithJoinAndGroup();
- handlePlans(plans);
+ handlePlans(plans, sparkOp);
}
}
- private void handlePlans(List<PhysicalPlan> plans) throws VisitorException {
+ private void handlePlans(List<PhysicalPlan> plans, SparkOperator sparkOp) throws VisitorException {
for(int i=0;i<plans.size();i++){
PhysicalPlan planWithJoinAndGroup = plans.get(i);
POGlobalRearrangeSpark glrSpark = PlanHelper.getPhysicalOperators(planWithJoinAndGroup,POGlobalRearrangeSpark.class).get(0);
if (verifyJoinOrGroupCase(plans.get(i), glrSpark)) {
try {
- restructSparkOp(planWithJoinAndGroup, glrSpark);
+ restructSparkOp(planWithJoinAndGroup, glrSpark, sparkOp);
} catch (PlanException e) {
throw new RuntimeException("GlobalRearrangeDiscover#visitSparkOp fails: ", e);
}
@@ -100,7 +102,7 @@ public class JoinGroupOptimizerSpark ext
}
//collapse LRA,GRA,PKG to POJoinGroupSpark
- private void restructSparkOp(PhysicalPlan plan,POGlobalRearrangeSpark glaOp) throws PlanException {
+ private void restructSparkOp(PhysicalPlan plan,POGlobalRearrangeSpark glaOp, SparkOperator sparkOp) throws PlanException {
List<PhysicalOperator> predes = plan.getPredecessors(glaOp);
if (predes != null) {
@@ -158,6 +160,7 @@ public class JoinGroupOptimizerSpark ext
plan.disconnect(pkgOp, pkgSuccessor);
plan.connect(joinSpark, pkgSuccessor);
for (POLocalRearrange lra : lraOps) {
+ replaceMultiqueryMapping(sparkOp, lra, joinSpark);
plan.remove(lra);
}
plan.remove(glaOp);
@@ -165,6 +168,15 @@ public class JoinGroupOptimizerSpark ext
}
}
+ private void replaceMultiqueryMapping(SparkOperator sparkOperator, PhysicalOperator from, PhysicalOperator to) {
+ MultiMap<OperatorKey, OperatorKey> multiQueryOptimizeConnectionItems = sparkOperator.getMultiQueryOptimizeConnectionItem();
+ if (multiQueryOptimizeConnectionItems.containsKey(from.getOperatorKey())) {
+ List<OperatorKey> value = multiQueryOptimizeConnectionItems.get(from.getOperatorKey());
+ multiQueryOptimizeConnectionItems.removeKey(from.getOperatorKey());
+ multiQueryOptimizeConnectionItems.put(to.getOperatorKey(), value);
+ }
+ }
+
private boolean verifyJoinOrGroupCase(PhysicalPlan plan, POGlobalRearrangeSpark glaOp) {
List<PhysicalOperator> lraOps = plan.getPredecessors(glaOp);
List<PhysicalOperator> pkgOps = plan.getSuccessors(glaOp);