You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by rd...@apache.org on 2010/11/13 01:05:55 UTC

svn commit: r1034606 - in /pig/branches/branch-0.8: CHANGES.txt src/org/apache/pig/Main.java src/org/apache/pig/builtin/PigStorage.java test/org/apache/pig/test/TestBZip.java

Author: rding
Date: Sat Nov 13 00:05:55 2010
New Revision: 1034606

URL: http://svn.apache.org/viewvc?rev=1034606&view=rev
Log:
PIG-1714: Option mapred.output.compress doesn't work in Pig 0.8 but worked in 0.7

Modified:
    pig/branches/branch-0.8/CHANGES.txt
    pig/branches/branch-0.8/src/org/apache/pig/Main.java
    pig/branches/branch-0.8/src/org/apache/pig/builtin/PigStorage.java
    pig/branches/branch-0.8/test/org/apache/pig/test/TestBZip.java

Modified: pig/branches/branch-0.8/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.8/CHANGES.txt?rev=1034606&r1=1034605&r2=1034606&view=diff
==============================================================================
--- pig/branches/branch-0.8/CHANGES.txt (original)
+++ pig/branches/branch-0.8/CHANGES.txt Sat Nov 13 00:05:55 2010
@@ -207,6 +207,9 @@ PIG-1309: Map-side Cogroup (ashutoshc)
 
 BUG FIXES
 
+PIG-1714: Option mapred.output.compress doesn't work in Pig 0.8 but worked in
+0.7 (xuefuz via rding)
+
 PIG-1706: New logical plan: PushDownFlattenForEach fail if flattened field has user defined schema (daijy)
 
 PIG-1705: New logical plan: self-join fail for some queries (daijy)

Modified: pig/branches/branch-0.8/src/org/apache/pig/Main.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.8/src/org/apache/pig/Main.java?rev=1034606&r1=1034605&r2=1034606&view=diff
==============================================================================
--- pig/branches/branch-0.8/src/org/apache/pig/Main.java (original)
+++ pig/branches/branch-0.8/src/org/apache/pig/Main.java Sat Nov 13 00:05:55 2010
@@ -179,6 +179,16 @@ static int run(String args[], PigProgres
             properties.setProperty("stop.on.failure", ""+false);
         }
         
+        if( "true".equals( properties.getProperty( "mapred.output.compress" ) ) ) {
+            properties.setProperty( "output.compression.enabled",  "true" );
+            String codec = properties.getProperty( "mapred.output.compression.codec" );
+            if( codec == null ) {
+                throw new RuntimeException( "'mapred.output.compress' is set but no value is specified for 'mapred.output.compression.codec'." );
+            } else {
+                properties.setProperty( "output.compression.codec", codec );
+            }
+        }
+        
         // set up client side system properties in UDF context
         UDFContext.getUDFContext().setClientSystemProps();
 

Modified: pig/branches/branch-0.8/src/org/apache/pig/builtin/PigStorage.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.8/src/org/apache/pig/builtin/PigStorage.java?rev=1034606&r1=1034605&r2=1034606&view=diff
==============================================================================
--- pig/branches/branch-0.8/src/org/apache/pig/builtin/PigStorage.java (original)
+++ pig/branches/branch-0.8/src/org/apache/pig/builtin/PigStorage.java Sat Nov 13 00:05:55 2010
@@ -28,6 +28,7 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.compress.BZip2Codec;
+import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.GzipCodec;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.Job;
@@ -232,14 +233,24 @@ LoadPushDown {
     public void setStoreLocation(String location, Job job) throws IOException {
         job.getConfiguration().set("mapred.textoutputformat.separator", "");
         FileOutputFormat.setOutputPath(job, new Path(location));
-        if (location.endsWith(".bz2") || location.endsWith(".bz")) {
-            FileOutputFormat.setCompressOutput(job, true);
-            FileOutputFormat.setOutputCompressorClass(job,  BZip2Codec.class);
-        }  else if (location.endsWith(".gz")) {
-            FileOutputFormat.setCompressOutput(job, true);
-            FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
+        if( "true".equals( job.getConfiguration().get( "output.compression.enabled" ) ) ) {
+            FileOutputFormat.setCompressOutput( job, true );
+            String codec = job.getConfiguration().get( "output.compression.codec" );
+            try {
+                FileOutputFormat.setOutputCompressorClass( job,  (Class<? extends CompressionCodec>) Class.forName( codec ) );
+            } catch (ClassNotFoundException e) {
+                throw new RuntimeException("Class not found: " + codec );
+            }
         } else {
-            FileOutputFormat.setCompressOutput(job, false);
+            if (location.endsWith(".bz2") || location.endsWith(".bz")) {
+                FileOutputFormat.setCompressOutput(job, true);
+                FileOutputFormat.setOutputCompressorClass(job,  BZip2Codec.class);
+            }  else if (location.endsWith(".gz")) {
+                FileOutputFormat.setCompressOutput(job, true);
+                FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
+            } else {
+                FileOutputFormat.setCompressOutput( job, false);
+            }
         }
     }
 

Modified: pig/branches/branch-0.8/test/org/apache/pig/test/TestBZip.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.8/test/org/apache/pig/test/TestBZip.java?rev=1034606&r1=1034605&r2=1034606&view=diff
==============================================================================
--- pig/branches/branch-0.8/test/org/apache/pig/test/TestBZip.java (original)
+++ pig/branches/branch-0.8/test/org/apache/pig/test/TestBZip.java Sat Nov 13 00:05:55 2010
@@ -440,4 +440,35 @@ public class TestBZip {
         stat = fs.getFileStatus(new Path("output.bz2/part-m-00000.bz2"));
         assertTrue(stat.getLen() > 0);     
     }
+
+    @Test
+    public void testBzipStoreInMultiQuery2() throws Exception {
+        String[] inputData = new String[] {
+                "1\t2\r3\t4"
+        };
+        
+        String inputFileName = "input2.txt";
+        Util.createInputFile(cluster, inputFileName, inputData);
+        
+        PigServer pig = new PigServer(ExecType.MAPREDUCE, cluster
+                .getProperties());
+        PigContext pigContext = pig.getPigContext();
+        pigContext.getProperties().setProperty( "output.compression.enabled", "true" );
+        pigContext.getProperties().setProperty( "output.compression.codec", "org.apache.hadoop.io.compress.BZip2Codec" );
+        
+        pig.setBatchOn();
+        pig.registerQuery("a = load '" +  inputFileName + "';");
+        pig.registerQuery("store a into 'output2.bz2';");
+        pig.registerQuery("store a into 'output2';");
+        pig.executeBatch();
+        
+        FileSystem fs = FileSystem.get(ConfigurationUtil.toConfiguration(
+                pig.getPigContext().getProperties()));
+        FileStatus stat = fs.getFileStatus(new Path("output2/part-m-00000.bz2"));        
+        assertTrue(stat.getLen() > 0);     
+        
+        stat = fs.getFileStatus(new Path("output2.bz2/part-m-00000.bz2"));
+        assertTrue(stat.getLen() > 0);     
+    }
+    
 }