You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by th...@apache.org on 2010/07/13 00:09:11 UTC

svn commit: r963504 [1/2] - in /hadoop/pig/trunk: ./ src/org/apache/pig/ src/org/apache/pig/backend/hadoop/ src/org/apache/pig/backend/hadoop/executionengine/ src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ src/org/apache/pig/backend/...

Author: thejas
Date: Mon Jul 12 22:09:09 2010
New Revision: 963504

URL: http://svn.apache.org/viewvc?rev=963504&view=rev
Log:
PIG-1472: Optimize serialization/deserialization between Map and Reduce and between MR jobs

Added:
    hadoop/pig/trunk/src/org/apache/pig/data/BinInterSedes.java
    hadoop/pig/trunk/src/org/apache/pig/data/BinSedesTuple.java
    hadoop/pig/trunk/src/org/apache/pig/data/BinSedesTupleFactory.java
    hadoop/pig/trunk/src/org/apache/pig/data/InterSedes.java
    hadoop/pig/trunk/src/org/apache/pig/data/InterSedesFactory.java
    hadoop/pig/trunk/src/org/apache/pig/impl/io/InterRecordReader.java
    hadoop/pig/trunk/src/org/apache/pig/impl/io/InterRecordWriter.java
    hadoop/pig/trunk/src/org/apache/pig/impl/io/InterStorage.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestBinInterSedes.java
Modified:
    hadoop/pig/trunk/CHANGES.txt
    hadoop/pig/trunk/src/org/apache/pig/PigServer.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/HDataType.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/NoopStoreRemover.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SampleOptimizer.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/SkewedPartitioner.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java
    hadoop/pig/trunk/src/org/apache/pig/builtin/BinStorage.java
    hadoop/pig/trunk/src/org/apache/pig/builtin/COR.java
    hadoop/pig/trunk/src/org/apache/pig/builtin/COV.java
    hadoop/pig/trunk/src/org/apache/pig/builtin/Utf8StorageConverter.java
    hadoop/pig/trunk/src/org/apache/pig/data/DataReaderWriter.java
    hadoop/pig/trunk/src/org/apache/pig/data/DefaultAbstractBag.java
    hadoop/pig/trunk/src/org/apache/pig/data/DefaultTuple.java
    hadoop/pig/trunk/src/org/apache/pig/data/DefaultTupleFactory.java
    hadoop/pig/trunk/src/org/apache/pig/data/TupleFactory.java
    hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LogToPhyTranslationVisitor.java
    hadoop/pig/trunk/src/org/apache/pig/impl/io/BinStorageRecordWriter.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestBuiltin.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestPOSort.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestStreaming.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestStreamingLocal.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestTextDataParser.java
    hadoop/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/MRC1.gld
    hadoop/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/MRC10.gld
    hadoop/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/MRC11.gld
    hadoop/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/MRC12.gld
    hadoop/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/MRC13.gld
    hadoop/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/MRC14.gld
    hadoop/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/MRC15.gld
    hadoop/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/MRC16.gld
    hadoop/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/MRC18.gld
    hadoop/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/MRC2.gld
    hadoop/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/MRC3.gld
    hadoop/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/MRC4.gld
    hadoop/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/MRC6.gld
    hadoop/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/MRC7.gld
    hadoop/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/MRC8.gld
    hadoop/pig/trunk/test/org/apache/pig/test/utils/GenPhyOp.java
    hadoop/pig/trunk/test/org/apache/pig/test/utils/GenRandomData.java
    hadoop/pig/trunk/tutorial/src/org/apache/pig/tutorial/NGramGenerator.java
    hadoop/pig/trunk/tutorial/src/org/apache/pig/tutorial/ScoreGenerator.java
    hadoop/pig/trunk/tutorial/src/org/apache/pig/tutorial/TutorialTest.java

Modified: hadoop/pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=963504&r1=963503&r2=963504&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Mon Jul 12 22:09:09 2010
@@ -24,6 +24,8 @@ INCOMPATIBLE CHANGES
 
 IMPROVEMENTS
 
+PIG-1472: Optimize serialization/deserialization between Map and Reduce and between MR jobs (thejas)
+
 PIG-1389: Implement Pig counter to track number of rows for each input files
 (rding)
 

Modified: hadoop/pig/trunk/src/org/apache/pig/PigServer.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/PigServer.java?rev=963504&r1=963503&r2=963504&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/PigServer.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/PigServer.java Mon Jul 12 22:09:09 2010
@@ -54,7 +54,6 @@ import org.apache.pig.backend.executione
 import org.apache.pig.backend.hadoop.executionengine.HJob;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
-import org.apache.pig.builtin.BinStorage;
 import org.apache.pig.builtin.PigStorage;
 import org.apache.pig.classification.InterfaceAudience;
 import org.apache.pig.classification.InterfaceStability;
@@ -65,6 +64,7 @@ import org.apache.pig.experimental.logic
 import org.apache.pig.experimental.logical.optimizer.UidStamper;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.io.FileLocalizer;
+import org.apache.pig.impl.io.InterStorage;
 import org.apache.pig.impl.logicalLayer.FrontendException;
 import org.apache.pig.impl.logicalLayer.LOConst;
 import org.apache.pig.impl.logicalLayer.LODefine;
@@ -679,7 +679,7 @@ public class PigServer {
             }
             
             ExecJob job = store(id, FileLocalizer.getTemporaryPath(pigContext)
-                    .toString(), BinStorage.class.getName() + "()");
+                    .toString(), InterStorage.class.getName() + "()");
             
             // invocation of "execute" is synchronous!
 

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/HDataType.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/HDataType.java?rev=963504&r1=963503&r2=963504&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/HDataType.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/HDataType.java Mon Jul 12 22:09:09 2010
@@ -19,24 +19,12 @@ package org.apache.pig.backend.hadoop;
 
 import java.util.Map;
 
-import org.apache.hadoop.io.BooleanWritable;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.FloatWritable;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.WritableComparable;
-
 import org.apache.pig.PigException;
 import org.apache.pig.backend.executionengine.ExecException;
-import org.apache.pig.backend.hadoop.DoubleWritable;
-import org.apache.pig.data.BagFactory;
 import org.apache.pig.data.DataBag;
-import org.apache.pig.data.DataType;
 import org.apache.pig.data.DataByteArray;
-import org.apache.pig.data.DefaultTupleFactory;
+import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
-import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.io.NullableBag;
 import org.apache.pig.impl.io.NullableBooleanWritable;
 import org.apache.pig.impl.io.NullableBytesWritable;

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java?rev=963504&r1=963503&r2=963504&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java Mon Jul 12 22:09:09 2010
@@ -50,12 +50,12 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
 import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
-import org.apache.pig.builtin.BinStorage;
 import org.apache.pig.experimental.logical.LogicalPlanMigrationVistor;
 import org.apache.pig.experimental.logical.optimizer.UidStamper;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.impl.io.FileSpec;
+import org.apache.pig.impl.io.InterStorage;
 import org.apache.pig.impl.logicalLayer.LogicalPlan;
 import org.apache.pig.impl.plan.NodeIdGenerator;
 import org.apache.pig.impl.plan.OperatorKey;
