You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ga...@apache.org on 2008/09/19 22:39:32 UTC

svn commit: r697229 [1/2] - in /incubator/pig/branches/types: src/org/apache/pig/ src/org/apache/pig/backend/hadoop/ src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relat...

Author: gates
Date: Fri Sep 19 13:39:31 2008
New Revision: 697229

URL: http://svn.apache.org/viewvc?rev=697229&view=rev
Log:
PIG-361.  Created a new type PigNullableWritable that extends WritableComparable.  In addition to a null byte this type includes an index byte, so that when
types are used in a join, this byte tracks which input the record came from.  Changed all NullableXWritable types to extend PigNullableWritable.  Created
new NullableTuple that also extends PigNullableWritable.  Wrote the comparator of PigNullableWritable so that it ignores the index byte accept when the null
byte is true.  This results in the desired cogroup semantics where nulls of a given input coallesce but nulls across inputs do not.

Changed all map reduce jobs so that keys are always a PigNullableWritable.  This was necessary so that the index could be used in sorting.  In the future this
will allow join optimizations by having join tuples sorted by input source.  Also changed so that all values are NullableTuples.  This replaces the
IndexedTuple.

Fixed a bug in the SortPartitioner where it was always assuming that inputs were tuples, which is no longer the case.


Added:
    incubator/pig/branches/types/src/org/apache/pig/impl/io/NullableBag.java
    incubator/pig/branches/types/src/org/apache/pig/impl/io/NullableTuple.java
    incubator/pig/branches/types/src/org/apache/pig/impl/io/PigNullableWritable.java
Removed:
    incubator/pig/branches/types/src/org/apache/pig/data/IndexedTuple.java
Modified:
    incubator/pig/branches/types/src/org/apache/pig/ComparisonFunc.java
    incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/HDataType.java
    incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
    incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigBytesRawComparator.java
    incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java
    incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigDoubleRawComparator.java
    incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigFloatRawComparator.java
    incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigIntRawComparator.java
    incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigLongRawComparator.java
    incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java
    incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapOnly.java
    incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java
    incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTextRawComparator.java
    incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTupleRawComparator.java
    incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SortPartitioner.java
    incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java
    incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java
    incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPostCombinerPackage.java
    incubator/pig/branches/types/src/org/apache/pig/data/DataReaderWriter.java
    incubator/pig/branches/types/src/org/apache/pig/data/DataType.java
    incubator/pig/branches/types/src/org/apache/pig/impl/io/NullableBooleanWritable.java
    incubator/pig/branches/types/src/org/apache/pig/impl/io/NullableBytesWritable.java
    incubator/pig/branches/types/src/org/apache/pig/impl/io/NullableDoubleWritable.java
    incubator/pig/branches/types/src/org/apache/pig/impl/io/NullableFloatWritable.java
    incubator/pig/branches/types/src/org/apache/pig/impl/io/NullableIntWritable.java
    incubator/pig/branches/types/src/org/apache/pig/impl/io/NullableLongWritable.java
    incubator/pig/branches/types/src/org/apache/pig/impl/io/NullableText.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestAlgebraicEval.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestEvalPipeline.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestForEach.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestLocalRearrange.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestPackage.java

Modified: incubator/pig/branches/types/src/org/apache/pig/ComparisonFunc.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/ComparisonFunc.java?rev=697229&r1=697228&r2=697229&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/ComparisonFunc.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/ComparisonFunc.java Fri Sep 19 13:39:31 2008
@@ -24,6 +24,7 @@
 import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PigProgressable;
