You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by th...@apache.org on 2010/08/26 19:11:37 UTC

svn commit: r989828 - in /hadoop/pig/trunk: ./ src/org/apache/pig/ src/org/apache/pig/backend/hadoop/executionengine/ src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/par...

Author: thejas
Date: Thu Aug 26 17:11:36 2010
New Revision: 989828

URL: http://svn.apache.org/viewvc?rev=989828&view=rev
Log:
PIG-1501: need to investigate the impact of compression on pig performance (yanz via thejas)

Added:
    hadoop/pig/trunk/src/org/apache/pig/impl/io/TFileRecordReader.java
    hadoop/pig/trunk/src/org/apache/pig/impl/io/TFileRecordWriter.java
    hadoop/pig/trunk/src/org/apache/pig/impl/io/TFileStorage.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestTmpFileCompression.java
Modified:
    hadoop/pig/trunk/CHANGES.txt
    hadoop/pig/trunk/src/org/apache/pig/PigServer.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SampleOptimizer.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartitionRearrange.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java
    hadoop/pig/trunk/src/org/apache/pig/builtin/BinStorage.java
    hadoop/pig/trunk/src/org/apache/pig/impl/io/InterStorage.java
    hadoop/pig/trunk/src/org/apache/pig/impl/util/Utils.java
    hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestSampleOptimizer.java
    hadoop/pig/trunk/test/org/apache/pig/test/utils/GenPhyOp.java

Modified: hadoop/pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=989828&r1=989827&r2=989828&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Thu Aug 26 17:11:36 2010
@@ -26,6 +26,8 @@ PIG-1249: Safe-guards against misconfigu
 
 IMPROVEMENTS
 
+PIG-1501: need to investigate the impact of compression on pig performance (yanz via thejas)
+
 PIG-1497: Mandatory rule PartitionFilterOptimizer (xuefuz via daijy)
 
 PIG-1514: Migrate logical optimization rule: OpLimitOptimizer (xuefuz via daijy)

Modified: hadoop/pig/trunk/src/org/apache/pig/PigServer.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/PigServer.java?rev=989828&r1=989827&r2=989828&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/PigServer.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/PigServer.java Thu Aug 26 17:11:36 2010
@@ -63,7 +63,6 @@ import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.impl.io.FileSpec;
-import org.apache.pig.impl.io.InterStorage;
 import org.apache.pig.impl.logicalLayer.FrontendException;
 import org.apache.pig.impl.logicalLayer.LOConst;
 import org.apache.pig.impl.logicalLayer.LODefine;
@@ -97,6 +96,7 @@ import org.apache.pig.impl.streaming.Str
 import org.apache.pig.impl.util.LogUtils;
 import org.apache.pig.impl.util.ObjectSerializer;
 import org.apache.pig.impl.util.PropertiesUtil;
+import org.apache.pig.impl.util.Utils;
 import org.apache.pig.newplan.logical.LogicalPlanMigrationVistor;
 import org.apache.pig.newplan.logical.optimizer.LogicalPlanOptimizer;
 import org.apache.pig.pen.ExampleGenerator;
@@ -708,7 +708,7 @@ public class PigServer {
             }
             
             ExecJob job = store(id, FileLocalizer.getTemporaryPath(pigContext)
-                    .toString(), InterStorage.class.getName() + "()");
+                    .toString(), Utils.getTmpFileCompressorName(pigContext) + "()");
             
             // invocation of "execute" is synchronous!
 

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java?rev=989828&r1=989827&r2=989828&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java Thu Aug 26 17:11:36 2010
@@ -55,12 +55,12 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.impl.io.FileSpec;
-import org.apache.pig.impl.io.InterStorage;
 import org.apache.pig.impl.logicalLayer.FrontendException;
 import org.apache.pig.impl.logicalLayer.LogicalPlan;
 import org.apache.pig.impl.plan.NodeIdGenerator;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.util.ObjectSerializer;
+import org.apache.pig.impl.util.Utils;
 import org.apache.pig.newplan.logical.LogicalPlanMigrationVistor;
 import org.apache.pig.newplan.logical.optimizer.SchemaResetter;
 import org.apache.pig.tools.pigstats.OutputStats;
