You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2013/08/28 02:23:32 UTC
svn commit: r1518039 - in /pig/trunk: ./
src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/
src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/
src/org/apache/pig/newplan/logical/relational/ test/org/apac...
Author: daijy
Date: Wed Aug 28 00:23:32 2013
New Revision: 1518039
URL: http://svn.apache.org/r1518039
Log:
PIG-3385: DISTINCT no longer uses custom partitioner
Added:
pig/trunk/test/org/apache/pig/test/utils/SimpleCustomPartitioner3.java
Modified:
pig/trunk/CHANGES.txt
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODistinct.java
pig/trunk/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java
pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java
Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1518039&r1=1518038&r2=1518039&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Wed Aug 28 00:23:32 2013
@@ -214,6 +214,8 @@ PIG-3013: BinInterSedes improve chararra
BUG FIXES
+PIG-3385: DISTINCT no longer uses custom partitioner (knoguchi via daijy)
+
PIG-3379: Alias reuse in nested foreach causes PIG script to fail (xuefuz via daijy)
PIG-3432: typo in log message in SchemaTupleFrontend (epishkin via cheolsoo)
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=1518039&r1=1518038&r2=1518039&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java Wed Aug 28 00:23:32 2013
@@ -1762,6 +1762,7 @@ public class MRCompiler extends PhyPlanV
addToMap(lr);
blocking(op);
+ curMROp.customPartitioner = op.getCustomPartitioner();
POPackage pkg = new POPackage(new OperatorKey(scope,nig.getNextNodeId(scope)));
pkg.setKeyType(DataType.TUPLE);
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODistinct.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODistinct.java?rev=1518039&r1=1518038&r2=1518039&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODistinct.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODistinct.java Wed Aug 28 00:23:32 2013
@@ -60,6 +60,17 @@ public class PODistinct extends Physical
private DataBag distinctBag = null;
transient Iterator<Tuple> it;
+ // PIG-3385: Since GlobalRearrange is not used by PODistinct, passing the
+ // custom partioner through here
+ protected String customPartitioner;
+
+ public String getCustomPartitioner() {
+ return customPartitioner;
+ }
+ public void setCustomPartitioner(String customPartitioner) {
+ this.customPartitioner = customPartitioner;
+ }
+
public PODistinct(OperatorKey k, int rp, List<PhysicalOperator> inp) {
super(k, rp, inp);
}
Modified: pig/trunk/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java?rev=1518039&r1=1518038&r2=1518039&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java Wed Aug 28 00:23:32 2013
@@ -1559,6 +1559,7 @@ public class LogToPhyTranslationVisitor
public void visit(LODistinct loDistinct) throws FrontendException {
String scope = DEFAULT_SCOPE;
PODistinct physOp = new PODistinct(new OperatorKey(scope,nodeGen.getNextNodeId(scope)), loDistinct.getRequestedParallelism());
+ physOp.setCustomPartitioner(loDistinct.getCustomPartitioner());
physOp.addOriginalLocation(loDistinct.getAlias(), loDistinct.getLocation());
currentPlan.add(physOp);
physOp.setResultType(DataType.BAG);
Modified: pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java?rev=1518039&r1=1518038&r2=1518039&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java Wed Aug 28 00:23:32 2013
@@ -555,7 +555,7 @@ public class TestEvalPipeline2 {
// See PIG-282
@Test
public void testCustomPartitionerGroups() throws Exception{
- String[] input = {
+ String[] input = {
"1\t1",
"2\t1",
"3\t1",
@@ -567,37 +567,76 @@ public class TestEvalPipeline2 {
// It should be noted that for a map reduce job, the total number of partitions
// is the same as the number of reduce tasks for the job. Hence we need to find a case wherein
- // we will get more than one reduce job so that we can use the partitioner.
+ // we will get more than one reduce job so that we can use the partitioner.
// The following logic assumes that we get 2 reduce jobs, so that we can hard-code the logic.
+ // SimpleCustomPartitioner3 simply returns '1' (second reducer) for all inputs when
+ // partition number is bigger than 1.
//
- pigServer.registerQuery("B = group A by $0 PARTITION BY org.apache.pig.test.utils.SimpleCustomPartitioner parallel 2;");
+ pigServer.registerQuery("B = group A by $0 PARTITION BY org.apache.pig.test.utils.SimpleCustomPartitioner3 parallel 2;");
pigServer.store("B", "tmp_testCustomPartitionerGroups");
new File("tmp_testCustomPartitionerGroups").mkdir();
- // SimpleCustomPartitioner partitions as per the parity of the key
- // Need to change this in SimpleCustomPartitioner is changed
Util.copyFromClusterToLocal(cluster, "tmp_testCustomPartitionerGroups/part-r-00000", "tmp_testCustomPartitionerGroups/part-r-00000");
BufferedReader reader = new BufferedReader(new FileReader("tmp_testCustomPartitionerGroups/part-r-00000"));
- String line = null;
- while((line = reader.readLine()) != null) {
- String[] cols = line.split("\t");
- int value = Integer.parseInt(cols[0]) % 2;
- Assert.assertEquals(0, value);
- }
- Util.copyFromClusterToLocal(cluster, "tmp_testCustomPartitionerGroups/part-r-00001", "tmp_testCustomPartitionerGroups/part-r-00001");
+ String line = null;
+ while((line = reader.readLine()) != null) {
+ Assert.fail("Partition 0 should be empty. Most likely Custom Partitioner was not used.");
+ }
+ Util.copyFromClusterToLocal(cluster, "tmp_testCustomPartitionerGroups/part-r-00001", "tmp_testCustomPartitionerGroups/part-r-00001");
reader = new BufferedReader(new FileReader("tmp_testCustomPartitionerGroups/part-r-00001"));
- line = null;
- while((line = reader.readLine()) != null) {
- String[] cols = line.split("\t");
- int value = Integer.parseInt(cols[0]) % 2;
- Assert.assertEquals(1, value);
- }
+ line = null;
+ int count=0;
+ while((line = reader.readLine()) != null) {
+ //all outputs should come to partion 1 (with SimpleCustomPartitioner3)
+ count++;
+ }
+ Assert.assertEquals(4, count);
Util.deleteDirectory(new File("tmp_testCustomPartitionerGroups"));
+ Util.deleteFile(cluster, "tmp_testCustomPartitionerGroups");
Util.deleteFile(cluster, "table_testCustomPartitionerGroups");
}
-
+
+ // See PIG-3385
+ @Test
+ public void testCustomPartitionerDistinct() throws Exception{
+ String[] input = {
+ "1\t1",
+ "2\t1",
+ "1\t1",
+ "3\t1",
+ "4\t1",
+ };
+ Util.createInputFile(cluster, "table_testCustomPartitionerDistinct", input);
+
+ pigServer.registerQuery("A = LOAD 'table_testCustomPartitionerDistinct' as (a0:int, a1:int);");
+ pigServer.registerQuery("B = distinct A PARTITION BY org.apache.pig.test.utils.SimpleCustomPartitioner3 parallel 2;");
+ pigServer.store("B", "tmp_testCustomPartitionerDistinct");
+
+ new File("tmp_testCustomPartitionerDistinct").mkdir();
+
+ // SimpleCustomPartitioner3 simply partition all inputs to *second* reducer
+ Util.copyFromClusterToLocal(cluster, "tmp_testCustomPartitionerDistinct/part-r-00000", "tmp_testCustomPartitionerDistinct/part-r-00000");
+ BufferedReader reader = new BufferedReader(new FileReader("tmp_testCustomPartitionerDistinct/part-r-00000"));
+ String line = null;
+ while((line = reader.readLine()) != null) {
+ Assert.fail("Partition 0 should be empty. Most likely Custom Partitioner was not used.");
+ }
+ Util.copyFromClusterToLocal(cluster, "tmp_testCustomPartitionerDistinct/part-r-00001", "tmp_testCustomPartitionerDistinct/part-r-00001");
+ reader = new BufferedReader(new FileReader("tmp_testCustomPartitionerDistinct/part-r-00001"));
+ line = null;
+ int count=0;
+ while((line = reader.readLine()) != null) {
+ //all outputs should come to partion 1 (with SimpleCustomPartitioner3)
+ count++;
+ }
+ Assert.assertEquals(4, count);
+ Util.deleteDirectory(new File("tmp_testCustomPartitionerDistinct"));
+ Util.deleteFile(cluster, "tmp_testCustomPartitionerDistinct");
+ Util.deleteFile(cluster, "table_testCustomPartitionerDistinct");
+ }
+
// See PIG-282
@Test
public void testCustomPartitionerCross() throws Exception{
Added: pig/trunk/test/org/apache/pig/test/utils/SimpleCustomPartitioner3.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/utils/SimpleCustomPartitioner3.java?rev=1518039&view=auto
==============================================================================
--- pig/trunk/test/org/apache/pig/test/utils/SimpleCustomPartitioner3.java (added)
+++ pig/trunk/test/org/apache/pig/test/utils/SimpleCustomPartitioner3.java Wed Aug 28 00:23:32 2013
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test.utils;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.Partitioner;
+import org.apache.pig.impl.io.PigNullableWritable;
+
+public class SimpleCustomPartitioner3 extends Partitioner<PigNullableWritable, Writable> {
+ @Override
+ public int getPartition(PigNullableWritable key, Writable value, int numPartitions) {
+ return numPartitions >= 1 ? 1 : 0;
+ }
+}