You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by pr...@apache.org on 2016/07/04 06:49:09 UTC
svn commit: r1751216 - in /pig/branches/spark/src/org/apache/pig:
backend/hadoop/executionengine/physicalLayer/plans/PhysicalPlan.java
backend/hadoop/executionengine/spark/optimizer/MultiQueryOptimizerSpark.java
impl/plan/OperatorPlan.java
Author: praveen
Date: Mon Jul 4 06:49:09 2016
New Revision: 1751216
URL: http://svn.apache.org/viewvc?rev=1751216&view=rev
Log:
PIG-4871: Not use OperatorPlan#forceConnect in MultiQueryOptimizationSpark (Liyun via Praveen)
Modified:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhysicalPlan.java
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/MultiQueryOptimizerSpark.java
pig/branches/spark/src/org/apache/pig/impl/plan/OperatorPlan.java
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhysicalPlan.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhysicalPlan.java?rev=1751216&r1=1751215&r2=1751216&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhysicalPlan.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhysicalPlan.java Mon Jul 4 06:49:09 2016
@@ -143,25 +143,6 @@ public class PhysicalPlan extends Operat
to.setInputs(getPredecessors(to));
}
- /**
- * connect from and to and ignore some judgements: like ignoring judge whether from operator supports multiOutputs
- * and whether to operator supports multiInputs
- *
- * @param from
- * @param to
- */
- public void forceConnect(PhysicalOperator from, PhysicalOperator to) throws PlanException {
- super.forceConnect(from, to);
- to.setInputs(getPredecessors(to));
- }
-
- /*public void connect(List<PhysicalOperator> from, PhysicalOperator to) throws IOException{
- if(!to.supportsMultipleInputs()){
- throw new IOException("Invalid Operation on " + to.name() + ". It doesn't support multiple inputs.");
- }
-
- }*/
-
@Override
public void remove(PhysicalOperator op) {
op.setInputs(null);
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/MultiQueryOptimizerSpark.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/MultiQueryOptimizerSpark.java?rev=1751216&r1=1751215&r2=1751216&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/MultiQueryOptimizerSpark.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/MultiQueryOptimizerSpark.java Mon Jul 4 06:49:09 2016
@@ -18,11 +18,14 @@
package org.apache.pig.backend.hadoop.executionengine.spark.optimizer;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.pig.PigException;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
@@ -34,6 +37,7 @@ import org.apache.pig.impl.plan.Operator
import org.apache.pig.impl.plan.PlanException;
import org.apache.pig.impl.plan.ReverseDependencyOrderWalker;
import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.plan.optimizer.OptimizerException;
/**
@@ -77,30 +81,35 @@ public class MultiQueryOptimizerSpark ex
if (splittees.size() == 1) {
// We don't need a POSplit here, we can merge the splittee into spliter
+ SparkOperator spliter = sparkOp;
SparkOperator singleSplitee = splittees.get(0);
- POStore poStore = null;
- PhysicalOperator firstNodeLeaf = sparkOp.physicalPlan.getLeaves().get(0);
- if (firstNodeLeaf instanceof POStore) {
- poStore = (POStore) firstNodeLeaf;
- }
- PhysicalOperator firstNodeLeafPred = sparkOp.physicalPlan.getPredecessors(firstNodeLeaf).get(0);
- sparkOp.physicalPlan.remove(poStore); // remove unnecessary store
- List<PhysicalOperator> firstNodeRoots = singleSplitee.physicalPlan.getRoots();
- sparkOp.physicalPlan.merge(singleSplitee.physicalPlan);
- for (int j = 0; j < firstNodeRoots.size(); j++) {
- PhysicalOperator firstNodeRoot = firstNodeRoots.get(j);
- POLoad poLoad = null;
- if (firstNodeRoot instanceof POLoad && poStore != null) {
- poLoad = (POLoad) firstNodeRoot;
- if (poLoad.getLFile().getFileName().equals(poStore.getSFile().getFileName())) {
- PhysicalOperator firstNodeRootSucc = sparkOp.physicalPlan.getSuccessors(firstNodeRoot).get(0);
- sparkOp.physicalPlan.remove(poLoad); // remove unnecessary load
- sparkOp.physicalPlan.forceConnect(firstNodeLeafPred, firstNodeRootSucc);
+ List<PhysicalOperator> roots = singleSplitee.physicalPlan.getRoots();
+ List<PhysicalOperator> rootCopys = new ArrayList<PhysicalOperator>(roots);
+ //sort the roots by OperatorKey
+ //for the first element of roots, merge the physical plan of spliter and splittee
+ //for the other elements of roots,merge the clone physical plan of spliter and splittee
+ //the clone physical plan will have same type of physical operators but have more bigger OperatorKey
+ //thus physical operator with bigger OperatorKey will be executed later than those have small OperatorKey(see JobGraphBuilder.sortPredecessorRDDs())
+ Collections.sort(rootCopys);
+ List<PhysicalPlan> spliterPhysicalPlan = getPhysicalPlans(spliter.physicalPlan, rootCopys.size());
+ int i = 0;
+ for (PhysicalOperator root : rootCopys) {
+ if (root instanceof POLoad) {
+ POLoad load = (POLoad) root;
+ PhysicalPlan plClone = spliterPhysicalPlan.get(i);
+ POStore store = (POStore) plClone.getLeaves().get(0);
+ if (load.getLFile().getFileName().equals(store.getSFile().getFileName())) {
+ plClone.remove(store);
+ PhysicalOperator succOfload = singleSplitee.physicalPlan.getSuccessors(load).get(0);
+ singleSplitee.physicalPlan.remove(load);
+ mergePlanAWithPlanB(singleSplitee.physicalPlan, plClone, succOfload);
+ i++;
}
}
}
- addSubPlanPropertiesToParent(sparkOp, singleSplitee);
- removeSplittee(getPlan(), sparkOp, singleSplitee);
+
+ addSubPlanPropertiesToParent(singleSplitee, spliter);
+ removeSpliter(getPlan(), spliter, singleSplitee);
} else {
//If the size of splittee is more than 1, we need create a split which type is POSplit, merge all the physical plans
// of splittees to the physical plan of split and remove the splittees.
@@ -115,7 +124,7 @@ public class MultiQueryOptimizerSpark ex
ArrayList<SparkOperator> spliteesCopy = new ArrayList
<SparkOperator>(splittees);
for (SparkOperator splitee : spliteesCopy) {
- List<PhysicalOperator> rootsOfSplitee = splitee.physicalPlan.getRoots();
+ List<PhysicalOperator> rootsOfSplitee = new ArrayList(splitee.physicalPlan.getRoots());
for (int i = 0; i < rootsOfSplitee.size(); i++) {
if (rootsOfSplitee.get(i) instanceof POLoad) {
POLoad poLoad = (POLoad) rootsOfSplitee.get(i);
@@ -129,14 +138,13 @@ public class MultiQueryOptimizerSpark ex
LOG.debug(String.format("add multiQueryOptimize connection item: to:%s, from:%s for %s",
successorOfPoLoad.toString(), predOfPoStore.getOperatorKey().toString(), splitee.getOperatorKey()));
}
- poSplit.addPlan(splitee.physicalPlan);
- addSubPlanPropertiesToParent(sparkOp, splitee);
- removeSplittee(getPlan(), sparkOp, splitee);
}
}
}
+ poSplit.addPlan(splitee.physicalPlan);
+ addSubPlanPropertiesToParent(sparkOp, splitee);
+ removeSplittee(getPlan(), sparkOp, splitee);
}
-
sparkOp.physicalPlan.addAsLeaf(poSplit);
}
}
@@ -145,6 +153,40 @@ public class MultiQueryOptimizerSpark ex
}
}
+ private List<PhysicalPlan> getPhysicalPlans(PhysicalPlan physicalPlan, int size) throws OptimizerException {
+ List<PhysicalPlan> ppList = new ArrayList<PhysicalPlan>();
+ try {
+ ppList.add(physicalPlan);
+ for (int i = 1; i < size; i++) {
+ ppList.add(physicalPlan.clone());
+ }
+ } catch (CloneNotSupportedException e) {
+ int errCode = 2127;
+ String msg = "Internal Error: Cloning of plan failed for optimization.";
+ throw new OptimizerException(msg, errCode, PigException.BUG, e);
+ }
+ return ppList;
+ }
+
+ //Merge every operators in planB to operator "to" of planA
+ private void mergePlanAWithPlanB(PhysicalPlan planA, PhysicalPlan planB, PhysicalOperator to) throws PlanException {
+ PhysicalOperator predOfStore = planB.getLeaves().get(0);
+ planA.merge(planB);
+ planA.connect(predOfStore, to);
+ }
+
+ private void removeSpliter(SparkOperPlan plan, SparkOperator spliter, SparkOperator splittee) throws PlanException {
+ if (plan.getPredecessors(spliter) != null) {
+ List<SparkOperator> preds = new ArrayList(plan.getPredecessors(spliter));
+ plan.disconnect(spliter, splittee);
+ for (SparkOperator pred : preds) {
+ plan.disconnect(pred, spliter);
+ plan.connect(pred, splittee);
+ }
+ }
+ plan.remove(spliter);
+ }
+
private void removeSplittee(SparkOperPlan plan, SparkOperator splitter,
SparkOperator splittee) throws PlanException {
if (plan.getSuccessors(splittee) != null) {
Modified: pig/branches/spark/src/org/apache/pig/impl/plan/OperatorPlan.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/impl/plan/OperatorPlan.java?rev=1751216&r1=1751215&r2=1751216&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/impl/plan/OperatorPlan.java (original)
+++ pig/branches/spark/src/org/apache/pig/impl/plan/OperatorPlan.java Mon Jul 4 06:49:09 2016
@@ -163,25 +163,6 @@ public abstract class OperatorPlan<E ext
}
/**
- * connect from and to and ignore some judgements: like ignoring judge whether from operator supports multiOutputs
- * and whether to operator supports multiInputs
- *
- * @param from Operator data will flow from.
- * @param to Operator data will flow to.
- * @throws PlanException if connect from or to which is not in the plan
- */
- public void forceConnect(E from, E to) throws PlanException {
- markDirty();
-
- // Check that both nodes are in the plan.
- checkInPlan(from);
- checkInPlan(to);
- mFromEdges.put(from, to);
- mToEdges.put(to, from);
- }
-
-
- /**
* Create an edge between two nodes. The direction of the edge implies data
* flow.
* @param from Operator data will flow from.