You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2011/12/08 23:04:36 UTC

svn commit: r1212163 - in /pig/trunk: CHANGES.txt src/org/apache/pig/builtin/PigStorage.java test/org/apache/pig/test/TestBZip.java

Author: daijy
Date: Thu Dec  8 22:04:36 2011
New Revision: 1212163

URL: http://svn.apache.org/viewvc?rev=1212163&view=rev
Log:
PIG-2391: Bzip_2 test is broken

Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/org/apache/pig/builtin/PigStorage.java
    pig/trunk/test/org/apache/pig/test/TestBZip.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1212163&r1=1212162&r2=1212163&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Thu Dec  8 22:04:36 2011
@@ -190,6 +190,8 @@ PIG-2228: support partial aggregation in
 
 BUG FIXES
 
+PIG-2391: Bzip_2 test is broken (xutingz via daijy)
+
 PIG-2358: JobStats.getHadoopCounters() is never set and always returns null (xutingz via daijy)
 
 PIG-2184: Not able to provide positional reference to macro invocations (xutingz via daijy)

Modified: pig/trunk/src/org/apache/pig/builtin/PigStorage.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/builtin/PigStorage.java?rev=1212163&r1=1212162&r2=1212163&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/builtin/PigStorage.java (original)
+++ pig/trunk/src/org/apache/pig/builtin/PigStorage.java Thu Dec  8 22:04:36 2011
@@ -33,8 +33,10 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.compress.BZip2Codec;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.apache.hadoop.io.compress.GzipCodec;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.OutputFormat;
@@ -380,13 +382,15 @@ LoadPushDown, LoadMetadata, StoreMetadat
     }
 
     private void setCompression(Path path, Job job) {
-        CompressionCodecFactory codecFactory = new CompressionCodecFactory(job.getConfiguration());
-        CompressionCodec codec = codecFactory.getCodec(path);
-        if (codec != null) {
+     	String location=path.getName();
+        if (location.endsWith(".bz2") || location.endsWith(".bz")) {
             FileOutputFormat.setCompressOutput(job, true);
-            FileOutputFormat.setOutputCompressorClass(job, codec.getClass());
-        }else {
-            FileOutputFormat.setCompressOutput(job, false);  
+            FileOutputFormat.setOutputCompressorClass(job,  BZip2Codec.class);
+        }  else if (location.endsWith(".gz")) {
+            FileOutputFormat.setCompressOutput(job, true);
+            FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
+        } else {
+            FileOutputFormat.setCompressOutput( job, false);
         }
     }
 

Modified: pig/trunk/test/org/apache/pig/test/TestBZip.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestBZip.java?rev=1212163&r1=1212162&r2=1212163&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestBZip.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestBZip.java Thu Dec  8 22:04:36 2011
@@ -181,6 +181,55 @@ public class TestBZip {
         out.delete();
     }
 
+    //see PIG-2391
+    @Test
+    public void testBz2() throws Exception {
+        String[] inputData = new String[] {
+                "1\t2\r3\t4", // '\r' case - this will be split into two tuples
+                "5\t6\r", // '\r\n' case
+                "7\t8", // '\n' case
+                "9\t10\r" // '\r\n' at the end of file
+        };
+        
+        // bzip compressed input
+        File in = File.createTempFile("junit", ".bz2");
+        String compressedInputFileName = in.getAbsolutePath();
+        in.deleteOnExit();
+        
+        try {
+            CBZip2OutputStream cos = 
+                new CBZip2OutputStream(new FileOutputStream(in));
+            for (int i = 0; i < inputData.length; i++) {
+                StringBuffer sb = new StringBuffer();
+                sb.append(inputData[i]).append("\n");
+                byte bytes[] = sb.toString().getBytes();
+                cos.write(bytes);
+            }
+            cos.close();
+            
+            Util.copyFromLocalToCluster(cluster, compressedInputFileName,
+                    compressedInputFileName);
+            
+            // pig script to read compressed input
+            PigServer pig = new PigServer(ExecType.MAPREDUCE, cluster
+                    .getProperties());
+            
+            // pig script to read compressed input
+            String script ="a = load '" + compressedInputFileName +"';";
+            pig.registerQuery(script);
+            
+            pig.registerQuery("store a into 'intermediate.bz';");
+            pig.registerQuery("b = load 'intermediate.bz';");
+            Iterator<Tuple> it2 = pig.openIterator("b");
+			while (it2.hasNext()) {
+				it2.next();
+			}
+        } finally {
+            in.delete();
+            Util.deleteFile(cluster, "intermediate.bz");
+            Util.deleteFile(cluster, "final.bz");
+        }
+    }
     /** 
      * Tests that '\n', '\r' and '\r\n' are treated as record delims when using
      * bzip just like they are when using uncompressed text