You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by pr...@apache.org on 2009/10/30 21:53:47 UTC
svn commit: r831452 - in /hadoop/pig/trunk: CHANGES.txt
src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java
test/org/apache/pig/test/TestMultiQuery.java
Author: pradeepkth
Date: Fri Oct 30 20:53:46 2009
New Revision: 831452
URL: http://svn.apache.org/viewvc?rev=831452&view=rev
Log:
PIG-920: optimizing diamond queries (rding via pradeepkth)
Modified:
hadoop/pig/trunk/CHANGES.txt
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java
hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQuery.java
Modified: hadoop/pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=831452&r1=831451&r2=831452&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Fri Oct 30 20:53:46 2009
@@ -26,6 +26,8 @@
IMPROVEMENTS
+PIG-920: optimizing diamond queries (rding via pradeepkth)
+
PIG-1040: FINDBUGS: MS_SHOULD_BE_FINAL: Field isn't final but should be (olgan)
PIG-1059: FINDBUGS: remaining Bad practice + Multithreaded correctness Warning (olgan)
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java?rev=831452&r1=831451&r2=831452&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java Fri Oct 30 20:53:46 2009
@@ -24,12 +24,14 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.pig.PigException;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROpPlanVisitor;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PODemux;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMultiQueryPackage;
@@ -43,9 +45,8 @@
import org.apache.pig.impl.plan.PlanException;
import org.apache.pig.impl.plan.ReverseDependencyOrderWalker;
import org.apache.pig.impl.plan.VisitorException;
-import org.apache.pig.impl.util.Pair;
import org.apache.pig.impl.plan.optimizer.OptimizerException;
-import org.apache.pig.PigException;
+import org.apache.pig.impl.util.Pair;
/**
@@ -204,10 +205,121 @@
}
}
+ // case 6: special diamond case with trivial MR operator at the head
+ if (numMerges == 0 && isDiamondMROper(mr)) {
+ int merged = mergeDiamondMROper(mr, getPlan().getSuccessors(mr));
+ log.info("Merged " + merged + " diamond splitter.");
+ numMerges += merged;
+ }
+
log.info("Merged " + numMerges + " out of total "
- + numSplittees + " splittees.");
+ + (numSplittees +1) + " MR operators.");
}
+ private boolean isDiamondMROper(MapReduceOper mr) {
+
+ // We'll remove this mr as part of diamond query optimization
+ // only if this mr is a trivial one, that is, it's plan
+ // has either two operators (load followed by store) or three operators
+ // (the operator between the load and store must be a foreach,
+ // introduced by casting operation).
+ //
+ // We won't optimize in other cases where there're more operators
+ // in the plan. Otherwise those operators world run multiple times
+ // in the successor MR operators which may not give better
+ // performance.
+ boolean rtn = false;
+ if (isMapOnly(mr)) {
+ PhysicalPlan pl = mr.mapPlan;
+ if (pl.size() == 2 || pl.size() == 3) {
+ PhysicalOperator root = pl.getRoots().get(0);
+ PhysicalOperator leaf = pl.getLeaves().get(0);
+ if (root instanceof POLoad && leaf instanceof POStore) {
+ if (pl.size() == 3) {
+ PhysicalOperator mid = pl.getSuccessors(root).get(0);
+ if (mid instanceof POForEach) {
+ rtn = true;
+ }
+ } else {
+ rtn = true;
+ }
+ }
+ }
+ }
+ return rtn;
+ }
+
+ private int mergeDiamondMROper(MapReduceOper mr, List<MapReduceOper> succs)
+ throws VisitorException {
+
+ // Only consider the cases where all inputs of the splittees are
+ // from the splitter
+ for (MapReduceOper succ : succs) {
+ List<MapReduceOper> preds = getPlan().getPredecessors(succ);
+ if (preds.size() != 1) {
+ return 0;
+ }
+ }
+
+ // first, remove the store operator from the splitter
+ PhysicalPlan pl = mr.mapPlan;
+ PhysicalOperator leaf = mr.mapPlan.getLeaves().get(0);
+ pl.remove(leaf);
+
+ // then connect the remaining map plan to the successor of
+ // each root (load) operator of the splittee
+ for (MapReduceOper succ : succs) {
+ List<PhysicalOperator> roots = succ.mapPlan.getRoots();
+ ArrayList<PhysicalOperator> rootsCopy =
+ new ArrayList<PhysicalOperator>(roots);
+ for (PhysicalOperator op : rootsCopy) {
+ PhysicalOperator opSucc = succ.mapPlan.getSuccessors(op).get(0);
+ PhysicalPlan clone = null;
+ try {
+ clone = pl.clone();
+ } catch (CloneNotSupportedException e) {
+ int errCode = 2127;
+ String msg = "Internal Error: Cloning of plan failed for optimization.";
+ throw new OptimizerException(msg, errCode, PigException.BUG, e);
+ }
+ succ.mapPlan.remove(op);
+ while (!clone.isEmpty()) {
+ PhysicalOperator oper = clone.getLeaves().get(0);
+ clone.remove(oper);
+ succ.mapPlan.add(oper);
+ try {
+ succ.mapPlan.connect(oper, opSucc);
+ opSucc = oper;
+ } catch (PlanException e) {
+ int errCode = 2131;
+ String msg = "Internal Error. Unable to connect split plan for optimization.";
+ throw new OptimizerException(msg, errCode, PigException.BUG, e);
+ }
+ }
+ }
+ }
+
+ // finally, remove the splitter from the MR plan
+ List<MapReduceOper> mrPreds = getPlan().getPredecessors(mr);
+ if (mrPreds != null) {
+ for (MapReduceOper pred : mrPreds) {
+ for (MapReduceOper succ : succs) {
+ try {
+ getPlan().connect(pred, succ);
+ } catch (PlanException e) {
+ int errCode = 2131;
+ String msg = "Internal Error. Unable to connect split plan for optimization.";
+ throw new OptimizerException(msg, errCode, PigException.BUG, e);
+ }
+ }
+ }
+ }
+
+ getPlan().remove(mr);
+
+ return 1;
+ }
+
private void mergeOneMapPart(MapReduceOper mapper, MapReduceOper splitter)
throws VisitorException {
PhysicalPlan splitterPl = isMapOnly(splitter) ?
@@ -1091,5 +1203,5 @@
private POMultiQueryPackage getMultiQueryPackage(){
return new POMultiQueryPackage(new OperatorKey(scope, nig.getNextNodeId(scope)));
- }
+ }
}
Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQuery.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQuery.java?rev=831452&r1=831451&r2=831452&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQuery.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQuery.java Fri Oct 30 20:53:46 2009
@@ -89,6 +89,154 @@
public void tearDown() throws Exception {
myPig = null;
}
+
+ @Test
+ public void testMultiQueryJiraPig920() {
+
+ // test case: a simple diamond query
+ try {
+ myPig.setBatchOn();
+
+ myPig.registerQuery("a = load 'file:test/org/apache/pig/test/data/passwd' " +
+ "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int, gid:int);");
+ myPig.registerQuery("b = filter a by uid < 5;");
+ myPig.registerQuery("c = filter a by gid >= 5;");
+ myPig.registerQuery("d = cogroup c by $0, b by $0;");
+ myPig.registerQuery("e = foreach d generate group, COUNT(c), COUNT(b);");
+ myPig.registerQuery("store e into '/tmp/output1';");
+
+ LogicalPlan lp = checkLogicalPlan(1, 1, 10);
+
+ PhysicalPlan pp = checkPhysicalPlan(lp, 1, 1, 13);
+
+ checkMRPlan(pp, 1, 1, 1);
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail();
+ }
+ }
+
+ @Test
+ public void testMultiQueryJiraPig920_1() {
+
+ // test case: a query with two diamonds
+ try {
+ myPig.setBatchOn();
+
+ myPig.registerQuery("a = load 'file:test/org/apache/pig/test/data/passwd' " +
+ "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int, gid:int);");
+ myPig.registerQuery("b = filter a by uid < 5;");
+ myPig.registerQuery("c = filter a by gid >= 5;");
+ myPig.registerQuery("d = filter a by uid >= 5;");
+ myPig.registerQuery("e = filter a by gid < 5;");
+ myPig.registerQuery("f = cogroup c by $0, b by $0;");
+ myPig.registerQuery("f1 = foreach f generate group, COUNT(c), COUNT(b);");
+ myPig.registerQuery("store f1 into '/tmp/output1';");
+ myPig.registerQuery("g = cogroup d by $0, e by $0;");
+ myPig.registerQuery("g1 = foreach g generate group, COUNT(d), COUNT(e);");
+ myPig.registerQuery("store g1 into '/tmp/output2';");
+
+ LogicalPlan lp = checkLogicalPlan(1, 2, 17);
+
+ PhysicalPlan pp = checkPhysicalPlan(lp, 1, 2, 23);
+
+ checkMRPlan(pp, 2, 2, 2);
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail();
+ }
+ }
+
+ @Test
+ public void testMultiQueryJiraPig920_2() {
+
+ // test case: execution of a query with two diamonds
+ try {
+ myPig.setBatchOn();
+
+ myPig.registerQuery("a = load 'file:test/org/apache/pig/test/data/passwd' " +
+ "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int, gid:int);");
+ myPig.registerQuery("b = filter a by uid < 5;");
+ myPig.registerQuery("c = filter a by gid >= 5;");
+ myPig.registerQuery("d = filter a by uid >= 5;");
+ myPig.registerQuery("e = filter a by gid < 5;");
+ myPig.registerQuery("f = cogroup c by $0, b by $0;");
+ myPig.registerQuery("f1 = foreach f generate group, COUNT(c), COUNT(b);");
+ myPig.registerQuery("store f1 into '/tmp/output1';");
+ myPig.registerQuery("g = cogroup d by $0, e by $0;");
+ myPig.registerQuery("g1 = foreach g generate group, COUNT(d), COUNT(e);");
+ myPig.registerQuery("store g1 into '/tmp/output2';");
+
+ List<ExecJob> jobs = myPig.executeBatch();
+ for (ExecJob job : jobs) {
+ assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED);
+ }
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail();
+ }
+ }
+
+ @Test
+ public void testMultiQueryJiraPig920_3() {
+
+ // test case: execution of a simple diamond query
+
+ String INPUT_FILE = "pig-920.txt";
+
+ try {
+
+ PrintWriter w = new PrintWriter(new FileWriter(INPUT_FILE));
+ w.println("apple\tapple\t100\t10");
+ w.println("apple\tapple\t200\t20");
+ w.println("orange\torange\t100\t10");
+ w.println("orange\torange\t300\t20");
+
+ w.close();
+
+ Util.copyFromLocalToCluster(cluster, INPUT_FILE, INPUT_FILE);
+
+ myPig.setBatchOn();
+
+ myPig.registerQuery("a = load '" + INPUT_FILE +
+ "' as (uname:chararray, passwd:chararray, uid:int, gid:int);");
+ myPig.registerQuery("b = filter a by uid < 300;");
+ myPig.registerQuery("c = filter a by gid > 10;");
+ myPig.registerQuery("d = cogroup c by $0, b by $0;");
+ myPig.registerQuery("e = foreach d generate group, COUNT(c), COUNT(b);");
+
+ Iterator<Tuple> iter = myPig.openIterator("e");
+
+ List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings(
+ new String[] {
+ "('apple',1L,2L)",
+ "('orange',1L,1L)"
+ });
+
+ int counter = 0;
+ while (iter.hasNext()) {
+ assertEquals(expectedResults.get(counter++).toString(), iter.next().toString());
+ }
+
+ assertEquals(expectedResults.size(), counter);
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail();
+ } finally {
+ new File(INPUT_FILE).delete();
+ try {
+ Util.deleteFile(cluster, INPUT_FILE);
+ } catch (IOException e) {
+ e.printStackTrace();
+ Assert.fail();
+ }
+
+ }
+ }
@Test
public void testMultiQueryWithDemoCase() {
@@ -491,7 +639,7 @@
PhysicalPlan pp = checkPhysicalPlan(lp, 1, 2, 25);
- checkMRPlan(pp, 1, 1, 3);
+ checkMRPlan(pp, 1, 1, 2);
} catch (Exception e) {
e.printStackTrace();
@@ -519,8 +667,13 @@
myPig.registerQuery("f1 = foreach f generate group, SUM(d.c::uid);");
myPig.registerQuery("store f1 into '/tmp/output2';");
- myPig.executeBatch();
+ List<ExecJob> jobs = myPig.executeBatch();
+ assertTrue(jobs.size() == 2);
+
+ for (ExecJob job : jobs) {
+ assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED);
+ }
} catch (Exception e) {
e.printStackTrace();
Assert.fail();