@@ -411,7 +411,7 @@ public class HExecutionEngine {
                     NodeIdGenerator.getGenerator().getNextNodeId(scope)));
                 spec = new FileSpec(FileLocalizer.getTemporaryPath(
                     pigContext).toString(),
-                    new FuncSpec(InterStorage.class.getName()));
+                    new FuncSpec(Utils.getTmpFileCompressorName(pigContext)));
                 str.setSFile(spec);
                 plan.addAsLeaf(str);
             } else{

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=989828&r1=989827&r2=989828&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java Thu Aug 26 17:11:36 2010
@@ -88,6 +88,7 @@ import org.apache.pig.impl.util.JarManag
 import org.apache.pig.impl.util.ObjectSerializer;
 import org.apache.pig.impl.util.Pair;
 import org.apache.pig.impl.util.UDFContext;
+import org.apache.pig.impl.util.Utils;
 import org.apache.pig.tools.pigstats.ScriptState;
 
 
@@ -627,6 +628,12 @@ public class JobControlCompiler{
             for (POStore st: mapStores) { st.setInputs(null); st.setParentPlan(null);}
             for (POStore st: reduceStores) { st.setInputs(null); st.setParentPlan(null);}
 
+            // tmp file compression setups
+            if (Utils.tmpFileCompression(pigContext)) {
+                conf.setBoolean("pig.tmpfilecompression", true);
+                conf.set("pig.tmpfilecompression.codec", Utils.tmpFileCompressionCodec(pigContext));
+            }
+
             conf.set(PIG_MAP_STORES, ObjectSerializer.serialize(mapStores));
             conf.set(PIG_REDUCE_STORES, ObjectSerializer.serialize(reduceStores));
                         

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=989828&r1=989827&r2=989828&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java Thu Aug 26 17:11:36 2010
@@ -84,7 +84,6 @@ import org.apache.pig.impl.builtin.Poiss
 import org.apache.pig.impl.builtin.RandomSampleLoader;
 import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.impl.io.FileSpec;
-import org.apache.pig.impl.io.InterStorage;
 import org.apache.pig.impl.logicalLayer.LogicalOperator;
 import org.apache.pig.impl.plan.CompilationMessageCollector;
 import org.apache.pig.impl.plan.DepthFirstWalker;
@@ -99,6 +98,7 @@ import org.apache.pig.impl.util.Compiler
 import org.apache.pig.impl.util.MultiMap;
 import org.apache.pig.impl.util.ObjectSerializer;
 import org.apache.pig.impl.util.Pair;
+import org.apache.pig.impl.util.Utils;
 
 /**
  * The compiler that compiles a given physical plan
@@ -613,7 +613,7 @@ public class MRCompiler extends PhyPlanV
      */
     private FileSpec getTempFileSpec() throws IOException {
         return new FileSpec(FileLocalizer.getTemporaryPath(pigContext).toString(),
-                new FuncSpec(InterStorage.class.getName()));
+                new FuncSpec(Utils.getTmpFileCompressorName(pigContext)));
     }
     
     /**
@@ -2151,7 +2151,7 @@ public class MRCompiler extends PhyPlanV
         // SampleLoader expects string version of FuncSpec 
         // as its first constructor argument.
         
-        rslargs[0] = (new FuncSpec(InterStorage.class.getName())).toString();
+        rslargs[0] = (new FuncSpec(Utils.getTmpFileCompressorName(pigContext))).toString();
         
         rslargs[1] = "100"; // The value is calculated based on the file size for skewed join
         FileSpec quantLdFilName = new FileSpec(lFile.getFileName(),

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java?rev=989828&r1=989827&r2=989828&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java Thu Aug 26 17:11:36 2010
@@ -452,7 +452,7 @@ public class MapReduceLauncher extends L
         
         // Optimize the jobs that have a load/store only first MR job followed
         // by a sample job.
-        SampleOptimizer so = new SampleOptimizer(plan);
+        SampleOptimizer so = new SampleOptimizer(plan, pc);
         so.visit();
         
         // Optimize to use secondary sort key if possible

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SampleOptimizer.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SampleOptimizer.java?rev=989828&r1=989827&r2=989828&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SampleOptimizer.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SampleOptimizer.java Thu Aug 26 17:11:36 2010
@@ -32,10 +32,11 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
 import org.apache.pig.impl.io.FileSpec;
-import org.apache.pig.impl.io.InterStorage;
 import org.apache.pig.impl.plan.DepthFirstWalker;
 import org.apache.pig.impl.plan.PlanException;
 import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.util.Utils;
+import org.apache.pig.impl.PigContext;
 
 /**
  * A visitor to optimize plans that have a sample job that immediately follows a
@@ -47,9 +48,11 @@ import org.apache.pig.impl.plan.VisitorE
 public class SampleOptimizer extends MROpPlanVisitor {
 
     private Log log = LogFactory.getLog(getClass());
+    private PigContext pigContext;
 
-    public SampleOptimizer(MROperPlan plan) {
+    public SampleOptimizer(MROperPlan plan, PigContext pigContext) {
         super(plan, new DepthFirstWalker<MapReduceOper, MROperPlan>(plan));
+        this.pigContext = pigContext;
     }
 
     private List<MapReduceOper> opsToRemove = new ArrayList<MapReduceOper>();
@@ -136,7 +139,7 @@ public class SampleOptimizer extends MRO
         MapReduceOper succ = succs.get(0);
 
         // Find the load the correlates with the file the sampler is loading, and
-        // check that it is using BinaryStorage.
+        // check that it is using the twmp file storage format.
         if (succ.mapPlan == null) { // Huh?
             log.debug("Successor has no map plan.");
             return;
@@ -150,7 +153,7 @@ public class SampleOptimizer extends MRO
             }
             POLoad sl = (POLoad)root;
             if (loadFile.equals(sl.getLFile().getFileName()) && 
-                    InterStorage.class.getName().equals(sl.getLFile().getFuncName())) {
+                    Utils.getTmpFileCompressorName(pigContext).equals(sl.getLFile().getFuncName())) {
                 succLoad = sl;
                 break;
             }

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java?rev=989828&r1=989827&r2=989828&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java Thu Aug 26 17:11:36 2010
@@ -38,7 +38,6 @@ import org.apache.pig.data.DataBag;
 import org.apache.pig.data.InternalMap;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.builtin.FindQuantiles;
-import org.apache.pig.impl.io.InterStorage;
 import org.apache.pig.impl.io.NullableBytesWritable;
 import org.apache.pig.impl.io.NullableDoubleWritable;
 import org.apache.pig.impl.io.NullableFloatWritable;
@@ -48,6 +47,7 @@ import org.apache.pig.impl.io.NullableTe
 import org.apache.pig.impl.io.NullableTuple;
 import org.apache.pig.impl.io.PigNullableWritable;
 import org.apache.pig.impl.io.ReadToEndLoader;
+import org.apache.pig.impl.util.Utils;
 
 public class WeightedRangePartitioner extends Partitioner<PigNullableWritable, Writable>   
                                       implements Configurable {
@@ -101,9 +101,15 @@ public class WeightedRangePartitioner ex
                 conf.set("fs.file.impl", configuration.get("fs.file.impl"));
             if (configuration.get("fs.hdfs.impl")!=null)
                 conf.set("fs.hdfs.impl", configuration.get("fs.hdfs.impl"));
+            if (configuration.getBoolean("pig.tmpfilecompression", false))
+            {
+                conf.setBoolean("pig.tmpfilecompression", true);
+                if (configuration.get("pig.tmpfilecompression.codec")!=null)
+                    conf.set("pig.tmpfilecompression.codec", configuration.get("pig.tmpfilecompression.codec"));
+            }
             conf.set(MapRedUtil.FILE_SYSTEM_NAME, "file:///");
             
-            ReadToEndLoader loader = new ReadToEndLoader(new InterStorage(),
+            ReadToEndLoader loader = new ReadToEndLoader(Utils.getTmpFileStorageObject(conf),
                     conf, quantilesFile, 0);
             DataBag quantilesList;
             Tuple t = loader.getNext();

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java?rev=989828&r1=989827&r2=989828&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java Thu Aug 26 17:11:36 2010
@@ -45,7 +45,6 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.impl.builtin.GFCross;
 import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.impl.io.FileSpec;
-import org.apache.pig.impl.io.InterStorage;
 import org.apache.pig.impl.logicalLayer.*;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
 import org.apache.pig.impl.plan.DependencyOrderWalker;
@@ -60,6 +59,7 @@ import org.apache.pig.impl.plan.VisitorE
 import org.apache.pig.impl.util.CompilerUtils;
 import org.apache.pig.impl.util.LinkedMultiMap;
 import org.apache.pig.impl.util.MultiMap;
+import org.apache.pig.impl.util.Utils;
 
 public class LogToPhyTranslationVisitor extends LOVisitor {
 
@@ -1505,7 +1505,7 @@ public class LogToPhyTranslationVisitor 
         physOp.setAlias(split.getAlias());
         FileSpec splStrFile;
         try {
-            splStrFile = new FileSpec(FileLocalizer.getTemporaryPath(pc).toString(),new FuncSpec(InterStorage.class.getName()));
+            splStrFile = new FileSpec(FileLocalizer.getTemporaryPath(pc).toString(),new FuncSpec(Utils.getTmpFileCompressorName(pc)));
         } catch (IOException e1) {
             byte errSrc = pc.getErrorSource();
             int errCode = 0;

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartitionRearrange.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartitionRearrange.java?rev=989828&r1=989827&r2=989828&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartitionRearrange.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartitionRearrange.java Thu Aug 26 17:11:36 2010
@@ -37,6 +37,7 @@ import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.util.Pair;
+import org.apache.pig.impl.util.Utils;
 
 
 /**
@@ -86,7 +87,17 @@ public class POPartitionRearrange extend
             "Internal error: missing key distribution file property.");
         }
 
+        boolean tmpFileCompression = Utils.tmpFileCompression(pigContext);
+        if (tmpFileCompression) {
+            PigMapReduce.sJobConf.setBoolean("pig.tmpfilecompression", true);
+            try {
+                PigMapReduce.sJobConf.set("pig.tmpfilecompression.codec", Utils.tmpFileCompressionCodec(pigContext));
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+        }
         try {
+          
             Integer [] redCnt = new Integer[1]; 
             
             reducerMap = MapRedUtil.loadPartitionFileFromLocalCache(

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java?rev=989828&r1=989827&r2=989828&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java Thu Aug 26 17:11:36 2010
@@ -47,12 +47,12 @@ import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.builtin.PartitionSkewedKeys;
 import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.impl.io.FileSpec;
-import org.apache.pig.impl.io.InterStorage;
 import org.apache.pig.impl.io.ReadToEndLoader;
 import org.apache.pig.impl.plan.NodeIdGenerator;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.util.Pair;
 import org.apache.pig.impl.util.UDFContext;
+import org.apache.pig.impl.util.Utils;
 
 /**
  * A class of utility static methods to be used in the hadoop map reduce backend
@@ -84,9 +84,15 @@ public class MapRedUtil {
             conf.set("fs.file.impl", PigMapReduce.sJobConf.get("fs.file.impl"));
         if (PigMapReduce.sJobConf.get("fs.hdfs.impl")!=null)
             conf.set("fs.hdfs.impl", PigMapReduce.sJobConf.get("fs.hdfs.impl"));
+        if (PigMapReduce.sJobConf.getBoolean("pig.tmpfilecompression", false))
+        {
+            conf.setBoolean("pig.tmpfilecompression", true);
+            if (PigMapReduce.sJobConf.get("pig.tmpfilecompression.codec")!=null)
+                conf.set("pig.tmpfilecompression.codec", PigMapReduce.sJobConf.get("pig.tmpfilecompression.codec"));
+        }
         conf.set(MapRedUtil.FILE_SYSTEM_NAME, "file:///");
 
-        ReadToEndLoader loader = new ReadToEndLoader(new InterStorage(), conf, 
+        ReadToEndLoader loader = new ReadToEndLoader(Utils.getTmpFileStorageObject(PigMapReduce.sJobConf), conf, 
                 keyDistFile, 0);
         DataBag partitionList;
         Tuple t = loader.getNext();
@@ -156,7 +162,7 @@ public class MapRedUtil {
                     NodeIdGenerator.getGenerator().getNextNodeId(scope)));
                 spec = new FileSpec(FileLocalizer.getTemporaryPath(
                     pigContext).toString(),
-                    new FuncSpec(InterStorage.class.getName()));
+                    new FuncSpec(Utils.getTmpFileCompressorName(pigContext)));
                 str.setSFile(spec);
                 plan.addAsLeaf(str);
             } else{

Modified: hadoop/pig/trunk/src/org/apache/pig/builtin/BinStorage.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/builtin/BinStorage.java?rev=989828&r1=989827&r2=989828&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/builtin/BinStorage.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/builtin/BinStorage.java Thu Aug 26 17:11:36 2010
@@ -55,16 +55,14 @@ import org.apache.pig.backend.hadoop.dat
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.DataReaderWriter;
-import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.io.BinStorageInputFormat;
 import org.apache.pig.impl.io.BinStorageOutputFormat;
 import org.apache.pig.impl.io.BinStorageRecordReader;
 import org.apache.pig.impl.io.BinStorageRecordWriter;
 import org.apache.pig.impl.io.FileLocalizer;
-import org.apache.pig.impl.io.ReadToEndLoader;
-import org.apache.pig.impl.logicalLayer.schema.Schema;
 import org.apache.pig.impl.util.LogUtils;
+import org.apache.pig.impl.util.Utils;
 
 /**
  * Load and store data in a binary format.  This class is used by Pig to move
@@ -408,27 +406,7 @@ implements LoadCaster, StoreFuncInterfac
             }
         }
 
-        ReadToEndLoader loader = new ReadToEndLoader(this, conf, location, 0);
-        // get the first record from the input file
-        // and figure out the schema from the data in
-        // the first record
-        Tuple t = loader.getNext();
-        if(t == null) {
-            // we couldn't get a valid record from the input
-            return null;
-        }
-        int numFields = t.size();
-        Schema s = new Schema();
-        for (int i = 0; i < numFields; i++) {
-            try {
-                s.add(DataType.determineFieldSchema(t.get(i)));
-            } catch (Exception e) {
-                int errCode = 2104;
-                String msg = "Error while determining schema of BinStorage data.";
-                throw new ExecException(msg, errCode, PigException.BUG, e);
-            } 
-        }
-        return new ResourceSchema(s);
+        return Utils.getSchema(this, location, false, job);
     }
 
     @Override

Modified: hadoop/pig/trunk/src/org/apache/pig/impl/io/InterStorage.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/io/InterStorage.java?rev=989828&r1=989827&r2=989828&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/io/InterStorage.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/io/InterStorage.java Thu Aug 26 17:11:36 2010
@@ -18,9 +18,6 @@
 package org.apache.pig.impl.io;
 
 import java.io.IOException;
-import java.util.Iterator;
-import java.util.Properties;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -42,20 +39,15 @@ import org.apache.pig.Expression;
 import org.apache.pig.FileInputLoadFunc;
 import org.apache.pig.LoadFunc;
 import org.apache.pig.LoadMetadata;
-import org.apache.pig.PigException;
 import org.apache.pig.ResourceSchema;
 import org.apache.pig.ResourceStatistics;
 import org.apache.pig.StoreFunc;
 import org.apache.pig.StoreFuncInterface;
-import org.apache.pig.backend.executionengine.ExecException;
-import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
-import org.apache.pig.backend.hadoop.datastorage.HDataStorage;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigFileInputFormat;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
 import org.apache.pig.classification.InterfaceAudience;
-import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
-import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.util.Utils;
 
 /**
  * LOAD FUNCTION FOR PIG INTERNAL USE ONLY!
@@ -68,10 +60,7 @@ import org.apache.pig.impl.logicalLayer.
 public class InterStorage extends FileInputLoadFunc 
 implements StoreFuncInterface, LoadMetadata {
 
-
-    Iterator<Tuple>     i              = null;
     private static final Log mLog = LogFactory.getLog(InterStorage.class);
-    protected long                end            = Long.MAX_VALUE;
     
     private InterRecordReader recReader = null;
     private InterRecordWriter recWriter = null;
@@ -80,6 +69,7 @@ implements StoreFuncInterface, LoadMetad
      * Simple binary nested reader format
      */
     public InterStorage() {
+        mLog.info("Pig Internal storage in use");
     }
 
     @Override
@@ -190,38 +180,7 @@ implements StoreFuncInterface, LoadMetad
     @Override
     public ResourceSchema getSchema(String location, Job job)
             throws IOException {
-        Configuration conf = job.getConfiguration();
-        // since local mode now is implemented as hadoop's local mode
-        // we can treat either local or hadoop mode as hadoop mode - hence
-        // we can use HDataStorage and FileLocalizer.openDFSFile below
-        Path path = new Path(location);
-        if(! FileSystem.get(conf).exists(path)){
-            // At compile time in batch mode, the file may not exist
-            // (such as intermediate file). Just return null - the
-            // same way as we would if we did not get a valid record
-            return null;
-        }
-        ReadToEndLoader loader = new ReadToEndLoader(this, conf, location, 0);
-        // get the first record from the input file
-        // and figure out the schema from the data in
-        // the first record
-        Tuple t = loader.getNext();
-        if(t == null) {
-            // we couldn't get a valid record from the input
-            return null;
-        }
-        int numFields = t.size();
-        Schema s = new Schema();
-        for (int i = 0; i < numFields; i++) {
-            try {
-                s.add(DataType.determineFieldSchema(t.get(i)));
-            } catch (Exception e) {
-                int errCode = 2104;
-                String msg = "Error while determining schema of InterStorage data.";
-                throw new ExecException(msg, errCode, PigException.BUG, e);
-            } 
-        }
-        return new ResourceSchema(s);
+        return Utils.getSchema(this, location, true, job);
     }
 
     @Override

Added: hadoop/pig/trunk/src/org/apache/pig/impl/io/TFileRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/io/TFileRecordReader.java?rev=989828&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/io/TFileRecordReader.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/io/TFileRecordReader.java Thu Aug 26 17:11:36 2010
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.io;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.BinInterSedes;
+import org.apache.pig.data.InterSedes;
+import org.apache.pig.data.InterSedesFactory;
+import org.apache.pig.data.Tuple;
+import org.apache.hadoop.io.file.tfile.TFile.Reader;
+
+/**
+ * A record reader used to read data written using {@link InterRecordWriter} It
+ * uses the default InterSedes object for deserialization.
+ */
+public class TFileRecordReader extends RecordReader<Text, Tuple> {
+
+    private long start;
+    private long pos;
+    private long end;
+    Reader reader = null;
+    Reader.Scanner scanner = null;
+    private Tuple value = null;
+    private FSDataInputStream fileIn = null;
+    private static InterSedes sedes = InterSedesFactory.getInterSedesInstance();
+
+    public void initialize(InputSplit genericSplit, TaskAttemptContext context)
+                    throws IOException {
+        FileSplit split = (FileSplit) genericSplit;
+        Configuration job = context.getConfiguration();
+        start = split.getStart();
+        end = start + split.getLength();
+        final Path file = split.getPath();
+
+        // open the file and seek to the start of the split
+        FileSystem fs = file.getFileSystem(job);
+        fileIn = fs.open(split.getPath());
+        reader = new Reader(fileIn, fs.getFileStatus(file).getLen(), job);
+        scanner = reader.createScannerByByteRange(start, split.getLength());
+    }
+
+    public boolean nextKeyValue() throws IOException {
+        //    skip to next record
+        if (scanner.atEnd()) {
+            value = null;
+            return false;
+        }
+
+        DataInputStream in = scanner.entry().getValueStream();
+        try {
+            // if we got here, we have seen RECORD_1-RECORD_2-RECORD_3-TUPLE_MARKER
+            // sequence - lets now read the contents of the tuple 
+            value = (Tuple) sedes.readDatum(in);
+            scanner.advance();
+            return true;
+        }
+        finally {
+            in.close();
+        }
+    }
+
+    @Override
+    public Text getCurrentKey() {
+        // the key is always null since we don't really have a key for each
+        // input record
+        return null;
+    }
+
+    @Override
+    public Tuple getCurrentValue() {
+        return value;
+    }
+
+    /**
+     * Get the progress within the split
+     */
+    public float getProgress() {
+        if (start == end) {
+            return 0.0f;
+        }
+        else {
+            return Math.min(1.0f, (pos - start) / (float) (end - start));
+        }
+    }
+
+    public synchronized void close() throws IOException {
+        if (scanner != null) {
+            scanner.close();
+        }
+        if (reader != null) reader.close();
+        if (fileIn != null) fileIn.close();
+    }
+}

Added: hadoop/pig/trunk/src/org/apache/pig/impl/io/TFileRecordWriter.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/io/TFileRecordWriter.java?rev=989828&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/io/TFileRecordWriter.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/io/TFileRecordWriter.java Thu Aug 26 17:11:36 2010
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.io;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.pig.data.InterSedes;
+import org.apache.pig.data.InterSedesFactory;
+import org.apache.pig.data.Tuple;
+import org.apache.hadoop.io.file.tfile.TFile.Writer;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * A record reader used to write data compatible with {@link InterRecordWriter}
+ * It uses the default InterSedes object for serialization.
+ */
+public class TFileRecordWriter extends
+                RecordWriter<org.apache.hadoop.io.WritableComparable, Tuple> {
+
+    final private BytesWritable KEY0 = new BytesWritable(new byte[0]);
+    private static InterSedes sedes = InterSedesFactory.getInterSedesInstance();
+    /**
+     * the outputstream to write out on
+     */
+    Writer writer = null;
+    FSDataOutputStream fileOut = null;
+
+    /**
+     * 
+     */
+    public TFileRecordWriter(Path file, String codec, Configuration conf)
+                    throws IOException {
+        FileSystem fs = file.getFileSystem(conf);
+        fileOut = fs.create(file, false);
+        writer = new Writer(fileOut, 1024 * 1024, codec, null, conf);
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.hadoop.mapreduce.RecordWriter#close(org.apache.hadoop.mapreduce.TaskAttemptContext)
+     */
+    @Override
+    public void close(TaskAttemptContext arg0) throws IOException,
+                    InterruptedException {
+        if (writer != null) writer.close();
+        if (fileOut != null) fileOut.close();
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.hadoop.mapreduce.RecordWriter#write(java.lang.Object, java.lang.Object)
+     */
+    @Override
+    public void write(WritableComparable wc, Tuple t) throws IOException,
+                    InterruptedException {
+        DataOutputStream outputKey = writer.prepareAppendKey(KEY0.getLength());
+        try {
+            outputKey.write(KEY0.getBytes(), 0, KEY0.getLength());
+        }
+        finally {
+            outputKey.close();
+        }
+        // we really only want to write the tuple (value) out here
+        DataOutputStream outputValue = writer.prepareAppendValue(-1);
+
+        try {
+            sedes.writeDatum(outputValue, t);
+        }
+        finally {
+            outputValue.close();
+        }
+    }
+
+}

Added: hadoop/pig/trunk/src/org/apache/pig/impl/io/TFileStorage.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/io/TFileStorage.java?rev=989828&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/io/TFileStorage.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/io/TFileStorage.java Thu Aug 26 17:11:36 2010
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.io;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.pig.Expression;
+import org.apache.pig.FileInputLoadFunc;
+import org.apache.pig.LoadFunc;
+import org.apache.pig.LoadMetadata;
+import org.apache.pig.ResourceSchema;
+import org.apache.pig.ResourceStatistics;
+import org.apache.pig.StoreFunc;
+import org.apache.pig.StoreFuncInterface;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigFileInputFormat;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
+import org.apache.pig.classification.InterfaceAudience;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.util.Utils;
+
+/**
+ * LOAD FUNCTION FOR PIG INTERNAL USE ONLY! This load function is used for
+ * storing intermediate data between MR jobs of a pig query. The serialization
+ * format of this load function can change in newer versions of pig, so this
+ * should NOT be used to store any persistent data.
+ */
+@InterfaceAudience.Private
+public class TFileStorage extends FileInputLoadFunc implements
+                StoreFuncInterface, LoadMetadata {
+
+    private static final Log mLog = LogFactory.getLog(TFileStorage.class);
+
+    private TFileRecordReader recReader = null;
+    private TFileRecordWriter recWriter = null;
+
+    /**
+     * Simple binary nested reader format
+     */
+    public TFileStorage() throws IOException {
+        mLog.info("TFile storage in use");
+    }
+
+    @Override
+    public Tuple getNext() throws IOException {
+        if (recReader.nextKeyValue()) {
+            return recReader.getCurrentValue();
+        }
+        else {
+            return null;
+        }
+    }
+
+    @Override
+    public void putNext(Tuple t) throws IOException {
+        try {
+            recWriter.write(null, t);
+        }
+        catch (InterruptedException e) {
+            throw new IOException(e);
+        }
+    }
+
+    public static class TFileInputFormat extends
+                    PigFileInputFormat<Text, Tuple> {
+
+        /* (non-Javadoc)
+         * @see org.apache.hadoop.mapreduce.InputFormat#createRecordReader(org.apache.hadoop.mapreduce.InputSplit, org.apache.hadoop.mapreduce.TaskAttemptContext)
+         */
+        @Override
+        public RecordReader<Text, Tuple> createRecordReader(InputSplit split,
+                        TaskAttemptContext context) throws IOException,
+                        InterruptedException {
+            return new TFileRecordReader();
+        }
+
+    }
+
+    @Override
+    public InputFormat getInputFormat() {
+        return new TFileInputFormat();
+    }
+
+    @Override
+    public int hashCode() {
+        return 42;
+    }
+
+    @Override
+    public void prepareToRead(RecordReader reader, PigSplit split) {
+        recReader = (TFileRecordReader) reader;
+    }
+
+    @Override
+    public void setLocation(String location, Job job) throws IOException {
+        FileInputFormat.setInputPaths(job, location);
+    }
+
+    public static class TFileOutputFormat
+                    extends
+                    FileOutputFormat<org.apache.hadoop.io.WritableComparable, Tuple> {
+
+        /* (non-Javadoc)
+         * @see org.apache.hadoop.mapreduce.lib.output.FileOutputFormat#getRecordWriter(org.apache.hadoop.mapreduce.TaskAttemptContext)
+         */
+        @Override
+        public RecordWriter<WritableComparable, Tuple> getRecordWriter(
+                        TaskAttemptContext job) throws IOException,
+                        InterruptedException {
+            Configuration conf = job.getConfiguration();
+            String codec = conf.get("pig.tmpfilecompression.codec", "");
+            if (!codec.equals("lzo") && !codec.equals("gz"))
+                throw new IOException(
+                                "Invalid temporary file compression codec [" + codec + "]. Expected compression codecs are gz and lzo");
+            mLog.info(codec + " compression codec in use");
+            Path file = getDefaultWorkFile(job, "");
+            return new TFileRecordWriter(file, codec, conf);
+        }
+    }
+
+    @Override
+    public OutputFormat getOutputFormat() {
+        return new TFileOutputFormat();
+    }
+
+    @Override
+    public void prepareToWrite(RecordWriter writer) {
+        this.recWriter = (TFileRecordWriter) writer;
+    }
+
+    @Override
+    public void setStoreLocation(String location, Job job) throws IOException {
+        FileOutputFormat.setOutputPath(job, new Path(location));
+    }
+
+    @Override
+    public void checkSchema(ResourceSchema s) throws IOException {
+
+    }
+
+    @Override
+    public String relToAbsPathForStoreLocation(String location, Path curDir)
+                    throws IOException {
+        return LoadFunc.getAbsolutePath(location, curDir);
+    }
+
+    @Override
+    public String[] getPartitionKeys(String location, Job job)
+                    throws IOException {
+        return null;
+    }
+
+    @Override
+    public ResourceSchema getSchema(String location, Job job)
+                    throws IOException {
+        return Utils.getSchema(this, location, true, job);
+    }
+
+    @Override
+    public ResourceStatistics getStatistics(String location, Job job)
+                    throws IOException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void setPartitionFilter(Expression plan) throws IOException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void setStoreFuncUDFContextSignature(String signature) {
+    }
+
+    @Override
+    public void cleanupOnFailure(String location, Job job) throws IOException {
+        StoreFunc.cleanupOnFailureImpl(location, job);
+    }
+
+}

Modified: hadoop/pig/trunk/src/org/apache/pig/impl/util/Utils.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/util/Utils.java?rev=989828&r1=989827&r2=989828&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/util/Utils.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/util/Utils.java Thu Aug 26 17:11:36 2010
@@ -18,11 +18,27 @@
 package org.apache.pig.impl.util;
 
 import java.io.ByteArrayInputStream;
+import java.io.IOException;
 
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+
+import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.logicalLayer.parser.ParseException;
 import org.apache.pig.impl.logicalLayer.parser.QueryParser;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.LoadFunc;
+import org.apache.pig.impl.io.InterStorage;
+import org.apache.pig.impl.io.ReadToEndLoader;
+import org.apache.pig.impl.io.TFileStorage;
+import org.apache.pig.FileInputLoadFunc;
+import org.apache.pig.PigException;
+import org.apache.pig.ResourceSchema;
 
 /**
  * Class with utility static methods
@@ -77,6 +93,42 @@ public class Utils {
         }
     }
     
+    public static ResourceSchema getSchema(LoadFunc wrappedLoadFunc, String location, boolean checkExistence, Job job)
+                    throws IOException {
+        Configuration conf = job.getConfiguration();
+        if (checkExistence) {
+            Path path = new Path(location);
+            if (!FileSystem.get(conf).exists(path)) {
+                // At compile time in batch mode, the file may not exist
+                // (such as intermediate file). Just return null - the
+                // same way as we would if we did not get a valid record
+                return null;
+            }
+        }
+        ReadToEndLoader loader = new ReadToEndLoader(wrappedLoadFunc, conf, location, 0);
+        // get the first record from the input file
+        // and figure out the schema from the data in
+        // the first record
+        Tuple t = loader.getNext();
+        if (t == null) {
+            // we couldn't get a valid record from the input
+            return null;
+        }
+        int numFields = t.size();
+        Schema s = new Schema();
+        for (int i = 0; i < numFields; i++) {
+            try {
+                s.add(DataType.determineFieldSchema(t.get(i)));
+            }
+            catch (Exception e) {
+                int errCode = 2104;
+                String msg = "Error while determining schema of SequenceFileStorage data.";
+                throw new ExecException(msg, errCode, PigException.BUG, e);
+            }
+        }
+        return new ResourceSchema(s);
+    }
+    
     public static Schema getSchemaFromString(String schemaString) throws ParseException {
         return Utils.getSchemaFromString(schemaString, DataType.BYTEARRAY);
     }
@@ -89,6 +141,40 @@ public class Utils {
         return schema;
     }
     
+    public static String getTmpFileCompressorName(PigContext pigContext) {
+        if (pigContext == null)
+            return InterStorage.class.getName();
+        boolean tmpFileCompression = pigContext.getProperties().getProperty("pig.tmpfilecompression", "false").equals("true");
+        String codec = pigContext.getProperties().getProperty("pig.tmpfilecompression.codec", "");
+        if (tmpFileCompression) {
+            if (codec.equals("lzo"))
+                pigContext.getProperties().setProperty("io.compression.codec.lzo.class", "com.hadoop.compression.lzo.LzoCodec");
+            return TFileStorage.class.getName();
+        } else
+            return InterStorage.class.getName();
+    }
+    
+    public static FileInputLoadFunc getTmpFileStorageObject(Configuration conf) throws IOException {
+        boolean tmpFileCompression = conf.getBoolean("pig.tmpfilecompression", false);
+        return tmpFileCompression ? new TFileStorage() : new InterStorage();
+    }
+    
+    public static boolean tmpFileCompression(PigContext pigContext) {
+        if (pigContext == null)
+            return false;
+        return pigContext.getProperties().getProperty("pig.tmpfilecompression", "false").equals("true");
+    }
+
+    public static String tmpFileCompressionCodec(PigContext pigContext) throws IOException {
+        if (pigContext == null)
+            return "";
+        String codec = pigContext.getProperties().getProperty("pig.tmpfilecompression.codec", "");
+        if (codec.equals("gz") || codec.equals("lzo"))
+            return codec;
+        else
+            throw new IOException("Invalid temporary file compression codec ["+codec+"]. Expected compression codecs are gz and lzo");
+    }
+
     public static String getStringFromArray(String[] arr) {
         StringBuilder str = new StringBuilder();
         for(String s: arr) {
@@ -97,5 +183,4 @@ public class Utils {
         }
         return str.toString();
     }
-    
 }

Modified: hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java?rev=989828&r1=989827&r2=989828&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java Thu Aug 26 17:11:36 2010
@@ -62,7 +62,6 @@ import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.builtin.GFCross;
 import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.impl.io.FileSpec;
-import org.apache.pig.impl.io.InterStorage;
 import org.apache.pig.impl.logicalLayer.FrontendException;
 import org.apache.pig.impl.logicalLayer.LogicalOperator;
 import org.apache.pig.impl.logicalLayer.LogicalPlan;
@@ -86,6 +85,7 @@ import org.apache.pig.newplan.logical.ex
 import org.apache.pig.newplan.logical.expression.LogicalExpressionPlan;
 import org.apache.pig.newplan.logical.expression.ProjectExpression;
 import org.apache.pig.newplan.logical.relational.LogicalSchema.LogicalFieldSchema;
+import org.apache.pig.impl.util.Utils;
 
 public class LogToPhyTranslationVisitor extends LogicalRelationalNodesVisitor {
     
@@ -1216,7 +1216,7 @@ public class LogToPhyTranslationVisitor 
         physOp.setAlias(loSplit.getAlias());
         FileSpec splStrFile;
         try {
-            splStrFile = new FileSpec(FileLocalizer.getTemporaryPath(pc).toString(),new FuncSpec(InterStorage.class.getName()));
+            splStrFile = new FileSpec(FileLocalizer.getTemporaryPath(pc).toString(),new FuncSpec(Utils.getTmpFileCompressorName(pc)));
         } catch (IOException e1) {
             byte errSrc = pc.getErrorSource();
             int errCode = 0;

Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestSampleOptimizer.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestSampleOptimizer.java?rev=989828&r1=989827&r2=989828&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestSampleOptimizer.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestSampleOptimizer.java Thu Aug 26 17:11:36 2010
@@ -82,7 +82,7 @@ public class TestSampleOptimizer {
         // Before optimizer visits, number of MR jobs = 3.
         assertEquals(3,count);   
 
-        SampleOptimizer so = new SampleOptimizer(mrPlan);
+        SampleOptimizer so = new SampleOptimizer(mrPlan, pc);
         so.visit();
 
         count = 1;
@@ -129,7 +129,7 @@ public class TestSampleOptimizer {
         // Before optimizer visits, number of MR jobs = 3.
         assertEquals(3,count);
 
-        SampleOptimizer so = new SampleOptimizer(mrPlan);
+        SampleOptimizer so = new SampleOptimizer(mrPlan, pc);
         so.visit();
 
         count = 1;
@@ -211,7 +211,7 @@ public class TestSampleOptimizer {
         // Before optimizer visits, number of MR jobs = 3.
         assertEquals(3,count);
 
-        SampleOptimizer so = new SampleOptimizer(mrPlan);
+        SampleOptimizer so = new SampleOptimizer(mrPlan, pc);
         so.visit();
 
         count = 1;
@@ -243,7 +243,7 @@ public class TestSampleOptimizer {
         // Before optimizer visits, number of MR jobs = 3.
         assertEquals(3,count);
 
-        SampleOptimizer so = new SampleOptimizer(mrPlan);
+        SampleOptimizer so = new SampleOptimizer(mrPlan, pc);
         so.visit();
 
         count = 1;

Added: hadoop/pig/trunk/test/org/apache/pig/test/TestTmpFileCompression.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestTmpFileCompression.java?rev=989828&view=auto
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestTmpFileCompression.java (added)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestTmpFileCompression.java Thu Aug 26 17:11:36 2010
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test;
+
+import static org.junit.Assert.*;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+
+import java.io.BufferedReader;
+import java.io.FileReader;
+
+
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.io.TFileStorage;
+import org.apache.pig.impl.io.InterStorage;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.log4j.FileAppender;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.log4j.SimpleLayout;
+
+public class TestTmpFileCompression {
+    private PigServer pigServer;
+    static MiniCluster cluster = MiniCluster.buildCluster();
+    File logFile;
+
+    @Before
+    public void setUp() throws Exception {
+        pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+    }
+
+    private void resetLog(Class clazz) throws Exception {
+        if (logFile != null)
+            logFile.delete();
+        Logger logger = Logger.getLogger(clazz);
+        logger.removeAllAppenders();
+        logger.setLevel(Level.INFO);
+        SimpleLayout layout = new SimpleLayout();
+        logFile = File.createTempFile("log", "");
+        FileAppender appender = new FileAppender(layout, logFile.toString(),
+                        false, false, 0);
+        logger.addAppender(appender);
+    }
+
+    public boolean checkLogFileMessage(String[] messages) {
+        BufferedReader reader = null;
+
+        try {
+            reader = new BufferedReader(new FileReader(logFile));
+            String logMessage = "";
+            String line;
+            while ((line = reader.readLine()) != null) {
+                logMessage = logMessage + line + "\n";
+            }
+            for (int i = 0; i < messages.length; i++) {
+                if (!logMessage.contains(messages[i])) return false;
+            }
+            return true;
+        }
+        catch (IOException e) {
+            return false;
+        }
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        if (logFile != null)
+            logFile.delete();
+    }
+
+    @AfterClass
+    public static void oneTimeTearDown() throws Exception {
+        cluster.shutDown();
+    }
+
+    @Test
+    public void testImplicitSplitUncompressed() throws Exception {
+        resetLog(InterStorage.class);
+        int LOOP_SIZE = 20;
+        String[] input = new String[LOOP_SIZE];
+        for (int i = 1; i <= LOOP_SIZE; i++) {
+            input[i - 1] = Integer.toString(i);
+        }
+        String inputFileName = "testImplicitSplit-input.txt";
+        Util.createInputFile(cluster, inputFileName, input);
+        pigServer.registerQuery("A = LOAD '" + inputFileName + "';");
+        pigServer.registerQuery("B = filter A by $0<=10;");
+        pigServer.registerQuery("C = filter A by $0>10;");
+        pigServer.registerQuery("D = union B,C;");
+        Iterator<Tuple> iter = pigServer.openIterator("D");
+        if (!iter.hasNext()) fail("No Output received");
+        int cnt = 0;
+        while (iter.hasNext()) {
+            Tuple t = iter.next();
+            ++cnt;
+        }
+        assertEquals(20, cnt);
+        Util.deleteFile(cluster, inputFileName);
+        assertTrue(checkLogFileMessage(new String[] {
+            "Pig Internal storage in use"
+        }));
+    }
+
+    @Test
+    public void testImplicitSplitInCoGroupUncompressed() throws Exception {
+        // this query is similar to the one reported in JIRA - PIG-537
+        // Create input file
+        resetLog(InterStorage.class);
+        String input1 = "testImplicitSplitInCoGroup-input1.txt";
+        String input2 = "testImplicitSplitInCoGroup-input2.txt";
+        Util.createInputFile(cluster, input1, new String[] {
+                        "a:1", "b:2", "b:20", "c:3", "c:30"
+        });
+        Util.createInputFile(cluster, input2, new String[] {
+                        "a:first", "b:second", "c:third"
+        });
+        pigServer.registerQuery("a = load '" + input1 + "' using PigStorage(':') as (name:chararray, marks:int);");
+        pigServer.registerQuery("b = load '" + input2 + "' using PigStorage(':') as (name:chararray, rank:chararray);");
+        pigServer.registerQuery("c = cogroup a by name, b by name;");
+        pigServer.registerQuery("d = foreach c generate group, FLATTEN(a.marks) as newmarks;");
+        pigServer.registerQuery("e = cogroup a by marks, d by newmarks;");
+        pigServer.registerQuery("f = foreach e generate group, flatten(a), flatten(d);");
+        HashMap<Integer, Object[]> results = new HashMap<Integer, Object[]>();
+        results.put(1, new Object[] {
+                        "a", 1, "a", 1
+        });
+        results.put(2, new Object[] {
+                        "b", 2, "b", 2
+        });
+        results.put(3, new Object[] {
+                        "c", 3, "c", 3
+        });
+        results.put(20, new Object[] {
+                        "b", 20, "b", 20
+        });
+        results.put(30, new Object[] {
+                        "c", 30, "c", 30
+        });
+
+        Iterator<Tuple> it = pigServer.openIterator("f");
+        while (it.hasNext()) {
+            Tuple t = it.next();
+            System.err.println("Tuple:" + t);
+            Integer group = (Integer) t.get(0);
+            Object[] groupValues = results.get(group);
+            for (int i = 0; i < 4; i++) {
+                assertEquals(groupValues[i], t.get(i + 1));
+            }
+        }
+        Util.deleteFile(cluster, input1);
+        Util.deleteFile(cluster, input2);
+        assertTrue(checkLogFileMessage(new String[] {
+            "Pig Internal storage in use"
+        }));
+    }
+
+    @Test
+    public void testImplicitSplit() throws Exception {
+        resetLog(TFileStorage.class);
+        int LOOP_SIZE = 20;
+        String[] input = new String[LOOP_SIZE];
+        for (int i = 1; i <= LOOP_SIZE; i++) {
+            input[i - 1] = Integer.toString(i);
+        }
+        String inputFileName = "testImplicitSplit-input.txt";
+        Util.createInputFile(cluster, inputFileName, input);
+        pigServer.getPigContext().getProperties().setProperty(
+                        "pig.tmpfilecompression", "true");
+        pigServer.getPigContext().getProperties().setProperty(
+                        "pig.tmpfilecompression.codec", "gz");
+        pigServer.registerQuery("A = LOAD '" + inputFileName + "';");
+        pigServer.registerQuery("B = filter A by $0<=10;");
+        pigServer.registerQuery("C = filter A by $0>10;");
+        pigServer.registerQuery("D = union B,C;");
+        Iterator<Tuple> iter = pigServer.openIterator("D");
+        if (!iter.hasNext()) fail("No Output received");
+        int cnt = 0;
+        while (iter.hasNext()) {
+            Tuple t = iter.next();
+            ++cnt;
+        }
+        assertEquals(20, cnt);
+        Util.deleteFile(cluster, inputFileName);
+        assertTrue(checkLogFileMessage(new String[] {
+            "TFile storage in use"
+        }));
+    }
+
+    @Test
+    public void testImplicitSplitInCoGroup() throws Exception {
+        // this query is similar to the one reported in JIRA - PIG-537
+        // Create input file
+        resetLog(TFileStorage.class);
+        String input1 = "testImplicitSplitInCoGroup-input1.txt";
+        String input2 = "testImplicitSplitInCoGroup-input2.txt";
+        Util.createInputFile(cluster, input1, new String[] {
+                        "a:1", "b:2", "b:20", "c:3", "c:30"
+        });
+        Util.createInputFile(cluster, input2, new String[] {
+                        "a:first", "b:second", "c:third"
+        });
+        pigServer.getPigContext().getProperties().setProperty(
+                        "pig.tmpfilecompression", "true");
+        pigServer.getPigContext().getProperties().setProperty(
+                        "pig.tmpfilecompression.codec", "gz");
+        pigServer.registerQuery("a = load '" + input1 + "' using PigStorage(':') as (name:chararray, marks:int);");
+        pigServer.registerQuery("b = load '" + input2 + "' using PigStorage(':') as (name:chararray, rank:chararray);");
+        pigServer.registerQuery("c = cogroup a by name, b by name;");
+        pigServer.registerQuery("d = foreach c generate group, FLATTEN(a.marks) as newmarks;");
+        pigServer.registerQuery("e = cogroup a by marks, d by newmarks;");
+        pigServer.registerQuery("f = foreach e generate group, flatten(a), flatten(d);");
+        HashMap<Integer, Object[]> results = new HashMap<Integer, Object[]>();
+        results.put(1, new Object[] {
+                        "a", 1, "a", 1
+        });
+        results.put(2, new Object[] {
+                        "b", 2, "b", 2
+        });
+        results.put(3, new Object[] {
+                        "c", 3, "c", 3
+        });
+        results.put(20, new Object[] {
+                        "b", 20, "b", 20
+        });
+        results.put(30, new Object[] {
+                        "c", 30, "c", 30
+        });
+
+        Iterator<Tuple> it = pigServer.openIterator("f");
+        while (it.hasNext()) {
+            Tuple t = it.next();
+            System.err.println("Tuple:" + t);
+            Integer group = (Integer) t.get(0);
+            Object[] groupValues = results.get(group);
+            for (int i = 0; i < 4; i++) {
+                assertEquals(groupValues[i], t.get(i + 1));
+            }
+        }
+        Util.deleteFile(cluster, input1);
+        Util.deleteFile(cluster, input2);
+        assertTrue(checkLogFileMessage(new String[] {
+            "TFile storage in use"
+        }));
+    }
+
+}

Modified: hadoop/pig/trunk/test/org/apache/pig/test/utils/GenPhyOp.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/utils/GenPhyOp.java?rev=989828&r1=989827&r2=989828&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/utils/GenPhyOp.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/utils/GenPhyOp.java Thu Aug 26 17:11:36 2010
@@ -33,7 +33,6 @@ import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.impl.io.FileSpec;
-import org.apache.pig.impl.io.InterStorage;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceOper;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
@@ -41,6 +40,7 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.*;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.*;
 import org.apache.pig.impl.plan.PlanException;
+import org.apache.pig.impl.util.Utils;
 
 public class GenPhyOp{
     static Random r = new Random();
@@ -776,7 +776,7 @@ public class GenPhyOp{
     
     private static FileSpec getTempFileSpec() throws IOException {
         return new FileSpec(FileLocalizer.getTemporaryPath(pc).toString(),
-                new FuncSpec(InterStorage.class.getName())
+                new FuncSpec(Utils.getTmpFileCompressorName(pc))
         );
     }