You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by th...@apache.org on 2010/08/26 19:11:37 UTC
svn commit: r989828 - in /hadoop/pig/trunk: ./ src/org/apache/pig/
src/org/apache/pig/backend/hadoop/executionengine/
src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/
src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/par...
Author: thejas
Date: Thu Aug 26 17:11:36 2010
New Revision: 989828
URL: http://svn.apache.org/viewvc?rev=989828&view=rev
Log:
PIG-1501: need to investigate the impact of compression on pig performance (yanz via thejas)
Added:
hadoop/pig/trunk/src/org/apache/pig/impl/io/TFileRecordReader.java
hadoop/pig/trunk/src/org/apache/pig/impl/io/TFileRecordWriter.java
hadoop/pig/trunk/src/org/apache/pig/impl/io/TFileStorage.java
hadoop/pig/trunk/test/org/apache/pig/test/TestTmpFileCompression.java
Modified:
hadoop/pig/trunk/CHANGES.txt
hadoop/pig/trunk/src/org/apache/pig/PigServer.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SampleOptimizer.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartitionRearrange.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java
hadoop/pig/trunk/src/org/apache/pig/builtin/BinStorage.java
hadoop/pig/trunk/src/org/apache/pig/impl/io/InterStorage.java
hadoop/pig/trunk/src/org/apache/pig/impl/util/Utils.java
hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java
hadoop/pig/trunk/test/org/apache/pig/test/TestSampleOptimizer.java
hadoop/pig/trunk/test/org/apache/pig/test/utils/GenPhyOp.java
Modified: hadoop/pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=989828&r1=989827&r2=989828&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Thu Aug 26 17:11:36 2010
@@ -26,6 +26,8 @@ PIG-1249: Safe-guards against misconfigu
IMPROVEMENTS
+PIG-1501: need to investigate the impact of compression on pig performance (yanz via thejas)
+
PIG-1497: Mandatory rule PartitionFilterOptimizer (xuefuz via daijy)
PIG-1514: Migrate logical optimization rule: OpLimitOptimizer (xuefuz via daijy)
Modified: hadoop/pig/trunk/src/org/apache/pig/PigServer.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/PigServer.java?rev=989828&r1=989827&r2=989828&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/PigServer.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/PigServer.java Thu Aug 26 17:11:36 2010
@@ -63,7 +63,6 @@ import org.apache.pig.data.Tuple;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.io.FileLocalizer;
import org.apache.pig.impl.io.FileSpec;
-import org.apache.pig.impl.io.InterStorage;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.logicalLayer.LOConst;
import org.apache.pig.impl.logicalLayer.LODefine;
@@ -97,6 +96,7 @@ import org.apache.pig.impl.streaming.Str
import org.apache.pig.impl.util.LogUtils;
import org.apache.pig.impl.util.ObjectSerializer;
import org.apache.pig.impl.util.PropertiesUtil;
+import org.apache.pig.impl.util.Utils;
import org.apache.pig.newplan.logical.LogicalPlanMigrationVistor;
import org.apache.pig.newplan.logical.optimizer.LogicalPlanOptimizer;
import org.apache.pig.pen.ExampleGenerator;
@@ -708,7 +708,7 @@ public class PigServer {
}
ExecJob job = store(id, FileLocalizer.getTemporaryPath(pigContext)
- .toString(), InterStorage.class.getName() + "()");
+ .toString(), Utils.getTmpFileCompressorName(pigContext) + "()");
// invocation of "execute" is synchronous!
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java?rev=989828&r1=989827&r2=989828&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java Thu Aug 26 17:11:36 2010
@@ -55,12 +55,12 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.io.FileLocalizer;
import org.apache.pig.impl.io.FileSpec;
-import org.apache.pig.impl.io.InterStorage;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.logicalLayer.LogicalPlan;
import org.apache.pig.impl.plan.NodeIdGenerator;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.util.ObjectSerializer;
+import org.apache.pig.impl.util.Utils;
import org.apache.pig.newplan.logical.LogicalPlanMigrationVistor;
import org.apache.pig.newplan.logical.optimizer.SchemaResetter;
import org.apache.pig.tools.pigstats.OutputStats;
@@ -411,7 +411,7 @@ public class HExecutionEngine {
NodeIdGenerator.getGenerator().getNextNodeId(scope)));
spec = new FileSpec(FileLocalizer.getTemporaryPath(
pigContext).toString(),
- new FuncSpec(InterStorage.class.getName()));
+ new FuncSpec(Utils.getTmpFileCompressorName(pigContext)));
str.setSFile(spec);
plan.addAsLeaf(str);
} else{
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=989828&r1=989827&r2=989828&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java Thu Aug 26 17:11:36 2010
@@ -88,6 +88,7 @@ import org.apache.pig.impl.util.JarManag
import org.apache.pig.impl.util.ObjectSerializer;
import org.apache.pig.impl.util.Pair;
import org.apache.pig.impl.util.UDFContext;
+import org.apache.pig.impl.util.Utils;
import org.apache.pig.tools.pigstats.ScriptState;
@@ -627,6 +628,12 @@ public class JobControlCompiler{
for (POStore st: mapStores) { st.setInputs(null); st.setParentPlan(null);}
for (POStore st: reduceStores) { st.setInputs(null); st.setParentPlan(null);}
+ // tmp file compression setups
+ if (Utils.tmpFileCompression(pigContext)) {
+ conf.setBoolean("pig.tmpfilecompression", true);
+ conf.set("pig.tmpfilecompression.codec", Utils.tmpFileCompressionCodec(pigContext));
+ }
+
conf.set(PIG_MAP_STORES, ObjectSerializer.serialize(mapStores));
conf.set(PIG_REDUCE_STORES, ObjectSerializer.serialize(reduceStores));
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=989828&r1=989827&r2=989828&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java Thu Aug 26 17:11:36 2010
@@ -84,7 +84,6 @@ import org.apache.pig.impl.builtin.Poiss
import org.apache.pig.impl.builtin.RandomSampleLoader;
import org.apache.pig.impl.io.FileLocalizer;
import org.apache.pig.impl.io.FileSpec;
-import org.apache.pig.impl.io.InterStorage;
import org.apache.pig.impl.logicalLayer.LogicalOperator;
import org.apache.pig.impl.plan.CompilationMessageCollector;
import org.apache.pig.impl.plan.DepthFirstWalker;
@@ -99,6 +98,7 @@ import org.apache.pig.impl.util.Compiler
import org.apache.pig.impl.util.MultiMap;
import org.apache.pig.impl.util.ObjectSerializer;
import org.apache.pig.impl.util.Pair;
+import org.apache.pig.impl.util.Utils;
/**
* The compiler that compiles a given physical plan
@@ -613,7 +613,7 @@ public class MRCompiler extends PhyPlanV
*/
private FileSpec getTempFileSpec() throws IOException {
return new FileSpec(FileLocalizer.getTemporaryPath(pigContext).toString(),
- new FuncSpec(InterStorage.class.getName()));
+ new FuncSpec(Utils.getTmpFileCompressorName(pigContext)));
}
/**
@@ -2151,7 +2151,7 @@ public class MRCompiler extends PhyPlanV
// SampleLoader expects string version of FuncSpec
// as its first constructor argument.
- rslargs[0] = (new FuncSpec(InterStorage.class.getName())).toString();
+ rslargs[0] = (new FuncSpec(Utils.getTmpFileCompressorName(pigContext))).toString();
rslargs[1] = "100"; // The value is calculated based on the file size for skewed join
FileSpec quantLdFilName = new FileSpec(lFile.getFileName(),
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java?rev=989828&r1=989827&r2=989828&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java Thu Aug 26 17:11:36 2010
@@ -452,7 +452,7 @@ public class MapReduceLauncher extends L
// Optimize the jobs that have a load/store only first MR job followed
// by a sample job.
- SampleOptimizer so = new SampleOptimizer(plan);
+ SampleOptimizer so = new SampleOptimizer(plan, pc);
so.visit();
// Optimize to use secondary sort key if possible
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SampleOptimizer.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SampleOptimizer.java?rev=989828&r1=989827&r2=989828&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SampleOptimizer.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SampleOptimizer.java Thu Aug 26 17:11:36 2010
@@ -32,10 +32,11 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
import org.apache.pig.impl.io.FileSpec;
-import org.apache.pig.impl.io.InterStorage;
import org.apache.pig.impl.plan.DepthFirstWalker;
import org.apache.pig.impl.plan.PlanException;
import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.util.Utils;
+import org.apache.pig.impl.PigContext;
/**
* A visitor to optimize plans that have a sample job that immediately follows a
@@ -47,9 +48,11 @@ import org.apache.pig.impl.plan.VisitorE
public class SampleOptimizer extends MROpPlanVisitor {
private Log log = LogFactory.getLog(getClass());
+ private PigContext pigContext;
- public SampleOptimizer(MROperPlan plan) {
+ public SampleOptimizer(MROperPlan plan, PigContext pigContext) {
super(plan, new DepthFirstWalker<MapReduceOper, MROperPlan>(plan));
+ this.pigContext = pigContext;
}
private List<MapReduceOper> opsToRemove = new ArrayList<MapReduceOper>();
@@ -136,7 +139,7 @@ public class SampleOptimizer extends MRO
MapReduceOper succ = succs.get(0);
// Find the load the correlates with the file the sampler is loading, and
- // check that it is using BinaryStorage.
+ // check that it is using the twmp file storage format.
if (succ.mapPlan == null) { // Huh?
log.debug("Successor has no map plan.");
return;
@@ -150,7 +153,7 @@ public class SampleOptimizer extends MRO
}
POLoad sl = (POLoad)root;
if (loadFile.equals(sl.getLFile().getFileName()) &&
- InterStorage.class.getName().equals(sl.getLFile().getFuncName())) {
+ Utils.getTmpFileCompressorName(pigContext).equals(sl.getLFile().getFuncName())) {
succLoad = sl;
break;
}
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java?rev=989828&r1=989827&r2=989828&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java Thu Aug 26 17:11:36 2010
@@ -38,7 +38,6 @@ import org.apache.pig.data.DataBag;
import org.apache.pig.data.InternalMap;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.builtin.FindQuantiles;
-import org.apache.pig.impl.io.InterStorage;
import org.apache.pig.impl.io.NullableBytesWritable;
import org.apache.pig.impl.io.NullableDoubleWritable;
import org.apache.pig.impl.io.NullableFloatWritable;
@@ -48,6 +47,7 @@ import org.apache.pig.impl.io.NullableTe
import org.apache.pig.impl.io.NullableTuple;
import org.apache.pig.impl.io.PigNullableWritable;
import org.apache.pig.impl.io.ReadToEndLoader;
+import org.apache.pig.impl.util.Utils;
public class WeightedRangePartitioner extends Partitioner<PigNullableWritable, Writable>
implements Configurable {
@@ -101,9 +101,15 @@ public class WeightedRangePartitioner ex
conf.set("fs.file.impl", configuration.get("fs.file.impl"));
if (configuration.get("fs.hdfs.impl")!=null)
conf.set("fs.hdfs.impl", configuration.get("fs.hdfs.impl"));
+ if (configuration.getBoolean("pig.tmpfilecompression", false))
+ {
+ conf.setBoolean("pig.tmpfilecompression", true);
+ if (configuration.get("pig.tmpfilecompression.codec")!=null)
+ conf.set("pig.tmpfilecompression.codec", configuration.get("pig.tmpfilecompression.codec"));
+ }
conf.set(MapRedUtil.FILE_SYSTEM_NAME, "file:///");
- ReadToEndLoader loader = new ReadToEndLoader(new InterStorage(),
+ ReadToEndLoader loader = new ReadToEndLoader(Utils.getTmpFileStorageObject(conf),
conf, quantilesFile, 0);
DataBag quantilesList;
Tuple t = loader.getNext();
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java?rev=989828&r1=989827&r2=989828&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java Thu Aug 26 17:11:36 2010
@@ -45,7 +45,6 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.impl.builtin.GFCross;
import org.apache.pig.impl.io.FileLocalizer;
import org.apache.pig.impl.io.FileSpec;
-import org.apache.pig.impl.io.InterStorage;
import org.apache.pig.impl.logicalLayer.*;
import org.apache.pig.impl.logicalLayer.schema.Schema;
import org.apache.pig.impl.plan.DependencyOrderWalker;
@@ -60,6 +59,7 @@ import org.apache.pig.impl.plan.VisitorE
import org.apache.pig.impl.util.CompilerUtils;
import org.apache.pig.impl.util.LinkedMultiMap;
import org.apache.pig.impl.util.MultiMap;
+import org.apache.pig.impl.util.Utils;
public class LogToPhyTranslationVisitor extends LOVisitor {
@@ -1505,7 +1505,7 @@ public class LogToPhyTranslationVisitor
physOp.setAlias(split.getAlias());
FileSpec splStrFile;
try {
- splStrFile = new FileSpec(FileLocalizer.getTemporaryPath(pc).toString(),new FuncSpec(InterStorage.class.getName()));
+ splStrFile = new FileSpec(FileLocalizer.getTemporaryPath(pc).toString(),new FuncSpec(Utils.getTmpFileCompressorName(pc)));
} catch (IOException e1) {
byte errSrc = pc.getErrorSource();
int errCode = 0;
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartitionRearrange.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartitionRearrange.java?rev=989828&r1=989827&r2=989828&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartitionRearrange.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartitionRearrange.java Thu Aug 26 17:11:36 2010
@@ -37,6 +37,7 @@ import org.apache.pig.data.Tuple;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.util.Pair;
+import org.apache.pig.impl.util.Utils;
/**
@@ -86,7 +87,17 @@ public class POPartitionRearrange extend
"Internal error: missing key distribution file property.");
}
+ boolean tmpFileCompression = Utils.tmpFileCompression(pigContext);
+ if (tmpFileCompression) {
+ PigMapReduce.sJobConf.setBoolean("pig.tmpfilecompression", true);
+ try {
+ PigMapReduce.sJobConf.set("pig.tmpfilecompression.codec", Utils.tmpFileCompressionCodec(pigContext));
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
try {
+
Integer [] redCnt = new Integer[1];
reducerMap = MapRedUtil.loadPartitionFileFromLocalCache(
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java?rev=989828&r1=989827&r2=989828&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java Thu Aug 26 17:11:36 2010
@@ -47,12 +47,12 @@ import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.builtin.PartitionSkewedKeys;
import org.apache.pig.impl.io.FileLocalizer;
import org.apache.pig.impl.io.FileSpec;
-import org.apache.pig.impl.io.InterStorage;
import org.apache.pig.impl.io.ReadToEndLoader;
import org.apache.pig.impl.plan.NodeIdGenerator;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.util.Pair;
import org.apache.pig.impl.util.UDFContext;
+import org.apache.pig.impl.util.Utils;
/**
* A class of utility static methods to be used in the hadoop map reduce backend
@@ -84,9 +84,15 @@ public class MapRedUtil {
conf.set("fs.file.impl", PigMapReduce.sJobConf.get("fs.file.impl"));
if (PigMapReduce.sJobConf.get("fs.hdfs.impl")!=null)
conf.set("fs.hdfs.impl", PigMapReduce.sJobConf.get("fs.hdfs.impl"));
+ if (PigMapReduce.sJobConf.getBoolean("pig.tmpfilecompression", false))
+ {
+ conf.setBoolean("pig.tmpfilecompression", true);
+ if (PigMapReduce.sJobConf.get("pig.tmpfilecompression.codec")!=null)
+ conf.set("pig.tmpfilecompression.codec", PigMapReduce.sJobConf.get("pig.tmpfilecompression.codec"));
+ }
conf.set(MapRedUtil.FILE_SYSTEM_NAME, "file:///");
- ReadToEndLoader loader = new ReadToEndLoader(new InterStorage(), conf,
+ ReadToEndLoader loader = new ReadToEndLoader(Utils.getTmpFileStorageObject(PigMapReduce.sJobConf), conf,
keyDistFile, 0);
DataBag partitionList;
Tuple t = loader.getNext();
@@ -156,7 +162,7 @@ public class MapRedUtil {
NodeIdGenerator.getGenerator().getNextNodeId(scope)));
spec = new FileSpec(FileLocalizer.getTemporaryPath(
pigContext).toString(),
- new FuncSpec(InterStorage.class.getName()));
+ new FuncSpec(Utils.getTmpFileCompressorName(pigContext)));
str.setSFile(spec);
plan.addAsLeaf(str);
} else{
Modified: hadoop/pig/trunk/src/org/apache/pig/builtin/BinStorage.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/builtin/BinStorage.java?rev=989828&r1=989827&r2=989828&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/builtin/BinStorage.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/builtin/BinStorage.java Thu Aug 26 17:11:36 2010
@@ -55,16 +55,14 @@ import org.apache.pig.backend.hadoop.dat
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataReaderWriter;
-import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.io.BinStorageInputFormat;
import org.apache.pig.impl.io.BinStorageOutputFormat;
import org.apache.pig.impl.io.BinStorageRecordReader;
import org.apache.pig.impl.io.BinStorageRecordWriter;
import org.apache.pig.impl.io.FileLocalizer;
-import org.apache.pig.impl.io.ReadToEndLoader;
-import org.apache.pig.impl.logicalLayer.schema.Schema;
import org.apache.pig.impl.util.LogUtils;
+import org.apache.pig.impl.util.Utils;
/**
* Load and store data in a binary format. This class is used by Pig to move
@@ -408,27 +406,7 @@ implements LoadCaster, StoreFuncInterfac
}
}
- ReadToEndLoader loader = new ReadToEndLoader(this, conf, location, 0);
- // get the first record from the input file
- // and figure out the schema from the data in
- // the first record
- Tuple t = loader.getNext();
- if(t == null) {
- // we couldn't get a valid record from the input
- return null;
- }
- int numFields = t.size();
- Schema s = new Schema();
- for (int i = 0; i < numFields; i++) {
- try {
- s.add(DataType.determineFieldSchema(t.get(i)));
- } catch (Exception e) {
- int errCode = 2104;
- String msg = "Error while determining schema of BinStorage data.";
- throw new ExecException(msg, errCode, PigException.BUG, e);
- }
- }
- return new ResourceSchema(s);
+ return Utils.getSchema(this, location, false, job);
}
@Override
Modified: hadoop/pig/trunk/src/org/apache/pig/impl/io/InterStorage.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/io/InterStorage.java?rev=989828&r1=989827&r2=989828&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/io/InterStorage.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/io/InterStorage.java Thu Aug 26 17:11:36 2010
@@ -18,9 +18,6 @@
package org.apache.pig.impl.io;
import java.io.IOException;
-import java.util.Iterator;
-import java.util.Properties;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -42,20 +39,15 @@ import org.apache.pig.Expression;
import org.apache.pig.FileInputLoadFunc;
import org.apache.pig.LoadFunc;
import org.apache.pig.LoadMetadata;
-import org.apache.pig.PigException;
import org.apache.pig.ResourceSchema;
import org.apache.pig.ResourceStatistics;
import org.apache.pig.StoreFunc;
import org.apache.pig.StoreFuncInterface;
-import org.apache.pig.backend.executionengine.ExecException;
-import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
-import org.apache.pig.backend.hadoop.datastorage.HDataStorage;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigFileInputFormat;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
import org.apache.pig.classification.InterfaceAudience;
-import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
-import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.util.Utils;
/**
* LOAD FUNCTION FOR PIG INTERNAL USE ONLY!
@@ -68,10 +60,7 @@ import org.apache.pig.impl.logicalLayer.
public class InterStorage extends FileInputLoadFunc
implements StoreFuncInterface, LoadMetadata {
-
- Iterator<Tuple> i = null;
private static final Log mLog = LogFactory.getLog(InterStorage.class);
- protected long end = Long.MAX_VALUE;
private InterRecordReader recReader = null;
private InterRecordWriter recWriter = null;
@@ -80,6 +69,7 @@ implements StoreFuncInterface, LoadMetad
* Simple binary nested reader format
*/
public InterStorage() {
+ mLog.info("Pig Internal storage in use");
}
@Override
@@ -190,38 +180,7 @@ implements StoreFuncInterface, LoadMetad
@Override
public ResourceSchema getSchema(String location, Job job)
throws IOException {
- Configuration conf = job.getConfiguration();
- // since local mode now is implemented as hadoop's local mode
- // we can treat either local or hadoop mode as hadoop mode - hence
- // we can use HDataStorage and FileLocalizer.openDFSFile below
- Path path = new Path(location);
- if(! FileSystem.get(conf).exists(path)){
- // At compile time in batch mode, the file may not exist
- // (such as intermediate file). Just return null - the
- // same way as we would if we did not get a valid record
- return null;
- }
- ReadToEndLoader loader = new ReadToEndLoader(this, conf, location, 0);
- // get the first record from the input file
- // and figure out the schema from the data in
- // the first record
- Tuple t = loader.getNext();
- if(t == null) {
- // we couldn't get a valid record from the input
- return null;
- }
- int numFields = t.size();
- Schema s = new Schema();
- for (int i = 0; i < numFields; i++) {
- try {
- s.add(DataType.determineFieldSchema(t.get(i)));
- } catch (Exception e) {
- int errCode = 2104;
- String msg = "Error while determining schema of InterStorage data.";
- throw new ExecException(msg, errCode, PigException.BUG, e);
- }
- }
- return new ResourceSchema(s);
+ return Utils.getSchema(this, location, true, job);
}
@Override
Added: hadoop/pig/trunk/src/org/apache/pig/impl/io/TFileRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/io/TFileRecordReader.java?rev=989828&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/io/TFileRecordReader.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/io/TFileRecordReader.java Thu Aug 26 17:11:36 2010
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.io;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.BinInterSedes;
+import org.apache.pig.data.InterSedes;
+import org.apache.pig.data.InterSedesFactory;
+import org.apache.pig.data.Tuple;
+import org.apache.hadoop.io.file.tfile.TFile.Reader;
+
+/**
+ * A record reader used to read data written using {@link InterRecordWriter} It
+ * uses the default InterSedes object for deserialization.
+ */
+public class TFileRecordReader extends RecordReader<Text, Tuple> {
+
+ private long start;
+ private long pos;
+ private long end;
+ Reader reader = null;
+ Reader.Scanner scanner = null;
+ private Tuple value = null;
+ private FSDataInputStream fileIn = null;
+ private static InterSedes sedes = InterSedesFactory.getInterSedesInstance();
+
+ public void initialize(InputSplit genericSplit, TaskAttemptContext context)
+ throws IOException {
+ FileSplit split = (FileSplit) genericSplit;
+ Configuration job = context.getConfiguration();
+ start = split.getStart();
+ end = start + split.getLength();
+ final Path file = split.getPath();
+
+ // open the file and seek to the start of the split
+ FileSystem fs = file.getFileSystem(job);
+ fileIn = fs.open(split.getPath());
+ reader = new Reader(fileIn, fs.getFileStatus(file).getLen(), job);
+ scanner = reader.createScannerByByteRange(start, split.getLength());
+ }
+
+ public boolean nextKeyValue() throws IOException {
+ // skip to next record
+ if (scanner.atEnd()) {
+ value = null;
+ return false;
+ }
+
+ DataInputStream in = scanner.entry().getValueStream();
+ try {
+ // if we got here, we have seen RECORD_1-RECORD_2-RECORD_3-TUPLE_MARKER
+ // sequence - lets now read the contents of the tuple
+ value = (Tuple) sedes.readDatum(in);
+ scanner.advance();
+ return true;
+ }
+ finally {
+ in.close();
+ }
+ }
+
+ @Override
+ public Text getCurrentKey() {
+ // the key is always null since we don't really have a key for each
+ // input record
+ return null;
+ }
+
+ @Override
+ public Tuple getCurrentValue() {
+ return value;
+ }
+
+ /**
+ * Get the progress within the split
+ */
+ public float getProgress() {
+ if (start == end) {
+ return 0.0f;
+ }
+ else {
+ return Math.min(1.0f, (pos - start) / (float) (end - start));
+ }
+ }
+
+ public synchronized void close() throws IOException {
+ if (scanner != null) {
+ scanner.close();
+ }
+ if (reader != null) reader.close();
+ if (fileIn != null) fileIn.close();
+ }
+}
Added: hadoop/pig/trunk/src/org/apache/pig/impl/io/TFileRecordWriter.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/io/TFileRecordWriter.java?rev=989828&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/io/TFileRecordWriter.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/io/TFileRecordWriter.java Thu Aug 26 17:11:36 2010
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.io;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.pig.data.InterSedes;
+import org.apache.pig.data.InterSedesFactory;
+import org.apache.pig.data.Tuple;
+import org.apache.hadoop.io.file.tfile.TFile.Writer;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * A record reader used to write data compatible with {@link InterRecordWriter}
+ * It uses the default InterSedes object for serialization.
+ */
+public class TFileRecordWriter extends
+ RecordWriter<org.apache.hadoop.io.WritableComparable, Tuple> {
+
+ final private BytesWritable KEY0 = new BytesWritable(new byte[0]);
+ private static InterSedes sedes = InterSedesFactory.getInterSedesInstance();
+ /**
+ * the outputstream to write out on
+ */
+ Writer writer = null;
+ FSDataOutputStream fileOut = null;
+
+ /**
+ *
+ */
+ public TFileRecordWriter(Path file, String codec, Configuration conf)
+ throws IOException {
+ FileSystem fs = file.getFileSystem(conf);
+ fileOut = fs.create(file, false);
+ writer = new Writer(fileOut, 1024 * 1024, codec, null, conf);
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.hadoop.mapreduce.RecordWriter#close(org.apache.hadoop.mapreduce.TaskAttemptContext)
+ */
+ @Override
+ public void close(TaskAttemptContext arg0) throws IOException,
+ InterruptedException {
+ if (writer != null) writer.close();
+ if (fileOut != null) fileOut.close();
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.hadoop.mapreduce.RecordWriter#write(java.lang.Object, java.lang.Object)
+ */
+ @Override
+ public void write(WritableComparable wc, Tuple t) throws IOException,
+ InterruptedException {
+ DataOutputStream outputKey = writer.prepareAppendKey(KEY0.getLength());
+ try {
+ outputKey.write(KEY0.getBytes(), 0, KEY0.getLength());
+ }
+ finally {
+ outputKey.close();
+ }
+ // we really only want to write the tuple (value) out here
+ DataOutputStream outputValue = writer.prepareAppendValue(-1);
+
+ try {
+ sedes.writeDatum(outputValue, t);
+ }
+ finally {
+ outputValue.close();
+ }
+ }
+
+}
Added: hadoop/pig/trunk/src/org/apache/pig/impl/io/TFileStorage.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/io/TFileStorage.java?rev=989828&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/io/TFileStorage.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/io/TFileStorage.java Thu Aug 26 17:11:36 2010
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.io;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.pig.Expression;
+import org.apache.pig.FileInputLoadFunc;
+import org.apache.pig.LoadFunc;
+import org.apache.pig.LoadMetadata;
+import org.apache.pig.ResourceSchema;
+import org.apache.pig.ResourceStatistics;
+import org.apache.pig.StoreFunc;
+import org.apache.pig.StoreFuncInterface;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigFileInputFormat;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
+import org.apache.pig.classification.InterfaceAudience;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.util.Utils;
+
+/**
+ * LOAD FUNCTION FOR PIG INTERNAL USE ONLY! This load function is used for
+ * storing intermediate data between MR jobs of a pig query. The serialization
+ * format of this load function can change in newer versions of pig, so this
+ * should NOT be used to store any persistent data.
+ */
+@InterfaceAudience.Private
+public class TFileStorage extends FileInputLoadFunc implements
+ StoreFuncInterface, LoadMetadata {
+
+ private static final Log mLog = LogFactory.getLog(TFileStorage.class);
+
+ private TFileRecordReader recReader = null;
+ private TFileRecordWriter recWriter = null;
+
+ /**
+ * Simple binary nested reader format
+ */
+ public TFileStorage() throws IOException {
+ mLog.info("TFile storage in use");
+ }
+
+ @Override
+ public Tuple getNext() throws IOException {
+ if (recReader.nextKeyValue()) {
+ return recReader.getCurrentValue();
+ }
+ else {
+ return null;
+ }
+ }
+
+ @Override
+ public void putNext(Tuple t) throws IOException {
+ try {
+ recWriter.write(null, t);
+ }
+ catch (InterruptedException e) {
+ throw new IOException(e);
+ }
+ }
+
+ public static class TFileInputFormat extends
+ PigFileInputFormat<Text, Tuple> {
+
+ /* (non-Javadoc)
+ * @see org.apache.hadoop.mapreduce.InputFormat#createRecordReader(org.apache.hadoop.mapreduce.InputSplit, org.apache.hadoop.mapreduce.TaskAttemptContext)
+ */
+ @Override
+ public RecordReader<Text, Tuple> createRecordReader(InputSplit split,
+ TaskAttemptContext context) throws IOException,
+ InterruptedException {
+ return new TFileRecordReader();
+ }
+
+ }
+
+ @Override
+ public InputFormat getInputFormat() {
+ return new TFileInputFormat();
+ }
+
+ @Override
+ public int hashCode() {
+ return 42;
+ }
+
+ @Override
+ public void prepareToRead(RecordReader reader, PigSplit split) {
+ recReader = (TFileRecordReader) reader;
+ }
+
+ @Override
+ public void setLocation(String location, Job job) throws IOException {
+ FileInputFormat.setInputPaths(job, location);
+ }
+
+ public static class TFileOutputFormat
+ extends
+ FileOutputFormat<org.apache.hadoop.io.WritableComparable, Tuple> {
+
+ /* (non-Javadoc)
+ * @see org.apache.hadoop.mapreduce.lib.output.FileOutputFormat#getRecordWriter(org.apache.hadoop.mapreduce.TaskAttemptContext)
+ */
+ @Override
+ public RecordWriter<WritableComparable, Tuple> getRecordWriter(
+ TaskAttemptContext job) throws IOException,
+ InterruptedException {
+ Configuration conf = job.getConfiguration();
+ String codec = conf.get("pig.tmpfilecompression.codec", "");
+ if (!codec.equals("lzo") && !codec.equals("gz"))
+ throw new IOException(
+ "Invalid temporary file compression codec [" + codec + "]. Expected compression codecs are gz and lzo");
+ mLog.info(codec + " compression codec in use");
+ Path file = getDefaultWorkFile(job, "");
+ return new TFileRecordWriter(file, codec, conf);
+ }
+ }
+
+ @Override
+ public OutputFormat getOutputFormat() {
+ return new TFileOutputFormat();
+ }
+
+ @Override
+ public void prepareToWrite(RecordWriter writer) {
+ this.recWriter = (TFileRecordWriter) writer;
+ }
+
+ @Override
+ public void setStoreLocation(String location, Job job) throws IOException {
+ FileOutputFormat.setOutputPath(job, new Path(location));
+ }
+
+ @Override
+ public void checkSchema(ResourceSchema s) throws IOException {
+
+ }
+
+ @Override
+ public String relToAbsPathForStoreLocation(String location, Path curDir)
+ throws IOException {
+ return LoadFunc.getAbsolutePath(location, curDir);
+ }
+
+ @Override
+ public String[] getPartitionKeys(String location, Job job)
+ throws IOException {
+ return null;
+ }
+
+ @Override
+ public ResourceSchema getSchema(String location, Job job)
+ throws IOException {
+ return Utils.getSchema(this, location, true, job);
+ }
+
+ @Override
+ public ResourceStatistics getStatistics(String location, Job job)
+ throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void setPartitionFilter(Expression plan) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void setStoreFuncUDFContextSignature(String signature) {
+ }
+
+ @Override
+ public void cleanupOnFailure(String location, Job job) throws IOException {
+ StoreFunc.cleanupOnFailureImpl(location, job);
+ }
+
+}
Modified: hadoop/pig/trunk/src/org/apache/pig/impl/util/Utils.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/util/Utils.java?rev=989828&r1=989827&r2=989828&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/util/Utils.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/util/Utils.java Thu Aug 26 17:11:36 2010
@@ -18,11 +18,27 @@
package org.apache.pig.impl.util;
import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+
+import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
import org.apache.pig.impl.logicalLayer.parser.ParseException;
import org.apache.pig.impl.logicalLayer.parser.QueryParser;
import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.LoadFunc;
+import org.apache.pig.impl.io.InterStorage;
+import org.apache.pig.impl.io.ReadToEndLoader;
+import org.apache.pig.impl.io.TFileStorage;
+import org.apache.pig.FileInputLoadFunc;
+import org.apache.pig.PigException;
+import org.apache.pig.ResourceSchema;
/**
* Class with utility static methods
@@ -77,6 +93,42 @@ public class Utils {
}
}
+ public static ResourceSchema getSchema(LoadFunc wrappedLoadFunc, String location, boolean checkExistence, Job job)
+ throws IOException {
+ Configuration conf = job.getConfiguration();
+ if (checkExistence) {
+ Path path = new Path(location);
+ if (!FileSystem.get(conf).exists(path)) {
+ // At compile time in batch mode, the file may not exist
+ // (such as intermediate file). Just return null - the
+ // same way as we would if we did not get a valid record
+ return null;
+ }
+ }
+ ReadToEndLoader loader = new ReadToEndLoader(wrappedLoadFunc, conf, location, 0);
+ // get the first record from the input file
+ // and figure out the schema from the data in
+ // the first record
+ Tuple t = loader.getNext();
+ if (t == null) {
+ // we couldn't get a valid record from the input
+ return null;
+ }
+ int numFields = t.size();
+ Schema s = new Schema();
+ for (int i = 0; i < numFields; i++) {
+ try {
+ s.add(DataType.determineFieldSchema(t.get(i)));
+ }
+ catch (Exception e) {
+ int errCode = 2104;
+ String msg = "Error while determining schema of SequenceFileStorage data.";
+ throw new ExecException(msg, errCode, PigException.BUG, e);
+ }
+ }
+ return new ResourceSchema(s);
+ }
+
public static Schema getSchemaFromString(String schemaString) throws ParseException {
return Utils.getSchemaFromString(schemaString, DataType.BYTEARRAY);
}
@@ -89,6 +141,40 @@ public class Utils {
return schema;
}
+ public static String getTmpFileCompressorName(PigContext pigContext) {
+ if (pigContext == null)
+ return InterStorage.class.getName();
+ boolean tmpFileCompression = pigContext.getProperties().getProperty("pig.tmpfilecompression", "false").equals("true");
+ String codec = pigContext.getProperties().getProperty("pig.tmpfilecompression.codec", "");
+ if (tmpFileCompression) {
+ if (codec.equals("lzo"))
+ pigContext.getProperties().setProperty("io.compression.codec.lzo.class", "com.hadoop.compression.lzo.LzoCodec");
+ return TFileStorage.class.getName();
+ } else
+ return InterStorage.class.getName();
+ }
+
+ public static FileInputLoadFunc getTmpFileStorageObject(Configuration conf) throws IOException {
+ boolean tmpFileCompression = conf.getBoolean("pig.tmpfilecompression", false);
+ return tmpFileCompression ? new TFileStorage() : new InterStorage();
+ }
+
+ public static boolean tmpFileCompression(PigContext pigContext) {
+ if (pigContext == null)
+ return false;
+ return pigContext.getProperties().getProperty("pig.tmpfilecompression", "false").equals("true");
+ }
+
+ public static String tmpFileCompressionCodec(PigContext pigContext) throws IOException {
+ if (pigContext == null)
+ return "";
+ String codec = pigContext.getProperties().getProperty("pig.tmpfilecompression.codec", "");
+ if (codec.equals("gz") || codec.equals("lzo"))
+ return codec;
+ else
+ throw new IOException("Invalid temporary file compression codec ["+codec+"]. Expected compression codecs are gz and lzo");
+ }
+
public static String getStringFromArray(String[] arr) {
StringBuilder str = new StringBuilder();
for(String s: arr) {
@@ -97,5 +183,4 @@ public class Utils {
}
return str.toString();
}
-
}
Modified: hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java?rev=989828&r1=989827&r2=989828&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java Thu Aug 26 17:11:36 2010
@@ -62,7 +62,6 @@ import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.builtin.GFCross;
import org.apache.pig.impl.io.FileLocalizer;
import org.apache.pig.impl.io.FileSpec;
-import org.apache.pig.impl.io.InterStorage;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.logicalLayer.LogicalOperator;
import org.apache.pig.impl.logicalLayer.LogicalPlan;
@@ -86,6 +85,7 @@ import org.apache.pig.newplan.logical.ex
import org.apache.pig.newplan.logical.expression.LogicalExpressionPlan;
import org.apache.pig.newplan.logical.expression.ProjectExpression;
import org.apache.pig.newplan.logical.relational.LogicalSchema.LogicalFieldSchema;
+import org.apache.pig.impl.util.Utils;
public class LogToPhyTranslationVisitor extends LogicalRelationalNodesVisitor {
@@ -1216,7 +1216,7 @@ public class LogToPhyTranslationVisitor
physOp.setAlias(loSplit.getAlias());
FileSpec splStrFile;
try {
- splStrFile = new FileSpec(FileLocalizer.getTemporaryPath(pc).toString(),new FuncSpec(InterStorage.class.getName()));
+ splStrFile = new FileSpec(FileLocalizer.getTemporaryPath(pc).toString(),new FuncSpec(Utils.getTmpFileCompressorName(pc)));
} catch (IOException e1) {
byte errSrc = pc.getErrorSource();
int errCode = 0;
Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestSampleOptimizer.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestSampleOptimizer.java?rev=989828&r1=989827&r2=989828&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestSampleOptimizer.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestSampleOptimizer.java Thu Aug 26 17:11:36 2010
@@ -82,7 +82,7 @@ public class TestSampleOptimizer {
// Before optimizer visits, number of MR jobs = 3.
assertEquals(3,count);
- SampleOptimizer so = new SampleOptimizer(mrPlan);
+ SampleOptimizer so = new SampleOptimizer(mrPlan, pc);
so.visit();
count = 1;
@@ -129,7 +129,7 @@ public class TestSampleOptimizer {
// Before optimizer visits, number of MR jobs = 3.
assertEquals(3,count);
- SampleOptimizer so = new SampleOptimizer(mrPlan);
+ SampleOptimizer so = new SampleOptimizer(mrPlan, pc);
so.visit();
count = 1;
@@ -211,7 +211,7 @@ public class TestSampleOptimizer {
// Before optimizer visits, number of MR jobs = 3.
assertEquals(3,count);
- SampleOptimizer so = new SampleOptimizer(mrPlan);
+ SampleOptimizer so = new SampleOptimizer(mrPlan, pc);
so.visit();
count = 1;
@@ -243,7 +243,7 @@ public class TestSampleOptimizer {
// Before optimizer visits, number of MR jobs = 3.
assertEquals(3,count);
- SampleOptimizer so = new SampleOptimizer(mrPlan);
+ SampleOptimizer so = new SampleOptimizer(mrPlan, pc);
so.visit();
count = 1;
Added: hadoop/pig/trunk/test/org/apache/pig/test/TestTmpFileCompression.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestTmpFileCompression.java?rev=989828&view=auto
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestTmpFileCompression.java (added)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestTmpFileCompression.java Thu Aug 26 17:11:36 2010
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test;
+
+import static org.junit.Assert.*;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+
+import java.io.BufferedReader;
+import java.io.FileReader;
+
+
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.io.TFileStorage;
+import org.apache.pig.impl.io.InterStorage;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.log4j.FileAppender;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.log4j.SimpleLayout;
+
+public class TestTmpFileCompression {
+ private PigServer pigServer;
+ static MiniCluster cluster = MiniCluster.buildCluster();
+ File logFile;
+
+ @Before
+ public void setUp() throws Exception {
+ pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+ }
+
+ private void resetLog(Class clazz) throws Exception {
+ if (logFile != null)
+ logFile.delete();
+ Logger logger = Logger.getLogger(clazz);
+ logger.removeAllAppenders();
+ logger.setLevel(Level.INFO);
+ SimpleLayout layout = new SimpleLayout();
+ logFile = File.createTempFile("log", "");
+ FileAppender appender = new FileAppender(layout, logFile.toString(),
+ false, false, 0);
+ logger.addAppender(appender);
+ }
+
+ public boolean checkLogFileMessage(String[] messages) {
+ BufferedReader reader = null;
+
+ try {
+ reader = new BufferedReader(new FileReader(logFile));
+ String logMessage = "";
+ String line;
+ while ((line = reader.readLine()) != null) {
+ logMessage = logMessage + line + "\n";
+ }
+ for (int i = 0; i < messages.length; i++) {
+ if (!logMessage.contains(messages[i])) return false;
+ }
+ return true;
+ }
+ catch (IOException e) {
+ return false;
+ }
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ if (logFile != null)
+ logFile.delete();
+ }
+
+ @AfterClass
+ public static void oneTimeTearDown() throws Exception {
+ cluster.shutDown();
+ }
+
+ @Test
+ public void testImplicitSplitUncompressed() throws Exception {
+ resetLog(InterStorage.class);
+ int LOOP_SIZE = 20;
+ String[] input = new String[LOOP_SIZE];
+ for (int i = 1; i <= LOOP_SIZE; i++) {
+ input[i - 1] = Integer.toString(i);
+ }
+ String inputFileName = "testImplicitSplit-input.txt";
+ Util.createInputFile(cluster, inputFileName, input);
+ pigServer.registerQuery("A = LOAD '" + inputFileName + "';");
+ pigServer.registerQuery("B = filter A by $0<=10;");
+ pigServer.registerQuery("C = filter A by $0>10;");
+ pigServer.registerQuery("D = union B,C;");
+ Iterator<Tuple> iter = pigServer.openIterator("D");
+ if (!iter.hasNext()) fail("No Output received");
+ int cnt = 0;
+ while (iter.hasNext()) {
+ Tuple t = iter.next();
+ ++cnt;
+ }
+ assertEquals(20, cnt);
+ Util.deleteFile(cluster, inputFileName);
+ assertTrue(checkLogFileMessage(new String[] {
+ "Pig Internal storage in use"
+ }));
+ }
+
+ @Test
+ public void testImplicitSplitInCoGroupUncompressed() throws Exception {
+ // this query is similar to the one reported in JIRA - PIG-537
+ // Create input file
+ resetLog(InterStorage.class);
+ String input1 = "testImplicitSplitInCoGroup-input1.txt";
+ String input2 = "testImplicitSplitInCoGroup-input2.txt";
+ Util.createInputFile(cluster, input1, new String[] {
+ "a:1", "b:2", "b:20", "c:3", "c:30"
+ });
+ Util.createInputFile(cluster, input2, new String[] {
+ "a:first", "b:second", "c:third"
+ });
+ pigServer.registerQuery("a = load '" + input1 + "' using PigStorage(':') as (name:chararray, marks:int);");
+ pigServer.registerQuery("b = load '" + input2 + "' using PigStorage(':') as (name:chararray, rank:chararray);");
+ pigServer.registerQuery("c = cogroup a by name, b by name;");
+ pigServer.registerQuery("d = foreach c generate group, FLATTEN(a.marks) as newmarks;");
+ pigServer.registerQuery("e = cogroup a by marks, d by newmarks;");
+ pigServer.registerQuery("f = foreach e generate group, flatten(a), flatten(d);");
+ HashMap<Integer, Object[]> results = new HashMap<Integer, Object[]>();
+ results.put(1, new Object[] {
+ "a", 1, "a", 1
+ });
+ results.put(2, new Object[] {
+ "b", 2, "b", 2
+ });
+ results.put(3, new Object[] {
+ "c", 3, "c", 3
+ });
+ results.put(20, new Object[] {
+ "b", 20, "b", 20
+ });
+ results.put(30, new Object[] {
+ "c", 30, "c", 30
+ });
+
+ Iterator<Tuple> it = pigServer.openIterator("f");
+ while (it.hasNext()) {
+ Tuple t = it.next();
+ System.err.println("Tuple:" + t);
+ Integer group = (Integer) t.get(0);
+ Object[] groupValues = results.get(group);
+ for (int i = 0; i < 4; i++) {
+ assertEquals(groupValues[i], t.get(i + 1));
+ }
+ }
+ Util.deleteFile(cluster, input1);
+ Util.deleteFile(cluster, input2);
+ assertTrue(checkLogFileMessage(new String[] {
+ "Pig Internal storage in use"
+ }));
+ }
+
+ @Test
+ public void testImplicitSplit() throws Exception {
+ resetLog(TFileStorage.class);
+ int LOOP_SIZE = 20;
+ String[] input = new String[LOOP_SIZE];
+ for (int i = 1; i <= LOOP_SIZE; i++) {
+ input[i - 1] = Integer.toString(i);
+ }
+ String inputFileName = "testImplicitSplit-input.txt";
+ Util.createInputFile(cluster, inputFileName, input);
+ pigServer.getPigContext().getProperties().setProperty(
+ "pig.tmpfilecompression", "true");
+ pigServer.getPigContext().getProperties().setProperty(
+ "pig.tmpfilecompression.codec", "gz");
+ pigServer.registerQuery("A = LOAD '" + inputFileName + "';");
+ pigServer.registerQuery("B = filter A by $0<=10;");
+ pigServer.registerQuery("C = filter A by $0>10;");
+ pigServer.registerQuery("D = union B,C;");
+ Iterator<Tuple> iter = pigServer.openIterator("D");
+ if (!iter.hasNext()) fail("No Output received");
+ int cnt = 0;
+ while (iter.hasNext()) {
+ Tuple t = iter.next();
+ ++cnt;
+ }
+ assertEquals(20, cnt);
+ Util.deleteFile(cluster, inputFileName);
+ assertTrue(checkLogFileMessage(new String[] {
+ "TFile storage in use"
+ }));
+ }
+
+ @Test
+ public void testImplicitSplitInCoGroup() throws Exception {
+ // this query is similar to the one reported in JIRA - PIG-537
+ // Create input file
+ resetLog(TFileStorage.class);
+ String input1 = "testImplicitSplitInCoGroup-input1.txt";
+ String input2 = "testImplicitSplitInCoGroup-input2.txt";
+ Util.createInputFile(cluster, input1, new String[] {
+ "a:1", "b:2", "b:20", "c:3", "c:30"
+ });
+ Util.createInputFile(cluster, input2, new String[] {
+ "a:first", "b:second", "c:third"
+ });
+ pigServer.getPigContext().getProperties().setProperty(
+ "pig.tmpfilecompression", "true");
+ pigServer.getPigContext().getProperties().setProperty(
+ "pig.tmpfilecompression.codec", "gz");
+ pigServer.registerQuery("a = load '" + input1 + "' using PigStorage(':') as (name:chararray, marks:int);");
+ pigServer.registerQuery("b = load '" + input2 + "' using PigStorage(':') as (name:chararray, rank:chararray);");
+ pigServer.registerQuery("c = cogroup a by name, b by name;");
+ pigServer.registerQuery("d = foreach c generate group, FLATTEN(a.marks) as newmarks;");
+ pigServer.registerQuery("e = cogroup a by marks, d by newmarks;");
+ pigServer.registerQuery("f = foreach e generate group, flatten(a), flatten(d);");
+ HashMap<Integer, Object[]> results = new HashMap<Integer, Object[]>();
+ results.put(1, new Object[] {
+ "a", 1, "a", 1
+ });
+ results.put(2, new Object[] {
+ "b", 2, "b", 2
+ });
+ results.put(3, new Object[] {
+ "c", 3, "c", 3
+ });
+ results.put(20, new Object[] {
+ "b", 20, "b", 20
+ });
+ results.put(30, new Object[] {
+ "c", 30, "c", 30
+ });
+
+ Iterator<Tuple> it = pigServer.openIterator("f");
+ while (it.hasNext()) {
+ Tuple t = it.next();
+ System.err.println("Tuple:" + t);
+ Integer group = (Integer) t.get(0);
+ Object[] groupValues = results.get(group);
+ for (int i = 0; i < 4; i++) {
+ assertEquals(groupValues[i], t.get(i + 1));
+ }
+ }
+ Util.deleteFile(cluster, input1);
+ Util.deleteFile(cluster, input2);
+ assertTrue(checkLogFileMessage(new String[] {
+ "TFile storage in use"
+ }));
+ }
+
+}
Modified: hadoop/pig/trunk/test/org/apache/pig/test/utils/GenPhyOp.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/utils/GenPhyOp.java?rev=989828&r1=989827&r2=989828&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/utils/GenPhyOp.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/utils/GenPhyOp.java Thu Aug 26 17:11:36 2010
@@ -33,7 +33,6 @@ import org.apache.pig.data.Tuple;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.io.FileLocalizer;
import org.apache.pig.impl.io.FileSpec;
-import org.apache.pig.impl.io.InterStorage;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceOper;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
@@ -41,6 +40,7 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.*;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.*;
import org.apache.pig.impl.plan.PlanException;
+import org.apache.pig.impl.util.Utils;
public class GenPhyOp{
static Random r = new Random();
@@ -776,7 +776,7 @@ public class GenPhyOp{
private static FileSpec getTempFileSpec() throws IOException {
return new FileSpec(FileLocalizer.getTemporaryPath(pc).toString(),
- new FuncSpec(InterStorage.class.getName())
+ new FuncSpec(Utils.getTmpFileCompressorName(pc))
);
}