You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ol...@apache.org on 2008/09/10 20:55:55 UTC

svn commit: r693927 - in /incubator/pig/branches/types: ./ src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ test/org/apache/pig/test/

Author: olga
Date: Wed Sep 10 11:55:53 2008
New Revision: 693927

URL: http://svn.apache.org/viewvc?rev=693927&view=rev
Log:
PIG-402, PIG-415: order related bugs

Modified:
    incubator/pig/branches/types/CHANGES.txt
    incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
    incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigBytesRawComparator.java
    incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigDoubleRawComparator.java
    incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigFloatRawComparator.java
    incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigIntRawComparator.java
    incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigLongRawComparator.java
    incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java
    incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java
    incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTextRawComparator.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestEvalPipeline.java

Modified: incubator/pig/branches/types/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/CHANGES.txt?rev=693927&r1=693926&r2=693927&view=diff
==============================================================================
--- incubator/pig/branches/types/CHANGES.txt (original)
+++ incubator/pig/branches/types/CHANGES.txt Wed Sep 10 11:55:53 2008
@@ -183,3 +183,9 @@
     PIG-398: Expressions not allowed inside foreach (sms via olgan)
 
     PIG-418: divide by 0 problem
+
+    PIG-402: order by with user comparator (shravanmn via olgan)
+
+    PIG-415: problem with comparators (shravanmn via olgan)
+
+

Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=693927&r1=693926&r2=693927&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java Wed Sep 10 11:55:53 2008
@@ -287,7 +287,7 @@
                     op.setParentPlan(plans[i]);                
                 }    
             }
-            
+            POPackage pack = null;
             if(mro.reducePlan.isEmpty()){
                 //MapOnly Job
                 jobConf.setMapperClass(PigMapOnly.Map.class);
@@ -310,7 +310,7 @@
                     jobConf.set("pig.combinePlan", ObjectSerializer.serialize(mro.combinePlan));
                     jobConf.set("pig.combine.package", ObjectSerializer.serialize(combPack));
                 }
-                POPackage pack = (POPackage)mro.reducePlan.getRoots().get(0);
+                pack = (POPackage)mro.reducePlan.getRoots().get(0);
                 mro.reducePlan.remove(pack);
                 jobConf.setMapperClass(PigMapReduce.Map.class);
                 jobConf.setReducerClass(PigMapReduce.Reduce.class);