+import org.apache.pig.impl.io.NullableTuple;
 
 
 public abstract class ComparisonFunc extends WritableComparator {
@@ -32,11 +33,13 @@
     protected PigProgressable reporter;
     
     public ComparisonFunc() {
-        super(TupleFactory.getInstance().tupleClass());
+        super(NullableTuple.class);
     }
 
     public int compare(WritableComparable a, WritableComparable b) {
-        return compare((Tuple)a, (Tuple)b);
+        // The incoming key will be in a NullableTuple.  But the comparison
+        // function needs a tuple, so pull the tuple out.
+        return compare((Tuple)((NullableTuple)a).getValueAsPigType(), (Tuple)((NullableTuple)b).getValueAsPigType());
     }
 
     /**

Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/HDataType.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/HDataType.java?rev=697229&r1=697228&r2=697229&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/HDataType.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/HDataType.java Fri Sep 19 13:39:31 2008
@@ -36,6 +36,7 @@
 import org.apache.pig.data.DefaultTupleFactory;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.io.NullableBag;
 import org.apache.pig.impl.io.NullableBooleanWritable;
 import org.apache.pig.impl.io.NullableBytesWritable;
 import org.apache.pig.impl.io.NullableDoubleWritable;
@@ -43,6 +44,8 @@
 import org.apache.pig.impl.io.NullableIntWritable;
 import org.apache.pig.impl.io.NullableLongWritable;
 import org.apache.pig.impl.io.NullableText;
+import org.apache.pig.impl.io.NullableTuple;
+import org.apache.pig.impl.io.PigNullableWritable;
 
 /**
  * A class of helper methods for converting from pig data types to hadoop
@@ -56,15 +59,15 @@
     static NullableDoubleWritable doubleWrit = new NullableDoubleWritable();
     static NullableIntWritable intWrit = new NullableIntWritable();
     static NullableLongWritable longWrit = new NullableLongWritable();
-    static DataBag defDB = BagFactory.getInstance().newDefaultBag();
-    static Tuple defTup = TupleFactory.getInstance().newTuple();
+    static NullableBag defDB = new NullableBag();
+    static NullableTuple defTup = new NullableTuple();
     static Map<Byte, String> typeToName = null;
 
-    public static WritableComparable getWritableComparableTypes(Object o, byte keyType) throws ExecException{
+    public static PigNullableWritable getWritableComparableTypes(Object o, byte keyType) throws ExecException{
         byte type = DataType.findType(o);
         switch (type) {
         case DataType.BAG:
-            return (DataBag)o;
+            return new NullableBag((DataBag)o);
 
         case DataType.BOOLEAN:
             return new NullableBooleanWritable((Boolean)o);
@@ -88,19 +91,17 @@
             return new NullableLongWritable((Long)o);
           
         case DataType.TUPLE:
-            return (Tuple) o;
+            return new NullableTuple((Tuple)o);
          
-//        case DataType.MAP:
-            // Hmm, This is problematic
-            // Need a deep clone to convert a Map into
-            // MapWritable
-            // wcKey = new MapWritable();
-//            break;
+        case DataType.MAP:
+            throw new RuntimeException("Map not supported as a key type!");
+
         case DataType.NULL:
             switch (keyType) {
             case DataType.BAG:
-                //TODO: create a null data bag
-                break;
+                NullableBag nbag = new NullableBag();
+                nbag.setNull(true);
+                return nbag;
             case DataType.BOOLEAN:
                 NullableBooleanWritable nboolWrit = new NullableBooleanWritable();
                 nboolWrit.setNull(true);
@@ -130,15 +131,11 @@
                 nLongWrit.setNull(true);
                 return nLongWrit;
             case DataType.TUPLE:
-                Tuple t = DefaultTupleFactory.getInstance().newTuple();
-                t.setNull(true);
-                return t;
-//            case DataType.MAP:
-                // Hmm, This is problematic
-                // Need a deep clone to convert a Map into
-                // MapWritable
-                // wcKey = new MapWritable();
-//                break;
+                NullableTuple ntuple = new NullableTuple();
+                ntuple.setNull(true);
+                return ntuple;
+            case DataType.MAP:
+                throw new RuntimeException("Map not supported as a key type!");
             }
             break;
         default:
@@ -151,8 +148,8 @@
         return null;
     }
     
-    public static WritableComparable getWritableComparableTypes(byte type) throws ExecException{
-        WritableComparable wcKey = null;
+    public static PigNullableWritable getWritableComparableTypes(byte type) throws ExecException{
+        PigNullableWritable wcKey = null;
          switch (type) {
         case DataType.BAG:
             wcKey = defDB;
@@ -181,12 +178,8 @@
         case DataType.TUPLE:
             wcKey = defTup;
             break;
-//        case DataType.MAP:
-            // Hmm, This is problematic
-            // Need a deep clone to convert a Map into
-            // MapWritable
-            // wcKey = new MapWritable();
-//            break;
+        case DataType.MAP:
+            throw new RuntimeException("Map not supported as a key type!");
         default:
             if (typeToName == null) typeToName = DataType.genTypeToNameMap();
             throw new ExecException("The type "
@@ -195,40 +188,4 @@
         }
         return wcKey;
     }
-    
-    public static Object convertToPigType(WritableComparable key) {
-        if ((key instanceof DataBag) )
-            return key;
-        if(key instanceof Tuple)
-           return ((Tuple)key).isNull() ? null : key;     
-        if (key instanceof NullableBooleanWritable) {
-            NullableBooleanWritable bWrit = (NullableBooleanWritable)key;   
-            return bWrit.isNull() ? null :bWrit.get();
-        }
-        if (key instanceof NullableBytesWritable) {
-            NullableBytesWritable byWrit = (NullableBytesWritable) key; 
-            return byWrit.isNull() ? null : new DataByteArray(byWrit.get(), 0, byWrit.getSize());
-        }
-        if (key instanceof NullableText) {
-            NullableText tWrit =  (NullableText) key;
-            return tWrit.isNull() ? null :tWrit.toString();
-        }
-        if (key instanceof NullableFloatWritable) {
-            NullableFloatWritable fWrit = (NullableFloatWritable) key;
-            return fWrit == null ? null : fWrit.get();
-        }
-        if (key instanceof NullableDoubleWritable) {
-            NullableDoubleWritable dWrit = (NullableDoubleWritable) key;
-            return dWrit.isNull() ? null : dWrit.get();
-        }
-        if (key instanceof NullableIntWritable) {
-            NullableIntWritable iWrit = (NullableIntWritable) key;
-            return iWrit.isNull() ? null : iWrit.get();
-        }
-        if (key instanceof NullableLongWritable) {
-            NullableLongWritable lWrit = (NullableLongWritable) key;
-            return lWrit.isNull() ? null : lWrit.get();
-        }
-        return null;
-    }
 }

Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=697229&r1=697228&r2=697229&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java Fri Sep 19 13:39:31 2008
@@ -36,12 +36,6 @@
 import org.apache.hadoop.filecache.DistributedCache;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.BooleanWritable;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.FloatWritable;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.io.WritableComparator;
 import org.apache.hadoop.mapred.FileOutputFormat;
@@ -49,25 +43,36 @@
 import org.apache.hadoop.mapred.jobcontrol.Job;
 import org.apache.hadoop.mapred.jobcontrol.JobControl;
 
+import org.apache.pig.ComparisonFunc;
 import org.apache.pig.FuncSpec;
 import org.apache.pig.backend.hadoop.HDataType;
 import org.apache.pig.backend.hadoop.DoubleWritable;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
 import org.apache.pig.data.BagFactory;
 import org.apache.pig.data.DataByteArray;
-import org.apache.pig.ComparisonFunc;
 import org.apache.pig.data.DataType;
-import org.apache.pig.data.IndexedTuple;
+import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.impl.io.FileSpec;
+import org.apache.pig.impl.io.NullableBytesWritable;
+import org.apache.pig.impl.io.NullableDoubleWritable;
+import org.apache.pig.impl.io.NullableFloatWritable;
+import org.apache.pig.impl.io.NullableIntWritable;
+import org.apache.pig.impl.io.NullableLongWritable;
+import org.apache.pig.impl.io.NullableText;
+import org.apache.pig.impl.io.NullableTuple;
+import org.apache.pig.impl.plan.DepthFirstWalker;
 import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.impl.util.JarManager;
 import org.apache.pig.impl.util.ObjectSerializer;
 import org.apache.pig.impl.util.Pair;
@@ -79,6 +84,22 @@
  * which has a JobConf. The MapReduceOper corresponds to a Job
  * and the getJobCong method returns the JobConf that is configured
  * as per the MapReduceOper
+ *
+ * <h2>Comparator Design</h2>
+ * <p>
+ * A few words on how comparators are chosen.  In almost all cases we use raw
+ * comparators (the one exception being when the user provides a comparison
+ * function for order by).  For order by queries the PigTYPERawComparator
+ * functions are used, where TYPE is Int, Long, etc.  These comparators are
+ * null aware and asc/desc aware.  The first byte of each of the
+ * NullableTYPEWritable classes contains info on whether the value is null.
+ * Asc/desc is written as an array into the JobConf with the key pig.sortOrder
+ * so that it can be read by each of the comparators as part of their 
+ * setConf call.
+ * <p>
+ * For non-order by queries, PigTYPEWritableComparator classes are used.
+ * These are all just type specific instances of WritableComparator.
+ *
  */
 public class JobControlCompiler{
     MROperPlan plan;
@@ -334,8 +355,9 @@
                 jobConf.set("pig.reduce.package", ObjectSerializer.serialize(pack));
                 Class<? extends WritableComparable> keyClass = HDataType.getWritableComparableTypes(pack.getKeyType()).getClass();
                 jobConf.setOutputKeyClass(keyClass);
+                jobConf.set("pig.reduce.key.type", Byte.toString(pack.getKeyType())); 
                 selectComparator(mro, pack.getKeyType(), jobConf);
-                jobConf.setOutputValueClass(IndexedTuple.class);
+                jobConf.setOutputValueClass(NullableTuple.class);
             }
         
             if(mro.isGlobalSort()){
@@ -348,7 +370,8 @@
                         jobConf.setMapperClass(PigMapReduce.MapWithComparator.class);
                         pack.setKeyType(DataType.TUPLE);
                         jobConf.set("pig.reduce.package", ObjectSerializer.serialize(pack));
-                        jobConf.setOutputKeyClass(TupleFactory.getInstance().tupleClass());
+                        jobConf.set("pig.usercomparator", "true");
+                        jobConf.setOutputKeyClass(NullableTuple.class);
                         jobConf.setOutputKeyComparatorClass(comparator);
                     }
                 } else {
@@ -370,7 +393,7 @@
         }
         return ret;
     }
-    
+
     public static class PigWritableComparator extends WritableComparator {
         protected PigWritableComparator(Class c) {
             super(c);
@@ -383,37 +406,37 @@
 
     public static class PigIntWritableComparator extends PigWritableComparator {
         public PigIntWritableComparator() {
-            super(IntWritable.class);
+            super(NullableIntWritable.class);
         }
     }
 
     public static class PigLongWritableComparator extends PigWritableComparator {
         public PigLongWritableComparator() {
-            super(LongWritable.class);
+            super(NullableLongWritable.class);
         }
     }
 
     public static class PigFloatWritableComparator extends PigWritableComparator {
         public PigFloatWritableComparator() {
-            super(FloatWritable.class);
+            super(NullableFloatWritable.class);
         }
     }
 
     public static class PigDoubleWritableComparator extends PigWritableComparator {
         public PigDoubleWritableComparator() {
-            super(DoubleWritable.class);
+            super(NullableDoubleWritable.class);
         }
     }
 
     public static class PigCharArrayWritableComparator extends PigWritableComparator {
         public PigCharArrayWritableComparator() {
-            super(Text.class);
+            super(NullableText.class);
         }
     }
 
     public static class PigDBAWritableComparator extends PigWritableComparator {
         public PigDBAWritableComparator() {
-            super(BytesWritable.class);
+            super(NullableBytesWritable.class);
         }
     }
 
@@ -429,25 +452,57 @@
         }
     }
 
