You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2013/08/28 22:38:59 UTC

svn commit: r1518377 - in /pig/branches/branch-0.11: ./ src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/ src/org/apache/pig/newplan/logical/relational...

Author: daijy
Date: Wed Aug 28 20:38:59 2013
New Revision: 1518377

URL: http://svn.apache.org/r1518377
Log:
PIG-3385: DISTINCT no longer uses custom partitioner

Added:
    pig/branches/branch-0.11/test/org/apache/pig/test/utils/SimpleCustomPartitioner3.java
Modified:
    pig/branches/branch-0.11/CHANGES.txt
    pig/branches/branch-0.11/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
    pig/branches/branch-0.11/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODistinct.java
    pig/branches/branch-0.11/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java
    pig/branches/branch-0.11/test/org/apache/pig/test/TestEvalPipeline2.java

Modified: pig/branches/branch-0.11/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.11/CHANGES.txt?rev=1518377&r1=1518376&r2=1518377&view=diff
==============================================================================
--- pig/branches/branch-0.11/CHANGES.txt (original)
+++ pig/branches/branch-0.11/CHANGES.txt Wed Aug 28 20:38:59 2013
@@ -32,6 +32,8 @@ PIG-2769: a simple logic causes very lon
 
 BUG FIXES
 
+PIG-3385: DISTINCT no longer uses custom partitioner (knoguchi via daijy)
+
 PIG-2507: Semicolon in paramenters for UDF results in parsing error (tnachen via daijy)
  
 PIG-3341: Strict datetime parsing and improve performance of loading datetime values (rohini)

Modified: pig/branches/branch-0.11/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.11/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=1518377&r1=1518376&r2=1518377&view=diff
==============================================================================
--- pig/branches/branch-0.11/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java (original)
+++ pig/branches/branch-0.11/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java Wed Aug 28 20:38:59 2013
@@ -1761,6 +1761,7 @@ public class MRCompiler extends PhyPlanV
             addToMap(lr);
             
             blocking(op);
+            curMROp.customPartitioner = op.getCustomPartitioner();
             
             POPackage pkg = new POPackage(new OperatorKey(scope,nig.getNextNodeId(scope)));
             pkg.setKeyType(DataType.TUPLE);

Modified: pig/branches/branch-0.11/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODistinct.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.11/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODistinct.java?rev=1518377&r1=1518376&r2=1518377&view=diff
==============================================================================
--- pig/branches/branch-0.11/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODistinct.java (original)
+++ pig/branches/branch-0.11/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODistinct.java Wed Aug 28 20:38:59 2013
@@ -60,6 +60,17 @@ public class PODistinct extends Physical
     private DataBag distinctBag = null;
     transient Iterator<Tuple> it;
 
+    // PIG-3385: Since GlobalRearrange is not used by PODistinct, passing the
+    // custom partioner through here
+    protected String customPartitioner;
+
+    public String getCustomPartitioner() {
+        return customPartitioner;
+    }
+    public void setCustomPartitioner(String customPartitioner) {
+        this.customPartitioner = customPartitioner;
+    }
+
     public PODistinct(OperatorKey k, int rp, List<PhysicalOperator> inp) {
         super(k, rp, inp);
     }

