You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2010/06/07 06:58:30 UTC

svn commit: r952098 - in /hadoop/pig/trunk: ./ src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/ src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relational...

Author: daijy
Date: Mon Jun  7 04:58:30 2010
New Revision: 952098

URL: http://svn.apache.org/viewvc?rev=952098&view=rev
Log:
PIG-282: Custom Partitioner

Added:
    hadoop/pig/trunk/test/org/apache/pig/test/utils/SimpleCustomPartitioner.java
Modified:
    hadoop/pig/trunk/CHANGES.txt
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POGlobalRearrange.java
    hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LogicalOperator.java
    hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
    hadoop/pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java

Modified: hadoop/pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=952098&r1=952097&r2=952098&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Mon Jun  7 04:58:30 2010
@@ -24,6 +24,8 @@ INCOMPATIBLE CHANGES
 
 IMPROVEMENTS
 
+PIG-282: Custom Partitioner (aniket486 via daijy)
+
 PIG-283: Allow to set arbitrary jobconf key-value pairs inside pig program (hashutosh)
 
 PIG-1373: We need to add jdiff output to docs on the website (daijy)

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=952098&r1=952097&r2=952098&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java Mon Jun  7 04:58:30 2010
@@ -376,7 +376,7 @@ public class JobControlCompiler{
                 }
             }
 
-            //Create the jar of all functions reuired
+            //Create the jar of all functions and classes required
             File submitJarFile = File.createTempFile("Job", ".jar");
             // ensure the job jar is deleted on exit
             submitJarFile.deleteOnExit();