+    /*
+    public static class PigRawGrouper extends WritableComparator {
+        protected PigRawGrouper(Class c) {
+            super(c);
+        }
+
+        public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2){
+            // If it's null, then look at the index.  Else, ignore the index.
+            if (b1[s1] == 0 && b2[s2] == 0) {
+                return WritableComparator.compareBytes(b1, s1 + 1, l1 - 2, b2, s2 + 1, l2 - 2);
+            } else if (b1[s1] != 0 && b2[s2] != 0) {
+                if (b1[s1 + l1 - 1] < b2[s2 + l2 - 1]) return -1;
+                else if (b2[s2 + l2 - 1] > b1[s1 + l1 - 1]) return 1;
+                else return 0;
+            }
+            else if (b1[s1] != 0) return -1;
+            else return 1;
+        }
+    }
+
+    public static class PigCharArrayRawGrouper extends PigRawGrouper {
+        public PigCharArrayRawGrouper() {
+            super(NullableText.class);
+        }
+    }
+    */
+
+
     private void selectComparator(
             MapReduceOper mro,
             byte keyType,
             JobConf jobConf) throws JobCreationException {
-        // If this operator is involved in an order by, use the native
-        // comparators.  Otherwise use bytewise comparison.  Have to
-        // look at the next operator too because if we're the quantile
-        // operation we need to use the native comparators.
-        boolean involved = false;
+        // If this operator is involved in an order by, use the pig specific raw
+        // comparators.  If it has a cogroup, we need to set the comparator class
+        // to the raw comparator and the grouping comparator class to pig specific
+        // raw comparators (which skip the index).  Otherwise use the hadoop provided
+        // raw comparator.
+        
+        // An operator has an order by if global sort is set or if it's successor has
+        // global sort set (because in that case it's the sampling job). 
+        boolean hasOrderBy = false;
         if (mro.isGlobalSort()) {
-            involved = true;
+            hasOrderBy = true;
         } else {
             List<MapReduceOper> succs = plan.getSuccessors(mro);
             if (succs != null) {
                 MapReduceOper succ = succs.get(0);
-                if (succ.isGlobalSort()) involved = true;
+                if (succ.isGlobalSort()) hasOrderBy = true;
             }
         }
-        if (involved) {
+        if (hasOrderBy) {
             switch (keyType) {
             case DataType.INTEGER:
                 jobConf.setOutputKeyComparatorClass(PigIntRawComparator.class);
@@ -481,52 +536,122 @@
                 jobConf.setOutputKeyComparatorClass(PigTupleRawComparator.class);
                 break;
 
-            default:
-                break;
-            }
-        } else {
-            switch (keyType) {
-            case DataType.INTEGER:
-                jobConf.setOutputKeyComparatorClass(PigIntWritableComparator.class);
-                break;
-
-            case DataType.LONG:
-                jobConf.setOutputKeyComparatorClass(PigLongWritableComparator.class);
-                break;
-
-            case DataType.FLOAT:
-                jobConf.setOutputKeyComparatorClass(PigFloatWritableComparator.class);
-                break;
-
-            case DataType.DOUBLE:
-                jobConf.setOutputKeyComparatorClass(PigDoubleWritableComparator.class);
-                break;
-
-            case DataType.CHARARRAY:
-                jobConf.setOutputKeyComparatorClass(PigCharArrayWritableComparator.class);
-                break;
-
-            case DataType.BYTEARRAY:
-                jobConf.setOutputKeyComparatorClass(PigDBAWritableComparator.class);
-                break;
-
-            case DataType.MAP:
-                log.error("Using Map as key not supported.");
-                throw new JobCreationException("Using Map as key not supported");
-
-            case DataType.TUPLE:
-                jobConf.setOutputKeyComparatorClass(PigTupleWritableComparator.class);
-                break;
-
             case DataType.BAG:
-                jobConf.setOutputKeyComparatorClass(PigBagWritableComparator.class);
-                break;
+                log.error("Using Bag as key not supported.");
+                throw new JobCreationException("Using Bag as key not supported");
 
             default:
-                throw new RuntimeException("Forgot case for type " +
-                    DataType.findTypeName(keyType));
+                break;
             }
+            return;
+        }
 
+            /*
+        try {
+            CogroupFinder cf = new CogroupFinder(mro.mapPlan);
+            cf.visit();
+            int mapRearranges = cf.rearrangeCounter;
+            cf = new CogroupFinder(mro.reducePlan);
+            cf.visit();
+            if (mapRearranges > 1 || cf.rearrangeCounter > 1) {
+                switch (keyType) {
+                case DataType.INTEGER:
+                    jobConf.setOutputKeyComparatorClass(PigIntWritableComparator.class);
+                    jobConf.setOutputValueGroupingComparator(PigIntRawComparator.class);
+                    break;
+
+                case DataType.LONG:
+                    jobConf.setOutputKeyComparatorClass(PigLongWritableComparator.class);
+                    jobConf.setOutputValueGroupingComparator(PigLongRawComparator.class);
+                    break;
+
+                case DataType.FLOAT:
+                    jobConf.setOutputKeyComparatorClass(PigFloatWritableComparator.class);
+                    jobConf.setOutputValueGroupingComparator(PigFloatRawComparator.class);
+                    break;
+
+                case DataType.DOUBLE:
+                    jobConf.setOutputKeyComparatorClass(PigDoubleWritableComparator.class);
+                    jobConf.setOutputValueGroupingComparator(PigDoubleRawComparator.class);
+                    break;
+
+                case DataType.CHARARRAY:
+                    jobConf.setOutputKeyComparatorClass(PigCharArrayWritableComparator.class);
+                    jobConf.setOutputValueGroupingComparator(PigCharArrayRawGrouper.class);
+                    break;
+
+                case DataType.BYTEARRAY:
+                    jobConf.setOutputKeyComparatorClass(PigDBAWritableComparator.class);
+                    jobConf.setOutputValueGroupingComparator(PigBytesRawComparator.class);
+                    break;
+
+                case DataType.MAP:
+                    log.error("Using Map as key not supported.");
+                    throw new JobCreationException("Using Map as key not supported");
+
+                case DataType.TUPLE:
+                    jobConf.setOutputKeyComparatorClass(PigTupleWritableComparator.class);
+                    jobConf.setOutputValueGroupingComparator(PigTupleRawComparator.class);
+                    break;
+
+                case DataType.BAG:
+                    log.error("Using Bag as key not supported.");
+                    throw new JobCreationException("Using Bag as key not supported");
+
+                default:
+                    throw new RuntimeException("Forgot case for type " +
+                        DataType.findTypeName(keyType));
+                }
+                */
+                jobConf.setPartitionerClass(org.apache.hadoop.mapred.lib.HashPartitioner.class);
+                /*
+                return;
+            }
+        } catch (VisitorException ve) {
+            throw new JobCreationException(ve);
+        }
+        */
+
+        switch (keyType) {
+        case DataType.INTEGER:
+            jobConf.setOutputKeyComparatorClass(PigIntWritableComparator.class);
+            break;
+
+        case DataType.LONG:
+            jobConf.setOutputKeyComparatorClass(PigLongWritableComparator.class);
+            break;
+
+        case DataType.FLOAT:
+            jobConf.setOutputKeyComparatorClass(PigFloatWritableComparator.class);
+            break;
+
+        case DataType.DOUBLE:
+            jobConf.setOutputKeyComparatorClass(PigDoubleWritableComparator.class);
+            break;
+
+        case DataType.CHARARRAY:
+            jobConf.setOutputKeyComparatorClass(PigCharArrayWritableComparator.class);
+            break;
+
+        case DataType.BYTEARRAY:
+            jobConf.setOutputKeyComparatorClass(PigDBAWritableComparator.class);
+            break;
+
+        case DataType.MAP:
+            log.error("Using Map as key not supported.");
+            throw new JobCreationException("Using Map as key not supported");
+
+        case DataType.TUPLE:
+            jobConf.setOutputKeyComparatorClass(PigTupleWritableComparator.class);
+            break;
+
+        case DataType.BAG:
+            log.error("Using Bag as key not supported.");
+            throw new JobCreationException("Using Bag as key not supported");
+
+        default:
+            throw new RuntimeException("Forgot case for type " +
+                DataType.findTypeName(keyType));
         }
     }
 
@@ -582,4 +707,19 @@
         }
     }
 
+    /*
+    private class CogroupFinder extends PhyPlanVisitor {
+        int rearrangeCounter = 0;
+
+        CogroupFinder(PhysicalPlan plan) {
+            super(plan,
+                new DepthFirstWalker<PhysicalOperator, PhysicalPlan>(plan));
+        }
+
+        public void visitLocalRearrange(POLocalRearrange lr) {
+            rearrangeCounter++;
+        }
+    }
+    */
+
 }

Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigBytesRawComparator.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigBytesRawComparator.java?rev=697229&r1=697228&r2=697229&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigBytesRawComparator.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigBytesRawComparator.java Fri Sep 19 13:39:31 2008
@@ -29,6 +29,7 @@
 import org.apache.hadoop.mapred.JobConf;
 
 import org.apache.pig.impl.io.NullableBytesWritable;
+import org.apache.pig.impl.io.PigNullableWritable;
 import org.apache.pig.impl.util.ObjectSerializer;
 
 public class PigBytesRawComparator extends BytesWritable.Comparator implements Configurable {
@@ -61,17 +62,22 @@
         return null;
     }
 
+    /**
+     * Compare two NullableBytesWritables as raw bytes.  If neither are null,
+     * then IntWritable.compare() is used.  If both are null then the indices
+     * are compared.  Otherwise the null one is defined to be less.
+     */
     public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
         int rc = 0;
+
         // If either are null, handle differently.
