You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2011/01/20 02:17:06 UTC
svn commit: r1061089 - in /pig/trunk: CHANGES.txt
contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/XMLLoader.java
contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestXMLLoader.java
Author: daijy
Date: Thu Jan 20 01:17:06 2011
New Revision: 1061089
URL: http://svn.apache.org/viewvc?rev=1061089&view=rev
Log:
PIG-1561: XMLLoader in Piggybank does not support bz2 or gzip compressed XML files
Modified:
pig/trunk/CHANGES.txt
pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/XMLLoader.java
pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestXMLLoader.java
Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1061089&r1=1061088&r2=1061089&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Thu Jan 20 01:17:06 2011
@@ -24,6 +24,8 @@ INCOMPATIBLE CHANGES
IMPROVEMENTS
+PIG-1561: XMLLoader in Piggybank does not support bz2 or gzip compressed XML files (daijy)
+
PIG-1806: Modify embedded Pig API for usability (rding)
PIG-1799: Provide deployable maven artifacts for pigunit and pig smoke tests
Modified: pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/XMLLoader.java
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/XMLLoader.java?rev=1061089&r1=1061088&r2=1061089&view=diff
==============================================================================
--- pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/XMLLoader.java (original)
+++ pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/XMLLoader.java Thu Jan 20 01:17:06 2011
@@ -29,9 +29,13 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.apache.hadoop.io.compress.CompressionInputStream;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
@@ -42,6 +46,8 @@ import org.apache.pig.data.DataByteArray
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.io.BufferedPositionedInputStream;
+import org.apache.tools.bzip2r.CBZip2InputStream;
+
@@ -538,7 +544,15 @@ public class XMLLoader extends LoadFunc
InterruptedException {
return new XMLFileRecordReader(recordIdentifier);
- }
+ }
+
+ @Override
+ protected boolean isSplitable(JobContext context, Path filename) {
+ // Always returns false since this version of XMLLoader will read an entire file.
+ // ie.w/o this , any file > block size, MR will compute multiple splits but all
+ // mappers will read the full file which is functionally wrong.
+ return false;
+ }
}
//------------------------------------------------------------------------
@@ -574,7 +588,26 @@ public class XMLLoader extends LoadFunc
FileSystem fs = file.getFileSystem(job);
FSDataInputStream fileIn = fs.open(split.getPath());
- this.xmlLoaderBPIS = new XMLLoaderBufferedPositionedInputStream(fileIn);
+
+ if(file.toString().endsWith(".bz2") )
+ {
+ // For bzip2 files use CBZip2InputStream to read and supply the upper input stream.
+ this.xmlLoaderBPIS = new XMLLoaderBufferedPositionedInputStream(new CBZip2InputStream(fileIn, 9, end));
+ }
+ else if (file.toString().endsWith(".gz"))
+ {
+ CompressionCodecFactory compressionCodecs = new CompressionCodecFactory(job);
+ final CompressionCodec codec = compressionCodecs.getCodec(file);
+ if (codec != null) {
+ CompressionInputStream stream = codec.createInputStream(fileIn);
+ this.xmlLoaderBPIS = new XMLLoaderBufferedPositionedInputStream(stream);
+ }
+ }
+
+ else
+ {
+ this.xmlLoaderBPIS = new XMLLoaderBufferedPositionedInputStream(fileIn);
+ }
}
Modified: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestXMLLoader.java
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestXMLLoader.java?rev=1061089&r1=1061088&r2=1061089&view=diff
==============================================================================
--- pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestXMLLoader.java (original)
+++ pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestXMLLoader.java Thu Jan 20 01:17:06 2011
@@ -13,16 +13,18 @@
package org.apache.pig.piggybank.test.storage;
+import static org.apache.pig.ExecType.LOCAL;
+
+import java.io.File;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.regex.Pattern;
import junit.framework.TestCase;
+import org.apache.pig.ExecType;
import org.apache.pig.PigServer;
-import org.apache.pig.data.DataByteArray;
import org.apache.pig.data.Tuple;
-import static org.apache.pig.ExecType.LOCAL;
public class TestXMLLoader extends TestCase {
private static String patternString = "(\\d+)!+(\\w+)~+(\\w+)";
@@ -67,4 +69,85 @@ public class TestXMLLoader extends TestC
}
assertEquals(3, tupleCount); // pig adds extra
}
+
+ public void testXMLLoaderShouldLoadBasicBzip2Files() throws Exception {
+ String filename = TestHelper.createTempFile(data, "");
+ Process bzipProc = Runtime.getRuntime().exec("bzip2 "+filename);
+ int waitFor = bzipProc.waitFor();
+
+ if(waitFor != 0)
+ {
+ fail ("Failed to create the class");
+ }
+
+ filename = filename + ".bz2";
+
+ try
+ {
+ PigServer pigServer = new PigServer (ExecType.LOCAL);
+ String loadQuery = "A = LOAD 'file:" + filename + "' USING org.apache.pig.piggybank.storage.XMLLoader('property') as (doc:chararray);";
+ pigServer.registerQuery(loadQuery);
+ Iterator<Tuple> it = pigServer.openIterator("A");
+ int tupleCount = 0;
+ while (it.hasNext()) {
+ Tuple tuple = (Tuple) it.next();
+ if (tuple == null)
+ break;
+ else {
+ //TestHelper.examineTuple(expected, tuple, tupleCount);
+ if (tuple.size() > 0) {
+ tupleCount++;
+ }
+ }
+ }
+
+ assertEquals(3, tupleCount); // pig adds extra
+
+ }finally
+ {
+ new File(filename).delete();
+ }
+
+ }
+
+ public void testLoaderShouldLoadBasicGzFile() throws Exception {
+ String filename = TestHelper.createTempFile(data, "");
+
+ Process bzipProc = Runtime.getRuntime().exec("gzip "+filename);
+ int waitFor = bzipProc.waitFor();
+
+ if(waitFor != 0)
+ {
+ fail ("Failed to create the class");
+ }
+
+ filename = filename + ".gz";
+
+ try
+ {
+
+ PigServer pigServer = new PigServer (ExecType.LOCAL);
+ String loadQuery = "A = LOAD 'file:" + filename + "' USING org.apache.pig.piggybank.storage.XMLLoader('property') as (doc:chararray);";
+ pigServer.registerQuery(loadQuery);
+
+ Iterator<Tuple> it = pigServer.openIterator("A");
+ int tupleCount = 0;
+ while (it.hasNext()) {
+ Tuple tuple = (Tuple) it.next();
+ if (tuple == null)
+ break;
+ else {
+ //TestHelper.examineTuple(expected, tuple, tupleCount);
+ if (tuple.size() > 0) {
+ tupleCount++;
+ }
+ }
+ }
+ assertEquals(3, tupleCount); // pig adds extra
+
+ }finally
+ {
+ new File(filename).delete();
+ }
+ }
}