You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2013/08/28 02:32:20 UTC
svn commit: r1518040 - in /pig/trunk: CHANGES.txt
src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java
test/org/apache/pig/test/TestMultiQueryCompiler.java
Author: daijy
Date: Wed Aug 28 00:32:20 2013
New Revision: 1518040
URL: http://svn.apache.org/r1518040
Log:
PIG-3435: Custom Partitioner not working with MultiQueryOptimizer
Modified:
pig/trunk/CHANGES.txt
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java
pig/trunk/test/org/apache/pig/test/TestMultiQueryCompiler.java
Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1518040&r1=1518039&r2=1518040&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Wed Aug 28 00:32:20 2013
@@ -214,6 +214,8 @@ PIG-3013: BinInterSedes improve chararra
BUG FIXES
+PIG-3435: Custom Partitioner not working with MultiQueryOptimizer (knoguchi via daijy)
+
PIG-3385: DISTINCT no longer uses custom partitioner (knoguchi via daijy)
PIG-3379: Alias reuse in nested foreach causes PIG script to fail (xuefuz via daijy)
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java?rev=1518040&r1=1518039&r2=1518040&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java Wed Aug 28 00:32:20 2013
@@ -115,6 +115,11 @@ class MultiQueryOptimizer extends MROpPl
+ " uses secondary key, do not merge it");
continue;
}
+ if (successor.getCustomPartitioner() != null) {
+ log.debug("Splittee " + successor.getOperatorKey().getId()
+ + " uses customPartitioner, do not merge it");
+ continue;
+ }
if (isMapOnly(successor)) {
if (isSingleLoadMapperPlan(successor.mapPlan)
&& isSinglePredecessor(successor)) {
Modified: pig/trunk/test/org/apache/pig/test/TestMultiQueryCompiler.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestMultiQueryCompiler.java?rev=1518040&r1=1518039&r2=1518040&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestMultiQueryCompiler.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestMultiQueryCompiler.java Wed Aug 28 00:32:20 2013
@@ -1387,9 +1387,13 @@ public class TestMultiQueryCompiler {
PhysicalPlan pp = checkPhysicalPlan(lp, 1, 2, 12);
- MROperPlan mrp = checkMRPlan(pp, 1, 1, 1);
-
- MapReduceOper mrop = mrp.getRoots().get(0);
+ //merging is skipped due to PIG-3435 for now
+ //MROperPlan mrp = checkMRPlan(pp, 1, 1, 1);
+ //MapReduceOper mrop = mrp.getRoots().get(0);
+
+ //Instead of 1 merged mapreduce job, there will be two.
+ MROperPlan mrp = checkMRPlan(pp, 1, 1, 2);
+ MapReduceOper mrop = mrp.getLeaves().get(0);
Assert.assertTrue(mrop.getCustomPartitioner().equals(SimpleCustomPartitioner.class.getName()));
} catch (Exception e) {
@@ -1397,6 +1401,45 @@ public class TestMultiQueryCompiler {
Assert.fail();
}
}
+
+ @Test
+ public void testMultiQueryDoNotMergeMRwithDifferentPartitioners() {
+
+ System.out.println("===== multi-query with intermediate stores =====");
+
+ try {
+ myPig.setBatchOn();
+
+ myPig.registerQuery("a = load 'passwd' " +
+ "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int, gid:int);");
+ myPig.registerQuery("b1 = FILTER a BY gid != 0;");
+ myPig.registerQuery("b2 = FILTER a BY uid > 100;");
+ myPig.registerQuery("c1 = GROUP b1 BY uname PARTITION BY " + SimpleCustomPartitioner.class.getName() + " PARALLEL 3;");
+ myPig.registerQuery("c2 = GROUP b2 BY uname PARALLEL 3;");
+ myPig.registerQuery("STORE c1 INTO 'output1';");
+ myPig.registerQuery("STORE c2 INTO 'output2';");
+
+ LogicalPlan lp = checkLogicalPlan(1, 2, 7);
+
+ PhysicalPlan pp = checkPhysicalPlan(lp, 1, 2, 15);
+
+ MROperPlan mrp = checkMRPlan(pp, 1, 1, 2);
+
+ // since the first mapreduce job of mrp.getRoots().get(0)
+ // is the merge of splitter and splittee without custom partitioner (c2 above),
+ // second job should contain the custom partitioner
+ MapReduceOper mrop;
+ mrop = mrp.getRoots().get(0);
+ Assert.assertTrue(mrop.getCustomPartitioner() == null );
+ mrop = mrp.getLeaves().get(0);
+ Assert.assertTrue(mrop.getCustomPartitioner().equals(SimpleCustomPartitioner.class.getName()));
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail(e.toString());
+ }
+ }
+
// --------------------------------------------------------------------------
// Helper methods