You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by rd...@apache.org on 2010/11/13 01:05:55 UTC
svn commit: r1034606 - in /pig/branches/branch-0.8: CHANGES.txt
src/org/apache/pig/Main.java src/org/apache/pig/builtin/PigStorage.java
test/org/apache/pig/test/TestBZip.java
Author: rding
Date: Sat Nov 13 00:05:55 2010
New Revision: 1034606
URL: http://svn.apache.org/viewvc?rev=1034606&view=rev
Log:
PIG-1714: Option mapred.output.compress doesn't work in Pig 0.8 but worked in 0.7
Modified:
pig/branches/branch-0.8/CHANGES.txt
pig/branches/branch-0.8/src/org/apache/pig/Main.java
pig/branches/branch-0.8/src/org/apache/pig/builtin/PigStorage.java
pig/branches/branch-0.8/test/org/apache/pig/test/TestBZip.java
Modified: pig/branches/branch-0.8/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.8/CHANGES.txt?rev=1034606&r1=1034605&r2=1034606&view=diff
==============================================================================
--- pig/branches/branch-0.8/CHANGES.txt (original)
+++ pig/branches/branch-0.8/CHANGES.txt Sat Nov 13 00:05:55 2010
@@ -207,6 +207,9 @@ PIG-1309: Map-side Cogroup (ashutoshc)
BUG FIXES
+PIG-1714: Option mapred.output.compress doesn't work in Pig 0.8 but worked in
+0.7 (xuefuz via rding)
+
PIG-1706: New logical plan: PushDownFlattenForEach fail if flattened field has user defined schema (daijy)
PIG-1705: New logical plan: self-join fail for some queries (daijy)
Modified: pig/branches/branch-0.8/src/org/apache/pig/Main.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.8/src/org/apache/pig/Main.java?rev=1034606&r1=1034605&r2=1034606&view=diff
==============================================================================
--- pig/branches/branch-0.8/src/org/apache/pig/Main.java (original)
+++ pig/branches/branch-0.8/src/org/apache/pig/Main.java Sat Nov 13 00:05:55 2010
@@ -179,6 +179,16 @@ static int run(String args[], PigProgres
properties.setProperty("stop.on.failure", ""+false);
}
+ if( "true".equals( properties.getProperty( "mapred.output.compress" ) ) ) {
+ properties.setProperty( "output.compression.enabled", "true" );
+ String codec = properties.getProperty( "mapred.output.compression.codec" );
+ if( codec == null ) {
+ throw new RuntimeException( "'mapred.output.compress' is set but no value is specified for 'mapred.output.compression.codec'." );
+ } else {
+ properties.setProperty( "output.compression.codec", codec );
+ }
+ }
+
// set up client side system properties in UDF context
UDFContext.getUDFContext().setClientSystemProps();
Modified: pig/branches/branch-0.8/src/org/apache/pig/builtin/PigStorage.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.8/src/org/apache/pig/builtin/PigStorage.java?rev=1034606&r1=1034605&r2=1034606&view=diff
==============================================================================
--- pig/branches/branch-0.8/src/org/apache/pig/builtin/PigStorage.java (original)
+++ pig/branches/branch-0.8/src/org/apache/pig/builtin/PigStorage.java Sat Nov 13 00:05:55 2010
@@ -28,6 +28,7 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.BZip2Codec;
+import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
@@ -232,14 +233,24 @@ LoadPushDown {
public void setStoreLocation(String location, Job job) throws IOException {
job.getConfiguration().set("mapred.textoutputformat.separator", "");
FileOutputFormat.setOutputPath(job, new Path(location));
- if (location.endsWith(".bz2") || location.endsWith(".bz")) {
- FileOutputFormat.setCompressOutput(job, true);
- FileOutputFormat.setOutputCompressorClass(job, BZip2Codec.class);
- } else if (location.endsWith(".gz")) {
- FileOutputFormat.setCompressOutput(job, true);
- FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
+ if( "true".equals( job.getConfiguration().get( "output.compression.enabled" ) ) ) {
+ FileOutputFormat.setCompressOutput( job, true );
+ String codec = job.getConfiguration().get( "output.compression.codec" );
+ try {
+ FileOutputFormat.setOutputCompressorClass( job, (Class<? extends CompressionCodec>) Class.forName( codec ) );
+ } catch (ClassNotFoundException e) {
+ throw new RuntimeException("Class not found: " + codec );
+ }
} else {
- FileOutputFormat.setCompressOutput(job, false);
+ if (location.endsWith(".bz2") || location.endsWith(".bz")) {
+ FileOutputFormat.setCompressOutput(job, true);
+ FileOutputFormat.setOutputCompressorClass(job, BZip2Codec.class);
+ } else if (location.endsWith(".gz")) {
+ FileOutputFormat.setCompressOutput(job, true);
+ FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
+ } else {
+ FileOutputFormat.setCompressOutput( job, false);
+ }
}
}
Modified: pig/branches/branch-0.8/test/org/apache/pig/test/TestBZip.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.8/test/org/apache/pig/test/TestBZip.java?rev=1034606&r1=1034605&r2=1034606&view=diff
==============================================================================
--- pig/branches/branch-0.8/test/org/apache/pig/test/TestBZip.java (original)
+++ pig/branches/branch-0.8/test/org/apache/pig/test/TestBZip.java Sat Nov 13 00:05:55 2010
@@ -440,4 +440,35 @@ public class TestBZip {
stat = fs.getFileStatus(new Path("output.bz2/part-m-00000.bz2"));
assertTrue(stat.getLen() > 0);
}
+
+ @Test
+ public void testBzipStoreInMultiQuery2() throws Exception {
+ String[] inputData = new String[] {
+ "1\t2\r3\t4"
+ };
+
+ String inputFileName = "input2.txt";
+ Util.createInputFile(cluster, inputFileName, inputData);
+
+ PigServer pig = new PigServer(ExecType.MAPREDUCE, cluster
+ .getProperties());
+ PigContext pigContext = pig.getPigContext();
+ pigContext.getProperties().setProperty( "output.compression.enabled", "true" );
+ pigContext.getProperties().setProperty( "output.compression.codec", "org.apache.hadoop.io.compress.BZip2Codec" );
+
+ pig.setBatchOn();
+ pig.registerQuery("a = load '" + inputFileName + "';");
+ pig.registerQuery("store a into 'output2.bz2';");
+ pig.registerQuery("store a into 'output2';");
+ pig.executeBatch();
+
+ FileSystem fs = FileSystem.get(ConfigurationUtil.toConfiguration(
+ pig.getPigContext().getProperties()));
+ FileStatus stat = fs.getFileStatus(new Path("output2/part-m-00000.bz2"));
+ assertTrue(stat.getLen() > 0);
+
+ stat = fs.getFileStatus(new Path("output2.bz2/part-m-00000.bz2"));
+ assertTrue(stat.getLen() > 0);
+ }
+
}