You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by pr...@apache.org on 2016/07/04 06:49:09 UTC

svn commit: r1751216 - in /pig/branches/spark/src/org/apache/pig: backend/hadoop/executionengine/physicalLayer/plans/PhysicalPlan.java backend/hadoop/executionengine/spark/optimizer/MultiQueryOptimizerSpark.java impl/plan/OperatorPlan.java

Author: praveen
Date: Mon Jul  4 06:49:09 2016
New Revision: 1751216

URL: http://svn.apache.org/viewvc?rev=1751216&view=rev
Log:
PIG-4871: Not use OperatorPlan#forceConnect in MultiQueryOptimizationSpark (Liyun via Praveen)

Modified:
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhysicalPlan.java
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/MultiQueryOptimizerSpark.java
    pig/branches/spark/src/org/apache/pig/impl/plan/OperatorPlan.java

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhysicalPlan.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhysicalPlan.java?rev=1751216&r1=1751215&r2=1751216&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhysicalPlan.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhysicalPlan.java Mon Jul  4 06:49:09 2016
@@ -143,25 +143,6 @@ public class PhysicalPlan extends Operat
         to.setInputs(getPredecessors(to));
     }
 
-    /**
-     * connect from and to and ignore some judgements: like ignoring judge whether from operator supports multiOutputs
-     * and whether to operator supports multiInputs
-     *
-     * @param from
-     * @param to
-     */
-    public void forceConnect(PhysicalOperator from, PhysicalOperator to) throws PlanException {
-        super.forceConnect(from, to);
-        to.setInputs(getPredecessors(to));
-    }
-
-    /*public void connect(List<PhysicalOperator> from, PhysicalOperator to) throws IOException{
-        if(!to.supportsMultipleInputs()){
-            throw new IOException("Invalid Operation on " + to.name() + ". It doesn't support multiple inputs.");
-        }
-
-    }*/
-
     @Override
     public void remove(PhysicalOperator op) {
         op.setInputs(null);

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/MultiQueryOptimizerSpark.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/MultiQueryOptimizerSpark.java?rev=1751216&r1=1751215&r2=1751216&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/MultiQueryOptimizerSpark.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/MultiQueryOptimizerSpark.java Mon Jul  4 06:49:09 2016
@@ -18,11 +18,14 @@
 package org.apache.pig.backend.hadoop.executionengine.spark.optimizer;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.pig.PigException;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
@@ -34,6 +37,7 @@ import org.apache.pig.impl.plan.Operator
 import org.apache.pig.impl.plan.PlanException;
 import org.apache.pig.impl.plan.ReverseDependencyOrderWalker;
 import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.plan.optimizer.OptimizerException;
 
 
 /**
@@ -77,30 +81,35 @@ public class MultiQueryOptimizerSpark ex
 
             if (splittees.size() == 1) {
                 // We don't need a POSplit here, we can merge the splittee into spliter
+                SparkOperator spliter = sparkOp;
                 SparkOperator singleSplitee = splittees.get(0);
-                POStore poStore = null;
-                PhysicalOperator firstNodeLeaf = sparkOp.physicalPlan.getLeaves().get(0);
-                if (firstNodeLeaf instanceof POStore) {
-                    poStore = (POStore) firstNodeLeaf;
-                }
-                PhysicalOperator firstNodeLeafPred = sparkOp.physicalPlan.getPredecessors(firstNodeLeaf).get(0);
-                sparkOp.physicalPlan.remove(poStore);  // remove  unnecessary store
-                List<PhysicalOperator> firstNodeRoots = singleSplitee.physicalPlan.getRoots();
-                sparkOp.physicalPlan.merge(singleSplitee.physicalPlan);
-                for (int j = 0; j < firstNodeRoots.size(); j++) {
-                    PhysicalOperator firstNodeRoot = firstNodeRoots.get(j);
-                    POLoad poLoad = null;
-                    if (firstNodeRoot instanceof POLoad && poStore != null) {
-                        poLoad = (POLoad) firstNodeRoot;
-                        if (poLoad.getLFile().getFileName().equals(poStore.getSFile().getFileName())) {
-                            PhysicalOperator firstNodeRootSucc = sparkOp.physicalPlan.getSuccessors(firstNodeRoot).get(0);
-                            sparkOp.physicalPlan.remove(poLoad); // remove unnecessary load
-                            sparkOp.physicalPlan.forceConnect(firstNodeLeafPred, firstNodeRootSucc);
+                List<PhysicalOperator> roots = singleSplitee.physicalPlan.getRoots();
+                List<PhysicalOperator> rootCopys = new ArrayList<PhysicalOperator>(roots);
+                //sort the roots by OperatorKey
+                //for the first element of roots, merge the physical plan of spliter and splittee
+                //for the other elements of roots,merge the clone physical plan of spliter and splittee
+                //the clone physical plan will have same type of physical operators but have more bigger OperatorKey
+                //thus physical operator with bigger OperatorKey will be executed later than those have small OperatorKey(see JobGraphBuilder.sortPredecessorRDDs())
+                Collections.sort(rootCopys);
+                List<PhysicalPlan> spliterPhysicalPlan = getPhysicalPlans(spliter.physicalPlan, rootCopys.size());
+                int i = 0;
+                for (PhysicalOperator root : rootCopys) {
+                    if (root instanceof POLoad) {
+                        POLoad load = (POLoad) root;
+                        PhysicalPlan plClone = spliterPhysicalPlan.get(i);
+                        POStore store = (POStore) plClone.getLeaves().get(0);
+                        if (load.getLFile().getFileName().equals(store.getSFile().getFileName())) {
+                            plClone.remove(store);
+                            PhysicalOperator succOfload = singleSplitee.physicalPlan.getSuccessors(load).get(0);
+                            singleSplitee.physicalPlan.remove(load);
+                            mergePlanAWithPlanB(singleSplitee.physicalPlan, plClone, succOfload);
+                            i++;
                         }
                     }
                 }
-                addSubPlanPropertiesToParent(sparkOp, singleSplitee);
-                removeSplittee(getPlan(), sparkOp, singleSplitee);
+
+                addSubPlanPropertiesToParent(singleSplitee, spliter);
+                removeSpliter(getPlan(), spliter, singleSplitee);
             } else {
                 //If the size of splittee is more than 1, we need create a split which type is POSplit, merge all the physical plans
                 // of splittees to the physical plan of split and remove the splittees.
@@ -115,7 +124,7 @@ public class MultiQueryOptimizerSpark ex
                     ArrayList<SparkOperator> spliteesCopy = new ArrayList
                             <SparkOperator>(splittees);
                     for (SparkOperator splitee : spliteesCopy) {
-                        List<PhysicalOperator> rootsOfSplitee = splitee.physicalPlan.getRoots();
+                        List<PhysicalOperator> rootsOfSplitee = new ArrayList(splitee.physicalPlan.getRoots());
                         for (int i = 0; i < rootsOfSplitee.size(); i++) {
                             if (rootsOfSplitee.get(i) instanceof POLoad) {
                                 POLoad poLoad = (POLoad) rootsOfSplitee.get(i);
@@ -129,14 +138,13 @@ public class MultiQueryOptimizerSpark ex
                                         LOG.debug(String.format("add multiQueryOptimize connection item: to:%s, from:%s for %s",
                                                 successorOfPoLoad.toString(), predOfPoStore.getOperatorKey().toString(), splitee.getOperatorKey()));
                                     }
-                                    poSplit.addPlan(splitee.physicalPlan);
-                                    addSubPlanPropertiesToParent(sparkOp, splitee);
-                                    removeSplittee(getPlan(), sparkOp, splitee);
                                 }
                             }
                         }
+                        poSplit.addPlan(splitee.physicalPlan);
+                        addSubPlanPropertiesToParent(sparkOp, splitee);
+                        removeSplittee(getPlan(), sparkOp, splitee);
                     }
-
                     sparkOp.physicalPlan.addAsLeaf(poSplit);
                 }
             }
@@ -145,6 +153,40 @@ public class MultiQueryOptimizerSpark ex
         }
     }
 
+    private List<PhysicalPlan> getPhysicalPlans(PhysicalPlan physicalPlan, int size) throws OptimizerException {
+        List<PhysicalPlan> ppList = new ArrayList<PhysicalPlan>();
+        try {
+            ppList.add(physicalPlan);
+            for (int i = 1; i < size; i++) {
+                ppList.add(physicalPlan.clone());
+            }
+        } catch (CloneNotSupportedException e) {
+            int errCode = 2127;
+            String msg = "Internal Error: Cloning of plan failed for optimization.";
+            throw new OptimizerException(msg, errCode, PigException.BUG, e);
+        }
+        return ppList;
+    }
+
+    //Merge every operators in planB to operator "to" of planA
+    private void mergePlanAWithPlanB(PhysicalPlan planA, PhysicalPlan planB, PhysicalOperator to) throws PlanException {
+        PhysicalOperator predOfStore = planB.getLeaves().get(0);
+        planA.merge(planB);
+        planA.connect(predOfStore, to);
+    }
+
+    private void removeSpliter(SparkOperPlan plan, SparkOperator spliter, SparkOperator splittee) throws PlanException {
+        if (plan.getPredecessors(spliter) != null) {
+            List<SparkOperator> preds = new ArrayList(plan.getPredecessors(spliter));
+            plan.disconnect(spliter, splittee);
+            for (SparkOperator pred : preds) {
+                plan.disconnect(pred, spliter);
+                plan.connect(pred, splittee);
+            }
+        }
+        plan.remove(spliter);
+    }
+
     private void removeSplittee(SparkOperPlan plan, SparkOperator splitter,
                                 SparkOperator splittee) throws PlanException {
         if (plan.getSuccessors(splittee) != null) {

Modified: pig/branches/spark/src/org/apache/pig/impl/plan/OperatorPlan.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/impl/plan/OperatorPlan.java?rev=1751216&r1=1751215&r2=1751216&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/impl/plan/OperatorPlan.java (original)
+++ pig/branches/spark/src/org/apache/pig/impl/plan/OperatorPlan.java Mon Jul  4 06:49:09 2016
@@ -163,25 +163,6 @@ public abstract class OperatorPlan<E ext
     }
 
     /**
-     * connect from and to and ignore some judgements: like ignoring judge whether from operator supports multiOutputs
-     * and whether to operator supports multiInputs
-     *
-     * @param from Operator data will flow from.
-     * @param to   Operator data will flow to.
-     * @throws PlanException if connect from or to which is not in the plan
-     */
-    public void forceConnect(E from, E to) throws PlanException {
-        markDirty();
-
-        // Check that both nodes are in the plan.
-        checkInPlan(from);
-        checkInPlan(to);
-        mFromEdges.put(from, to);
-        mToEdges.put(to, from);
-    }
-
-
-    /**
      * Create an edge between two nodes.  The direction of the edge implies data
      * flow.
      * @param from Operator data will flow from.