You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2010/06/07 06:58:30 UTC
svn commit: r952098 - in /hadoop/pig/trunk: ./
src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/
src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/
src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relational...
Author: daijy
Date: Mon Jun 7 04:58:30 2010
New Revision: 952098
URL: http://svn.apache.org/viewvc?rev=952098&view=rev
Log:
PIG-282: Custom Partitioner
Added:
hadoop/pig/trunk/test/org/apache/pig/test/utils/SimpleCustomPartitioner.java
Modified:
hadoop/pig/trunk/CHANGES.txt
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POGlobalRearrange.java
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LogicalOperator.java
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
hadoop/pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java
Modified: hadoop/pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=952098&r1=952097&r2=952098&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Mon Jun 7 04:58:30 2010
@@ -24,6 +24,8 @@ INCOMPATIBLE CHANGES
IMPROVEMENTS
+PIG-282: Custom Partitioner (aniket486 via daijy)
+
PIG-283: Allow to set arbitrary jobconf key-value pairs inside pig program (hashutosh)
PIG-1373: We need to add jdiff output to docs on the website (daijy)
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=952098&r1=952097&r2=952098&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java Mon Jun 7 04:58:30 2010
@@ -376,7 +376,7 @@ public class JobControlCompiler{
}
}
- //Create the jar of all functions reuired
+ //Create the jar of all functions and classes required
File submitJarFile = File.createTempFile("Job", ".jar");
// ensure the job jar is deleted on exit
submitJarFile.deleteOnExit();
@@ -530,6 +530,8 @@ public class JobControlCompiler{
nwJob.setReducerClass(PigMapReduce.Reduce.class);
if (mro.requestedParallelism>0)
nwJob.setNumReduceTasks(mro.requestedParallelism);
+ if (mro.customPartitioner != null)
+ nwJob.setPartitionerClass(PigContext.resolveClassName(mro.customPartitioner));
conf.set("pig.mapPlan", ObjectSerializer.serialize(mro.mapPlan));
if(mro.isEndOfAllInputSetInMap()) {
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=952098&r1=952097&r2=952098&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java Mon Jun 7 04:58:30 2010
@@ -30,7 +30,6 @@ import java.util.Random;
import java.util.Set;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.mapred.JobConf;
import org.apache.pig.CollectableLoadFunc;
import org.apache.pig.ExecType;
import org.apache.pig.FuncSpec;
@@ -986,6 +985,7 @@ public class MRCompiler extends PhyPlanV
public void visitGlobalRearrange(POGlobalRearrange op) throws VisitorException{
try{
blocking(op);
+ curMROp.customPartitioner = op.getCustomPartitioner();
phyToMROpMap.put(op, curMROp);
}catch(Exception e){
int errCode = 2034;
@@ -1673,7 +1673,8 @@ public class MRCompiler extends PhyPlanV
// create POGlobalRearrange
- POGlobalRearrange gr = new POGlobalRearrange(new OperatorKey(scope,nig.getNextNodeId(scope)), rp);
+ POGlobalRearrange gr = new POGlobalRearrange(new OperatorKey(scope,nig.getNextNodeId(scope)), rp);
+ // Skewed join has its own special partitioner
gr.setResultType(DataType.TUPLE);
gr.visit(this);
if(gr.getRequestedParallelism() > curMROp.requestedParallelism)
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java?rev=952098&r1=952097&r2=952098&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java Mon Jun 7 04:58:30 2010
@@ -112,6 +112,9 @@ public class MapReduceOper extends Opera
int requestedParallelism = -1;
+ /* Name of the Custom Partitioner used */
+ String customPartitioner = null;
+
// Last POLimit value in this map reduce operator, needed by LimitAdjuster
// to add additional map reduce operator with 1 reducer after this
long limit = -1;
@@ -367,6 +370,10 @@ public class MapReduceOper extends Opera
public int getRequestedParallelism() {
return requestedParallelism;
}
+
+ public String getCustomPartitioner() {
+ return customPartitioner;
+ }
public void setSplitter(boolean spl) {
splitter = spl;
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java?rev=952098&r1=952097&r2=952098&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java Mon Jun 7 04:58:30 2010
@@ -530,6 +530,7 @@ public class LogToPhyTranslationVisitor
POGlobalRearrange poGlobal = new POGlobalRearrange(new OperatorKey(
scope, nodeGen.getNextNodeId(scope)), cs
.getRequestedParallelism());
+ poGlobal.setCustomPartitioner(cs.getCustomPartitioner());
poGlobal.setAlias(cs.getAlias());
POPackage poPackage = new POPackage(new OperatorKey(scope, nodeGen
.getNextNodeId(scope)), cs.getRequestedParallelism());
@@ -667,9 +668,9 @@ public class LogToPhyTranslationVisitor
case REGULAR:
POPackage poPackage = compileToLR_GR_PackTrio(cg.getOperatorKey().scope,
- cg.getInputs(),cg.getRequestedParallelism(),cg.getAlias(),
- cg.getInner(),cg.getGroupByPlans());
-
+ cg.getInputs(), cg.getRequestedParallelism(), cg.getCustomPartitioner(),
+ cg.getAlias(), cg.getInner(),cg.getGroupByPlans());
+
logToPhyMap.put(cg, poPackage);
break;
@@ -769,12 +770,13 @@ public class LogToPhyTranslationVisitor
}
private POPackage compileToLR_GR_PackTrio(String scope,List<LogicalOperator> inputs,
- int parallel, String alias, boolean[] innerFlags, MultiMap<LogicalOperator,
+ int parallel, String customPartitioner, String alias, boolean[] innerFlags, MultiMap<LogicalOperator,
LogicalPlan> innerPlans) throws VisitorException {
POGlobalRearrange poGlobal = new POGlobalRearrange(new OperatorKey(
scope, nodeGen.getNextNodeId(scope)), parallel);
poGlobal.setAlias(alias);
+ poGlobal.setCustomPartitioner(customPartitioner);
POPackage poPackage = new POPackage(new OperatorKey(scope, nodeGen
.getNextNodeId(scope)), parallel);
poPackage.setAlias(alias);
@@ -1118,8 +1120,7 @@ public class LogToPhyTranslationVisitor
return;
}
else if (loj.getJoinType() == LOJoin.JOINTYPE.HASH){
-
- POPackage poPackage = compileToLR_GR_PackTrio(scope, inputs, parallel, alias, innerFlags, loj.getJoinPlans());
+ POPackage poPackage = compileToLR_GR_PackTrio(scope, inputs, parallel, loj.getCustomPartitioner(), alias, innerFlags, loj.getJoinPlans());
POForEach fe = compileFE4Flattening(innerFlags, scope, parallel, alias, inputs);
currentPlan.add(fe);
try {
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POGlobalRearrange.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POGlobalRearrange.java?rev=952098&r1=952097&r2=952098&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POGlobalRearrange.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POGlobalRearrange.java Mon Jun 7 04:58:30 2010
@@ -43,6 +43,19 @@ public class POGlobalRearrange extends P
*
*/
private static final long serialVersionUID = 1L;
+
+ /* As, GlobalRearrange decides the map reduce boundary, we add custom
+ * partitioner here
+ */
+ protected String customPartitioner;
+
+ public String getCustomPartitioner() {
+ return customPartitioner;
+ }
+
+ public void setCustomPartitioner(String customPartitioner) {
+ this.customPartitioner = customPartitioner;
+ }
public POGlobalRearrange(OperatorKey k) {
this(k, -1, null);
@@ -51,7 +64,7 @@ public class POGlobalRearrange extends P
public POGlobalRearrange(OperatorKey k, int rp) {
this(k, rp, null);
}
-
+
public POGlobalRearrange(OperatorKey k, List inp) {
this(k, -1, null);
}
@@ -59,7 +72,7 @@ public class POGlobalRearrange extends P
public POGlobalRearrange(OperatorKey k, int rp, List inp) {
super(k, rp, inp);
}
-
+
@Override
public void visit(PhyPlanVisitor v) throws VisitorException {
v.visitGlobalRearrange(this);
Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LogicalOperator.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LogicalOperator.java?rev=952098&r1=952097&r2=952098&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LogicalOperator.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LogicalOperator.java Mon Jun 7 04:58:30 2010
@@ -89,9 +89,22 @@ abstract public class LogicalOperator ex
* by the user or can be chosen at runtime by the optimizer.
*/
protected HashSet<Integer> mPinnedOptions = new HashSet<Integer>();
+
+ /**
+ * Name of the customPartitioner if one is used, this is set to null otherwise.
+ */
+ protected String mCustomPartitioner = null;
- private static Log log = LogFactory.getLog(LogicalOperator.class);
+ public String getCustomPartitioner() {
+ return mCustomPartitioner;
+ }
+
+ public void setCustomPartitioner(String customPartitioner) {
+ this.mCustomPartitioner = customPartitioner;
+ }
+
+ private static Log log = LogFactory.getLog(LogicalOperator.class);
/**
* Equivalent to LogicalOperator(k, 0).
Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt?rev=952098&r1=952097&r2=952098&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt Mon Jun 7 04:58:30 2010
@@ -33,6 +33,7 @@ package org.apache.pig.impl.logicalLayer
import java.io.*;
import java.util.*;
import java.net.URI;
+import java.lang.Class;
import java.net.URISyntaxException;
import java.lang.reflect.Type;
import org.apache.pig.impl.logicalLayer.*;
@@ -71,7 +72,9 @@ import org.apache.pig.backend.datastorag
import org.apache.pig.backend.datastorage.ContainerDescriptor;
import org.apache.pig.backend.datastorage.ElementDescriptor;
import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
+import org.apache.pig.backend.executionengine.ExecException;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.pig.impl.util.LinkedMultiMap;
public class QueryParser {
@@ -900,6 +903,23 @@ class FunctionType {
throw new Exception("Received an unknown function type: " + funcType);
}
}
+};
+
+class ClassType {
+ public static final byte UNKNOWNCLASS = 0;
+ public static final byte PARTITIONER = 2;
+
+ public static void checkClassType(Class cs, byte classType) throws Exception {
+ switch(classType) {
+ case ClassType.PARTITIONER:
+ if(!(cs.newInstance() instanceof Partitioner)) {
+ throw new Exception("Not a class of org.apache.hadoop.mapreduce.Partitioner");
+ }
+ break;
+ default:
+ throw new Exception("Received an unknown class type: " + classType);
+ }
+ }
}
PARSER_END(QueryParser)
@@ -942,6 +962,7 @@ TOKEN : { <INNER : "inner"> }
TOKEN : { <OUTER : "outer"> }
TOKEN : { <STAR : "*"> }
TOKEN : { <PARALLEL : "parallel"> }
+TOKEN : { <PARTITION : "partition by"> }
TOKEN : { <GROUP : "group"> }
TOKEN : { <AND : "and"> }
TOKEN : { <OR : "or"> }
@@ -1124,7 +1145,6 @@ LogicalOperator SplitClause(LogicalPlan
{log.trace("Exiting SplitClause"); return splitOp;}
}
-
LogicalOperator Expr(LogicalPlan lp) :
{
LogicalOperator op;
@@ -1164,7 +1184,7 @@ Token IdentifierOrReserved() :
}
{
(
- ( t1 = <DEFINE> )
+ ( t1 = <DEFINE> )
| (t1 = <LOAD> )
| (t1 =<FILTER> )
| (t1 =<FOREACH> )
@@ -1187,6 +1207,7 @@ Token IdentifierOrReserved() :
| (t1 =<INNER> )
| (t1 =<OUTER> )
| (t1 =<PARALLEL> )
+| (t1 =<PARTITION>)
| (t1 =<GROUP> )
| (t1 =<AND> )
| (t1 =<OR> )
@@ -1260,6 +1281,7 @@ LogicalOperator BaseExpr(LogicalPlan lp)
Token t1, t2;
Schema.FieldSchema fs;
log.trace("Entering BaseExpr");
+ String partitioner = null;
}
{
(
@@ -1289,9 +1311,11 @@ LogicalOperator BaseExpr(LogicalPlan lp)
| (<SAMPLE> op = SampleClause(lp))
| (<ORDER> op = OrderClause(lp))
| (<DISTINCT> op = NestedExpr(lp)
+ ([<PARTITION> (partitioner = EvalClass(ClassType.PARTITIONER))])
{
LogicalOperator distinct = new LODistinct(lp, new OperatorKey(scope, getNextId()));
lp.add(distinct);
+ distinct.setCustomPartitioner(partitioner);
log.debug("Added operator: " + distinct.getClass().getName() + " to the logical plan");
lp.connect(op, distinct);
log.debug("Connected alias: " + op.getAlias() + " operator " + op.getClass().getName() + " to operator " + distinct.getClass().getName());
@@ -1798,6 +1822,7 @@ LogicalOperator CogroupClause(LogicalPla
LogicalOperator cogroup = null;
log.trace("Entering CoGroupClause");
Token t;
+ String partitioner = null;
}
{
(gi = GroupItem(lp) { gis.add(gi); }
@@ -1817,15 +1842,18 @@ LogicalOperator CogroupClause(LogicalPla
cogroup = parseUsingForGroupBy("merge", gis, lp);
}
)])
+ ([<PARTITION> (partitioner = EvalClass(ClassType.PARTITIONER))])
)
-
{
if (cogroup != null) {
+ cogroup.setCustomPartitioner(partitioner);
log.trace("Exiting CoGroupClause");
return cogroup;
}
-
cogroup = parseCogroup(gis, lp, LOCogroup.GROUPTYPE.REGULAR);
+ if(cogroup != null) {
+ cogroup.setCustomPartitioner(partitioner);
+ }
log.trace("Exiting CoGroupClause");
return cogroup;
}
@@ -2079,7 +2107,8 @@ int ColNameOrNum(Schema over) :
LogicalOperator CrossClause(LogicalPlan lp) :
{
LogicalOperator op;
- ArrayList<LogicalOperator> inputs = new ArrayList<LogicalOperator>();
+ ArrayList<LogicalOperator> inputs = new ArrayList<LogicalOperator>();
+ String partitioner = null;
log.trace("Entering CrossClause");
}
{
@@ -2087,9 +2116,11 @@ LogicalOperator CrossClause(LogicalPlan
op = NestedExpr(lp) { inputs.add(op); }
("," op = NestedExpr(lp) { inputs.add(op); })+
)
+ ([<PARTITION> (partitioner = EvalClass(ClassType.PARTITIONER))])
{
LogicalOperator cross = new LOCross(lp, new OperatorKey(scope, getNextId()));
lp.add(cross);
+ cross.setCustomPartitioner(partitioner);
log.debug("Added operator " + cross.getClass().getName() + " to the logical plan");
for (LogicalOperator lop: inputs) {
@@ -2115,6 +2146,7 @@ LogicalOperator JoinClause(LogicalPlan l
boolean isFullOuter = false;
boolean isOuter = false;
Token t;
+ String partitioner = null;
}
{
(gi = JoinItem(lp) { gis.add(gi); }
@@ -2126,7 +2158,6 @@ LogicalOperator JoinClause(LogicalPlan l
(<FULL> [<OUTER>] {isFullOuter = true;})
]
("," gi = JoinItem(lp) { gis.add(gi); })+
-
{
// in the case of outer joins, only two
// inputs are allowed
@@ -2182,15 +2213,23 @@ LogicalOperator JoinClause(LogicalPlan l
joinOp = parseUsingForJoin("hash", gis, lp, isFullOuter, isRightOuter, isOuter);
}
)]))
-
- {log.trace("Exiting JoinClause");
- if (joinOp!=null) {
+ ([<PARTITION> (partitioner = EvalClass(ClassType.PARTITIONER))])
+
+ {
+ log.trace("Exiting JoinClause");
+ if (joinOp == null) {
+ joinOp = parseJoin(gis, lp, LOJoin.JOINTYPE.HASH);
+ }
+ if(partitioner != null) {
+ if(((LOJoin)joinOp).getJoinType() == LOJoin.JOINTYPE.SKEWED) {
+ throw new ParseException("Custom Partitioner is not supported for skewed join");
+ }
+ else {
+ joinOp.setCustomPartitioner(partitioner);
+ }
+ }
return joinOp;
}
- else {
- return parseJoin(gis, lp, LOJoin.JOINTYPE.HASH);
- }}
-
}
LogicalOperator UnionClause(LogicalPlan lp) :
@@ -3703,6 +3742,35 @@ Object EvalFunction(byte funcType) :
}
}
+String EvalClass(byte classType) :
+{
+ String className;
+ Class cs;
+ log.trace("Entering EvalClass");
+}
+{
+ className = QualifiedFunction()
+ {
+ cs = PigContext.resolveClassName(className);
+ try {
+ ClassType.checkClassType(cs, classType);
+ }
+ catch (ExecException e) {
+ ParseException pe = new ParseException("Class " + className + " not found");
+ pe.initCause(e);
+ throw pe;
+ }
+ catch (Exception e){
+ ParseException pe = new ParseException(e.getMessage());
+ pe.initCause(e);
+ throw pe;
+ }
+ log.trace("Exiting EvalClass");
+
+ return className;
+ }
+}
+
/**
* Bug 831620 - '$' support
*/
Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java?rev=952098&r1=952097&r2=952098&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java Mon Jun 7 04:58:30 2010
@@ -17,11 +17,15 @@
*/
package org.apache.pig.test;
+import java.io.BufferedReader;
import java.io.File;
import java.io.FileOutputStream;
+import java.io.FileReader;
import java.io.IOException;
import java.io.PrintStream;
+import java.util.Collection;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.Random;
@@ -36,6 +40,7 @@ import org.apache.pig.data.DataByteArray
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.io.FileLocalizer;
+import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.test.utils.Identity;
import org.junit.AfterClass;
import org.junit.Before;
@@ -489,5 +494,154 @@ public class TestEvalPipeline2 extends T
Util.deleteFile(cluster, "table_testNestedDescSort");
}
-
+
+ // See PIG-282
+ @Test
+ public void testCustomPartitionerParseJoins() throws Exception{
+ String[] input = {
+ "1\t3",
+ "1\t2"
+ };
+ Util.createInputFile(cluster, "table_testCustomPartitionerParseJoins", input);
+
+ pigServer.registerQuery("A = LOAD 'table_testCustomPartitionerParseJoins' as (a0:int, a1:int);");
+
+ pigServer.registerQuery("B = ORDER A by $0;");
+
+ // Custom Partitioner is not allowed for skewed joins, will throw a ExecException
+ try {
+ pigServer.registerQuery("skewed = JOIN A by $0, B by $0 USING 'skewed' PARTITION BY org.apache.pig.test.utils.SimpleCustomPartitioner;");
+ //control should not reach here
+ fail("Skewed join cannot accept a custom partitioner");
+ }
+ catch (FrontendException e) {
+ assertTrue(e.getErrorCode() == 1000);
+ }
+
+ pigServer.registerQuery("hash = JOIN A by $0, B by $0 USING 'hash' PARTITION BY org.apache.pig.test.utils.SimpleCustomPartitioner;");
+ Iterator<Tuple> iter = pigServer.openIterator("hash");
+ Tuple t;
+
+ Collection<String> results = new HashSet<String>();
+ results.add("(1,3,1,2)");
+ results.add("(1,3,1,3)");
+ results.add("(1,2,1,2)");
+ results.add("(1,2,1,3)");
+
+ assertTrue(iter.hasNext());
+ t = iter.next();
+ assertTrue(t.size()==4);
+ assertTrue(results.contains(t.toString()));
+
+ assertTrue(iter.hasNext());
+ t = iter.next();
+ assertTrue(t.size()==4);
+ assertTrue(results.contains(t.toString()));
+
+ assertTrue(iter.hasNext());
+ t = iter.next();
+ assertTrue(t.size()==4);
+ assertTrue(results.contains(t.toString()));
+
+ assertTrue(iter.hasNext());
+ t = iter.next();
+ assertTrue(t.size()==4);
+ assertTrue(results.contains(t.toString()));
+
+ // No checks are made for merged and replicated joins as they are compiled to a map only job
+ // No frontend error checking has been added for these jobs, hence not adding any test cases
+ // Manually tested the sanity once. Above test should cover the basic sanity of the scenario
+
+ Util.deleteFile(cluster, "table_testCustomPartitionerParseJoins");
+ }
+
+ // See PIG-282
+ @Test
+ public void testCustomPartitionerGroups() throws Exception{
+ String[] input = {
+ "1\t1",
+ "2\t1",
+ "3\t1",
+ "4\t1"
+ };
+ Util.createInputFile(cluster, "table_testCustomPartitionerGroups", input);
+
+ pigServer.registerQuery("A = LOAD 'table_testCustomPartitionerGroups' as (a0:int, a1:int);");
+
+ // It should be noted that for a map reduce job, the total number of partitions
+ // is the same as the number of reduce tasks for the job. Hence we need to find a case wherein
+ // we will get more than one reduce job so that we can use the partitioner.
+ // The following logic assumes that we get 2 reduce jobs, so that we can hard-code the logic.
+ //
+ pigServer.registerQuery("B = group A by $0 PARTITION BY org.apache.pig.test.utils.SimpleCustomPartitioner parallel 2;");
+
+ pigServer.store("B", "tmp_testCustomPartitionerGroups");
+
+ new File("tmp_testCustomPartitionerGroups").mkdir();
+
+ // SimpleCustomPartitioner partitions as per the parity of the key
+ // Need to change this in SimpleCustomPartitioner is changed
+ Util.copyFromClusterToLocal(cluster, "tmp_testCustomPartitionerGroups/part-r-00000", "tmp_testCustomPartitionerGroups/part-r-00000");
+ BufferedReader reader = new BufferedReader(new FileReader("tmp_testCustomPartitionerGroups/part-r-00000"));
+ String line = null;
+ while((line = reader.readLine()) != null) {
+ String[] cols = line.split("\t");
+ int value = Integer.parseInt(cols[0]) % 2;
+ assertEquals(0, value);
+ }
+ Util.copyFromClusterToLocal(cluster, "tmp_testCustomPartitionerGroups/part-r-00001", "tmp_testCustomPartitionerGroups/part-r-00001");
+ reader = new BufferedReader(new FileReader("tmp_testCustomPartitionerGroups/part-r-00001"));
+ line = null;
+ while((line = reader.readLine()) != null) {
+ String[] cols = line.split("\t");
+ int value = Integer.parseInt(cols[0]) % 2;
+ assertEquals(1, value);
+ }
+ Util.deleteDirectory(new File("tmp_testCustomPartitionerGroups"));
+ Util.deleteFile(cluster, "table_testCustomPartitionerGroups");
+ }
+
+ // See PIG-282
+ @Test
+ public void testCustomPartitionerCross() throws Exception{
+ String[] input = {
+ "1\t3",
+ "1\t2",
+ };
+
+ Util.createInputFile(cluster, "table_testCustomPartitionerCross", input);
+ pigServer.registerQuery("A = LOAD 'table_testCustomPartitionerCross' as (a0:int, a1:int);");
+ pigServer.registerQuery("B = ORDER A by $0;");
+ pigServer.registerQuery("C = cross A , B PARTITION BY org.apache.pig.test.utils.SimpleCustomPartitioner;");
+ Iterator<Tuple> iter = pigServer.openIterator("C");
+ Tuple t;
+
+ Collection<String> results = new HashSet<String>();
+ results.add("(1,3,1,2)");
+ results.add("(1,3,1,3)");
+ results.add("(1,2,1,2)");
+ results.add("(1,2,1,3)");
+
+ assertTrue(iter.hasNext());
+ t = iter.next();
+ assertTrue(t.size()==4);
+ assertTrue(results.contains(t.toString()));
+
+ assertTrue(iter.hasNext());
+ t = iter.next();
+ assertTrue(t.size()==4);
+ assertTrue(results.contains(t.toString()));
+
+ assertTrue(iter.hasNext());
+ t = iter.next();
+ assertTrue(t.size()==4);
+ assertTrue(results.contains(t.toString()));
+
+ assertTrue(iter.hasNext());
+ t = iter.next();
+ assertTrue(t.size()==4);
+ assertTrue(results.contains(t.toString()));
+
+ Util.deleteFile(cluster, "table_testCustomPartitionerCross");
+ }
}
Added: hadoop/pig/trunk/test/org/apache/pig/test/utils/SimpleCustomPartitioner.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/utils/SimpleCustomPartitioner.java?rev=952098&view=auto
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/utils/SimpleCustomPartitioner.java (added)
+++ hadoop/pig/trunk/test/org/apache/pig/test/utils/SimpleCustomPartitioner.java Mon Jun 7 04:58:30 2010
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test.utils;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.Partitioner;
+import org.apache.pig.impl.io.PigNullableWritable;
+
+public class SimpleCustomPartitioner extends Partitioner<PigNullableWritable, Writable> {
+
+ @Override
+ public int getPartition(PigNullableWritable key, Writable value, int numPartitions) {
+ if(key.getValueAsPigType() instanceof Integer) {
+ int ret = (((Integer)key.getValueAsPigType()).intValue() % numPartitions);
+ return ret;
+ }
+ else {
+ return (key.hashCode()) % numPartitions;
+ }
+ }
+}