@@ -393,7 +393,7 @@ public class HExecutionEngine {
                     NodeIdGenerator.getGenerator().getNextNodeId(scope)));
                 spec = new FileSpec(FileLocalizer.getTemporaryPath(
                     pigContext).toString(),
-                    new FuncSpec(BinStorage.class.getName()));
+                    new FuncSpec(InterStorage.class.getName()));
                 str.setSFile(spec);
                 plan.addAsLeaf(str);
             } else{

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=963504&r1=963503&r2=963504&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java Mon Jul 12 22:09:09 2010
@@ -72,7 +72,6 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POUnion;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage.PackageType;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
-import org.apache.pig.builtin.BinStorage;
 import org.apache.pig.data.DataType;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.builtin.DefaultIndexableLoader;
@@ -83,6 +82,7 @@ import org.apache.pig.impl.builtin.Poiss
 import org.apache.pig.impl.builtin.RandomSampleLoader;
 import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.impl.io.FileSpec;
+import org.apache.pig.impl.io.InterStorage;
 import org.apache.pig.impl.plan.CompilationMessageCollector;
 import org.apache.pig.impl.plan.DepthFirstWalker;
 import org.apache.pig.impl.plan.NodeIdGenerator;
@@ -594,7 +594,7 @@ public class MRCompiler extends PhyPlanV
      */
     private FileSpec getTempFileSpec() throws IOException {
         return new FileSpec(FileLocalizer.getTemporaryPath(pigContext).toString(),
-                new FuncSpec(BinStorage.class.getName()));
+                new FuncSpec(InterStorage.class.getName()));
     }
     
     /**
@@ -2077,7 +2077,7 @@ public class MRCompiler extends PhyPlanV
         // SampleLoader expects string version of FuncSpec 
         // as its first constructor argument.
         
-        rslargs[0] = (new FuncSpec(BinStorage.class.getName())).toString();
+        rslargs[0] = (new FuncSpec(InterStorage.class.getName())).toString();
         
         rslargs[1] = "100"; // The value is calculated based on the file size for skewed join
         FileSpec quantLdFilName = new FileSpec(lFile.getFileName(),

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/NoopStoreRemover.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/NoopStoreRemover.java?rev=963504&r1=963503&r2=963504&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/NoopStoreRemover.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/NoopStoreRemover.java Mon Jul 12 22:09:09 2010
@@ -41,7 +41,7 @@ import org.apache.pig.backend.hadoop.exe
  *
  *    ------------- Split ---------
  *    |                           |
- *  Store(BinStorage)         Store(StoreFunc)
+ *  Store(InterStorage)         Store(StoreFunc)
  * 
  * Followed by a load of the tmp store in a dependent MapReduceOper.
  *

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SampleOptimizer.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SampleOptimizer.java?rev=963504&r1=963503&r2=963504&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SampleOptimizer.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SampleOptimizer.java Mon Jul 12 22:09:09 2010
@@ -24,7 +24,6 @@ import java.util.List;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.pig.FuncSpec;
-import org.apache.pig.LoadFunc;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROpPlanVisitor;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
@@ -32,9 +31,8 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
-import org.apache.pig.impl.PigContext;
-import org.apache.pig.impl.builtin.RandomSampleLoader;
 import org.apache.pig.impl.io.FileSpec;
+import org.apache.pig.impl.io.InterStorage;
 import org.apache.pig.impl.plan.DepthFirstWalker;
 import org.apache.pig.impl.plan.PlanException;
 import org.apache.pig.impl.plan.VisitorException;
@@ -151,7 +149,7 @@ public class SampleOptimizer extends MRO
             }
             POLoad sl = (POLoad)root;
             if (loadFile.equals(sl.getLFile().getFileName()) && 
-                    "org.apache.pig.builtin.BinStorage".equals(sl.getLFile().getFuncName())) {
+                    InterStorage.class.getName().equals(sl.getLFile().getFuncName())) {
                 succLoad = sl;
                 break;
             }

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/SkewedPartitioner.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/SkewedPartitioner.java?rev=963504&r1=963503&r2=963504&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/SkewedPartitioner.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/SkewedPartitioner.java Mon Jul 12 22:09:09 2010
@@ -21,22 +21,20 @@ package org.apache.pig.backend.hadoop.ex
 import java.util.HashMap;
 import java.util.Map;
 
-
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapreduce.Partitioner;
 import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
+import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
+import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.io.NullablePartitionWritable;
 import org.apache.pig.impl.io.NullableTuple;
 import org.apache.pig.impl.io.PigNullableWritable;
-import org.apache.pig.impl.io.NullablePartitionWritable;
-
 import org.apache.pig.impl.util.Pair;
-import org.apache.pig.data.DefaultTupleFactory;
-import org.apache.pig.data.DataType;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
-import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
 
 
 /**
@@ -62,7 +60,7 @@ public class SkewedPartitioner extends P
         // for partition table, compute the index based on the sampler output
         Pair <Integer, Integer> indexes;
         Integer curIndex = -1;
-        Tuple keyTuple = DefaultTupleFactory.getInstance().newTuple(1);
+        Tuple keyTuple = TupleFactory.getInstance().newTuple(1);
 
         // extract the key from nullablepartitionwritable
         PigNullableWritable key = ((NullablePartitionWritable) wrappedKey).getKey();

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java?rev=963504&r1=963503&r2=963504&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java Mon Jul 12 22:09:09 2010
@@ -20,27 +20,25 @@ package org.apache.pig.backend.hadoop.ex
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.RawComparator;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapreduce.Partitioner;
-import org.apache.pig.LoadFunc;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.HDataType;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
 import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
-import org.apache.pig.builtin.BinStorage;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.InternalMap;
 import org.apache.pig.data.Tuple;
-import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.builtin.FindQuantiles;
-import org.apache.pig.impl.io.FileSpec;
+import org.apache.pig.impl.io.InterStorage;
 import org.apache.pig.impl.io.NullableBytesWritable;
 import org.apache.pig.impl.io.NullableDoubleWritable;
 import org.apache.pig.impl.io.NullableFloatWritable;
@@ -50,10 +48,6 @@ import org.apache.pig.impl.io.NullableTe
 import org.apache.pig.impl.io.NullableTuple;
 import org.apache.pig.impl.io.PigNullableWritable;
 import org.apache.pig.impl.io.ReadToEndLoader;
-import org.apache.pig.impl.util.ObjectSerializer;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 
 public class WeightedRangePartitioner extends Partitioner<PigNullableWritable, Writable>   
                                       implements Configurable {
@@ -109,7 +103,7 @@ public class WeightedRangePartitioner ex
                 conf.set("fs.hdfs.impl", configuration.get("fs.hdfs.impl"));
             conf.set(MapRedUtil.FILE_SYSTEM_NAME, "file:///");
             
-            ReadToEndLoader loader = new ReadToEndLoader(new BinStorage(),
+            ReadToEndLoader loader = new ReadToEndLoader(new InterStorage(),
                     conf, quantilesFile, 0);
             DataBag quantilesList;
             Tuple t = loader.getNext();

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java?rev=963504&r1=963503&r2=963504&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java Mon Jul 12 22:09:09 2010
@@ -42,10 +42,10 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.*;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ExpressionOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.BinaryExpressionOperator;
-import org.apache.pig.builtin.BinStorage;
 import org.apache.pig.impl.builtin.GFCross;
 import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.impl.io.FileSpec;
+import org.apache.pig.impl.io.InterStorage;
 import org.apache.pig.impl.logicalLayer.*;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
 import org.apache.pig.impl.plan.DependencyOrderWalker;
@@ -1490,7 +1490,7 @@ public class LogToPhyTranslationVisitor 
         physOp.setAlias(split.getAlias());
         FileSpec splStrFile;
         try {
-            splStrFile = new FileSpec(FileLocalizer.getTemporaryPath(pc).toString(),new FuncSpec(BinStorage.class.getName()));
+            splStrFile = new FileSpec(FileLocalizer.getTemporaryPath(pc).toString(),new FuncSpec(InterStorage.class.getName()));
         } catch (IOException e1) {
             byte errSrc = pc.getErrorSource();
             int errCode = 0;

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java?rev=963504&r1=963503&r2=963504&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java Mon Jul 12 22:09:09 2010
@@ -513,6 +513,8 @@ public class POPackage extends PhysicalO
                      } else {
                             bags[index].add(copy);
                      }
+                }else{
+                    break;
                 }
             }
         } 

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java?rev=963504&r1=963503&r2=963504&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java Mon Jul 12 22:09:09 2010
@@ -35,19 +35,19 @@ import org.apache.hadoop.fs.PathFilter;
 import org.apache.pig.FuncSpec;
 import org.apache.pig.PigException;
 import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
-import org.apache.pig.builtin.BinStorage;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.DataType;
-import org.apache.pig.data.DefaultTupleFactory;
 import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.builtin.PartitionSkewedKeys;
 import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.impl.io.FileSpec;
+import org.apache.pig.impl.io.InterStorage;
 import org.apache.pig.impl.io.ReadToEndLoader;
 import org.apache.pig.impl.plan.NodeIdGenerator;
 import org.apache.pig.impl.plan.OperatorKey;
@@ -86,7 +86,7 @@ public class MapRedUtil {
             conf.set("fs.hdfs.impl", PigMapReduce.sJobConf.get("fs.hdfs.impl"));
         conf.set(MapRedUtil.FILE_SYSTEM_NAME, "file:///");
 
-        ReadToEndLoader loader = new ReadToEndLoader(new BinStorage(), conf, 
+        ReadToEndLoader loader = new ReadToEndLoader(new InterStorage(), conf, 
                 keyDistFile, 0);
         DataBag partitionList;
         Tuple t = loader.getNext();
@@ -113,14 +113,14 @@ public class MapRedUtil {
             if (idxTuple.size() > 3) {
                 // remove the last 2 fields of the tuple, i.e: minIndex and maxIndex and store
                 // it in the reducer map
-                Tuple keyTuple = DefaultTupleFactory.getInstance().newTuple();
+                Tuple keyTuple = TupleFactory.getInstance().newTuple();
                 for (int i=0; i < idxTuple.size() - 2; i++) {
                     keyTuple.append(idxTuple.get(i));	
                 }
                 keyT = (E) keyTuple;
             } else {
                 if (keyType == DataType.TUPLE) {
-                    keyT = (E)DefaultTupleFactory.getInstance().newTuple(1);
+                    keyT = (E)TupleFactory.getInstance().newTuple(1);
                     ((Tuple)keyT).set(0,idxTuple.get(0));
                 } else {
                     keyT = (E) idxTuple.get(0);
@@ -151,7 +151,7 @@ public class MapRedUtil {
                     NodeIdGenerator.getGenerator().getNextNodeId(scope)));
                 spec = new FileSpec(FileLocalizer.getTemporaryPath(
                     pigContext).toString(),
-                    new FuncSpec(BinStorage.class.getName()));
+                    new FuncSpec(InterStorage.class.getName()));
                 str.setSFile(spec);
                 plan.addAsLeaf(str);
             } else{

Modified: hadoop/pig/trunk/src/org/apache/pig/builtin/BinStorage.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/builtin/BinStorage.java?rev=963504&r1=963503&r2=963504&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/builtin/BinStorage.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/builtin/BinStorage.java Mon Jul 12 22:09:09 2010
@@ -69,10 +69,6 @@ import org.apache.pig.impl.util.LogUtils
 public class BinStorage extends FileInputLoadFunc 
 implements LoadCaster, StoreFuncInterface, LoadMetadata {
 
-    
-    public static final int RECORD_1 = 0x01;
-    public static final int RECORD_2 = 0x02;
-    public static final int RECORD_3 = 0x03;
 
     Iterator<Tuple>     i              = null;
     private static final Log mLog = LogFactory.getLog(BinStorage.class);

Modified: hadoop/pig/trunk/src/org/apache/pig/builtin/COR.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/builtin/COR.java?rev=963504&r1=963503&r2=963504&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/builtin/COR.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/builtin/COR.java Mon Jul 12 22:09:09 2010
@@ -28,7 +28,7 @@ import org.apache.pig.EvalFunc;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.DefaultBagFactory;
-import org.apache.pig.data.DefaultTupleFactory;
+import org.apache.pig.data.TupleFactory;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
 
@@ -71,7 +71,7 @@ public class COR extends EvalFunc<DataBa
         DataBag output = DefaultBagFactory.getInstance().newDefaultBag();
         for(int i=0;i<input.size();i++){
             for(int j=i+1;j<input.size();j++){
-                Tuple temp = DefaultTupleFactory.getInstance().newTuple(3);
+                Tuple temp = TupleFactory.getInstance().newTuple(3);
                 try{
                     if(flag){
                         temp.set(0, schemaName.elementAt(i));
@@ -142,7 +142,7 @@ public class COR extends EvalFunc<DataBa
         public Tuple exec(Tuple input) throws IOException {
             if (input == null || input.size() == 0)
                 return null;
-            Tuple output = DefaultTupleFactory.getInstance().newTuple(input.size() * 2); 
+            Tuple output = TupleFactory.getInstance().newTuple(input.size() * 2); 
             try {
                 int k = -1;
                 for(int i=0;i<input.size();i++){
@@ -211,7 +211,7 @@ public class COR extends EvalFunc<DataBa
             DataBag output = DefaultBagFactory.getInstance().newDefaultBag();
             for(int i=0;i<totalSchemas;i++){
                 for(int j=i+1;j<totalSchemas;j++){
-                    Tuple result = DefaultTupleFactory.getInstance().newTuple(3);
+                    Tuple result = TupleFactory.getInstance().newTuple(3);
                     try{
                         if(flag){
                             result.set(0, schemaName.elementAt(i));
@@ -250,9 +250,9 @@ public class COR extends EvalFunc<DataBa
      * @throws IOException
      */
     static protected Tuple combine(DataBag values) throws IOException {
-        Tuple output = DefaultTupleFactory.getInstance().newTuple();
+        Tuple output = TupleFactory.getInstance().newTuple();
         Tuple tuple; // copy of DataBag values 
-        tuple =  DefaultTupleFactory.getInstance().newTuple(values.size());
+        tuple =  TupleFactory.getInstance().newTuple(values.size());
         int ct=0;
 
         try{
@@ -281,7 +281,7 @@ public class COR extends EvalFunc<DataBa
                     sum_x_square += (Double)tem.get(3);
                     sum_y_square += (Double)tem.get(4);
                 }
-                Tuple result = DefaultTupleFactory.getInstance().newTuple(5);
+                Tuple result = TupleFactory.getInstance().newTuple(5);
                 result.set(0, sum_x_y);
                 result.set(1, sum_x);
                 result.set(2, sum_y);
@@ -325,7 +325,7 @@ public class COR extends EvalFunc<DataBa
             throw new IOException("Caught exception processing input", e);
         }
         