Modified: pig/branches/branch-0.11/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.11/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java?rev=1518377&r1=1518376&r2=1518377&view=diff
==============================================================================
--- pig/branches/branch-0.11/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java (original)
+++ pig/branches/branch-0.11/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java Wed Aug 28 20:38:59 2013
@@ -1559,6 +1559,7 @@ public class LogToPhyTranslationVisitor 
     public void visit(LODistinct loDistinct) throws FrontendException {
         String scope = DEFAULT_SCOPE;
         PODistinct physOp = new PODistinct(new OperatorKey(scope,nodeGen.getNextNodeId(scope)), loDistinct.getRequestedParallelism());
+        physOp.setCustomPartitioner(loDistinct.getCustomPartitioner());
         physOp.addOriginalLocation(loDistinct.getAlias(), loDistinct.getLocation());
         currentPlan.add(physOp);
         physOp.setResultType(DataType.BAG);

Modified: pig/branches/branch-0.11/test/org/apache/pig/test/TestEvalPipeline2.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.11/test/org/apache/pig/test/TestEvalPipeline2.java?rev=1518377&r1=1518376&r2=1518377&view=diff
==============================================================================
--- pig/branches/branch-0.11/test/org/apache/pig/test/TestEvalPipeline2.java (original)
+++ pig/branches/branch-0.11/test/org/apache/pig/test/TestEvalPipeline2.java Wed Aug 28 20:38:59 2013
@@ -555,7 +555,7 @@ public class TestEvalPipeline2 {
     // See PIG-282
     @Test
     public void testCustomPartitionerGroups() throws Exception{
-    	String[] input = {
+        String[] input = {
                 "1\t1",
                 "2\t1",
                 "3\t1",
@@ -567,37 +567,76 @@ public class TestEvalPipeline2 {
         
         // It should be noted that for a map reduce job, the total number of partitions 
         // is the same as the number of reduce tasks for the job. Hence we need to find a case wherein 
-        // we will get more than one reduce job so that we can use the partitioner. 	
+        // we will get more than one reduce job so that we can use the partitioner.
         // The following logic assumes that we get 2 reduce jobs, so that we can hard-code the logic.
+        // SimpleCustomPartitioner3 simply returns '1' (second reducer) for all inputs when
+        // partition number is bigger than 1.
         //
-        pigServer.registerQuery("B = group A by $0 PARTITION BY org.apache.pig.test.utils.SimpleCustomPartitioner parallel 2;");
+        pigServer.registerQuery("B = group A by $0 PARTITION BY org.apache.pig.test.utils.SimpleCustomPartitioner3 parallel 2;");
         
         pigServer.store("B", "tmp_testCustomPartitionerGroups");
         
         new File("tmp_testCustomPartitionerGroups").mkdir();
         
-        // SimpleCustomPartitioner partitions as per the parity of the key
-        // Need to change this in SimpleCustomPartitioner is changed
         Util.copyFromClusterToLocal(cluster, "tmp_testCustomPartitionerGroups/part-r-00000", "tmp_testCustomPartitionerGroups/part-r-00000");
         BufferedReader reader = new BufferedReader(new FileReader("tmp_testCustomPartitionerGroups/part-r-00000"));
- 	    String line = null;      	     
- 	    while((line = reader.readLine()) != null) {
- 	        String[] cols = line.split("\t");
- 	        int value = Integer.parseInt(cols[0]) % 2;
- 	        Assert.assertEquals(0, value);
- 	    }
- 	    Util.copyFromClusterToLocal(cluster, "tmp_testCustomPartitionerGroups/part-r-00001", "tmp_testCustomPartitionerGroups/part-r-00001");
+        String line = null;
+        while((line = reader.readLine()) != null) {
+            Assert.fail("Partition 0 should be empty.  Most likely Custom Partitioner was not used.");
+        }
+        Util.copyFromClusterToLocal(cluster, "tmp_testCustomPartitionerGroups/part-r-00001", "tmp_testCustomPartitionerGroups/part-r-00001");
         reader = new BufferedReader(new FileReader("tmp_testCustomPartitionerGroups/part-r-00001"));
- 	    line = null;      	     
- 	    while((line = reader.readLine()) != null) {
- 	        String[] cols = line.split("\t");
- 	        int value = Integer.parseInt(cols[0]) % 2;
- 	        Assert.assertEquals(1, value);
- 	    } 
+        line = null;
+        int count=0;
+        while((line = reader.readLine()) != null) {
+            //all outputs should come to partion 1 (with SimpleCustomPartitioner3)
+            count++;
+        }
+        Assert.assertEquals(4, count);
         Util.deleteDirectory(new File("tmp_testCustomPartitionerGroups"));
+        Util.deleteFile(cluster, "tmp_testCustomPartitionerGroups");
         Util.deleteFile(cluster, "table_testCustomPartitionerGroups");
     }
-    
+
+    // See PIG-3385
+    @Test
+    public void testCustomPartitionerDistinct() throws Exception{
+        String[] input = {
+                "1\t1",
+                "2\t1",
+                "1\t1",
+                "3\t1",
+                "4\t1",
+        };
+        Util.createInputFile(cluster, "table_testCustomPartitionerDistinct", input);
+
+        pigServer.registerQuery("A = LOAD 'table_testCustomPartitionerDistinct' as (a0:int, a1:int);");
+        pigServer.registerQuery("B = distinct A PARTITION BY org.apache.pig.test.utils.SimpleCustomPartitioner3 parallel 2;");
+        pigServer.store("B", "tmp_testCustomPartitionerDistinct");
+
+        new File("tmp_testCustomPartitionerDistinct").mkdir();
+
+        // SimpleCustomPartitioner3 simply partition all inputs to *second* reducer
+        Util.copyFromClusterToLocal(cluster, "tmp_testCustomPartitionerDistinct/part-r-00000", "tmp_testCustomPartitionerDistinct/part-r-00000");
+        BufferedReader reader = new BufferedReader(new FileReader("tmp_testCustomPartitionerDistinct/part-r-00000"));
+        String line = null;
+        while((line = reader.readLine()) != null) {
+            Assert.fail("Partition 0 should be empty.  Most likely Custom Partitioner was not used.");
+        }
+        Util.copyFromClusterToLocal(cluster, "tmp_testCustomPartitionerDistinct/part-r-00001", "tmp_testCustomPartitionerDistinct/part-r-00001");
+        reader = new BufferedReader(new FileReader("tmp_testCustomPartitionerDistinct/part-r-00001"));
+        line = null;
+        int count=0;
+        while((line = reader.readLine()) != null) {
+            //all outputs should come to partion 1 (with SimpleCustomPartitioner3)
+            count++;
+        }
+        Assert.assertEquals(4, count);
+        Util.deleteDirectory(new File("tmp_testCustomPartitionerDistinct"));
+        Util.deleteFile(cluster, "tmp_testCustomPartitionerDistinct");
+        Util.deleteFile(cluster, "table_testCustomPartitionerDistinct");
+    }
+
     // See PIG-282
     @Test
     public void testCustomPartitionerCross() throws Exception{

Added: pig/branches/branch-0.11/test/org/apache/pig/test/utils/SimpleCustomPartitioner3.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.11/test/org/apache/pig/test/utils/SimpleCustomPartitioner3.java?rev=1518377&view=auto
==============================================================================
--- pig/branches/branch-0.11/test/org/apache/pig/test/utils/SimpleCustomPartitioner3.java (added)
+++ pig/branches/branch-0.11/test/org/apache/pig/test/utils/SimpleCustomPartitioner3.java Wed Aug 28 20:38:59 2013
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test.utils;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.Partitioner;
+import org.apache.pig.impl.io.PigNullableWritable;
+
+public class SimpleCustomPartitioner3 extends Partitioner<PigNullableWritable, Writable> {
+    @Override
+    public int getPartition(PigNullableWritable key, Writable value, int numPartitions) {
+        return numPartitions >= 1 ? 1 : 0;
+    }
+}