-        if (b1[s1] == NullableBytesWritable.NOTNULL &&
-                b2[s2] == NullableBytesWritable.NOTNULL) {
-            rc = super.compare(b1, s1 + 1, l1-1, b2, s2 + 1, l2-1);
+        if (b1[s1] == 0 && b2[s2] == 0) {
+            // Subtract 2, one for null byte and one for index byte
+            rc = super.compare(b1, s1 + 1, l1 - 2, b2, s2 + 1, l2 - 2);
         } else {
             // For sorting purposes two nulls are equal.
-            if (b1[s1] == NullableBytesWritable.NULL &&
-                    b2[s2] == NullableBytesWritable.NULL) rc = 0;
-            else if (b1[s1] == NullableBytesWritable.NULL) rc = -1;
+            if (b1[s1] != 0 && b2[s2] != 0) rc = 0;
+            else if (b1[s1] != 0) rc = -1;
             else rc = 1;
         }
         if (!mAsc[0]) rc *= -1;

Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java?rev=697229&r1=697228&r2=697229&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java Fri Sep 19 13:39:31 2008
@@ -26,7 +26,6 @@
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.MapReduceBase;
 import org.apache.hadoop.mapred.Mapper;
@@ -42,39 +41,21 @@
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
 import org.apache.pig.data.DataType;
-import org.apache.pig.data.IndexedTuple;
 import org.apache.pig.data.TargetedTuple;
 import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.io.PigNullableWritable;
+import org.apache.pig.impl.io.NullableTuple;
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.impl.util.ObjectSerializer;
 import org.apache.pig.impl.util.WrappedIOException;
 
-/**
- * This class is the static Mapper &amp; Reducer classes that
- * are used by Pig to execute Pig Map Reduce jobs. Since
- * there is a reduce phase, the leaf is bound to be a 
- * POLocalRearrange. So the map phase has to separate the
- * key and indexed tuple and collect it into the output
- * collector.
- * 
- * The shuffle and sort phase sorts these key &amp; indexed tuples
- * and creates key, List&lt;IndexedTuple&gt; and passes the key and
- * iterator to the list. The deserialized POPackage operator
- * is used to package the key, List&lt;IndexedTuple&gt; into pigKey, 
- * Bag&lt;Tuple&gt; where pigKey is of the appropriate pig type and
- * then the result of the package is attached to the reduce
- * plan which is executed if its not empty. Either the result 
- * of the reduce plan or the package res is collected into
- * the output collector. 
- *
- */
 public class PigCombiner {
 
     public static JobConf sJobConf = null;
     
     public static class Combine extends MapReduceBase
             implements
-            Reducer<WritableComparable, IndexedTuple, WritableComparable, Writable> {
+            Reducer<PigNullableWritable, NullableTuple, PigNullableWritable, Writable> {
         private final Log log = LogFactory.getLog(getClass());
 
         private byte keyType;
@@ -125,20 +106,19 @@
         }
         
         /**
-         * The reduce function which packages the key and List<IndexedTuple>
-         * into key, Bag<Tuple> after converting Hadoop type key into Pig type.
+         * The reduce function which packages the key and List &lt;Tuple&gt;
+         * into key, Bag&lt;Tuple&gt; after converting Hadoop type key into Pig type.
          * The package result is either collected as is, if the reduce plan is
          * empty or after passing through the reduce plan.
          */
-        public void reduce(WritableComparable key,
-                Iterator<IndexedTuple> indInp,
-                OutputCollector<WritableComparable, Writable> oc,
+        public void reduce(PigNullableWritable key,
+                Iterator<NullableTuple> tupIter,
+                OutputCollector<PigNullableWritable, Writable> oc,
                 Reporter reporter) throws IOException {
             
             pigReporter.setRep(reporter);
             
-            Object k = HDataType.convertToPigType(key);
-            pack.attachInput(k, indInp);
+            pack.attachInput(key, tupIter);
             
             try {
                 Tuple t=null;
@@ -161,10 +141,18 @@
                         
                         if(redRes.returnStatus==POStatus.STATUS_OK){
                             Tuple tuple = (Tuple)redRes.result;
-                            Object combKey = tuple.get(0);
-                            IndexedTuple it = (IndexedTuple)tuple.get(1);
-                            WritableComparable wcKey = HDataType.getWritableComparableTypes(combKey, this.keyType);
-                            oc.collect(wcKey, it);
+                            Byte index = (Byte)tuple.get(0);
+                            PigNullableWritable outKey =
+                                HDataType.getWritableComparableTypes(tuple.get(1), this.keyType);
+                            NullableTuple val =
+                                new NullableTuple((Tuple)tuple.get(2));
+                            // Both the key and the value need the index.  The key needs it so
+                            // that it can be sorted on the index in addition to the key
+                            // value.  The value needs it so that POPackage can properly
+                            // assign the tuple to its slot in the projection.
+                            outKey.setIndex(index);
+                            val.setIndex(index);
+                            oc.collect(outKey, val);
                             continue;
                         }
                         

Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigDoubleRawComparator.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigDoubleRawComparator.java?rev=697229&r1=697228&r2=697229&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigDoubleRawComparator.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigDoubleRawComparator.java Fri Sep 19 13:39:31 2008
@@ -28,6 +28,7 @@
 
 import org.apache.pig.backend.hadoop.DoubleWritable;
 import org.apache.pig.impl.io.NullableDoubleWritable;
+import org.apache.pig.impl.io.PigNullableWritable;
 import org.apache.pig.impl.util.ObjectSerializer;
 
 public class PigDoubleRawComparator extends DoubleWritable.Comparator implements Configurable {
@@ -60,17 +61,21 @@
         return null;
     }
 
+    /**
+     * Compare two NullableIntWritables as raw bytes.  If neither are null,
+     * then IntWritable.compare() is used.  If both are null then the indices
+     * are compared.  Otherwise the null one is defined to be less.
+     */
     public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
         int rc = 0;
+
         // If either are null, handle differently.
-        if (b1[s1] == NullableDoubleWritable.NOTNULL &&
-                b2[s2] == NullableDoubleWritable.NOTNULL) {
-            rc = super.compare(b1, s1 + 1, l1-1, b2, s2 + 1, l2-1);
+        if (b1[s1] == 0 && b2[s2] == 0) {
+            rc = super.compare(b1, s1 + 1, l1 - 2, b2, s2 + 1, l2 - 2);
         } else {
             // For sorting purposes two nulls are equal.
-            if (b1[s1] == NullableDoubleWritable.NULL &&
-                    b2[s2] == NullableDoubleWritable.NULL) rc = 0;
-            else if (b1[s1] == NullableDoubleWritable.NULL) rc = -1;
+            if (b1[s1] != 0 && b2[s2] != 0) rc = 0;
+            else if (b1[s1] != 0) rc = -1;
             else rc = 1;
         }
         if (!mAsc[0]) rc *= -1;

Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigFloatRawComparator.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigFloatRawComparator.java?rev=697229&r1=697228&r2=697229&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigFloatRawComparator.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigFloatRawComparator.java Fri Sep 19 13:39:31 2008
@@ -29,6 +29,7 @@
 import org.apache.hadoop.mapred.JobConf;
 
 import org.apache.pig.impl.io.NullableFloatWritable;
+import org.apache.pig.impl.io.PigNullableWritable;
 import org.apache.pig.impl.util.ObjectSerializer;
 
 public class PigFloatRawComparator extends FloatWritable.Comparator implements Configurable {
@@ -61,17 +62,21 @@
         return null;
     }
 
+    /**
+     * Compare two NullableIntWritables as raw bytes.  If neither are null,
+     * then IntWritable.compare() is used.  If both are null then the indices
+     * are compared.  Otherwise the null one is defined to be less.
+     */
     public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
         int rc = 0;
+
         // If either are null, handle differently.
-        if (b1[s1] == NullableFloatWritable.NOTNULL &&
-                b2[s2] == NullableFloatWritable.NOTNULL) {
-            rc = super.compare(b1, s1 + 1, l1-1, b2, s2 + 1, l2-1);
+        if (b1[s1] == 0 && b2[s2] == 0) {
+            rc = super.compare(b1, s1 + 1, l1 - 2, b2, s2 + 1, l2 - 2);
         } else {
             // For sorting purposes two nulls are equal.
-            if (b1[s1] == NullableFloatWritable.NULL &&
-                    b2[s2] == NullableFloatWritable.NULL) rc = 0;
-            else if (b1[s1] == NullableFloatWritable.NULL) rc = -1;
+            if (b1[s1] != 0 && b2[s2] != 0) rc = 0;
+            else if (b1[s1] != 0) rc = -1;
             else rc = 1;
         }
         if (!mAsc[0]) rc *= -1;

Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigIntRawComparator.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigIntRawComparator.java?rev=697229&r1=697228&r2=697229&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigIntRawComparator.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigIntRawComparator.java Fri Sep 19 13:39:31 2008
@@ -28,7 +28,7 @@
 import org.apache.hadoop.io.RawComparator;
 import org.apache.hadoop.mapred.JobConf;
 
-import org.apache.pig.impl.io.NullableIntWritable;
+import org.apache.pig.impl.io.PigNullableWritable;
 import org.apache.pig.impl.util.ObjectSerializer;
 
 public class PigIntRawComparator extends IntWritable.Comparator implements Configurable {
@@ -61,17 +61,21 @@
         return null;
     }
 
+    /**
+     * Compare two NullableIntWritables as raw bytes.  If neither are null,
+     * then IntWritable.compare() is used.  If both are null then the indices
+     * are compared.  Otherwise the null one is defined to be less.
+     */
     public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
         int rc = 0;
+
         // If either are null, handle differently.
-        if (b1[s1] == NullableIntWritable.NOTNULL &&
-                b2[s2] == NullableIntWritable.NOTNULL) {
-            rc = super.compare(b1, s1 + 1, l1-1, b2, s2 + 1, l2-1);
+        if (b1[s1] == 0 && b2[s2] == 0) {
+            rc = super.compare(b1, s1 + 1, l1 - 2, b2, s2 + 1, l2 - 2);
         } else {
             // For sorting purposes two nulls are equal.
-            if (b1[s1] == NullableIntWritable.NULL &&
-                    b2[s2] == NullableIntWritable.NULL) rc = 0;
-            else if (b1[s1] == NullableIntWritable.NULL) rc = -1;
+            if (b1[s1] != 0 && b2[s2] != 0) rc = 0;
+            else if (b1[s1] != 0) rc = -1;
             else rc = 1;
         }
         if (!mAsc[0]) rc *= -1;

Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigLongRawComparator.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigLongRawComparator.java?rev=697229&r1=697228&r2=697229&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigLongRawComparator.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigLongRawComparator.java Fri Sep 19 13:39:31 2008
@@ -29,6 +29,7 @@
 import org.apache.hadoop.mapred.JobConf;
 
 import org.apache.pig.impl.io.NullableLongWritable;
+import org.apache.pig.impl.io.PigNullableWritable;
 import org.apache.pig.impl.util.ObjectSerializer;
 
 public class PigLongRawComparator extends LongWritable.Comparator implements Configurable {
@@ -61,17 +62,21 @@
         return null;
     }
 
+    /**
+     * Compare two NullableIntWritables as raw bytes.  If neither are null,
+     * then IntWritable.compare() is used.  If both are null then the indices
+     * are compared.  Otherwise the null one is defined to be less.
+     */
     public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
         int rc = 0;
+
         // If either are null, handle differently.
-        if (b1[s1] == NullableLongWritable.NOTNULL &&
-                b2[s2] == NullableLongWritable.NOTNULL) {
-            rc = super.compare(b1, s1 + 1, l1-1, b2, s2 + 1, l2-1);
+        if (b1[s1] == 0 && b2[s2] == 0) {
+            rc = super.compare(b1, s1 + 1, l1 - 2, b2, s2 + 1, l2 - 2);
         } else {
             // For sorting purposes two nulls are equal.
-            if (b1[s1] == NullableLongWritable.NULL &&
-                    b2[s2] == NullableLongWritable.NULL) rc = 0;
-            else if (b1[s1] == NullableLongWritable.NULL) rc = -1;
+            if (b1[s1] != 0 && b2[s2] != 0) rc = 0;
+            else if (b1[s1] != 0) rc = -1;
             else rc = 1;
         }
         if (!mAsc[0]) rc *= -1;

Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java?rev=697229&r1=697228&r2=697229&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java Fri Sep 19 13:39:31 2008
@@ -8,7 +8,6 @@
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.MapReduceBase;
 import org.apache.hadoop.mapred.OutputCollector;
@@ -16,6 +15,7 @@
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.data.TargetedTuple;
 import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.io.PigNullableWritable;
 import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
@@ -36,7 +36,7 @@
     protected PhysicalPlan mp;
     protected TupleFactory tf = TupleFactory.getInstance();
     
-    OutputCollector<WritableComparable, Writable> outputCollector;
+    OutputCollector<PigNullableWritable, Writable> outputCollector;
     
     // Reporter that will be used by operators
     // to transmit heartbeat
@@ -123,7 +123,7 @@
      * the key and indexed tuple.
      */
     public void map(Text key, TargetedTuple inpTuple,
-            OutputCollector<WritableComparable, Writable> oc,
+            OutputCollector<PigNullableWritable, Writable> oc,
             Reporter reporter) throws IOException {
         
         // cache the collector for use in runPipeline() which
@@ -201,7 +201,7 @@
         
     }
 
-    abstract public void collect(OutputCollector<WritableComparable, Writable> oc, Tuple tuple) throws ExecException, IOException;
+    abstract public void collect(OutputCollector<PigNullableWritable, Writable> oc, Tuple tuple) throws ExecException, IOException;
 
     /**
      * @return the keyType

Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapOnly.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapOnly.java?rev=697229&r1=697228&r2=697229&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapOnly.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapOnly.java Fri Sep 19 13:39:31 2008
@@ -35,6 +35,7 @@
 import org.apache.pig.data.TargetedTuple;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.io.PigNullableWritable;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
@@ -62,10 +63,10 @@
 public class PigMapOnly {
 
     public static class Map extends PigMapBase implements
-            Mapper<Text, TargetedTuple, WritableComparable, Writable> {
+            Mapper<Text, TargetedTuple, PigNullableWritable, Writable> {
 
         @Override
-        public void collect(OutputCollector<WritableComparable, Writable> oc, Tuple tuple) throws ExecException, IOException {
+        public void collect(OutputCollector<PigNullableWritable, Writable> oc, Tuple tuple) throws ExecException, IOException {
             oc.collect(null, tuple);
         }
     }

Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java?rev=697229&r1=697228&r2=697229&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java Fri Sep 19 13:39:31 2008
@@ -26,7 +26,6 @@
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.MapReduceBase;
 import org.apache.hadoop.mapred.Mapper;
@@ -43,9 +42,10 @@
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
 import org.apache.pig.data.DataType;
-import org.apache.pig.data.IndexedTuple;
 import org.apache.pig.data.TargetedTuple;
 import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.io.PigNullableWritable;
+import org.apache.pig.impl.io.NullableTuple;
 import org.apache.pig.impl.util.ObjectSerializer;
 import org.apache.pig.impl.util.SpillableMemoryManager;
 
@@ -54,53 +54,73 @@
  * are used by Pig to execute Pig Map Reduce jobs. Since
  * there is a reduce phase, the leaf is bound to be a 
  * POLocalRearrange. So the map phase has to separate the
- * key and indexed tuple and collect it into the output
+ * key and tuple and collect it into the output
  * collector.
  * 
- * The shuffle and sort phase sorts these key &amp; indexed tuples
- * and creates key, List&lt;IndexedTuple&gt; and passes the key and
+ * The shuffle and sort phase sorts these keys &amp; tuples
+ * and creates key, List&lt;Tuple&gt; and passes the key and
  * iterator to the list. The deserialized POPackage operator
- * is used to package the key, List&lt;IndexedTuple&gt; into pigKey, 
+ * is used to package the key, List&lt;Tuple&gt; into pigKey, 
  * Bag&lt;Tuple&gt; where pigKey is of the appropriate pig type and
  * then the result of the package is attached to the reduce
  * plan which is executed if its not empty. Either the result 
  * of the reduce plan or the package res is collected into
  * the output collector. 
  *
+ * The index of the tuple (that is, which bag it should be placed in by the
+ * package) is packed into the key.  This is done so that hadoop sorts the
+ * keys in order of index for join.
+ *
  */
 public class PigMapReduce {
 
     public static JobConf sJobConf = null;
     
     public static class Map extends PigMapBase implements
-            Mapper<Text, TargetedTuple, WritableComparable, Writable> {
+            Mapper<Text, TargetedTuple, PigNullableWritable, Writable> {
 
         @Override
-        public void collect(OutputCollector<WritableComparable, Writable> oc, Tuple tuple) throws ExecException, IOException {
-            Object key = tuple.get(0);
-            IndexedTuple it = (IndexedTuple)tuple.get(1);
-            WritableComparable wcKey = HDataType.getWritableComparableTypes(key, keyType);
-            oc.collect(wcKey, it);
+        public void collect(OutputCollector<PigNullableWritable, Writable> oc, Tuple tuple) throws ExecException, IOException {
+            Byte index = (Byte)tuple.get(0);
+            PigNullableWritable key =
+                HDataType.getWritableComparableTypes(tuple.get(1), keyType);
+            NullableTuple val = new NullableTuple((Tuple)tuple.get(2));
+            // Both the key and the value need the index.  The key needs it so
+            // that it can be sorted on the index in addition to the key
+            // value.  The value needs it so that POPackage can properly
+            // assign the tuple to its slot in the projection.
+            key.setIndex(index);
+            val.setIndex(index);
+            oc.collect(key, val);
         }
     }
     
     public static class MapWithComparator extends PigMapBase implements
-            Mapper<Text, TargetedTuple, WritableComparable, Writable> {
+            Mapper<Text, TargetedTuple, PigNullableWritable, Writable> {
 
         @Override
-        public void collect(OutputCollector<WritableComparable, Writable> oc,
+        public void collect(OutputCollector<PigNullableWritable, Writable> oc,
                 Tuple tuple) throws ExecException, IOException {
-            Object key = tuple.get(0);
-            Tuple keyTuple = tf.newTuple(1);
-            keyTuple.set(0, key);
-            IndexedTuple it = (IndexedTuple) tuple.get(1);
-            oc.collect(keyTuple, it);
+            Object k = tuple.get(1);
+            Tuple keyTuple = tf.newTuple(k);
+
+            Byte index = (Byte)tuple.get(0);
+            PigNullableWritable key =
+                HDataType.getWritableComparableTypes(keyTuple, DataType.TUPLE);
+            NullableTuple val = new NullableTuple((Tuple)tuple.get(2));
+            // Both the key and the value need the index.  The key needs it so
+            // that it can be sorted on the index in addition to the key
+            // value.  The value needs it so that POPackage can properly
+            // assign the tuple to its slot in the projection.
+            key.setIndex(index);
+            val.setIndex(index);
+            oc.collect(key, val);
         }
     }
 
     public static class Reduce extends MapReduceBase
             implements
-            Reducer<WritableComparable, IndexedTuple, WritableComparable, Writable> {
+            Reducer<PigNullableWritable, NullableTuple, PigNullableWritable, Writable> {
         private final Log log = LogFactory.getLog(getClass());
         
         //The reduce plan
@@ -115,7 +135,7 @@
         
         ProgressableReporter pigReporter;
 
-        private OutputCollector<WritableComparable, Writable> outputCollector;
+        private OutputCollector<PigNullableWritable, Writable> outputCollector;
 
         private boolean errorInReduce = false;
         
@@ -152,14 +172,14 @@
         }
         
         /**
-         * The reduce function which packages the key and List<IndexedTuple>
-         * into key, Bag<Tuple> after converting Hadoop type key into Pig type.
+         * The reduce function which packages the key and List&lt;Tuple&gt;
+         * into key, Bag&lt;Tuple&gt; after converting Hadoop type key into Pig type.
          * The package result is either collected as is, if the reduce plan is
          * empty or after passing through the reduce plan.
          */
-        public void reduce(WritableComparable key,
-                Iterator<IndexedTuple> indInp,
-                OutputCollector<WritableComparable, Writable> oc,
+        public void reduce(PigNullableWritable key,
+                Iterator<NullableTuple> tupIter,
+                OutputCollector<PigNullableWritable, Writable> oc,
                 Reporter reporter) throws IOException {
             
             // cache the collector for use in runPipeline()
@@ -167,8 +187,7 @@
             this.outputCollector = oc;
             pigReporter.setRep(reporter);
             
-            Object k = HDataType.convertToPigType(key);
-            pack.attachInput(k, indInp);
+            pack.attachInput(key, tupIter);
             
             try {
                 Tuple t=null;

Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTextRawComparator.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTextRawComparator.java?rev=697229&r1=697228&r2=697229&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTextRawComparator.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTextRawComparator.java Fri Sep 19 13:39:31 2008
@@ -29,6 +29,7 @@
 import org.apache.hadoop.mapred.JobConf;
 
 import org.apache.pig.impl.io.NullableText;
+import org.apache.pig.impl.io.PigNullableWritable;
 import org.apache.pig.impl.util.ObjectSerializer;
 
 public class PigTextRawComparator extends Text.Comparator implements Configurable {
@@ -61,17 +62,21 @@
         return null;
     }
 
+    /**
+     * Compare two NullableTextWritables as raw bytes.  If neither are null,
+     * then IntWritable.compare() is used.  If both are null then the indices
+     * are compared.  Otherwise the null one is defined to be less.
+     */
     public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
         int rc = 0;
+
         // If either are null, handle differently.
-        if (b1[s1] == NullableText.NOTNULL &&
-                b2[s2] == NullableText.NOTNULL) {
-            rc = super.compare(b1, s1 + 1, l1-1, b2, s2 + 1, l2-1);
+        if (b1[s1] == 0 && b2[s2] == 0) {
+            rc = super.compare(b1, s1 + 1, l1 - 2, b2, s2 + 1, l2 - 2);
         } else {
             // For sorting purposes two nulls are equal.
-            if (b1[s1] == NullableText.NULL &&
-                    b2[s2] == NullableText.NULL) rc = 0;
-            else if (b1[s1] == NullableText.NULL) rc = -1;
+            if (b1[s1] != 0 && b2[s2] != 0) rc = 0;
+            else if (b1[s1] != 0) rc = -1;
             else rc = 1;
         }
         if (!mAsc[0]) rc *= -1;

Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTupleRawComparator.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTupleRawComparator.java?rev=697229&r1=697228&r2=697229&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTupleRawComparator.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTupleRawComparator.java Fri Sep 19 13:39:31 2008
@@ -33,6 +33,7 @@
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.io.PigNullableWritable;
 import org.apache.pig.impl.util.ObjectSerializer;
 
 public class PigTupleRawComparator extends WritableComparator implements Configurable {
@@ -75,29 +76,30 @@
         return null;
     }
 
+    /**
+     * Compare two NullableTuples as raw bytes.  If neither are null,
+     * then IntWritable.compare() is used.  If both are null then the indices
+     * are compared.  Otherwise the null one is defined to be less.
+     */
     public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
-        // This can't be done on the raw data.  Users are allowed to
-        // implement their own versions of tuples, which means we have no
-        // idea what the underlying representation is.  So step one is to
-        // instantiate each object as a tuple.
-        Tuple t1 = mFact.newTuple();
-        Tuple t2 = mFact.newTuple();
-        try {
-            t1.readFields(new DataInputStream(new ByteArrayInputStream(b1, s1, l1)));
-            t2.readFields(new DataInputStream(new ByteArrayInputStream(b2, s2, l2)));
-        } catch (IOException ioe) {
-            mLog.error("Unable to instantiate tuples for comparison: " +
-                ioe.getMessage());
-            throw new RuntimeException(ioe.getMessage(), ioe);
-        }
+        int rc = 0;
+
+        if (b1[s1] == 0 && b2[s2] == 0) {
+            // This can't be done on the raw data.  Users are allowed to
+            // implement their own versions of tuples, which means we have no
+            // idea what the underlying representation is.  So step one is to
+            // instantiate each object as a tuple.
+            Tuple t1 = mFact.newTuple();
+            Tuple t2 = mFact.newTuple();
+            try {
+                t1.readFields(new DataInputStream(new ByteArrayInputStream(b1, s1 + 1, l1 - 1)));
+                t2.readFields(new DataInputStream(new ByteArrayInputStream(b2, s2 + 1, l2 - 1)));
+            } catch (IOException ioe) {
+                mLog.error("Unable to instantiate tuples for comparison: " +
+                    ioe.getMessage());
+                throw new RuntimeException(ioe.getMessage(), ioe);
+            }
 
-        int rc;
-        if (t1.isNull() || t2.isNull()) {
-            // For sorting purposes two nulls are equal.
-            if (t1.isNull() && t2.isNull()) rc = 0;
-            else if (t1.isNull()) rc = -1;
-            else rc = 1;
-        } else {
             int sz1 = t1.size();
             int sz2 = t2.size();
             if (sz2 < sz1) {
@@ -119,6 +121,11 @@
                 }
                 rc = 0;
             }
+        } else {
+            // For sorting purposes two nulls are equal.
+            if (b1[s1] != 0 && b2[s2] != 0) rc = 0;
+            else if (b1[s1] != 0) rc = -1;
+            else rc = 1;
         }
         if (mWholeTuple && !mAsc[0]) rc *= -1;
         return rc;

Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SortPartitioner.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SortPartitioner.java?rev=697229&r1=697228&r2=697229&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SortPartitioner.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SortPartitioner.java Fri Sep 19 13:39:31 2008
@@ -27,6 +27,7 @@
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.Partitioner;
+import org.apache.pig.backend.hadoop.HDataType;
 import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.builtin.BinStorage;
@@ -34,12 +35,20 @@
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.io.BufferedPositionedInputStream;
 import org.apache.pig.impl.io.FileLocalizer;
+import org.apache.pig.impl.io.NullableTuple;
+import org.apache.pig.impl.io.NullableText;
+import org.apache.pig.impl.io.NullableIntWritable;
+import org.apache.pig.impl.io.NullableLongWritable;
+import org.apache.pig.impl.io.NullableFloatWritable;
+import org.apache.pig.impl.io.NullableDoubleWritable;
+import org.apache.pig.impl.io.NullableBytesWritable;
+import org.apache.pig.impl.io.PigNullableWritable;
 
-public class SortPartitioner implements Partitioner<WritableComparable, Writable> {
-    Tuple[] quantiles;
-    RawComparator<WritableComparable> comparator;
+public class SortPartitioner implements Partitioner<PigNullableWritable, Writable> {
+    PigNullableWritable[] quantiles;
+    RawComparator<PigNullableWritable> comparator;
     
-    public int getPartition(WritableComparable key, Writable value,
+    public int getPartition(PigNullableWritable key, Writable value,
             int numPartitions){
         int index = Arrays.binarySearch(quantiles, key, comparator);
         if (index < 0)
@@ -58,7 +67,7 @@
             loader.bindTo(quantilesFile, new BufferedPositionedInputStream(is), 0, Long.MAX_VALUE);
             
             Tuple t;
-            ArrayList<Tuple> quantiles = new ArrayList<Tuple>();
+            ArrayList<PigNullableWritable> quantiles = new ArrayList<PigNullableWritable>();
             
             while(true){
                 t = loader.getNext();
@@ -68,13 +77,14 @@
                 Object o = t.get(0);
                 if (o instanceof DataBag) {
                     for (Tuple it : (DataBag)o) {
-                        quantiles.add(it);
+                        addToQuantiles(job, quantiles, it);
                     }
                 } else {
-                    quantiles.add(t);
+                    addToQuantiles(job, quantiles, t);
                 }
             }
-            this.quantiles = quantiles.toArray(new Tuple[0]);
+            convertToArray(job, quantiles);
+            //this.quantiles = quantiles.toArray(new NullableTuple[0]);
         }catch (Exception e){
             throw new RuntimeException(e);
         }
@@ -82,4 +92,61 @@
         comparator = job.getOutputKeyComparator();
     }
 
+    private void addToQuantiles(
+            JobConf job,
+            ArrayList<PigNullableWritable> q,
+            Tuple t) {
+        try {
+            if ("true".equals(job.get("pig.usercomparator")) || t.size() > 1) {
+                q.add(new NullableTuple(t));
+            } else {
+                Object o = t.get(0);
+                String kts = job.get("pig.reduce.key.type");
+                if (kts == null) {
+                    throw new RuntimeException("Didn't get reduce key type "
+                        + "from config file.");
+                }
+                q.add(HDataType.getWritableComparableTypes(o,
+                    Byte.valueOf(kts)));
+                /*
+                if (o == null) {
+                    q.add(new NullableTuple(t));
+                } else {
+                    q.add(HDataType.getWritableComparableTypes(o, DataType.findType(o)));
+                }
+                */
+            }
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private void convertToArray(
+            JobConf job,
+            ArrayList<PigNullableWritable> q) {
+        if ("true".equals(job.get("pig.usercomparator")) ||
+                q.get(0).getClass().equals(NullableTuple.class)) {
+            quantiles = q.toArray(new NullableTuple[0]);
+        } else if (q.get(0).getClass().equals(NullableBytesWritable.class)) {
+            quantiles = q.toArray(new NullableBytesWritable[0]);
+        } else if (q.get(0).getClass().equals(NullableDoubleWritable.class)) {
+            quantiles = q.toArray(new NullableDoubleWritable[0]);
+        } else if (q.get(0).getClass().equals(NullableFloatWritable.class)) {
+            quantiles = q.toArray(new NullableFloatWritable[0]);
+        } else if (q.get(0).getClass().equals(NullableIntWritable.class)) {
+            quantiles = q.toArray(new NullableIntWritable[0]);
+        } else if (q.get(0).getClass().equals(NullableLongWritable.class)) {
+            quantiles = q.toArray(new NullableLongWritable[0]);
+        } else if (q.get(0).getClass().equals(NullableText.class)) {
+            quantiles = q.toArray(new NullableText[0]);
+        } else {
+            throw new RuntimeException("Unexpected class in SortPartitioner");
+        }
+
+System.out.println("Quantiles:");
+for (int i = 0; i < quantiles.length; i++) {
+System.out.println(quantiles[i]);
+}
+    }
+
 }

Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java?rev=697229&r1=697228&r2=697229&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java Fri Sep 19 13:39:31 2008
@@ -24,7 +24,6 @@
 import org.apache.commons.logging.LogFactory;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.data.DataType;
-import org.apache.pig.data.IndexedTuple;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
@@ -59,7 +58,7 @@
     List<ExpressionOperator> leafOps;
 
     // The position of this LR in the package operator
-    int index;
+    byte index;
     
     byte keyType;
 
@@ -67,12 +66,12 @@
     
     private boolean isCross = false;
 
-    // A place holder IndexedTuple used in distinct case where we really don't
+    // A place holder Tuple used in distinct case where we really don't
     // have any value to pass through.  But hadoop gets cranky if we pass a
-    // null, so we'll just create one instance of this empty indexed tuple and
+    // null, so we'll just create one instance of this empty tuple and
     // pass it for every row.  We only get around to actually creating it if
     // mIsDistinct is set to true.
-    private IndexedTuple mFakeIndexedTuple = null;
+    private Tuple mFakeTuple = null;
 
     public POLocalRearrange(OperatorKey k) {
         this(k, -1, null);
@@ -114,12 +113,17 @@
         return false;
     }
 
-    public int getIndex() {
+    public byte getIndex() {
         return index;
     }
 
     public void setIndex(int index) {
-        this.index = index;
+        if (index > 0x40) {
+            throw new RuntimeException("Cogroups with more than 127 inputs "
+                + " not supported.");
+        } else {
+            this.index = (byte)index;
+        }
     }
 
     public boolean isDistinct() { 
@@ -129,7 +133,7 @@
     public void setDistinct(boolean isDistinct) {
         mIsDistinct = isDistinct;
         if (mIsDistinct) {
-            mFakeIndexedTuple = new IndexedTuple(mTupleFactory.newTuple(), 0);
+            mFakeTuple = mTupleFactory.newTuple();
         }
     }
     
@@ -221,28 +225,27 @@
             key = resLst.get(0).result;
         }
         
-        Tuple outPut = mTupleFactory.newTuple(2);
+        Tuple output = mTupleFactory.newTuple(3);
         if (mIsDistinct) {
 
             //Put the key and the indexed tuple
             //in a tuple and return
-            outPut.set(0,key);
-            outPut.set(1, mFakeIndexedTuple);
-            return outPut;
+            output.set(0, new Byte((byte)0));
+            output.set(1, key);
+            output.set(2, mFakeTuple);
+            return output;
         } else {
             if(isCross){
                 for(int i=0;i<plans.size();i++)
                     value.getAll().remove(0);
             }
-            //Create the indexed tuple out of the value
-            //that is remaining in the input tuple
-            IndexedTuple it = new IndexedTuple(value, index);
 
-            //Put the key and the indexed tuple
+            //Put the index, key, and value
             //in a tuple and return
-            outPut.set(0,key);
-            outPut.set(1,it);
-            return outPut;
+            output.set(0, new Byte(index));
+            output.set(1, key);
+            output.set(2, value);
+            return output;
         }
     }
 

Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java?rev=697229&r1=697228&r2=697229&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java Fri Sep 19 13:39:31 2008
@@ -26,9 +26,10 @@
 import org.apache.pig.data.BagFactory;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.DataType;
-import org.apache.pig.data.IndexedTuple;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.io.NullableTuple;
+import org.apache.pig.impl.io.PigNullableWritable;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.NodeIdGenerator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
@@ -58,8 +59,8 @@
     //The iterator of indexed Tuples
     //that is typically provided by
     //Hadoop
-    Iterator<IndexedTuple> indTupIter;
-    
+    Iterator<NullableTuple> tupIter;
+
     //The key being worked on
     Object key;
     
@@ -123,16 +124,16 @@
      * @param inp - iterator of indexed tuples typically
      *              obtained from Hadoop
      */
-    public void attachInput(Object k, Iterator<IndexedTuple> inp) {
-        indTupIter = inp;
-        key = k;
+    public void attachInput(PigNullableWritable k, Iterator<NullableTuple> inp) {
+        tupIter = inp;
+        key = k.getValueAsPigType();
     }
 
     /**
      * attachInput's better half!
      */
     public void detachInput() {
-        indTupIter = null;
+        tupIter = null;
         key = null;
     }
 
@@ -171,9 +172,19 @@
         //For each indexed tup in the inp, sort them
         //into their corresponding bags based
         //on the index
-        while (indTupIter.hasNext()) {
-            IndexedTuple it = indTupIter.next();
-            if (numInputs > 0) dbs[it.index].add(it.toTuple());
+        while (tupIter.hasNext()) {
+            NullableTuple ntup = tupIter.next();
+            // Need to make a copy of the value, as hadoop uses the same ntup
+            // to represent each value.
+            Tuple val = (Tuple)ntup.getValueAsPigType();
+            /*
+            Tuple copy = mTupleFactory.newTuple(val.size());
+            for (int i = 0; i < val.size(); i++) {
+                copy.set(i, val.get(i));
+            }
+            */
+            Tuple copy = mTupleFactory.newTuple(val.getAll());
+            if (numInputs > 0) dbs[ntup.getIndex()].add(copy);
             if(reporter!=null) reporter.progress();
         }
         

Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPostCombinerPackage.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPostCombinerPackage.java?rev=697229&r1=697228&r2=697229&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPostCombinerPackage.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPostCombinerPackage.java Fri Sep 19 13:39:31 2008
@@ -30,17 +30,17 @@
 import org.apache.pig.data.BagFactory;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.DataType;
-import org.apache.pig.data.IndexedTuple;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.io.NullableTuple;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.NodeIdGenerator;
 import org.apache.pig.impl.plan.VisitorException;
 /**
  * The package operator that packages the globally rearranged tuples into
  * output format after the combiner stage.  It differs from POPackage in that
- * instead of using the index in the IndexedTuple to find the bag to put a
- * tuple in, it instead uses the index to find the key.  All other inputs are
+ * instead it does not use the index in the NullableTuple to find the bag to put a
+ * tuple in.  Intead, the inputs are
  * put in a bag corresponding to their offset in the tuple.
  */
 public class POPostCombinerPackage extends POPackage {
@@ -83,11 +83,6 @@
         return "PostCombinerPackage" + "[" + DataType.findTypeName(resultType) + "]" + "{" + DataType.findTypeName(keyType) + "}" +" - " + mKey.toString();
     }
 
-    /**
-     * From the inputs, constructs the output tuple
-     * for this co-group in the required format which
-     * is (key, {bag of tuples from input 1}, {bag of tuples from input 2}, ...)
-     */
     @Override
     public Result getNext(Tuple t) throws ExecException {
         int keyField = -1;
@@ -99,10 +94,10 @@
         
         // For each indexed tup in the inp, split them up and place their
         // fields into the proper bags.  If the given field isn't a bag, just
-        // return set the value as is.
-        while (indTupIter.hasNext()) {
-            IndexedTuple it = indTupIter.next();
-            Tuple tup = it.toTuple();
+        // set the value as is.
+        while (tupIter.hasNext()) {
+            NullableTuple ntup = tupIter.next();
+            Tuple tup = (Tuple)ntup.getValueAsPigType();
             for (int i = 0; i < tup.size(); i++) {
                 if (mBags[i]) ((DataBag)fields[i]).add((Tuple)tup.get(i));
                 else fields[i] = tup.get(i);

Modified: incubator/pig/branches/types/src/org/apache/pig/data/DataReaderWriter.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/data/DataReaderWriter.java?rev=697229&r1=697228&r2=697229&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/data/DataReaderWriter.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/data/DataReaderWriter.java Fri Sep 19 13:39:31 2008
@@ -95,6 +95,9 @@
             case DataType.BOOLEAN:
                 return new Boolean(in.readBoolean());
 
+            case DataType.BYTE:
+                return new Byte(in.readByte());
+
             case DataType.BYTEARRAY: {
                 int size = in.readInt();
                 byte[] ba = new byte[size];
@@ -176,6 +179,11 @@
                 out.writeBoolean((Boolean)val);
                 break;
 
+            case DataType.BYTE:
+                out.writeByte(DataType.BYTE);
+                out.writeByte((Byte)val);
+                break;
+
             case DataType.BYTEARRAY: {
                 out.writeByte(DataType.BYTEARRAY);
                 DataByteArray bytes = (DataByteArray)val;

Modified: incubator/pig/branches/types/src/org/apache/pig/data/DataType.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/data/DataType.java?rev=697229&r1=697228&r2=697229&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/data/DataType.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/data/DataType.java Fri Sep 19 13:39:31 2008
@@ -49,7 +49,8 @@
     // values or creating order issues.  
     public static final byte UNKNOWN   =   0;
     public static final byte NULL      =   1;
-    public static final byte BOOLEAN   =   5;
+    public static final byte BOOLEAN   =   5; // internal use only
+    public static final byte BYTE      =   6; // internal use only
     public static final byte INTEGER   =  10;
     public static final byte LONG      =  15;
     public static final byte FLOAT     =  20;
@@ -80,6 +81,7 @@
         else if (o instanceof Float) return FLOAT;
         else if (o instanceof Double) return DOUBLE;
         else if (o instanceof Boolean) return BOOLEAN;
+        else if (o instanceof Byte) return BYTE;
         else return ERROR;
     }
 
@@ -100,6 +102,7 @@
         else if (t == Float.class) return FLOAT;
         else if (t == Double.class) return DOUBLE;
         else if (t == Boolean.class) return BOOLEAN;
+        else if (t == Byte.class) return BYTE;
         else {
             // Might be a tuple or a bag, need to check the interfaces it
             // implements
@@ -135,13 +138,13 @@
         return types.length;
     }
     public static byte[] genAllTypes(){
-        byte[] types = { DataType.BAG, DataType.BOOLEAN, DataType.BYTEARRAY, DataType.CHARARRAY, 
+        byte[] types = { DataType.BAG, DataType.BOOLEAN, DataType.BYTE, DataType.BYTEARRAY, DataType.CHARARRAY, 
                 DataType.DOUBLE, DataType.FLOAT, DataType.INTEGER, DataType.LONG, DataType.MAP, DataType.TUPLE};
         return types;
     }
     
     private static String[] genAllTypeNames(){
-        String[] names = { "BAG", "BOOLEAN", "BYTEARRAY", "CHARARRAY", "DOUBLE", "FLOAT", "INTEGER", "LONG", 
+        String[] names = { "BAG", "BOOLEAN", "BYTE", "BYTEARRAY", "CHARARRAY", "DOUBLE", "FLOAT", "INTEGER", "LONG", 
                 "MAP", "TUPLE" };
         return names;
     }
@@ -184,6 +187,7 @@
         switch (dt) {
         case NULL:      return "NULL";
         case BOOLEAN:   return "boolean";
+        case BYTE:      return "byte";
         case INTEGER:   return "integer";
         case LONG:      return "long";
         case FLOAT:     return "float";
@@ -229,7 +233,8 @@
                 (dataType == LONG) ||
                 (dataType == FLOAT) ||
                 (dataType == DOUBLE) ||
-                (dataType == BOOLEAN));
+                (dataType == BOOLEAN) ||
+                (dataType == BYTE));
     }
 
     /**
@@ -265,7 +270,7 @@
      * Compare two objects to each other.  This function is necessary
      * because there's no super class that implements compareTo.  This
      * function provides an (arbitrary) ordering of objects of different
-     * types as follows:  NULL &lt; BOOLEAN &lt; INTEGER &lt; LONG &lt;
+     * types as follows:  NULL &lt; BOOLEAN &lt; BYTE &lt; INTEGER &lt; LONG &lt;
      * FLOAT &lt; DOUBLE * &lt; BYTEARRAY &lt; STRING &lt; MAP &lt;
      * TUPLE &lt; BAG.  No other functions should implement this cross
      * object logic.  They should call this function for it instead.
@@ -285,6 +290,9 @@
             case BOOLEAN:
                 return ((Boolean)o1).compareTo((Boolean)o2);
 
+            case BYTE:
+                return ((Byte)o1).compareTo((Byte)o2);
+
             case INTEGER:
                 return ((Integer)o1).compareTo((Integer)o2);
 
@@ -371,6 +379,9 @@
             if (((Boolean)o) == true) return new Integer(1);
             else return new Integer(0);
 
+        case BYTE:
+            return new Integer(((Byte)o).intValue());
+
         case INTEGER:
             return (Integer)o;
 
@@ -418,6 +429,9 @@
             if (((Boolean)o) == true) return new Long(1);
             else return new Long(0);
 
+        case BYTE:
+            return new Long(((Byte)o).longValue());
+
         case INTEGER:
             return new Long(((Integer)o).longValue());
 
@@ -483,6 +497,7 @@
             return null;
 
         case BOOLEAN:
+        case BYTE:
         case MAP:
         case TUPLE:
         case BAG:
@@ -527,6 +542,7 @@
             return null;
 
         case BOOLEAN:
+        case BYTE:
         case MAP:
         case TUPLE:
         case BAG:

Added: incubator/pig/branches/types/src/org/apache/pig/impl/io/NullableBag.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/io/NullableBag.java?rev=697229&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/io/NullableBag.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/io/NullableBag.java Fri Sep 19 13:39:31 2008
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.io;
+
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
+
+/**
+ *
+ */
+public class NullableBag extends PigNullableWritable {
+
+    private BagFactory mFactory = null;
+
+    public NullableBag() {
+        if (mFactory == null) {
+            mFactory = BagFactory.getInstance();
+        }
+        mValue = mFactory.newDefaultBag();
+    }
+
+    /**
+     * @param bytes
+     */
+    public NullableBag(DataBag b) {
+        mValue = b;
+    }
+
+    public Object getValueAsPigType() {
+        return isNull() ? null : (DataBag)mValue;
+    }
+}