-        Tuple result = DefaultTupleFactory.getInstance().newTuple(5);
+        Tuple result = TupleFactory.getInstance().newTuple(5);
         try{
             result.set(0, sum_x_y);
             result.set(1, sum_x);

Modified: hadoop/pig/trunk/src/org/apache/pig/builtin/COV.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/builtin/COV.java?rev=963504&r1=963503&r2=963504&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/builtin/COV.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/builtin/COV.java Mon Jul 12 22:09:09 2010
@@ -28,7 +28,7 @@ import org.apache.pig.EvalFunc;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.DefaultBagFactory;
-import org.apache.pig.data.DefaultTupleFactory;
+import org.apache.pig.data.TupleFactory;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
 
@@ -73,7 +73,7 @@ public class COV extends EvalFunc<DataBa
         try{
             for(int i=0;i<input.size();i++){
                 for(int j=i+1;j<input.size();j++){
-                    Tuple temp = DefaultTupleFactory.getInstance().newTuple(3);
+                    Tuple temp = TupleFactory.getInstance().newTuple(3);
                     if(flag){
                         temp.set(0, schemaName.elementAt(i));
                         temp.set(1, schemaName.elementAt(j));
@@ -143,7 +143,7 @@ public class COV extends EvalFunc<DataBa
             if (input == null || input.size() == 0)
                 return null;
 
-            Tuple output = DefaultTupleFactory.getInstance().newTuple();
+            Tuple output = TupleFactory.getInstance().newTuple();
             try {
                 for(int i=0;i<input.size();i++){
                     for(int j=i+1;j<input.size();j++){
@@ -208,7 +208,7 @@ public class COV extends EvalFunc<DataBa
                 }
                 for(int i=0;i<totalSchemas;i++){
                     for(int j=i+1;j<totalSchemas;j++){
-                        Tuple result = DefaultTupleFactory.getInstance().newTuple(3);
+                        Tuple result = TupleFactory.getInstance().newTuple(3);
                         if(flag){
                             result.set(0, schemaName.elementAt(i));
                             result.set(1, schemaName.elementAt(j));
@@ -243,8 +243,8 @@ public class COV extends EvalFunc<DataBa
      * @throws IOException
      */
     static protected Tuple combine(DataBag values) throws IOException {
-        Tuple tuple = DefaultTupleFactory.getInstance().newTuple(Double.valueOf(values.size()).intValue());
-        Tuple output = DefaultTupleFactory.getInstance().newTuple();
+        Tuple tuple = TupleFactory.getInstance().newTuple(Double.valueOf(values.size()).intValue());
+        Tuple output = TupleFactory.getInstance().newTuple();
 
         int ct=0;
         for (Iterator<Tuple> it = values.iterator(); it.hasNext();ct++) {
@@ -268,7 +268,7 @@ public class COV extends EvalFunc<DataBa
                     sum_x+=(Double)tem.get(1);
                     sum_y+=(Double)tem.get(2);
                 }
-                Tuple result = DefaultTupleFactory.getInstance().newTuple(3);
+                Tuple result = TupleFactory.getInstance().newTuple(3);
                 result.set(0, sum_x_y);
                 result.set(1, sum_x);
                 result.set(2, sum_y);
@@ -306,7 +306,7 @@ public class COV extends EvalFunc<DataBa
             throw new IOException("Caught exception processing input", e);
         }
         
-        Tuple result = DefaultTupleFactory.getInstance().newTuple(3);
+        Tuple result = TupleFactory.getInstance().newTuple(3);
         try{
             result.set(0, sum_x_y);
             result.set(1, sum_x);

Modified: hadoop/pig/trunk/src/org/apache/pig/builtin/Utf8StorageConverter.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/builtin/Utf8StorageConverter.java?rev=963504&r1=963503&r2=963504&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/builtin/Utf8StorageConverter.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/builtin/Utf8StorageConverter.java Mon Jul 12 22:09:09 2010
@@ -36,7 +36,6 @@ import org.apache.pig.data.DataBag;
 import org.apache.pig.data.DataByteArray;
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.DefaultBagFactory;
-import org.apache.pig.data.DefaultTupleFactory;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.util.LogUtils;
@@ -117,7 +116,7 @@ public class Utf8StorageConverter implem
                 return null;
             }
         }
-        Tuple t = DefaultTupleFactory.getInstance().newTuple();
+        Tuple t = TupleFactory.getInstance().newTuple();
         if (fieldSchema.getSchema()!=null && fieldSchema.getSchema().getFields().length!=0) {
             ResourceFieldSchema[] fss = fieldSchema.getSchema().getFields();
             // Interpret item inside tuple one by one based on the inner schema

Added: hadoop/pig/trunk/src/org/apache/pig/data/BinInterSedes.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/data/BinInterSedes.java?rev=963504&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/data/BinInterSedes.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/data/BinInterSedes.java Mon Jul 12 22:09:09 2010
@@ -0,0 +1,574 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.data;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.pig.PigException;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.classification.InterfaceAudience;
+import org.apache.pig.classification.InterfaceStability;
+
+/**
+ * A class to handle reading and writing of intermediate results of data
+ * types. The serialization format used by this class more efficient than 
+ * what was used in DataReaderWriter . 
+ * The format used by the functions in this class is subject to change, so it
+ * should be used ONLY to store intermediate results within a pig query.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Stable
+public class BinInterSedes implements InterSedes {
+
+    public static final byte BOOLEAN_TRUE = 0;
+    static final byte BOOLEAN_FALSE = 1;
+
+    public static final byte BYTE = 2;
+
+    public static final byte INTEGER = 3;
+    // since boolean is not supported yet(v0.7) as external type,
+    // lot of people use int instead
+    // and some data with old schema is likely stay for some time.
+    // so optimizing for that case as well
+    public static final byte INTEGER_0 = 4; 
+    public static final byte INTEGER_1 = 5;
+    public static final byte INTEGER_INSHORT = 6;
+    public static final byte INTEGER_INBYTE = 7;
+
+
+    public static final byte LONG = 8;
+    public static final byte FLOAT = 9;
+    public static final byte DOUBLE = 10;
+
+    public static final byte BYTEARRAY = 11;
+    public static final byte SMALLBYTEARRAY = 12;
+    public static final byte TINYBYTEARRAY = 13;
+
+    public static final byte CHARARRAY = 14;
+    public static final byte SMALLCHARARRAY = 15;
+
+    public static final byte MAP = 16;
+    public static final byte SMALLMAP = 17;
+    public static final byte TINYMAP = 18;
+
+    public static final byte TUPLE = 19;
+    public static final byte SMALLTUPLE = 20;
+    public static final byte TINYTUPLE = 21;
+
+    public static final byte BAG = 22;
+    public static final byte SMALLBAG = 23;
+    public static final byte TINYBAG = 24;
+
+    public static final byte GENERIC_WRITABLECOMPARABLE = 25;
+    public static final byte INTERNALMAP = 26;
+
+    public static final byte NULL = 27;
+
+    private static TupleFactory mTupleFactory = TupleFactory.getInstance();
+    private static BagFactory mBagFactory = BagFactory.getInstance();
+    static final int UNSIGNED_SHORT_MAX = 65535;
+    static final int UNSIGNED_BYTE_MAX = 255;
+    static final String UTF8 = "UTF-8";
+
+    
+    
+    private Tuple readTuple(DataInput in, byte type) throws IOException {
+        // Read the size.
+        int sz = getTupleSize(in, type);
+
+        Tuple t = mTupleFactory.newTuple(sz);
+        for (int i = 0; i < sz; i++) {
+            t.set(i, readDatum(in));
+        }
+        return t;
+
+    }
+    
+
+    
+    private int getTupleSize(DataInput in, byte type) throws IOException {
+        int sz ;
+        switch(type){
+        case TINYTUPLE:
+            sz = in.readUnsignedByte();
+            break;
+        case SMALLTUPLE:
+            sz = in.readUnsignedShort();
+            break;
+        case TUPLE:
+            sz = in.readInt();
+            break;
+        default: {
+            int errCode = 2112;
+            String msg = "Unexpected datatype " + type + " while reading tuple" +
+            "from binary file.";
+            throw new ExecException(msg, errCode, PigException.BUG);
+        }
+        }
+        // if sz == 0, we construct an "empty" tuple -
+        // presumably the writer wrote an empty tuple!
+        if (sz < 0) {
+            throw new IOException("Invalid size " + sz + " for a tuple");
+        }      
+        return sz;
+    }
+
+
+
+    private DataBag readBag(DataInput in, byte type) throws IOException {
+        DataBag bag = mBagFactory.newDefaultBag();
+        long size;
+        //determine size of bag
+        switch(type){
+        case TINYBAG:
+            size = in.readUnsignedByte();
+            break;
+        case SMALLBAG:
+            size = in.readUnsignedShort();
+            break;
+        case BAG:
+            size = in.readLong();
+            break;
+        default:
+            int errCode = 2219;
+            String msg = "Unexpected data while reading bag " +
+            "from binary file.";
+            throw new ExecException(msg, errCode, PigException.BUG);            
+        }
+
+        for (long i = 0; i < size; i++) {
+            try {
+                Object o = readDatum(in);
+                bag.add((Tuple)o);
+            } catch (ExecException ee) {
+                throw ee;
+            }
+        }   
+        return bag;
+    }
+    
+    
+    private Map<String, Object> readMap(DataInput in, byte type) throws IOException {
+        int size ;
+        switch(type){
+        case TINYMAP:
+            size = in.readUnsignedByte();
+            break;
+        case SMALLMAP:
+            size = in.readUnsignedShort();
+            break;
+        case MAP:
+            size = in.readInt();
+            break;
+        default: {
+            int errCode = 2220;
+            String msg = "Unexpected data while reading map" +
+            "from binary file.";
+            throw new ExecException(msg, errCode, PigException.BUG);  
+        }
+        }
+        Map<String, Object> m = new HashMap<String, Object>(size);
+        for (int i = 0; i < size; i++) {
+            String key = (String)readDatum(in);
+            m.put(key, readDatum(in));
+        }
+        return m;    
+    }
+
+    private InternalMap readInternalMap(DataInput in) throws IOException {
+        int size = in.readInt();    
+        InternalMap m = new InternalMap(size);
+        for (int i = 0; i < size; i++) {
+            Object key = readDatum(in);
+            m.put(key, readDatum(in));
+        }
+        return m;    
+    }
+    
+    private static String readCharArray(DataInput in) throws IOException{
+        return in.readUTF();
+    }
+
+    private static String readBigCharArray(DataInput in) throws IOException{
+        int size = in.readInt();
+        byte[] ba = new byte[size];
+        in.readFully(ba);
+        return new String(ba, UTF8);
+    }
+    
+    private Writable readWritable(DataInput in) throws IOException {
+        String className = (String) readDatum(in);
+        // create the writeable class . It needs to have a default constructor
+        Class<?> objClass = null ;
+        try {
+            objClass = Class.forName(className);
+        } catch (ClassNotFoundException e) {
+            throw new IOException("Could not find class " + className + 
+                    ", while attempting to de-serialize it ", e);
+        }
+        Writable writable = null;
+        try {
+            writable = (Writable) objClass.newInstance();
+        } catch (Exception e) {
+            String msg = "Could create instance of class " + className + 
+            ", while attempting to de-serialize it. (no default constructor ?)";
+            throw new IOException(msg, e);
+        } 
+        
+        //read the fields of the object from DataInput
+        writable.readFields(in);
+        return writable;
+    }
+    
+    /* (non-Javadoc)
+     * @see org.apache.pig.data.InterSedes#readDatum(java.io.DataInput)
+     */
+    public Object readDatum(DataInput in) throws IOException, ExecException {
+        // Read the data type
+        byte b = in.readByte();
+        return readDatum(in, b);
+    }
+    
+    private static Object readBytes(DataInput in, int size) throws IOException {
+        byte[] ba = new byte[size];
+        in.readFully(ba);
+        return new DataByteArray(ba);
+    }
+        
+    /* (non-Javadoc)
+     * @see org.apache.pig.data.InterSedes#readDatum(java.io.DataInput, byte)
+     */
+    public Object readDatum(DataInput in, byte type) throws IOException, ExecException {
+        switch (type) {
+            case TUPLE: 
+            case TINYTUPLE:
+            case SMALLTUPLE:
+                return readTuple(in, type);
+            
+            case BAG: 
+            case TINYBAG:
+            case SMALLBAG:
+                return readBag(in, type);
+
+            case MAP: 
+            case TINYMAP:
+            case SMALLMAP:
+                return readMap(in, type);    
+
+            case INTERNALMAP: 
+                return readInternalMap(in);    
+
+            case INTEGER_0:
+                return Integer.valueOf(0);
+            case INTEGER_1:
+                return Integer.valueOf(1);
+            case INTEGER_INBYTE:
+                return Integer.valueOf(in.readByte());                
+            case INTEGER_INSHORT:
+                return Integer.valueOf(in.readShort());
+            case INTEGER:
+                return Integer.valueOf(in.readInt());
+
+            case LONG:
+                return Long.valueOf(in.readLong());
+
+            case FLOAT:
+                return Float.valueOf(in.readFloat());
+
+            case DOUBLE:
+                return Double.valueOf(in.readDouble());
+
+            case BOOLEAN_TRUE:
+                return Boolean.valueOf(true);
+                
+            case BOOLEAN_FALSE:
+                return Boolean.valueOf(false);
+
+            case BYTE:
+                return Byte.valueOf(in.readByte());
+
+            case TINYBYTEARRAY :{
+                int size = in.readUnsignedByte();
+                return readBytes(in, size);
+            }
+            
+            case SMALLBYTEARRAY :{
+                int size = in.readUnsignedShort();
+                return readBytes(in, size);
+            }
+                
+            case BYTEARRAY: {
+                int size = in.readInt();
+                return readBytes(in, size);
+            }
+            
+            case CHARARRAY: 
+                return readBigCharArray(in);
+
+            case SMALLCHARARRAY: 
+                return readCharArray(in);
+                
+            case GENERIC_WRITABLECOMPARABLE :
+                return readWritable(in);
+                
+            case NULL:
+                return null;
+
+            default:
+                throw new RuntimeException("Unexpected data type " + type +
+                    " found in stream.");
+        }
+    }
+
+
+    /* (non-Javadoc)
+     * @see org.apache.pig.data.InterSedes#writeDatum(java.io.DataOutput, java.lang.Object)
+     */
+    @SuppressWarnings("unchecked")
+    public void writeDatum(
+            DataOutput out,
+            Object val) throws IOException {
+        // Read the data type
+        byte type = DataType.findType(val);
+        switch (type) {
+            case DataType.TUPLE:
+                writeTuple(out, (Tuple)val);
+                break;
+                
+            case DataType.BAG:
+                writeBag(out, (DataBag)val);
+                break;
+
+            case DataType.MAP: {
+                writeMap(out, (Map<String, Object>)val);
+
+                break;
+                               }
+            
+            case DataType.INTERNALMAP: {
+                out.writeByte(INTERNALMAP);
+                Map<Object, Object> m = (Map<Object, Object>)val;
+                out.writeInt(m.size());
+                Iterator<Map.Entry<Object, Object> > i =
+                    m.entrySet().iterator();
+                while (i.hasNext()) {
+                    Map.Entry<Object, Object> entry = i.next();
+                    writeDatum(out, entry.getKey());
+                    writeDatum(out, entry.getValue());
+                }
+                break;
+                               }
+            
+            case DataType.INTEGER:
+                int i = (Integer)val;
+                if(i == 0){
+                    out.writeByte(INTEGER_0);
+                }else if(i == 1){
+                    out.writeByte(INTEGER_1);
+                }
+                else if(Byte.MIN_VALUE <= i && i <= Byte.MAX_VALUE  ){
+                    out.writeByte(INTEGER_INBYTE);
+                    out.writeByte(i);
+                }
+                else if(Short.MIN_VALUE <= i && i <= Short.MAX_VALUE ){
+                    out.writeByte(INTEGER_INSHORT);
+                    out.writeShort(i);
+                }
+                else{
+                    out.writeByte(INTEGER);
+                    out.writeInt(i);
+                }
+
+
+                break;
+
+            case DataType.LONG:
+                out.writeByte(LONG);
+                out.writeLong((Long)val);
+                break;
+
+            case DataType.FLOAT:
+                out.writeByte(FLOAT);
+                out.writeFloat((Float)val);
+                break;
+
+            case DataType.DOUBLE:
+                out.writeByte(DOUBLE);
+                out.writeDouble((Double)val);
+                break;
+
+            case DataType.BOOLEAN:
+                if(((Boolean)val) == true)
+                    out.writeByte(BOOLEAN_TRUE);
+                else
+                    out.writeByte(BOOLEAN_FALSE);
+                break;
+
+            case DataType.BYTE:
+                out.writeByte(BYTE);
+                out.writeByte((Byte)val);
+                break;
+
+            case DataType.BYTEARRAY: {
+                DataByteArray bytes = (DataByteArray)val;
+                final int sz = bytes.size();
+                if(sz < UNSIGNED_BYTE_MAX){
+                    out.writeByte(TINYBYTEARRAY);
+                    out.writeByte(sz);
+                }
+                else if(sz < UNSIGNED_SHORT_MAX){
+                    out.writeByte(SMALLBYTEARRAY);
+                    out.writeShort(sz);
+                }
+                else {
+                    out.writeByte(BYTEARRAY);
+                    out.writeInt(sz);
+                }
+                out.write(bytes.mData);
+                
+                break;
+
+            }
+
+            case DataType.CHARARRAY: {
+                String s = (String)val;
+                // a char can take up to 3 bytes in the modified utf8 encoding
+                // used by DataOutput.writeUTF, so use UNSIGNED_SHORT_MAX/3
+                if(s.length() < UNSIGNED_SHORT_MAX/3) {
+                    out.writeByte(SMALLCHARARRAY);
+                    out.writeUTF(s);
+                } else {
+                    byte[] utfBytes = s.getBytes(UTF8);
+                    int length = utfBytes.length;
+
+                    out.writeByte(CHARARRAY);
+                    out.writeInt(length);
+                    out.write(utfBytes);
+                }
+                break;
+            }
+            case DataType.GENERIC_WRITABLECOMPARABLE :
+                out.writeByte(GENERIC_WRITABLECOMPARABLE);
+                //store the class name, so we know the class to create on read
+                writeDatum(out, val.getClass().getName());
+                Writable writable = (Writable)val;
+                writable.write(out);
+                break;
+
+            case DataType.NULL:
+                out.writeByte(NULL);
+                break;
+
+            default:
+                throw new RuntimeException("Unexpected data type " + type +
+                    " found in stream.");
+        }
+    }
+
+    private void writeMap(DataOutput out, Map<String, Object> m)
+    throws IOException {
+
+        final int sz = m.size();
+        if(sz < UNSIGNED_BYTE_MAX){
+            out.writeByte(TINYMAP);
+            out.writeByte(sz);
+        }else if(sz < UNSIGNED_SHORT_MAX){
+            out.writeByte(SMALLMAP);
+            out.writeShort(sz);
+        }else {
+            out.writeByte(MAP);       
+            out.writeInt(sz);
+        }
+        Iterator<Map.Entry<String, Object> > i =
+            m.entrySet().iterator();
+        while (i.hasNext()) {
+            Map.Entry<String, Object> entry = i.next();
+            writeDatum(out, entry.getKey());
+            writeDatum(out, entry.getValue());
+        }
+    }
+
+
+
+    private void writeBag(DataOutput out, DataBag bag)
+    throws IOException {
+        // We don't care whether this bag was sorted or distinct because
+        // using the iterator to write it will guarantee those things come
+        // correctly.  And on the other end there'll be no reason to waste
+        // time re-sorting or re-applying distinct.
+        final long sz = bag.size();
+        if(sz < UNSIGNED_BYTE_MAX){
+            out.writeByte(TINYBAG);
+            out.writeByte((int)sz);
+        }else if(sz < UNSIGNED_SHORT_MAX){
+            out.writeByte(SMALLBAG);
+            out.writeShort((int)sz);           
+        }else {
+            out.writeByte(BAG);
+            out.writeLong(sz);
+        }
+
+        Iterator<Tuple> it = bag.iterator();
+        while (it.hasNext()) {
+            writeTuple(out, it.next());
+        } 
+        
+    }
+
+    private void writeTuple(DataOutput out, Tuple t) throws IOException {
+        final int sz = t.size();
+        if(sz < UNSIGNED_BYTE_MAX){
+            out.writeByte(TINYTUPLE);
+            out.writeByte(sz);
+        }else if(sz < UNSIGNED_SHORT_MAX){
+            out.writeByte(SMALLTUPLE);
+            out.writeShort(sz);
+        }else{
+            out.writeByte(TUPLE);
+            out.writeInt(sz);
+        }
+
+        for (int i = 0; i < sz; i++) {
+            writeDatum(out, t.get(i));
+        }
+    }
+
+
+
+    /* (non-Javadoc)
+     * @see org.apache.pig.data.InterSedes#addColsToTuple(java.io.DataInput, org.apache.pig.data.Tuple)
+     */
+    @Override
+    public void addColsToTuple(DataInput in, Tuple t)
+    throws IOException {
+        byte type = in.readByte();
+        int sz = getTupleSize(in, type);
+        for (int i = 0; i < sz; i++) {
+            t.append(readDatum(in));
+        }
+    }
+
+
+}
+

Added: hadoop/pig/trunk/src/org/apache/pig/data/BinSedesTuple.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/data/BinSedesTuple.java?rev=963504&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/data/BinSedesTuple.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/data/BinSedesTuple.java Mon Jul 12 22:09:09 2010
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.data;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.pig.classification.InterfaceAudience;
+
+/**
+ * This tuple has a faster (de)serialization mechanism. It to be used for
+ * storing intermediate data between Map and Reduce and between MR jobs.
+ * This is for internal pig use only. The serialization format can change, so
+ *  do not use it for storing any persistant data (ie in load/store functions).
+ */
+@InterfaceAudience.Private
+public class BinSedesTuple extends DefaultTuple {
+
+    private static final long serialVersionUID = 1L;
+    private static final InterSedes sedes = InterSedesFactory.getInterSedesInstance();
+    
+
+    public void write(DataOutput out) throws IOException {
+        sedes.writeDatum(out, this);
+    }
+
+    public void readFields(DataInput in) throws IOException {
+
+        // Clear our fields, in case we're being reused.
+        mFields.clear();
+        sedes.addColsToTuple(in, this);
+    } 
+    
+
+
+    /**
+     * Default constructor
+     */
+    BinSedesTuple() {
+       super();
+    }
+
+    /**
+     * Construct a tuple with a known number of fields.  Package level so
+     * that callers cannot directly invoke it.
+     * @param size Number of fields to allocate in the tuple.
+     */
+    BinSedesTuple(int size) {
+        super(size);
+    }
+
+    /**
+     * Construct a tuple from an existing list of objects.  Package
+     * level so that callers cannot directly invoke it.
+     * @param c List of objects to turn into a tuple.
+     */
+    BinSedesTuple(List<Object> c) {
+        super(c);
+    }
+
+    /**
+     * Construct a tuple from an existing list of objects.  Package
+     * level so that callers cannot directly invoke it.
+     * @param c List of objects to turn into a tuple.  This list will be kept
+     * as part of the tuple.
+     * @param junk Just used to differentiate from the constructor above that
+     * copies the list.
+     */
+    BinSedesTuple(List<Object> c, int junk) {
+        super(c, junk);
+    }
+    
+}

Added: hadoop/pig/trunk/src/org/apache/pig/data/BinSedesTupleFactory.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/data/BinSedesTupleFactory.java?rev=963504&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/data/BinSedesTupleFactory.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/data/BinSedesTupleFactory.java Mon Jul 12 22:09:09 2010
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.data;
+
+import java.lang.Class;
+import java.util.List;
+
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.classification.InterfaceAudience;
+
+/**
+ * Default implementation of TupleFactory.
+ */
+@InterfaceAudience.Private
+public class BinSedesTupleFactory extends TupleFactory {
+    public Tuple newTuple() {
+        return new BinSedesTuple();
+    
+    }
+
+    public Tuple newTuple(int size) {
+        return new BinSedesTuple(size);
+    }
+    
+    @SuppressWarnings("unchecked")
+    public Tuple newTuple(List c) {
+        return new BinSedesTuple(c);
+    }
+
+    @SuppressWarnings("unchecked")
+    public Tuple newTupleNoCopy(List list) {
+        return new BinSedesTuple(list, 1);
+    }
+
+    public Tuple newTuple(Object datum) {
+        Tuple t = new BinSedesTuple(1);
+        try {
+            t.set(0, datum);
+        } catch (ExecException e) {
+            // The world has come to an end, we just allocated a tuple with one slot
+            // but we can't write to that slot.
+            throw new RuntimeException("Unable to write to field 0 in newly " +
+                "allocated tuple of size 1!", e);
+        }
+        return t;
+    }
+
+    public Class tupleClass() {
+        return BinSedesTuple.class;
+    }
+
+    BinSedesTupleFactory() {
+    }
+
+}
+

Modified: hadoop/pig/trunk/src/org/apache/pig/data/DataReaderWriter.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/data/DataReaderWriter.java?rev=963504&r1=963503&r2=963504&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/data/DataReaderWriter.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/data/DataReaderWriter.java Mon Jul 12 22:09:09 2010
@@ -35,8 +35,10 @@ import org.apache.pig.classification.Int
 import org.apache.pig.backend.executionengine.ExecException;
 
 /**
- * A class to handle reading and writing of intermediate results of data
- * types.  This class could also be used for storing permanent results.
+ * This class was used to handle reading and writing of intermediate
+ *  results of data types. Now that functionality is in {@link BinInterSedes}
+ *  This class could also be used for storing permanent results, it used 
+ *  by BinStorage and Zebra through DefaultTuple class.
  */
 @InterfaceAudience.Private
 @InterfaceStability.Stable
@@ -68,7 +70,16 @@ public class DataReaderWriter {
     
     public static DataBag bytesToBag(DataInput in) throws IOException {
         DataBag bag = mBagFactory.newDefaultBag();
-        bag.readFields(in);
+        long size = in.readLong();
+        
+        for (long i = 0; i < size; i++) {
+            try {
+                Object o = readDatum(in);
+                bag.add((Tuple)o);
+            } catch (ExecException ee) {
+                throw ee;
+            }
+        }
         return bag;
     }
     
@@ -202,16 +213,23 @@ public class DataReaderWriter {
         byte type = DataType.findType(val);
         switch (type) {
             case DataType.TUPLE:
-                // Because tuples are written directly by hadoop, the
-                // tuple's write method needs to write the indicator byte.
-                // So don't write the indicator byte here as it is for
-                // everyone else.
-                ((Tuple)val).write(out);
+                Tuple t = (Tuple)val;
+                out.writeByte(DataType.TUPLE);
+                int sz = t.size();
+                out.writeInt(sz);
+                for (int i = 0; i < sz; i++) {
+                    DataReaderWriter.writeDatum(out, t.get(i));
+                }
                 break;
                 
             case DataType.BAG:
+                DataBag bag = (DataBag)val;
                 out.writeByte(DataType.BAG);
-                ((DataBag)val).write(out);
+                out.writeLong(bag.size());
+                Iterator<Tuple> it = bag.iterator();
+                while (it.hasNext()) {
+                    DataReaderWriter.writeDatum(out, it.next());
+                }  
                 break;
 
             case DataType.MAP: {

Modified: hadoop/pig/trunk/src/org/apache/pig/data/DefaultAbstractBag.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/data/DefaultAbstractBag.java?rev=963504&r1=963503&r2=963504&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/data/DefaultAbstractBag.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/data/DefaultAbstractBag.java Mon Jul 12 22:09:09 2010
@@ -47,10 +47,11 @@ import org.apache.commons.logging.LogFac
  */
 public abstract class DefaultAbstractBag implements DataBag {
 
-     private static final Log log = LogFactory.getLog(DataBag.class);
-     
-     private static PigLogger pigLogger = PhysicalOperator.getPigLogger();
+    private static final Log log = LogFactory.getLog(DataBag.class);
 
+    private static PigLogger pigLogger = PhysicalOperator.getPigLogger();
+
+    private static InterSedes sedes = InterSedesFactory.getInterSedesInstance();
     // Container that holds the tuples. Actual object instantiated by
     // subclasses.
     protected Collection<Tuple> mContents;
@@ -239,16 +240,7 @@ public abstract class DefaultAbstractBag
      * @throws IOException (passes it on from underlying calls).
      */
     public void write(DataOutput out) throws IOException {
-        // We don't care whether this bag was sorted or distinct because
-        // using the iterator to write it will guarantee those things come
-        // correctly.  And on the other end there'll be no reason to waste
-        // time re-sorting or re-applying distinct.
-        out.writeLong(size());
-        Iterator<Tuple> it = iterator();
-        while (it.hasNext()) {
-            Tuple item = it.next();
-            item.write(out);
-        }    
+        sedes.writeDatum(out, this);
     }
  
     /**
@@ -261,7 +253,7 @@ public abstract class DefaultAbstractBag
         
         for (long i = 0; i < size; i++) {
             try {
-                Object o = DataReaderWriter.readDatum(in);
+                Object o = sedes.readDatum(in);
                 add((Tuple)o);
             } catch (ExecException ee) {
                 throw ee;

Modified: hadoop/pig/trunk/src/org/apache/pig/data/DefaultTuple.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/data/DefaultTuple.java?rev=963504&r1=963503&r2=963504&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/data/DefaultTuple.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/data/DefaultTuple.java Mon Jul 12 22:09:09 2010
@@ -31,15 +31,17 @@ import org.apache.pig.backend.executione
 import org.apache.pig.impl.util.TupleFormat;
 
 /**
- * A default implementation of Tuple.  This class will be created by the
- * DefaultTupleFactory.
+ * This was the old default implementation of Tuple. The new default is
+ * {@link BinSedesTuple} .   
+ * Zebra and BinStorage load/store functions use the .write(..) and .readFields(..)
+ * functions here for (de)serialization.
  */
 public class DefaultTuple implements Tuple {
     
     protected boolean isNull = false;
     private static final long serialVersionUID = 2L;
     protected List<Object> mFields;
-    
+        
     /**
      * Default constructor.  This constructor is public so that hadoop can call
      * it directly.  However, inside pig you should never be calling this
@@ -257,12 +259,7 @@ public class DefaultTuple implements Tup
     }
 
     public void write(DataOutput out) throws IOException {
-        out.writeByte(DataType.TUPLE);
-        int sz = size();
-        out.writeInt(sz);
-        for (int i = 0; i < sz; i++) {
-            DataReaderWriter.writeDatum(out, mFields.get(i));
-        }
+        DataReaderWriter.writeDatum(out, this);
     }
 
     public void readFields(DataInput in) throws IOException {

Modified: hadoop/pig/trunk/src/org/apache/pig/data/DefaultTupleFactory.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/data/DefaultTupleFactory.java?rev=963504&r1=963503&r2=963504&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/data/DefaultTupleFactory.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/data/DefaultTupleFactory.java Mon Jul 12 22:09:09 2010
@@ -17,53 +17,21 @@
  */
 package org.apache.pig.data;
 
-import java.lang.Class;
-import java.util.List;
-
-import org.apache.pig.backend.executionengine.ExecException;
-
 /**
- * Default implementation of TupleFactory.
+ * This class was being used to create new tuple instances in some
+ * udfs and other places in pig code, even though the tuplefactory creation
+ * function (getInstance()) is static in TupleFactory
+ *  (ie DefaultTuple could not override it)
+ *  A typical call is - DefaultTupleFactory.getInstance().newTuple(..);
+ *  
+ *  So that such external udfs don't break, a DefaultTupleFactory is present.
+ *  Don't use this in your code, use TupleFactory directly instead.
  */
-public class DefaultTupleFactory extends TupleFactory {
-    public Tuple newTuple() {
-        return new DefaultTuple();
-    
-    }
-
-    public Tuple newTuple(int size) {
-        return new DefaultTuple(size);
-    }
-    
-    @SuppressWarnings("unchecked")
-    public Tuple newTuple(List c) {
-        return new DefaultTuple(c);
-    }
-
-    @SuppressWarnings("unchecked")
-    public Tuple newTupleNoCopy(List list) {
-        return new DefaultTuple(list, 1);
-    }
-
-    public Tuple newTuple(Object datum) {
-        Tuple t = new DefaultTuple(1);
-        try {
-            t.set(0, datum);
-        } catch (ExecException e) {
-            // The world has come to an end, we just allocated a tuple with one slot
-            // but we can't write to that slot.
-            throw new RuntimeException("Unable to write to field 0 in newly " +
-                "allocated tuple of size 1!", e);
-        }
-        return t;
-    }
 
-    public Class tupleClass() {
-        return DefaultTuple.class;
-    }
-
-    DefaultTupleFactory() {
-    }
+/**
+ * @deprecated Use {@link TupleFactory}
+ */
+@Deprecated 
+public class DefaultTupleFactory extends BinSedesTupleFactory {
 
 }
-

Added: hadoop/pig/trunk/src/org/apache/pig/data/InterSedes.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/data/InterSedes.java?rev=963504&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/data/InterSedes.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/data/InterSedes.java Mon Jul 12 22:09:09 2010
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.data;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.classification.InterfaceAudience;
+import org.apache.pig.classification.InterfaceStability;
+
+/**
+ * A class to handle reading and writing of intermediate results of data
+ * types. The serialization format used by this class more efficient than 
+ * what was used in DataReaderWriter . 
+ * The format used by the functions in this class is subject to change, so it
+ * should be used ONLY to store intermediate results within a pig query.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public interface InterSedes {
+    
+    /**
+     * Get the next object from DataInput in
+     * @param in
+     * @return Next object from DataInput in
+     * @throws IOException
+     * @throws ExecException
+     */
+    public Object readDatum(DataInput in)
+    throws IOException, ExecException;
+    
+    /**
+     * Get the next object from DataInput in of the type of type argument
+     * The type information has been read from DataInput.
+     * @param in
+     * @param type
+     * @return Next object from DataInput in
+     * @throws IOException
+     * @throws ExecException
+     */
+    public Object readDatum(DataInput in, byte type)
+    throws IOException, ExecException;
+    
+    /**
+     * The type of next object has been determined to be of type Tuple,
+     * add the columns that belong to the tuple to given tuple argument t
+     * @param in
+     * @param t
+     * @throws IOException
+     */
+    public void addColsToTuple(DataInput in, Tuple t)
+    throws IOException;
+
+    /**
+     * Write given object val to DataOutput out
+     * @param out
+     * @param val
+     * @throws IOException
+     */
+    public void writeDatum(DataOutput out, Object val) 
+    throws IOException;
+}
+

Added: hadoop/pig/trunk/src/org/apache/pig/data/InterSedesFactory.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/data/InterSedesFactory.java?rev=963504&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/data/InterSedesFactory.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/data/InterSedesFactory.java Mon Jul 12 22:09:09 2010
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.data;
+
+import org.apache.pig.classification.InterfaceAudience;
+import org.apache.pig.classification.InterfaceStability;
+
+
+/**
+ * Used to get hold of the single instance of InterSedes .
+ * In future, the subclass of InterSedes that gets instantiated would 
+ * be configurable.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class InterSedesFactory {
+    
+    private static InterSedes instance = null;
+    public static InterSedes getInterSedesInstance(){
+        if(instance == null){
+            instance = new BinInterSedes();
+        }
+        return instance;
+    }
+    
+
+}

Modified: hadoop/pig/trunk/src/org/apache/pig/data/TupleFactory.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/data/TupleFactory.java?rev=963504&r1=963503&r2=963504&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/data/TupleFactory.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/data/TupleFactory.java Mon Jul 12 22:09:09 2010
@@ -74,7 +74,7 @@ public abstract class TupleFactory {
                         + "tuple factory " + factoryName, e);
                 }
             } else {
-                gSelf = new DefaultTupleFactory();
+                gSelf = new BinSedesTupleFactory();
             }
         }
         return gSelf;

Modified: hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LogToPhyTranslationVisitor.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LogToPhyTranslationVisitor.java?rev=963504&r1=963503&r2=963504&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LogToPhyTranslationVisitor.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LogToPhyTranslationVisitor.java Mon Jul 12 22:09:09 2010
@@ -44,11 +44,9 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POUnion;
-import org.apache.pig.builtin.BinStorage;
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
-import org.apache.pig.experimental.logical.expression.BagDereferenceExpression;
 import org.apache.pig.experimental.logical.expression.ExpToPhyTranslationVisitor;
 import org.apache.pig.experimental.logical.expression.LogicalExpressionPlan;
 import org.apache.pig.experimental.logical.expression.ProjectExpression;
@@ -62,8 +60,8 @@ import org.apache.pig.experimental.plan.
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.impl.io.FileSpec;
+import org.apache.pig.impl.io.InterStorage;
 import org.apache.pig.impl.logicalLayer.FrontendException;
-import org.apache.pig.impl.logicalLayer.LogicalOperator;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
 import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
 import org.apache.pig.impl.plan.NodeIdGenerator;
@@ -889,7 +887,7 @@ public class LogToPhyTranslationVisitor 
         physOp.setAlias(loSplit.getAlias());
         FileSpec splStrFile;
         try {
-            splStrFile = new FileSpec(FileLocalizer.getTemporaryPath(pc).toString(),new FuncSpec(BinStorage.class.getName()));
+            splStrFile = new FileSpec(FileLocalizer.getTemporaryPath(pc).toString(),new FuncSpec(InterStorage.class.getName()));
         } catch (IOException e1) {
             byte errSrc = pc.getErrorSource();
             int errCode = 0;

Modified: hadoop/pig/trunk/src/org/apache/pig/impl/io/BinStorageRecordWriter.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/io/BinStorageRecordWriter.java?rev=963504&r1=963503&r2=963504&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/io/BinStorageRecordWriter.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/io/BinStorageRecordWriter.java Mon Jul 12 22:09:09 2010
@@ -23,6 +23,8 @@ import java.io.IOException;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.pig.data.DataReaderWriter;
+import org.apache.pig.data.DefaultTuple;
 import org.apache.pig.data.Tuple;
 
 /**
@@ -66,7 +68,7 @@ public class BinStorageRecordWriter exte
         out.write(RECORD_1);
         out.write(RECORD_2);
         out.write(RECORD_3);
-        t.write(out);
+        DataReaderWriter.writeDatum(out, t);
         
     }
 

Added: hadoop/pig/trunk/src/org/apache/pig/impl/io/InterRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/io/InterRecordReader.java?rev=963504&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/io/InterRecordReader.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/io/InterRecordReader.java Mon Jul 12 22:09:09 2010
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.io;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.BinInterSedes;
+import org.apache.pig.data.InterSedes;
+import org.apache.pig.data.InterSedesFactory;
+import org.apache.pig.data.Tuple;
+
+/**
+ * A record reader used to read data written using {@link InterRecordWriter}
+ * It uses the default InterSedes object for deserialization.
+ */
+public class InterRecordReader extends RecordReader<Text, Tuple> {
+
+  private long start;
+  private long pos;
+  private long end;
+  private BufferedPositionedInputStream in;
+  private Tuple value = null;
+  public static final int RECORD_1 = 0x01;
+  public static final int RECORD_2 = 0x02;
+  public static final int RECORD_3 = 0x03;
+  private DataInputStream inData = null;
+  private static InterSedes sedes = InterSedesFactory.getInterSedesInstance();
+
+  public void initialize(InputSplit genericSplit,
+                         TaskAttemptContext context) throws IOException {
+    FileSplit split = (FileSplit) genericSplit;
+    Configuration job = context.getConfiguration();
+    start = split.getStart();
+    end = start + split.getLength();
+    final Path file = split.getPath();
+
+    // open the file and seek to the start of the split
+    FileSystem fs = file.getFileSystem(job);
+    FSDataInputStream fileIn = fs.open(split.getPath());
+    if (start != 0) {
+        fileIn.seek(start);
+    }
+    in = new BufferedPositionedInputStream(fileIn, start);
+    inData = new DataInputStream(in);
+  }
+  
+  public boolean nextKeyValue() throws IOException {
+      int b = 0;
+      //    skip to next record
+      while (true) {
+          if (in == null || in.getPosition() >=end) {
+              return false;
+          }
+          // check if we saw RECORD_1 in our last attempt
+          // this can happen if we have the following 
+          // sequence RECORD_1-RECORD_1-RECORD_2-RECORD_3
+          // After reading the second RECORD_1 in the above
+          // sequence, we should not look for RECORD_1 again
+          if(b != RECORD_1) {
+              b = in.read();
+              if(b != RECORD_1 && b != -1) {
+                  continue;
+              }
+              if(b == -1) return false;
+          }
+          b = in.read();
+          if(b != RECORD_2 && b != -1) {
+              continue;
+          }
+          if(b == -1) return false;
+          b = in.read();
+          if(b != RECORD_3 && b != -1) {
+              continue;
+          }
+          if(b == -1) return false;
+          b = in.read();
+          if(b != BinInterSedes.TINYTUPLE && 
+                  b != BinInterSedes.SMALLTUPLE &&
+                  b != BinInterSedes.TUPLE &&
+                  b != -1) {
+              continue;
+          }
+          if(b == -1) return false;
+          break;
+      }
+      try {
+          // if we got here, we have seen RECORD_1-RECORD_2-RECORD_3-TUPLE_MARKER
+          // sequence - lets now read the contents of the tuple 
+          value =  (Tuple)sedes.readDatum(inData, (byte)b);
+          return true;
+      } catch (ExecException ee) {
+          throw ee;
+      }
+
+  }
+
+  @Override
+  public Text getCurrentKey() {
+      // the key is always null since we don't really have a key for each
+      // input record
+      return null;
+  }
+
+  @Override
+  public Tuple getCurrentValue() {
+    return value;
+  }
+
+  /**
+   * Get the progress within the split
+   */
+  public float getProgress() {
+    if (start == end) {
+      return 0.0f;
+    } else {
+      return Math.min(1.0f, (pos - start) / (float)(end - start));
+    }
+  }
+  
+  public synchronized void close() throws IOException {
+    if (in != null) {
+      in.close(); 
+    }
+  }
+}

Added: hadoop/pig/trunk/src/org/apache/pig/impl/io/InterRecordWriter.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/io/InterRecordWriter.java?rev=963504&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/io/InterRecordWriter.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/io/InterRecordWriter.java Mon Jul 12 22:09:09 2010
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.io;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.pig.data.InterSedes;
+import org.apache.pig.data.InterSedesFactory;
+import org.apache.pig.data.Tuple;
+
+
+/**
+ * A record reader used to write data compatible with {@link InterRecordWriter}
+ * It uses the default InterSedes object for serialization.
+ */
+public class InterRecordWriter extends
+        RecordWriter<org.apache.hadoop.io.WritableComparable, Tuple> {
+
+    public static final int RECORD_1 = 0x01;
+    public static final int RECORD_2 = 0x02;
+    public static final int RECORD_3 = 0x03;
+    private static InterSedes sedes = InterSedesFactory.getInterSedesInstance();
+    /**
+     * the outputstream to write out on
+     */
+    private DataOutputStream out;
+    
+    /**
+     * 
+     */
+    public InterRecordWriter(DataOutputStream out) {
+        this.out = out;
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.hadoop.mapreduce.RecordWriter#close(org.apache.hadoop.mapreduce.TaskAttemptContext)
+     */
+    @Override
+    public void close(TaskAttemptContext arg0) throws IOException,
+            InterruptedException {
+        out.close();        
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.hadoop.mapreduce.RecordWriter#write(java.lang.Object, java.lang.Object)
+     */
+    @Override
+    public void write(WritableComparable wc, Tuple t) throws IOException,
+            InterruptedException {
+        // we really only want to write the tuple (value) out here
+        out.write(RECORD_1);
+        out.write(RECORD_2);
+        out.write(RECORD_3);
+        sedes.writeDatum(out, t);
+        
+    }
+
+}