You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2013/08/28 02:32:20 UTC

svn commit: r1518040 - in /pig/trunk: CHANGES.txt src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java test/org/apache/pig/test/TestMultiQueryCompiler.java

Author: daijy
Date: Wed Aug 28 00:32:20 2013
New Revision: 1518040

URL: http://svn.apache.org/r1518040
Log:
PIG-3435: Custom Partitioner not working with MultiQueryOptimizer

Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java
    pig/trunk/test/org/apache/pig/test/TestMultiQueryCompiler.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1518040&r1=1518039&r2=1518040&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Wed Aug 28 00:32:20 2013
@@ -214,6 +214,8 @@ PIG-3013: BinInterSedes improve chararra
 
 BUG FIXES
 
+PIG-3435: Custom Partitioner not working with MultiQueryOptimizer (knoguchi via daijy)
+
 PIG-3385: DISTINCT no longer uses custom partitioner (knoguchi via daijy)
 
 PIG-3379: Alias reuse in nested foreach causes PIG script to fail (xuefuz via daijy)

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java?rev=1518040&r1=1518039&r2=1518040&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java Wed Aug 28 00:32:20 2013
@@ -115,6 +115,11 @@ class MultiQueryOptimizer extends MROpPl
                         + " uses secondary key, do not merge it");
                 continue;
             }
+            if (successor.getCustomPartitioner() != null) {
+                log.debug("Splittee " + successor.getOperatorKey().getId()
+                        + " uses customPartitioner, do not merge it");
+                continue;
+            }
             if (isMapOnly(successor)) {
                 if (isSingleLoadMapperPlan(successor.mapPlan)
                         && isSinglePredecessor(successor)) {                    

Modified: pig/trunk/test/org/apache/pig/test/TestMultiQueryCompiler.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestMultiQueryCompiler.java?rev=1518040&r1=1518039&r2=1518040&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestMultiQueryCompiler.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestMultiQueryCompiler.java Wed Aug 28 00:32:20 2013
@@ -1387,9 +1387,13 @@ public class TestMultiQueryCompiler {
 
             PhysicalPlan pp = checkPhysicalPlan(lp, 1, 2, 12);
 
-            MROperPlan mrp = checkMRPlan(pp, 1, 1, 1); 
-            
-            MapReduceOper mrop = mrp.getRoots().get(0);
+            //merging is skipped due to PIG-3435 for now
+            //MROperPlan mrp = checkMRPlan(pp, 1, 1, 1);
+            //MapReduceOper mrop = mrp.getRoots().get(0);
+
+            //Instead of 1 merged mapreduce job, there will be two.
+            MROperPlan mrp = checkMRPlan(pp, 1, 1, 2);
+            MapReduceOper mrop = mrp.getLeaves().get(0);
             Assert.assertTrue(mrop.getCustomPartitioner().equals(SimpleCustomPartitioner.class.getName()));
 
         } catch (Exception e) {
@@ -1397,6 +1401,45 @@ public class TestMultiQueryCompiler {
             Assert.fail();
         } 
     }    
+
+    @Test
+    public void testMultiQueryDoNotMergeMRwithDifferentPartitioners() {
+
+        System.out.println("===== multi-query with intermediate stores =====");
+
+        try {
+            myPig.setBatchOn();
+
+            myPig.registerQuery("a = load 'passwd' " +
+                                "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int, gid:int);");
+            myPig.registerQuery("b1 = FILTER a BY gid != 0;");
+            myPig.registerQuery("b2 = FILTER a BY uid > 100;");
+            myPig.registerQuery("c1 = GROUP b1 BY uname PARTITION BY " + SimpleCustomPartitioner.class.getName() + " PARALLEL 3;");
+            myPig.registerQuery("c2 = GROUP b2 BY uname PARALLEL 3;");
+            myPig.registerQuery("STORE c1 INTO 'output1';");
+            myPig.registerQuery("STORE c2 INTO 'output2';");
+
+            LogicalPlan lp = checkLogicalPlan(1, 2, 7);
+
+            PhysicalPlan pp = checkPhysicalPlan(lp, 1, 2, 15);
+
+            MROperPlan mrp = checkMRPlan(pp, 1, 1, 2);
+
+            // since the first mapreduce job of mrp.getRoots().get(0)
+            // is the merge of splitter and splittee without custom partitioner (c2 above),
+            // second job should contain the custom partitioner
+            MapReduceOper mrop;
+            mrop = mrp.getRoots().get(0);
+            Assert.assertTrue(mrop.getCustomPartitioner() == null );
+            mrop = mrp.getLeaves().get(0);
+            Assert.assertTrue(mrop.getCustomPartitioner().equals(SimpleCustomPartitioner.class.getName()));
+
+        } catch (Exception e) {
+            e.printStackTrace();
+            Assert.fail(e.toString());
+        }
+    }
+
     
     // --------------------------------------------------------------------------
     // Helper methods