You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2011/01/20 02:57:11 UTC

svn commit: r1061098 - in /pig/branches/branch-0.8: CHANGES.txt contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/XMLLoader.java contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestXMLLoader.java

Author: daijy
Date: Thu Jan 20 01:57:11 2011
New Revision: 1061098

URL: http://svn.apache.org/viewvc?rev=1061098&view=rev
Log:
PIG-1561: XMLLoader in Piggybank does not support bz2 or gzip compressed XML files

Modified:
    pig/branches/branch-0.8/CHANGES.txt
    pig/branches/branch-0.8/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/XMLLoader.java
    pig/branches/branch-0.8/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestXMLLoader.java

Modified: pig/branches/branch-0.8/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.8/CHANGES.txt?rev=1061098&r1=1061097&r2=1061098&view=diff
==============================================================================
--- pig/branches/branch-0.8/CHANGES.txt (original)
+++ pig/branches/branch-0.8/CHANGES.txt Thu Jan 20 01:57:11 2011
@@ -28,6 +28,8 @@ PIG-1249: Safe-guards against misconfigu
 
 IMPROVEMENTS
 
+PIG-1561: XMLLoader in Piggybank does not support bz2 or gzip compressed XML files (vivekp via daijy)
+
 PIG-1728: doc updates (chandec via olgan)
 
 PIG-1756: doc updates (chandec via olgan)

Modified: pig/branches/branch-0.8/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/XMLLoader.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.8/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/XMLLoader.java?rev=1061098&r1=1061097&r2=1061098&view=diff
==============================================================================
--- pig/branches/branch-0.8/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/XMLLoader.java (original)
+++ pig/branches/branch-0.8/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/XMLLoader.java Thu Jan 20 01:57:11 2011
@@ -29,9 +29,13 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.apache.hadoop.io.compress.CompressionInputStream;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
@@ -42,6 +46,8 @@ import org.apache.pig.data.DataByteArray
 import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.io.BufferedPositionedInputStream;
+import org.apache.tools.bzip2r.CBZip2InputStream;
+
 
 
 
@@ -538,7 +544,15 @@ public class XMLLoader extends LoadFunc 
                 InterruptedException {
             
             return new XMLFileRecordReader(recordIdentifier);
-        }     
+        }  
+        
+        @Override
+        protected boolean isSplitable(JobContext context, Path filename) {
+        	// Always returns false since this version of XMLLoader will read an entire file.
+        	// ie.w/o this , any file > block size, MR will compute multiple splits but all 
+        	// mappers will read the full file which is functionally wrong.
+        	return false;
+        }
     }
     
     //------------------------------------------------------------------------
@@ -574,7 +588,26 @@ public class XMLLoader extends LoadFunc 
             FileSystem fs = file.getFileSystem(job);
             FSDataInputStream fileIn = fs.open(split.getPath());
         
-            this.xmlLoaderBPIS = new XMLLoaderBufferedPositionedInputStream(fileIn);
+            
+            if(file.toString().endsWith(".bz2") )
+            {
+            	// For bzip2 files use CBZip2InputStream to read and supply the upper input stream.
+            	this.xmlLoaderBPIS = new XMLLoaderBufferedPositionedInputStream(new CBZip2InputStream(fileIn, 9, end));
+            }
+            else if (file.toString().endsWith(".gz"))
+            {
+            	CompressionCodecFactory compressionCodecs =  new CompressionCodecFactory(job);
+            	final CompressionCodec codec = compressionCodecs.getCodec(file);
+            	 if (codec != null) {
+            	      CompressionInputStream stream = codec.createInputStream(fileIn);
+            	      this.xmlLoaderBPIS = new XMLLoaderBufferedPositionedInputStream(stream);
+            	    }
+            }
+            
+            else
+            {
+            	this.xmlLoaderBPIS = new XMLLoaderBufferedPositionedInputStream(fileIn);
+            }
         }
 
         

Modified: pig/branches/branch-0.8/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestXMLLoader.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.8/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestXMLLoader.java?rev=1061098&r1=1061097&r2=1061098&view=diff
==============================================================================
--- pig/branches/branch-0.8/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestXMLLoader.java (original)
+++ pig/branches/branch-0.8/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestXMLLoader.java Thu Jan 20 01:57:11 2011
@@ -13,16 +13,18 @@
 
 package org.apache.pig.piggybank.test.storage;
 
+import static org.apache.pig.ExecType.LOCAL;
+
+import java.io.File;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.regex.Pattern;
 
 import junit.framework.TestCase;
 
+import org.apache.pig.ExecType;
 import org.apache.pig.PigServer;
-import org.apache.pig.data.DataByteArray;
 import org.apache.pig.data.Tuple;
-import static org.apache.pig.ExecType.LOCAL;
 
 public class TestXMLLoader extends TestCase {
   private static String patternString = "(\\d+)!+(\\w+)~+(\\w+)";
@@ -67,4 +69,85 @@ public class TestXMLLoader extends TestC
     }
     assertEquals(3, tupleCount); // pig adds extra 
   }
+  
+  public void testXMLLoaderShouldLoadBasicBzip2Files() throws Exception {
+    String filename = TestHelper.createTempFile(data, "");
+    Process bzipProc = Runtime.getRuntime().exec("bzip2 "+filename);
+    int waitFor = bzipProc.waitFor();
+  
+    if(waitFor != 0)
+    {
+        fail ("Failed to create the class");
+    }
+
+    filename = filename + ".bz2";
+
+    try
+    {
+        PigServer pigServer = new PigServer (ExecType.LOCAL);
+        String loadQuery = "A = LOAD 'file:" + filename + "' USING org.apache.pig.piggybank.storage.XMLLoader('property') as (doc:chararray);";
+        pigServer.registerQuery(loadQuery);
+        Iterator<Tuple> it = pigServer.openIterator("A");
+        int tupleCount = 0;
+        while (it.hasNext()) {
+        Tuple tuple = (Tuple) it.next();
+        if (tuple == null)
+        break;
+        else {
+        //TestHelper.examineTuple(expected, tuple, tupleCount);
+        if (tuple.size() > 0) {
+            tupleCount++;
+	        }
+	     }
+        }
+	    
+        assertEquals(3, tupleCount); // pig adds extra
+    
+    }finally
+    {
+        new File(filename).delete();
+    }
+	    
+   }
+
+   public void testLoaderShouldLoadBasicGzFile() throws Exception {
+    String filename = TestHelper.createTempFile(data, "");
+	  
+    Process bzipProc = Runtime.getRuntime().exec("gzip "+filename);
+    int waitFor = bzipProc.waitFor();
+	  
+    if(waitFor != 0)
+    {
+        fail ("Failed to create the class");
+    }
+	  
+    filename = filename + ".gz";
+    
+    try
+    {
+
+    PigServer pigServer = new PigServer (ExecType.LOCAL);
+    String loadQuery = "A = LOAD 'file:" + filename + "' USING org.apache.pig.piggybank.storage.XMLLoader('property') as (doc:chararray);";
+    pigServer.registerQuery(loadQuery);
+
+    Iterator<Tuple> it = pigServer.openIterator("A");
+    int tupleCount = 0;
+    while (it.hasNext()) {
+    Tuple tuple = (Tuple) it.next();
+    if (tuple == null)
+        break;
+    else {
+        //TestHelper.examineTuple(expected, tuple, tupleCount);
+        if (tuple.size() > 0) {
+        tupleCount++;
+             }
+        }
+    }	
+    assertEquals(3, tupleCount); // pig adds extra
+   
+    }finally
+    {
+        new File(filename).delete();
+    }
+ }
 }