You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by rd...@apache.org on 2010/06/10 20:09:55 UTC
svn commit: r953414 - in /hadoop/pig/branches/branch-0.7: CHANGES.txt
src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java
test/org/apache/pig/test/TestMultiQuery.java
Author: rding
Date: Thu Jun 10 18:09:54 2010
New Revision: 953414
URL: http://svn.apache.org/viewvc?rev=953414&view=rev
Log:
PIG-1438: [Performance] MultiQueryOptimizer should also merge DISTINCT jobs
Modified:
hadoop/pig/branches/branch-0.7/CHANGES.txt
hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java
hadoop/pig/branches/branch-0.7/test/org/apache/pig/test/TestMultiQuery.java
Modified: hadoop/pig/branches/branch-0.7/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.7/CHANGES.txt?rev=953414&r1=953413&r2=953414&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.7/CHANGES.txt (original)
+++ hadoop/pig/branches/branch-0.7/CHANGES.txt Thu Jun 10 18:09:54 2010
@@ -22,6 +22,9 @@ Release 0.7.0 - 2010-05-03
INCOMPATIBLE CHANGES
+PIG-1438: [Performance] MultiQueryOptimizer should also merge DISTINCT jobs
+(rding)
+
PIG-1292: Interface Refinements (hashutosh)
PIG-1259: ResourceFieldSchema.setSchema should not allow a bag field without a
Modified: hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java?rev=953414&r1=953413&r2=953414&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java (original)
+++ hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java Thu Jun 10 18:09:54 2010
@@ -455,10 +455,12 @@ class MultiQueryOptimizer extends MROpPl
return true;
}
- private List<MapReduceOper> getMergeList(List<MapReduceOper> mapReducers) {
+ private List<MapReduceOper> getMergeList(MapReduceOper splitter,
+ List<MapReduceOper> mapReducers) {
List<MapReduceOper> mergeNoCmbList = new ArrayList<MapReduceOper>();
List<MapReduceOper> mergeCmbList = new ArrayList<MapReduceOper>();
-
+ List<MapReduceOper> mergeDistList = new ArrayList<MapReduceOper>();
+
for (MapReduceOper mrOp : mapReducers) {
if (isSplitteeMergeable(mrOp)) {
if (mrOp.combinePlan.isEmpty()) {
@@ -466,16 +468,26 @@ class MultiQueryOptimizer extends MROpPl
} else {
mergeCmbList.add(mrOp);
}
- }
- }
- return (mergeNoCmbList.size() > mergeCmbList.size()) ?
- mergeNoCmbList : mergeCmbList;
+ } else if (splitter.reducePlan.isEmpty()
+ || splitter.needsDistinctCombiner()) {
+ if (mrOp.needsDistinctCombiner()) {
+ mergeDistList.add(mrOp);
+ }
+ }
+ }
+
+ int max = Math.max(mergeNoCmbList.size(), mergeCmbList.size());
+ max = Math.max(max, mergeDistList.size());
+
+ if (max == mergeDistList.size()) return mergeDistList;
+ else if (max == mergeNoCmbList.size()) return mergeNoCmbList;
+ else return mergeCmbList;
}
private int mergeMapReduceSplittees(List<MapReduceOper> mapReducers,
MapReduceOper splitter, POSplit splitOp) throws VisitorException {
- List<MapReduceOper> mergeList = getMergeList(mapReducers);
+ List<MapReduceOper> mergeList = getMergeList(splitter, mapReducers);
if (mergeList.size() <= 1) {
@@ -507,7 +519,7 @@ class MultiQueryOptimizer extends MROpPl
// MR splittees into the splitter. What we'll do is to merge multiple
// splittees (if exists) into a new MR operator and connect it to the splitter.
- List<MapReduceOper> mergeList = getMergeList(mapReducers);
+ List<MapReduceOper> mergeList = getMergeList(splitter, mapReducers);
if (mergeList.size() <= 1) {
// nothing to merge, just return
Modified: hadoop/pig/branches/branch-0.7/test/org/apache/pig/test/TestMultiQuery.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.7/test/org/apache/pig/test/TestMultiQuery.java?rev=953414&r1=953413&r2=953414&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.7/test/org/apache/pig/test/TestMultiQuery.java (original)
+++ hadoop/pig/branches/branch-0.7/test/org/apache/pig/test/TestMultiQuery.java Thu Jun 10 18:09:54 2010
@@ -112,6 +112,122 @@ public class TestMultiQuery {
}
@Test
+ public void testMultiQueryJiraPig1438() {
+
+ // test case: merge multiple distinct jobs
+
+ String INPUT_FILE = "abc";
+
+ try {
+
+ PrintWriter w = new PrintWriter(new FileWriter(INPUT_FILE));
+ w.println("1\t2\t3");
+ w.println("2\t3\t4");
+ w.println("1\t2\t3");
+ w.println("2\t3\t4");
+ w.println("1\t2\t3");
+ w.close();
+
+ Util.copyFromLocalToCluster(cluster, INPUT_FILE, INPUT_FILE);
+
+ myPig.setBatchOn();
+
+ myPig.registerQuery("A = load '" + INPUT_FILE + "' as (col1:int, col2:int, col3:int);");
+ myPig.registerQuery("B1 = foreach A generate col1, col2;");
+ myPig.registerQuery("B2 = foreach A generate col2, col3;");
+ myPig.registerQuery("C1 = distinct B1;");
+ myPig.registerQuery("C2 = distinct B2;");
+ myPig.registerQuery("D1 = foreach C1 generate col1, col2;");
+ myPig.registerQuery("D2 = foreach C2 generate col2, col3;");
+ myPig.registerQuery("store D1 into '/tmp/output1';");
+ myPig.registerQuery("store D2 into '/tmp/output2';");
+
+ LogicalPlan lp = checkLogicalPlan(1, 2, 13);
+
+ PhysicalPlan pp = checkPhysicalPlan(lp, 1, 2, 13);
+
+ checkMRPlan(pp, 1, 1, 1);
+
+ myPig.executeBatch();
+
+ myPig.registerQuery("E = load '/tmp/output1' as (a:int, b:int);");
+ Iterator<Tuple> iter = myPig.openIterator("E");
+
+ List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings(
+ new String[] {
+ "(1,2)",
+ "(2,3)"
+ });
+
+ int counter = 0;
+ while (iter.hasNext()) {
+ assertEquals(expectedResults.get(counter++).toString(), iter.next().toString());
+ }
+ assertEquals(expectedResults.size(), counter);
+
+ myPig.registerQuery("E = load '/tmp/output2' as (a:int, b:int);");
+ iter = myPig.openIterator("E");
+
+ expectedResults = Util.getTuplesFromConstantTupleStrings(
+ new String[] {
+ "(2,3)",
+ "(3,4)"
+ });
+
+ counter = 0;
+ while (iter.hasNext()) {
+ assertEquals(expectedResults.get(counter++).toString(), iter.next().toString());
+ }
+
+ assertEquals(expectedResults.size(), counter);
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail();
+ } finally {
+ new File(INPUT_FILE).delete();
+ try {
+ Util.deleteFile(cluster, INPUT_FILE);
+ } catch (IOException e) {
+ e.printStackTrace();
+ Assert.fail();
+ }
+ }
+ }
+
+ @Test
+ public void testMultiQueryJiraPig1438_2() {
+
+ // test case: merge multiple distinct jobs -- one group by job, one distinct job
+
+ String INPUT_FILE = "abc";
+
+ try {
+ myPig.setBatchOn();
+
+ myPig.registerQuery("A = load '" + INPUT_FILE + "' as (col1:int, col2:int, col3:int);");
+ myPig.registerQuery("B1 = foreach A generate col1, col2;");
+ myPig.registerQuery("B2 = foreach A generate col2, col3;");
+ myPig.registerQuery("C1 = distinct B1;");
+ myPig.registerQuery("C2 = group B2 by (col2, col3);");
+ myPig.registerQuery("D1 = foreach C1 generate col1, col2;");
+ myPig.registerQuery("D2 = foreach C2 generate B2.col2, B2.col3;");
+ myPig.registerQuery("store D1 into '/tmp/output1';");
+ myPig.registerQuery("store D2 into '/tmp/output2';");
+
+ LogicalPlan lp = checkLogicalPlan(1, 2, 13);
+
+ PhysicalPlan pp = checkPhysicalPlan(lp, 1, 2, 15);
+
+ checkMRPlan(pp, 1, 1, 2);
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail();
+ }
+ }
+
+ @Test
public void testMultiQueryJiraPig1252() {
// test case: Problems with secondary key optimization and multiquery