@@ -530,6 +530,8 @@ public class JobControlCompiler{
                 nwJob.setReducerClass(PigMapReduce.Reduce.class);
                 if (mro.requestedParallelism>0)
                     nwJob.setNumReduceTasks(mro.requestedParallelism);
+                if (mro.customPartitioner != null)
+                	nwJob.setPartitionerClass(PigContext.resolveClassName(mro.customPartitioner));
 
                 conf.set("pig.mapPlan", ObjectSerializer.serialize(mro.mapPlan));
                 if(mro.isEndOfAllInputSetInMap()) {

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=952098&r1=952097&r2=952098&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java Mon Jun  7 04:58:30 2010
@@ -30,7 +30,6 @@ import java.util.Random;
 import java.util.Set;
 
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.mapred.JobConf;
 import org.apache.pig.CollectableLoadFunc;
 import org.apache.pig.ExecType;
 import org.apache.pig.FuncSpec;
@@ -986,6 +985,7 @@ public class MRCompiler extends PhyPlanV
     public void visitGlobalRearrange(POGlobalRearrange op) throws VisitorException{
         try{
             blocking(op);
+            curMROp.customPartitioner = op.getCustomPartitioner();
             phyToMROpMap.put(op, curMROp);
         }catch(Exception e){
             int errCode = 2034;
@@ -1673,7 +1673,8 @@ public class MRCompiler extends PhyPlanV
 					   
 			
 			// create POGlobalRearrange
-			POGlobalRearrange gr = new POGlobalRearrange(new OperatorKey(scope,nig.getNextNodeId(scope)), rp);                          
+			POGlobalRearrange gr = new POGlobalRearrange(new OperatorKey(scope,nig.getNextNodeId(scope)), rp);
+			// Skewed join has its own special partitioner 
 			gr.setResultType(DataType.TUPLE);
 			gr.visit(this);
 			if(gr.getRequestedParallelism() > curMROp.requestedParallelism)

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java?rev=952098&r1=952097&r2=952098&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java Mon Jun  7 04:58:30 2010
@@ -112,6 +112,9 @@ public class MapReduceOper extends Opera
     
     int requestedParallelism = -1;
     
+    /* Name of the Custom Partitioner used */ 
+    String customPartitioner = null;
+    
     // Last POLimit value in this map reduce operator, needed by LimitAdjuster
     // to add additional map reduce operator with 1 reducer after this
     long limit = -1;
@@ -367,6 +370,10 @@ public class MapReduceOper extends Opera
     public int getRequestedParallelism() {
         return requestedParallelism;
     }
+    
+    public String getCustomPartitioner() {
+    	return customPartitioner;
+    }
 
     public void setSplitter(boolean spl) {
         splitter = spl;

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java?rev=952098&r1=952097&r2=952098&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java Mon Jun  7 04:58:30 2010
@@ -530,6 +530,7 @@ public class LogToPhyTranslationVisitor 
         POGlobalRearrange poGlobal = new POGlobalRearrange(new OperatorKey(
                 scope, nodeGen.getNextNodeId(scope)), cs
                 .getRequestedParallelism());
+        poGlobal.setCustomPartitioner(cs.getCustomPartitioner());
         poGlobal.setAlias(cs.getAlias());
         POPackage poPackage = new POPackage(new OperatorKey(scope, nodeGen
                 .getNextNodeId(scope)), cs.getRequestedParallelism());
@@ -667,9 +668,9 @@ public class LogToPhyTranslationVisitor 
 
         case REGULAR:
             POPackage poPackage = compileToLR_GR_PackTrio(cg.getOperatorKey().scope,
-                    cg.getInputs(),cg.getRequestedParallelism(),cg.getAlias(),
-                    cg.getInner(),cg.getGroupByPlans());
-
+                    cg.getInputs(), cg.getRequestedParallelism(), cg.getCustomPartitioner(),
+                    cg.getAlias(), cg.getInner(),cg.getGroupByPlans());
+            
             logToPhyMap.put(cg, poPackage);
             break;
             
@@ -769,12 +770,13 @@ public class LogToPhyTranslationVisitor 
     }
     
     private POPackage compileToLR_GR_PackTrio(String scope,List<LogicalOperator> inputs,
-            int parallel, String alias, boolean[] innerFlags, MultiMap<LogicalOperator, 
+            int parallel, String customPartitioner, String alias, boolean[] innerFlags, MultiMap<LogicalOperator, 
             LogicalPlan> innerPlans) throws VisitorException {
 
         POGlobalRearrange poGlobal = new POGlobalRearrange(new OperatorKey(
                 scope, nodeGen.getNextNodeId(scope)), parallel);
         poGlobal.setAlias(alias);
+        poGlobal.setCustomPartitioner(customPartitioner);
         POPackage poPackage = new POPackage(new OperatorKey(scope, nodeGen
                 .getNextNodeId(scope)), parallel);
         poPackage.setAlias(alias);
@@ -1118,8 +1120,7 @@ public class LogToPhyTranslationVisitor 
             return;
         }
 		else if (loj.getJoinType() == LOJoin.JOINTYPE.HASH){
-		    
-		    POPackage poPackage = compileToLR_GR_PackTrio(scope, inputs, parallel, alias, innerFlags, loj.getJoinPlans());
+		    POPackage poPackage = compileToLR_GR_PackTrio(scope, inputs, parallel, loj.getCustomPartitioner(), alias, innerFlags, loj.getJoinPlans());
 	        POForEach fe = compileFE4Flattening(innerFlags,  scope, parallel, alias, inputs);
             currentPlan.add(fe);
             try {

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POGlobalRearrange.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POGlobalRearrange.java?rev=952098&r1=952097&r2=952098&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POGlobalRearrange.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POGlobalRearrange.java Mon Jun  7 04:58:30 2010
@@ -43,6 +43,19 @@ public class POGlobalRearrange extends P
      * 
      */
     private static final long serialVersionUID = 1L;
+    
+    /* As, GlobalRearrange decides the map reduce boundary, we add custom
+     * partitioner here
+     */
+    protected String customPartitioner;
+
+    public String getCustomPartitioner() {
+		return customPartitioner;
+	}
+
+	public void setCustomPartitioner(String customPartitioner) {
+		this.customPartitioner = customPartitioner;
+	}
 
     public POGlobalRearrange(OperatorKey k) {
         this(k, -1, null);
@@ -51,7 +64,7 @@ public class POGlobalRearrange extends P
     public POGlobalRearrange(OperatorKey k, int rp) {
         this(k, rp, null);
     }
-
+    
     public POGlobalRearrange(OperatorKey k, List inp) {
         this(k, -1, null);
     }
@@ -59,7 +72,7 @@ public class POGlobalRearrange extends P
     public POGlobalRearrange(OperatorKey k, int rp, List inp) {
         super(k, rp, inp);
     }
-
+    
     @Override
     public void visit(PhyPlanVisitor v) throws VisitorException {
         v.visitGlobalRearrange(this);

Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LogicalOperator.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LogicalOperator.java?rev=952098&r1=952097&r2=952098&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LogicalOperator.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LogicalOperator.java Mon Jun  7 04:58:30 2010
@@ -89,9 +89,22 @@ abstract public class LogicalOperator ex
      * by the user or can be chosen at runtime by the optimizer.
      */
     protected HashSet<Integer> mPinnedOptions = new HashSet<Integer>();
+    
+    /**
+     * Name of the customPartitioner if one is used, this is set to null otherwise.
+     */
+    protected String mCustomPartitioner = null;
 
     
-    private static Log log = LogFactory.getLog(LogicalOperator.class);
+    public String getCustomPartitioner() {
+		return mCustomPartitioner;
+	}
+
+	public void setCustomPartitioner(String customPartitioner) {
+		this.mCustomPartitioner = customPartitioner;
+	}
+
+	private static Log log = LogFactory.getLog(LogicalOperator.class);
 
     /**
      * Equivalent to LogicalOperator(k, 0).

Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt?rev=952098&r1=952097&r2=952098&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt Mon Jun  7 04:58:30 2010
@@ -33,6 +33,7 @@ package org.apache.pig.impl.logicalLayer
 import java.io.*;
 import java.util.*;
 import java.net.URI;
+import java.lang.Class;
 import java.net.URISyntaxException;
 import java.lang.reflect.Type;
 import org.apache.pig.impl.logicalLayer.*;
@@ -71,7 +72,9 @@ import org.apache.pig.backend.datastorag
 import org.apache.pig.backend.datastorage.ContainerDescriptor;
 import org.apache.pig.backend.datastorage.ElementDescriptor;
 import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
+import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Partitioner;
 import org.apache.pig.impl.util.LinkedMultiMap;
 
 public class QueryParser {
@@ -900,6 +903,23 @@ class FunctionType {
             throw new Exception("Received an unknown function type: " + funcType);
         }
     }
+};
+
+class ClassType {
+	public static final byte UNKNOWNCLASS = 0;
+	public static final byte PARTITIONER = 2;
+	
+	public static void checkClassType(Class cs, byte classType) throws Exception {
+		switch(classType) {
+		case ClassType.PARTITIONER:
+			if(!(cs.newInstance() instanceof Partitioner)) {
+				throw new Exception("Not a class of org.apache.hadoop.mapreduce.Partitioner");
+			}
+			break;
+		default:
+			throw new Exception("Received an unknown class type: " + classType);
+		}
+	}
 }
 
 PARSER_END(QueryParser)
@@ -942,6 +962,7 @@ TOKEN : { <INNER : "inner"> }
 TOKEN : { <OUTER : "outer"> }
 TOKEN : { <STAR : "*"> 		}
 TOKEN : { <PARALLEL : "parallel"> }
+TOKEN : { <PARTITION : "partition by"> }
 TOKEN : { <GROUP : "group"> }
 TOKEN : { <AND : "and"> }
 TOKEN : { <OR : "or"> }
@@ -1124,7 +1145,6 @@ LogicalOperator SplitClause(LogicalPlan 
 	{log.trace("Exiting SplitClause"); return splitOp;}
 } 
 
-
 LogicalOperator Expr(LogicalPlan lp) : 
 {
 	LogicalOperator op; 
@@ -1164,7 +1184,7 @@ Token IdentifierOrReserved() :
 }
 {
   (
-  ( t1 = <DEFINE> )
+ ( t1 = <DEFINE> )
 | (t1 = <LOAD> )
 | (t1 =<FILTER> )
 | (t1 =<FOREACH> )
@@ -1187,6 +1207,7 @@ Token IdentifierOrReserved() :
 | (t1 =<INNER> )
 | (t1 =<OUTER> )
 | (t1 =<PARALLEL> )
+| (t1 =<PARTITION>)
 | (t1 =<GROUP> )
 | (t1 =<AND> )
 | (t1 =<OR> )
@@ -1260,6 +1281,7 @@ LogicalOperator BaseExpr(LogicalPlan lp)
 	Token t1, t2; 
 	Schema.FieldSchema fs; 
 	log.trace("Entering BaseExpr");
+	String partitioner = null;
 }
 {
 	(
@@ -1289,9 +1311,11 @@ LogicalOperator BaseExpr(LogicalPlan lp)
 |   (<SAMPLE> op = SampleClause(lp))
 |   (<ORDER> op = OrderClause(lp))
 |	(<DISTINCT> op = NestedExpr(lp) 
+	([<PARTITION> (partitioner = EvalClass(ClassType.PARTITIONER))])
 	{
 		LogicalOperator distinct = new LODistinct(lp, new OperatorKey(scope, getNextId())); 
 		lp.add(distinct);
+		distinct.setCustomPartitioner(partitioner);
 		log.debug("Added operator: " + distinct.getClass().getName() + " to the logical plan"); 
 		lp.connect(op, distinct);
 		log.debug("Connected alias: " + op.getAlias() + " operator " + op.getClass().getName() + " to operator " + distinct.getClass().getName());
@@ -1798,6 +1822,7 @@ LogicalOperator CogroupClause(LogicalPla
     LogicalOperator cogroup = null; 
     log.trace("Entering CoGroupClause");
     Token t;
+    String partitioner = null;
 }
 {
     (gi = GroupItem(lp) { gis.add(gi); }
@@ -1817,15 +1842,18 @@ LogicalOperator CogroupClause(LogicalPla
             cogroup = parseUsingForGroupBy("merge", gis, lp);
             }
         )])
+        ([<PARTITION> (partitioner = EvalClass(ClassType.PARTITIONER))])
     )
-
     {
         if (cogroup != null) {
+        	cogroup.setCustomPartitioner(partitioner);
             log.trace("Exiting CoGroupClause");
             return cogroup;
         }
-
         cogroup = parseCogroup(gis, lp, LOCogroup.GROUPTYPE.REGULAR);
+        if(cogroup != null) {
+        	cogroup.setCustomPartitioner(partitioner);
+        }
         log.trace("Exiting CoGroupClause");
         return cogroup;		
     }
@@ -2079,7 +2107,8 @@ int ColNameOrNum(Schema over) : 
 LogicalOperator CrossClause(LogicalPlan lp) : 
 {
 	LogicalOperator op; 
-	ArrayList<LogicalOperator> inputs = new ArrayList<LogicalOperator>(); 
+	ArrayList<LogicalOperator> inputs = new ArrayList<LogicalOperator>();
+	String partitioner = null; 
 	log.trace("Entering CrossClause");
 }
 {
@@ -2087,9 +2116,11 @@ LogicalOperator CrossClause(LogicalPlan 
 	op = NestedExpr(lp) { inputs.add(op); }
 	("," op = NestedExpr(lp) { inputs.add(op); })+
 	)
+	([<PARTITION> (partitioner = EvalClass(ClassType.PARTITIONER))])
 	{
 		LogicalOperator cross = new LOCross(lp, new OperatorKey(scope, getNextId()));
 		lp.add(cross);
+		cross.setCustomPartitioner(partitioner);
 		log.debug("Added operator " + cross.getClass().getName() + " to the logical plan");
 		
 		for (LogicalOperator lop: inputs) {
@@ -2115,6 +2146,7 @@ LogicalOperator JoinClause(LogicalPlan l
 	boolean isFullOuter = false;
 	boolean isOuter = false;
 	Token t;
+	String partitioner = null;
 }
 {
 	(gi = JoinItem(lp) { gis.add(gi); }
@@ -2126,7 +2158,6 @@ LogicalOperator JoinClause(LogicalPlan l
         (<FULL> [<OUTER>] {isFullOuter = true;})
 	]
 	("," gi = JoinItem(lp) { gis.add(gi); })+
-	
 	{
 		// in the case of outer joins, only two
 		// inputs are allowed
@@ -2182,15 +2213,23 @@ LogicalOperator JoinClause(LogicalPlan l
             joinOp = parseUsingForJoin("hash", gis, lp, isFullOuter, isRightOuter, isOuter);
 		    	}
      )]))
-
-	{log.trace("Exiting JoinClause");
-	if (joinOp!=null) {
+     ([<PARTITION> (partitioner = EvalClass(ClassType.PARTITIONER))])
+	
+	{
+		log.trace("Exiting JoinClause");
+		if (joinOp == null) {
+			joinOp = parseJoin(gis, lp, LOJoin.JOINTYPE.HASH);
+		}
+		if(partitioner != null) {
+			if(((LOJoin)joinOp).getJoinType() == LOJoin.JOINTYPE.SKEWED) {
+				throw new ParseException("Custom Partitioner is not supported for skewed join");
+			}
+			else {
+				joinOp.setCustomPartitioner(partitioner);
+			}
+		}
 		return joinOp;
 	}
-	else {
-		return parseJoin(gis, lp, LOJoin.JOINTYPE.HASH);
-	}}
-	
 }
 
 LogicalOperator UnionClause(LogicalPlan lp) : 
@@ -3703,6 +3742,35 @@ Object  EvalFunction(byte funcType) : 
 	}
 }
 
+String EvalClass(byte classType) :
+{
+	String className;
+	Class cs;
+	log.trace("Entering EvalClass");
+}
+{
+	className = QualifiedFunction()
+	{
+		cs = PigContext.resolveClassName(className);
+		try {
+			ClassType.checkClassType(cs, classType);
+		}
+		catch (ExecException e) {
+			ParseException pe = new ParseException("Class " + className + " not found");
+			pe.initCause(e);
+			throw pe;
+		}
+		catch (Exception e){
+			ParseException pe = new ParseException(e.getMessage());
+			pe.initCause(e);
+			throw pe;
+		}
+		log.trace("Exiting EvalClass");
+		
+		return className;
+	}
+}
+
 /**
  * Bug 831620 - '$' support
  */

Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java?rev=952098&r1=952097&r2=952098&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java Mon Jun  7 04:58:30 2010
@@ -17,11 +17,15 @@
  */
 package org.apache.pig.test;
 
+import java.io.BufferedReader;
 import java.io.File;
 import java.io.FileOutputStream;
+import java.io.FileReader;
 import java.io.IOException;
 import java.io.PrintStream;
+import java.util.Collection;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.Random;
 
@@ -36,6 +40,7 @@ import org.apache.pig.data.DataByteArray
 import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.io.FileLocalizer;
+import org.apache.pig.impl.logicalLayer.FrontendException;
 import org.apache.pig.test.utils.Identity;
 import org.junit.AfterClass;
 import org.junit.Before;
@@ -489,5 +494,154 @@ public class TestEvalPipeline2 extends T
         
         Util.deleteFile(cluster, "table_testNestedDescSort");
     }
-
+    
+    // See PIG-282
+    @Test
+    public void testCustomPartitionerParseJoins() throws Exception{
+    	String[] input = {
+                "1\t3",
+                "1\t2"
+        };
+        Util.createInputFile(cluster, "table_testCustomPartitionerParseJoins", input);
+        
+        pigServer.registerQuery("A = LOAD 'table_testCustomPartitionerParseJoins' as (a0:int, a1:int);");
+        
+        pigServer.registerQuery("B = ORDER A by $0;");
+        
+        // Custom Partitioner is not allowed for skewed joins, will throw a ExecException 
+        try {
+        	pigServer.registerQuery("skewed = JOIN A by $0, B by $0 USING 'skewed' PARTITION BY org.apache.pig.test.utils.SimpleCustomPartitioner;");
+        	//control should not reach here
+        	fail("Skewed join cannot accept a custom partitioner");
+        }
+        catch (FrontendException e) {
+        	assertTrue(e.getErrorCode() == 1000);
+		}
+        
+        pigServer.registerQuery("hash = JOIN A by $0, B by $0 USING 'hash' PARTITION BY org.apache.pig.test.utils.SimpleCustomPartitioner;");
+        Iterator<Tuple> iter = pigServer.openIterator("hash");
+        Tuple t;
+        
+        Collection<String> results = new HashSet<String>();
+        results.add("(1,3,1,2)");
+        results.add("(1,3,1,3)");
+        results.add("(1,2,1,2)");
+        results.add("(1,2,1,3)");
+        
+        assertTrue(iter.hasNext());
+        t = iter.next();
+        assertTrue(t.size()==4);
+        assertTrue(results.contains(t.toString()));
+        
+        assertTrue(iter.hasNext());
+        t = iter.next();
+        assertTrue(t.size()==4);
+        assertTrue(results.contains(t.toString()));
+        
+        assertTrue(iter.hasNext());
+        t = iter.next();
+        assertTrue(t.size()==4);
+        assertTrue(results.contains(t.toString()));
+        
+        assertTrue(iter.hasNext());
+        t = iter.next();
+        assertTrue(t.size()==4);
+        assertTrue(results.contains(t.toString()));
+        
+        // No checks are made for merged and replicated joins as they are compiled to a map only job 
+        // No frontend error checking has been added for these jobs, hence not adding any test cases 
+        // Manually tested the sanity once. Above test should cover the basic sanity of the scenario 
+        
+        Util.deleteFile(cluster, "table_testCustomPartitionerParseJoins");
+    }
+    
+    // See PIG-282
+    @Test
+    public void testCustomPartitionerGroups() throws Exception{
+    	String[] input = {
+                "1\t1",
+                "2\t1",
+                "3\t1",
+                "4\t1"
+        };
+        Util.createInputFile(cluster, "table_testCustomPartitionerGroups", input);
+        
+        pigServer.registerQuery("A = LOAD 'table_testCustomPartitionerGroups' as (a0:int, a1:int);");
+        
+        // It should be noted that for a map reduce job, the total number of partitions 
+        // is the same as the number of reduce tasks for the job. Hence we need to find a case wherein 
+        // we will get more than one reduce job so that we can use the partitioner. 	
+        // The following logic assumes that we get 2 reduce jobs, so that we can hard-code the logic.
+        //
+        pigServer.registerQuery("B = group A by $0 PARTITION BY org.apache.pig.test.utils.SimpleCustomPartitioner parallel 2;");
+        
+        pigServer.store("B", "tmp_testCustomPartitionerGroups");
+        
+        new File("tmp_testCustomPartitionerGroups").mkdir();
+        
+        // SimpleCustomPartitioner partitions as per the parity of the key
+        // Need to change this in SimpleCustomPartitioner is changed
+        Util.copyFromClusterToLocal(cluster, "tmp_testCustomPartitionerGroups/part-r-00000", "tmp_testCustomPartitionerGroups/part-r-00000");
+        BufferedReader reader = new BufferedReader(new FileReader("tmp_testCustomPartitionerGroups/part-r-00000"));
+ 	    String line = null;      	     
+ 	    while((line = reader.readLine()) != null) {
+ 	        String[] cols = line.split("\t");
+ 	        int value = Integer.parseInt(cols[0]) % 2;
+ 	        assertEquals(0, value);
+ 	    }
+ 	    Util.copyFromClusterToLocal(cluster, "tmp_testCustomPartitionerGroups/part-r-00001", "tmp_testCustomPartitionerGroups/part-r-00001");
+        reader = new BufferedReader(new FileReader("tmp_testCustomPartitionerGroups/part-r-00001"));
+ 	    line = null;      	     
+ 	    while((line = reader.readLine()) != null) {
+ 	        String[] cols = line.split("\t");
+ 	        int value = Integer.parseInt(cols[0]) % 2;
+ 	        assertEquals(1, value);
+ 	    } 
+        Util.deleteDirectory(new File("tmp_testCustomPartitionerGroups"));
+        Util.deleteFile(cluster, "table_testCustomPartitionerGroups");
+    }
+    
+    // See PIG-282
+    @Test
+    public void testCustomPartitionerCross() throws Exception{
+    	String[] input = {
+                "1\t3",
+                "1\t2",
+        };
+    	
+        Util.createInputFile(cluster, "table_testCustomPartitionerCross", input);
+        pigServer.registerQuery("A = LOAD 'table_testCustomPartitionerCross' as (a0:int, a1:int);");
+        pigServer.registerQuery("B = ORDER A by $0;");
+        pigServer.registerQuery("C = cross A , B PARTITION BY org.apache.pig.test.utils.SimpleCustomPartitioner;");
+        Iterator<Tuple> iter = pigServer.openIterator("C");
+        Tuple t;
+        
+        Collection<String> results = new HashSet<String>();
+        results.add("(1,3,1,2)");
+        results.add("(1,3,1,3)");
+        results.add("(1,2,1,2)");
+        results.add("(1,2,1,3)");
+        
+        assertTrue(iter.hasNext());
+        t = iter.next();
+        assertTrue(t.size()==4);
+        assertTrue(results.contains(t.toString()));
+        
+        assertTrue(iter.hasNext());
+        t = iter.next();
+        assertTrue(t.size()==4);
+        assertTrue(results.contains(t.toString()));
+        
+        assertTrue(iter.hasNext());
+        t = iter.next();
+        assertTrue(t.size()==4);
+        assertTrue(results.contains(t.toString()));
+        
+        assertTrue(iter.hasNext());
+        t = iter.next();
+        assertTrue(t.size()==4);
+        assertTrue(results.contains(t.toString()));
+        
+        Util.deleteFile(cluster, "table_testCustomPartitionerCross");
+    }
 }

Added: hadoop/pig/trunk/test/org/apache/pig/test/utils/SimpleCustomPartitioner.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/utils/SimpleCustomPartitioner.java?rev=952098&view=auto
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/utils/SimpleCustomPartitioner.java (added)
+++ hadoop/pig/trunk/test/org/apache/pig/test/utils/SimpleCustomPartitioner.java Mon Jun  7 04:58:30 2010
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test.utils;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.Partitioner;
+import org.apache.pig.impl.io.PigNullableWritable;
+
+public class SimpleCustomPartitioner extends Partitioner<PigNullableWritable, Writable> {
+
+	@Override
+	public int getPartition(PigNullableWritable key, Writable value, int numPartitions) {
+		if(key.getValueAsPigType() instanceof Integer) {
+			int ret = (((Integer)key.getValueAsPigType()).intValue() % numPartitions);
+			return ret;
+		}
+		else {
+			return (key.hashCode()) % numPartitions;
+		}
+	}
+}