You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by zl...@apache.org on 2017/03/29 21:42:49 UTC

svn commit: r1789416 - in /pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark: SparkLauncher.java optimizer/JoinGroupOptimizerSpark.java

Author: zly
Date: Wed Mar 29 21:42:48 2017
New Revision: 1789416

URL: http://svn.apache.org/viewvc?rev=1789416&view=rev
Log:
PIG-5163:MultiQuery_Streaming_1 is failing with spark exec type(Nandor via Liyun)

Modified:
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/JoinGroupOptimizerSpark.java

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java?rev=1789416&r1=1789415&r2=1789416&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java Wed Mar 29 21:42:48 2017
@@ -257,7 +257,7 @@ public class SparkLauncher extends Launc
             skOptimizer.visit();
         }
 
-        boolean isAccum = conf.getBoolean("opt.accumulator", true);
+        boolean isAccum = conf.getBoolean(PigConfiguration.PIG_OPT_ACCUMULATOR, true);
         if (isAccum) {
             AccumulatorOptimizer accum = new AccumulatorOptimizer(plan);
             accum.visit();

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/JoinGroupOptimizerSpark.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/JoinGroupOptimizerSpark.java?rev=1789416&r1=1789415&r2=1789416&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/JoinGroupOptimizerSpark.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/JoinGroupOptimizerSpark.java Wed Mar 29 21:42:48 2017
@@ -37,8 +37,10 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperator;
 import org.apache.pig.impl.plan.DependencyOrderWalker;
 import org.apache.pig.impl.plan.DepthFirstWalker;
+import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.PlanException;
 import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.util.MultiMap;
 
 /**
  * Collapse LocalRearrange,GlobalRearrange,Package to POJoinGroupSpark to reduce unnecessary
@@ -57,18 +59,18 @@ public class JoinGroupOptimizerSpark ext
             GlobalRearrangeDiscover glrDiscover = new GlobalRearrangeDiscover(sparkOp.physicalPlan);
             glrDiscover.visit();
             List<PhysicalPlan> plans = glrDiscover.getPlansWithJoinAndGroup();
-            handlePlans(plans);
+            handlePlans(plans, sparkOp);
         }
 
     }
 
-    private void handlePlans(List<PhysicalPlan> plans) throws VisitorException {
+    private void handlePlans(List<PhysicalPlan> plans, SparkOperator sparkOp) throws VisitorException {
         for(int i=0;i<plans.size();i++){
             PhysicalPlan planWithJoinAndGroup = plans.get(i);
             POGlobalRearrangeSpark glrSpark = PlanHelper.getPhysicalOperators(planWithJoinAndGroup,POGlobalRearrangeSpark.class).get(0);
             if (verifyJoinOrGroupCase(plans.get(i), glrSpark)) {
                 try {
-                    restructSparkOp(planWithJoinAndGroup, glrSpark);
+                    restructSparkOp(planWithJoinAndGroup, glrSpark, sparkOp);
                 } catch (PlanException e) {
                     throw new RuntimeException("GlobalRearrangeDiscover#visitSparkOp fails: ", e);
                 }
@@ -100,7 +102,7 @@ public class JoinGroupOptimizerSpark ext
     }
 
     //collapse LRA,GRA,PKG to POJoinGroupSpark
-    private void restructSparkOp(PhysicalPlan plan,POGlobalRearrangeSpark glaOp) throws PlanException {
+    private void restructSparkOp(PhysicalPlan plan,POGlobalRearrangeSpark glaOp, SparkOperator sparkOp) throws PlanException {
 
         List<PhysicalOperator> predes = plan.getPredecessors(glaOp);
         if (predes != null) {
@@ -158,6 +160,7 @@ public class JoinGroupOptimizerSpark ext
             plan.disconnect(pkgOp, pkgSuccessor);
             plan.connect(joinSpark, pkgSuccessor);
             for (POLocalRearrange lra : lraOps) {
+                replaceMultiqueryMapping(sparkOp, lra, joinSpark);
                 plan.remove(lra);
             }
             plan.remove(glaOp);
@@ -165,6 +168,15 @@ public class JoinGroupOptimizerSpark ext
         }
     }
 
+    private void replaceMultiqueryMapping(SparkOperator sparkOperator, PhysicalOperator from, PhysicalOperator to) {
+        MultiMap<OperatorKey, OperatorKey> multiQueryOptimizeConnectionItems = sparkOperator.getMultiQueryOptimizeConnectionItem();
+        if (multiQueryOptimizeConnectionItems.containsKey(from.getOperatorKey())) {
+            List<OperatorKey> value = multiQueryOptimizeConnectionItems.get(from.getOperatorKey());
+            multiQueryOptimizeConnectionItems.removeKey(from.getOperatorKey());
+            multiQueryOptimizeConnectionItems.put(to.getOperatorKey(), value);
+        }
+    }
+
     private boolean verifyJoinOrGroupCase(PhysicalPlan plan, POGlobalRearrangeSpark glaOp) {
         List<PhysicalOperator> lraOps = plan.getPredecessors(glaOp);
         List<PhysicalOperator> pkgOps = plan.getSuccessors(glaOp);