@@ -345,6 +345,10 @@
                     String compFuncSpec = mro.UDFs.get(0);
                     Class comparator = PigContext.resolveClassName(compFuncSpec);
                     if(ComparisonFunc.class.isAssignableFrom(comparator)) {
+                        jobConf.setMapperClass(PigMapReduce.MapWithComparator.class);
+                        pack.setKeyType(DataType.TUPLE);
+                        jobConf.set("pig.reduce.package", ObjectSerializer.serialize(pack));
+                        jobConf.setOutputKeyClass(TupleFactory.getInstance().tupleClass());
                         jobConf.setOutputKeyComparatorClass(comparator);
                     }
                 } else {

Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigBytesRawComparator.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigBytesRawComparator.java?rev=693927&r1=693926&r2=693927&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigBytesRawComparator.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigBytesRawComparator.java Wed Sep 10 11:55:53 2008
@@ -66,7 +66,7 @@
         // If either are null, handle differently.
         if (b1[s1] == NullableBytesWritable.NOTNULL &&
                 b2[s2] == NullableBytesWritable.NOTNULL) {
-            rc = super.compare(b1, s1 + 1, l1, b2, s2 + 1, l2);
+            rc = super.compare(b1, s1 + 1, l1-1, b2, s2 + 1, l2-1);
         } else {
             // For sorting purposes two nulls are equal.
             if (b1[s1] == NullableBytesWritable.NULL &&

Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigDoubleRawComparator.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigDoubleRawComparator.java?rev=693927&r1=693926&r2=693927&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigDoubleRawComparator.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigDoubleRawComparator.java Wed Sep 10 11:55:53 2008
@@ -65,7 +65,7 @@
         // If either are null, handle differently.
         if (b1[s1] == NullableDoubleWritable.NOTNULL &&
                 b2[s2] == NullableDoubleWritable.NOTNULL) {
-            rc = super.compare(b1, s1 + 1, l1, b2, s2 + 1, l2);
+            rc = super.compare(b1, s1 + 1, l1-1, b2, s2 + 1, l2-1);
         } else {
             // For sorting purposes two nulls are equal.
             if (b1[s1] == NullableDoubleWritable.NULL &&

Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigFloatRawComparator.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigFloatRawComparator.java?rev=693927&r1=693926&r2=693927&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigFloatRawComparator.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigFloatRawComparator.java Wed Sep 10 11:55:53 2008
@@ -66,7 +66,7 @@
         // If either are null, handle differently.
         if (b1[s1] == NullableFloatWritable.NOTNULL &&
                 b2[s2] == NullableFloatWritable.NOTNULL) {
-            rc = super.compare(b1, s1 + 1, l1, b2, s2 + 1, l2);
+            rc = super.compare(b1, s1 + 1, l1-1, b2, s2 + 1, l2-1);
         } else {
             // For sorting purposes two nulls are equal.
             if (b1[s1] == NullableFloatWritable.NULL &&

Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigIntRawComparator.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigIntRawComparator.java?rev=693927&r1=693926&r2=693927&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigIntRawComparator.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigIntRawComparator.java Wed Sep 10 11:55:53 2008
@@ -66,7 +66,7 @@
         // If either are null, handle differently.
         if (b1[s1] == NullableIntWritable.NOTNULL &&
                 b2[s2] == NullableIntWritable.NOTNULL) {
-            rc = super.compare(b1, s1 + 1, l1, b2, s2 + 1, l2);
+            rc = super.compare(b1, s1 + 1, l1-1, b2, s2 + 1, l2-1);
         } else {
             // For sorting purposes two nulls are equal.
             if (b1[s1] == NullableIntWritable.NULL &&

Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigLongRawComparator.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigLongRawComparator.java?rev=693927&r1=693926&r2=693927&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigLongRawComparator.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigLongRawComparator.java Wed Sep 10 11:55:53 2008
@@ -66,7 +66,7 @@
         // If either are null, handle differently.
         if (b1[s1] == NullableLongWritable.NOTNULL &&
                 b2[s2] == NullableLongWritable.NOTNULL) {
-            rc = super.compare(b1, s1 + 1, l1, b2, s2 + 1, l2);
+            rc = super.compare(b1, s1 + 1, l1-1, b2, s2 + 1, l2-1);
         } else {
             // For sorting purposes two nulls are equal.
             if (b1[s1] == NullableLongWritable.NULL &&

Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java?rev=693927&r1=693926&r2=693927&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java Wed Sep 10 11:55:53 2008
@@ -16,6 +16,7 @@
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.data.TargetedTuple;
 import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
@@ -27,12 +28,13 @@
 
 public abstract class PigMapBase extends MapReduceBase{
     private final Log log = LogFactory.getLog(getClass());
-
+    
     protected byte keyType;
     
     
     //Map Plan
     protected PhysicalPlan mp;
+    protected TupleFactory tf = TupleFactory.getInstance();
     
     OutputCollector<WritableComparable, Writable> outputCollector;
     

Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java?rev=693927&r1=693926&r2=693927&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java Wed Sep 10 11:55:53 2008
@@ -83,6 +83,20 @@
             oc.collect(wcKey, it);
         }
     }
+    
+    public static class MapWithComparator extends PigMapBase implements
+            Mapper<Text, TargetedTuple, WritableComparable, Writable> {
+
+        @Override
+        public void collect(OutputCollector<WritableComparable, Writable> oc,
+                Tuple tuple) throws ExecException, IOException {
+            Object key = tuple.get(0);
+            Tuple keyTuple = tf.newTuple(1);
+            keyTuple.set(0, key);
+            IndexedTuple it = (IndexedTuple) tuple.get(1);
+            oc.collect(keyTuple, it);
+        }
+    }
 
     public static class Reduce extends MapReduceBase
             implements

Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTextRawComparator.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTextRawComparator.java?rev=693927&r1=693926&r2=693927&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTextRawComparator.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTextRawComparator.java Wed Sep 10 11:55:53 2008
@@ -66,7 +66,7 @@
         // If either are null, handle differently.
         if (b1[s1] == NullableText.NOTNULL &&
                 b2[s2] == NullableText.NOTNULL) {
-            rc = super.compare(b1, s1 + 1, l1, b2, s2 + 1, l2);
+            rc = super.compare(b1, s1 + 1, l1-1, b2, s2 + 1, l2-1);
         } else {
             // For sorting purposes two nulls are equal.
             if (b1[s1] == NullableText.NULL &&

Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestEvalPipeline.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestEvalPipeline.java?rev=693927&r1=693926&r2=693927&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestEvalPipeline.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestEvalPipeline.java Wed Sep 10 11:55:53 2008
@@ -34,6 +34,7 @@
 import org.junit.Before;
 import org.junit.Test;
 
+import org.apache.pig.ComparisonFunc;
 import org.apache.pig.EvalFunc;
 import org.apache.pig.ExecType;
 import org.apache.pig.PigServer;
@@ -61,6 +62,7 @@
     public void setUp() throws Exception{
         FileLocalizer.setR(new Random());
         pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+//        pigServer = new PigServer(ExecType.LOCAL);
     }
     
     static public class MyBagFunction extends EvalFunc<DataBag>{
@@ -306,16 +308,29 @@
     
     @Test
     public void testSort() throws Exception{
-        testSortDistinct(false);
+        testSortDistinct(false, false);
+    }
+    
+    @Test
+    public void testSortWithUDF() throws Exception{
+        testSortDistinct(false, true);
     }
     
 
     @Test
     public void testDistinct() throws Exception{
-        testSortDistinct(true);
+        testSortDistinct(true, false);
     }
+    
+    public static class TupComp extends ComparisonFunc {
 
-    private void testSortDistinct(boolean eliminateDuplicates) throws Exception{
+        @Override
+        public int compare(Tuple t1, Tuple t2) {
+            return t1.compareTo(t2);
+        }
+    }
+
+    private void testSortDistinct(boolean eliminateDuplicates, boolean useUDF) throws Exception{
         int LOOP_SIZE = 1024*16;
         File tmpFile = File.createTempFile("test", "txt");
         PrintStream ps = new PrintStream(new FileOutputStream(tmpFile));
@@ -330,7 +345,10 @@
         if (eliminateDuplicates){
             pigServer.registerQuery("B = DISTINCT (FOREACH A GENERATE $0) PARALLEL 10;");
         }else{
-            pigServer.registerQuery("B = ORDER A BY $0 PARALLEL 10;");
+            if(!useUDF)
+                pigServer.registerQuery("B = ORDER A BY $0 PARALLEL 10;");
+            else
+                pigServer.registerQuery("B = ORDER A BY $0 using " + TupComp.class.getName() + ";");
         }
         pigServer.store("B", tmpOutputFile);
         
@@ -355,7 +373,7 @@
         
     }
     
-    public void testNestedPlan() throws Exception{
+    /*public void testNestedPlan() throws Exception{
         int LOOP_COUNT = 10;
         File tmpFile = File.createTempFile("test", "txt");
         PrintStream ps = new PrintStream(new FileOutputStream(tmpFile));
@@ -464,7 +482,7 @@
             ++numIdentity;
         }
         assertEquals(5, numIdentity);
-    }
+    }*/
     
 
 }