You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ga...@apache.org on 2011/02/25 20:05:39 UTC
svn commit: r1074670 - in /pig/trunk/contrib: CHANGES.txt
piggybank/java/src/main/java/org/apache/pig/piggybank/storage/XMLLoader.java
piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestXMLLoader.java
Author: gates
Date: Fri Feb 25 19:05:38 2011
New Revision: 1074670
URL: http://svn.apache.org/viewvc?rev=1074670&view=rev
Log:
PIG-1842 Improve Scalability of the XMLLoader for large datasets such as wikipedia
Modified:
pig/trunk/contrib/CHANGES.txt
pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/XMLLoader.java
pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestXMLLoader.java
Modified: pig/trunk/contrib/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/CHANGES.txt?rev=1074670&r1=1074669&r2=1074670&view=diff
==============================================================================
--- pig/trunk/contrib/CHANGES.txt (original)
+++ pig/trunk/contrib/CHANGES.txt Fri Feb 25 19:05:38 2011
@@ -24,6 +24,9 @@ INCOMPATIBLE CHANGES
IMPROVEMENTS
+PIG-1842 Improve Scalability of the XMLLoader for large datasets such as
+wikipedia (vivekp via gates)
+
PIG-1781 Piggybank: ISOToDay disregards timezone (should use ISODateTimeFormat
instead of DateTime to parse) (misterbeebee via gates)
Modified: pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/XMLLoader.java
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/XMLLoader.java?rev=1074670&r1=1074669&r2=1074670&view=diff
==============================================================================
--- pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/XMLLoader.java (original)
+++ pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/XMLLoader.java Fri Feb 25 19:05:38 2011
@@ -53,7 +53,7 @@ import org.apache.tools.bzip2r.CBZip2Inp
/**
* A <code>XMLLoaderBufferedPositionedInputStream</code> is the package class and is the
- * decorater overthe BufferedPositionedInputStream which in turn decorate
+ * decorator over the BufferedPositionedInputStream which in turn decorate
* BufferedInputStream. It contains <code>BufferedPositionedInputStream<code>
* input stream, which it uses as
* its basic source of data, possibly reading or providing additional
@@ -69,7 +69,7 @@ import org.apache.tools.bzip2r.CBZip2Inp
* array between the tag which is a xml record. i.e <tag> .*</tag> will be returned
*
* @note we can't use the standard SAX or STAX parser as for a big xml
- * the intermetant hadoop block may not be the valid xml and hence those
+ * the intermittent hadoop block may not be the valid xml and hence those
* parser may create pb.
*
* @since pig 2.0
@@ -92,6 +92,21 @@ class XMLLoaderBufferedPositionedInputSt
boolean _isReadable;
/**
+ * The field set the maximum bytes that is readable by this instance of stream.
+ */
+ private long maxBytesReadable = 0;
+
+ /**
+ * The field denote the number of bytes read by this stream.
+ */
+ long bytesRead = 0;
+
+ /**
+ * Denotes the end of the current split location
+ */
+ long end = 0;
+
+ /**
* Creates a <code>XMLLoaderBufferedPositionedInputStream</code>
* by assigning the argument <code>in</code>
* to the field <code>this.wrapperIn</code> so as
@@ -104,16 +119,17 @@ class XMLLoaderBufferedPositionedInputSt
this.wrapperIn = in;
setReadable(true);
}
-
+
/**
- * Since the input stream is control by Pig or hadoop
- * stream and there seems to be issue with multiple closing
- * with hadoop and pig
- *
- * @exception IOException if an I/O error occurs.
- */
- public void close() throws IOException {
- // throw new IOException("Closing stream BAD");
+ * Creates a split aware <code>XMLLoaderBufferedPositionedInputStream</code>.
+ * @param in the underlying input stream
+ * @param start start location of the split
+ * @param end end location of the split
+ */
+ public XMLLoaderBufferedPositionedInputStream(InputStream in,long start,long end){
+ this(in);
+ this.end = end;
+ maxBytesReadable = end - start;
}
/**
@@ -137,7 +153,7 @@ class XMLLoaderBufferedPositionedInputSt
}
/**
- * @Override org.apache.pig.impl.io.BufferedPositionedInputStream.read
+ * org.apache.pig.impl.io.BufferedPositionedInputStream.read
* It is just the wrapper for now.
* Reads the next byte of data from this input stream. The value
* byte is returned as an <code>int</code> in the range
@@ -185,14 +201,21 @@ class XMLLoaderBufferedPositionedInputSt
tag[2+i] = tmp[i];
}
tag[tmp.length+2] = (byte)'>';
- // System.out.println("[collectUntilEndTag] TAG " + tag + tagName); // DEBUG
ByteArrayOutputStream collectBuf = new ByteArrayOutputStream(1024);
int idxTagChar = 0;
+
+ /*
+ * Read till an end tag is found.It need not check for any condition since it
+ * tries to read it till end.One issue that may happen is that if the xml
+ * content is very huge; or if the end tag is not there in a huge file,
+ * then it may blow up the memory.
+ */
while (true) {
int b = -1;
try {
b = this.read();
+ ++bytesRead; // Add one to the bytes read
if (b == -1) {
collectBuf.reset();
this.setReadable(false);
@@ -215,8 +238,6 @@ class XMLLoaderBufferedPositionedInputSt
return null;
}
}
- // DEBUG
- //System.out.println("Match = " + new String(collectBuf.toByteArray()));
return collectBuf.toByteArray();
}
@@ -235,7 +256,7 @@ class XMLLoaderBufferedPositionedInputSt
* @see loader.XMLLoaderBufferedPositionedInputStream.collectUntilEndTag
*
*/
- private byte[] skipToTag(String tagName, long limit) {
+ private byte[] skipToTag(String tagName, long limit) throws IOException {
//@todo use the charset and get the charset encoding from the xml encoding.
byte[] tmp = tagName.getBytes();
@@ -244,15 +265,22 @@ class XMLLoaderBufferedPositionedInputSt
for (int i = 0; i < tmp.length; ++i) {
tag[1+i] = tmp[i];
}
- //System.out.println("[skipToTag] TAG " + tag + tagName); // DEBUG
ByteArrayOutputStream matchBuf = new ByteArrayOutputStream(512);
int idxTagChar = 0;
int state = S_START;
- while (true) {
+
+ /*
+ * Read till the tag is found in this block. If a partial tag block is found
+ * then continue on to the next block.matchBuf contains the data that is currently
+ * matched. If the read has reached the end of split and there are matched data
+ * then continue on to the next block.
+ */
+ while (splitBoundaryCriteria(wrapperIn) || (matchBuf.size() > 0 )) {
int b = -1;
try {
b = this.read();
+ ++bytesRead; // Increment the bytes read by 1
if (b == -1) {
state = S_START;
matchBuf.reset();
@@ -298,30 +326,31 @@ class XMLLoaderBufferedPositionedInputSt
// need to break, no record in this block
break;
}
- // DEBUG
- /*
- if (idxTagChar > 0) {
- System.out.println("Match b='" + (char)b + "'"
- + ", tag='" + (char)tag[idxTagChar-1] + "'"
- + ", idxTagChar=" + (idxTagChar-1)
- + ", tagLength=" + tag.length);
- } else {
- System.out.println("Mismatch b='" + (char)b + "'"
- + ", tag='" + (char)tag[idxTagChar] + "'"
- + ", idxTagChar=" + (idxTagChar)
- + ", tagLength=" + tag.length);
- }
- */
}
catch (IOException e) {
this.setReadable(false);
return null;
}
}
- // DEBUG
- //System.out.println("Match = " + new String(matchBuf.toByteArray()));
return matchBuf.toByteArray();
}
+ /**
+ * Returns whether the split boundary condition has reached or not.
+ * For normal files ; the condition is to read till the split end reaches.
+ * Gz files will have maxBytesReadable set to near Long.MAXVALUE, hence
+ * this will cause the entire file to be read. For bz2 and bz files, the
+ * condition lies on the position which until which it is read.
+ *
+ * @param wrapperIn2
+ * @return true/false depending on whether split boundary has reached or no
+ * @throws IOException
+ */
+ private boolean splitBoundaryCriteria(InputStream wrapperIn2) throws IOException {
+ if(wrapperIn2 instanceof CBZip2InputStream)
+ return ((CBZip2InputStream)wrapperIn2).getPos() <= end;
+ else
+ return bytesRead <= maxBytesReadable;
+ }
/**
* This is collect bytes from start and end tag both inclusive
@@ -341,15 +370,18 @@ class XMLLoaderBufferedPositionedInputSt
byte[] collectTag(String tagName, long limit) throws IOException {
ByteArrayOutputStream collectBuf = new ByteArrayOutputStream(1024);
byte[] beginTag = skipToTag(tagName, limit);
- byte[] untilTag = collectUntilEndTag(tagName, limit);
- if (beginTag.length > 0 && untilTag.length > 0) {
- for (byte b: beginTag) {
- collectBuf.write(b);
+ // No need to search for the end tag if the start tag is not found
+ if(beginTag.length > 0 ){
+ byte[] untilTag = collectUntilEndTag(tagName, limit);
+ if (untilTag.length > 0) {
+ for (byte b: beginTag) {
+ collectBuf.write(b);
}
for (byte b: untilTag) {
collectBuf.write(b);
}
+ }
}
return collectBuf.toByteArray();
}
@@ -429,6 +461,7 @@ public class XMLLoader extends LoadFunc
*/
public String recordIdentifier = "document";
+ private String loadLocation;
public XMLLoader() {
@@ -467,13 +500,11 @@ public class XMLLoader extends LoadFunc
if (!next) return null;
Tuple t = null;
-
+
try {
byte[] tagContent = (byte[]) reader.getCurrentValue();
- if(tagContent.length > 0)
- {
- t = createTuple(tagContent);
- }
+ // No need to create the tuple if there are no contents
+ t = (tagContent.length > 0) ? createTuple(tagContent) : null;
} catch (Exception e) {
throw new IOException(e);
}
@@ -514,7 +545,11 @@ public class XMLLoader extends LoadFunc
@SuppressWarnings("unchecked")
@Override
public InputFormat getInputFormat() throws IOException {
- return new XMLFileInputFormat(recordIdentifier);
+ XMLFileInputFormat inputFormat = new XMLFileInputFormat(recordIdentifier);
+ if(loadLocation.endsWith(".bz2") || loadLocation.endsWith(".bz")) {
+ inputFormat.isSplitable = true;
+ }
+ return inputFormat;
}
@SuppressWarnings("unchecked")
@@ -526,6 +561,7 @@ public class XMLLoader extends LoadFunc
@Override
public void setLocation(String location, Job job) throws IOException {
+ loadLocation = location;
FileInputFormat.setInputPaths(job, location);
}
@@ -534,6 +570,11 @@ public class XMLLoader extends LoadFunc
public static class XMLFileInputFormat extends FileInputFormat {
+ /**
+ * Boolean flag used to identify whether splittable property is explicitly set.
+ */
+ private boolean isSplitable = false;
+
private String recordIdentifier;
public XMLFileInputFormat(String recordIdentifier) {
@@ -551,10 +592,9 @@ public class XMLLoader extends LoadFunc
@Override
protected boolean isSplitable(JobContext context, Path filename) {
- // Always returns false since this version of XMLLoader will read an entire file.
- // ie.w/o this , any file > block size, MR will compute multiple splits but all
- // mappers will read the full file which is functionally wrong.
- return false;
+ CompressionCodec codec =
+ new CompressionCodecFactory(context.getConfiguration()).getCodec(filename);
+ return (!(codec == null)) ? isSplitable : true;
}
}
@@ -590,26 +630,30 @@ public class XMLLoader extends LoadFunc
// open the file and seek to the start of the split
FileSystem fs = file.getFileSystem(job);
FSDataInputStream fileIn = fs.open(split.getPath());
-
- if(file.toString().endsWith(".bz2") )
+ // Seek to the start of the file
+ fileIn.seek(start);
+
+ if(file.toString().endsWith(".bz2") || file.toString().endsWith(".bz"))
{
// For bzip2 files use CBZip2InputStream to read and supply the upper input stream.
- this.xmlLoaderBPIS = new XMLLoaderBufferedPositionedInputStream(new CBZip2InputStream(fileIn, 9, end));
+ CBZip2InputStream in = new CBZip2InputStream(fileIn,9, end);
+ this.xmlLoaderBPIS = new XMLLoaderBufferedPositionedInputStream(in,start,end);
}
else if (file.toString().endsWith(".gz"))
{
CompressionCodecFactory compressionCodecs = new CompressionCodecFactory(job);
final CompressionCodec codec = compressionCodecs.getCodec(file);
if (codec != null) {
+ end = Long.MAX_VALUE;
CompressionInputStream stream = codec.createInputStream(fileIn);
- this.xmlLoaderBPIS = new XMLLoaderBufferedPositionedInputStream(stream);
+ this.xmlLoaderBPIS = new XMLLoaderBufferedPositionedInputStream(stream,start,end);
}
}
else
{
- this.xmlLoaderBPIS = new XMLLoaderBufferedPositionedInputStream(fileIn);
+ this.xmlLoaderBPIS = new XMLLoaderBufferedPositionedInputStream(fileIn,start,end);
}
}
Modified: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestXMLLoader.java
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestXMLLoader.java?rev=1074670&r1=1074669&r2=1074670&view=diff
==============================================================================
--- pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestXMLLoader.java (original)
+++ pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestXMLLoader.java Fri Feb 25 19:05:38 2011
@@ -171,4 +171,113 @@ public class TestXMLLoader extends TestC
new File(filename).delete();
}
}
+
+ public void testXMLLoaderShouldNotConfusedWithTagsHavingSimilarPrefix () throws Exception
+ {
+ ArrayList<String[]> testData = new ArrayList<String[]>();
+ testData.add(new String[] { "<namethisalso> foobar9 </namethisalso>"});
+ testData.addAll(data);
+ String filename = TestHelper.createTempFile(testData, "");
+ PigServer pig = new PigServer(LOCAL);
+ filename = filename.replace("\\", "\\\\");
+ patternString = patternString.replace("\\", "\\\\");
+ String query = "A = LOAD 'file:" + filename + "' USING org.apache.pig.piggybank.storage.XMLLoader('name') as (doc:chararray);";
+ pig.registerQuery(query);
+ Iterator<?> it = pig.openIterator("A");
+ int tupleCount = 0;
+ while (it.hasNext()) {
+ Tuple tuple = (Tuple) it.next();
+ if (tuple == null)
+ break;
+ else {
+ if (tuple.size() > 0) {
+ tupleCount++;
+ }
+ }
+ }
+ assertEquals(3, tupleCount);
+
+ }
+
+ public void testShouldReturn1ForIntermediateTagData () throws Exception
+ {
+ String filename = TestHelper.createTempFile(data, "");
+ PigServer pig = new PigServer(LOCAL);
+ filename = filename.replace("\\", "\\\\");
+ patternString = patternString.replace("\\", "\\\\");
+ String query = "A = LOAD 'file:" + filename + "' USING org.apache.pig.piggybank.storage.XMLLoader('ignoreProperty') as (doc:chararray);";
+ pig.registerQuery(query);
+ Iterator<?> it = pig.openIterator("A");
+ int tupleCount = 0;
+ while (it.hasNext()) {
+ Tuple tuple = (Tuple) it.next();
+ if (tuple == null)
+ break;
+ else {
+ if (tuple.size() > 0) {
+ tupleCount++;
+ }
+ }
+ }
+ assertEquals(1, tupleCount);
+ }
+
+ public void testShouldReturn0TupleCountIfNoEndTagIsFound() throws Exception
+ {
+ // modify the data content to avoid end tag for </ignoreProperty>
+ ArrayList<String[]> testData = new ArrayList<String[]>();
+ for (String content[] : data) {
+
+ if(false == data.equals(new String[] { "</ignoreProperty>"}))
+ {
+ testData.add(content);
+ }
+ }
+
+ String filename = TestHelper.createTempFile(testData, "");
+ PigServer pig = new PigServer(LOCAL);
+ filename = filename.replace("\\", "\\\\");
+ patternString = patternString.replace("\\", "\\\\");
+ String query = "A = LOAD 'file:" + filename + "' USING org.apache.pig.piggybank.storage.XMLLoader('</ignoreProperty>') as (doc:chararray);";
+ pig.registerQuery(query);
+ Iterator<?> it = pig.openIterator("A");
+ int tupleCount = 0;
+ while (it.hasNext()) {
+ Tuple tuple = (Tuple) it.next();
+ if (tuple == null)
+ break;
+ else {
+ if (tuple.size() > 0) {
+ tupleCount++;
+ }
+ }
+ }
+ assertEquals(0, tupleCount);
+ }
+ public void testShouldReturn0TupleCountIfEmptyFileIsPassed() throws Exception
+ {
+ // modify the data content to avoid end tag for </ignoreProperty>
+ ArrayList<String[]> testData = new ArrayList<String[]>();
+
+ String filename = TestHelper.createTempFile(testData, "");
+ PigServer pig = new PigServer(LOCAL);
+ filename = filename.replace("\\", "\\\\");
+ patternString = patternString.replace("\\", "\\\\");
+ String query = "A = LOAD 'file:" + filename + "' USING org.apache.pig.piggybank.storage.XMLLoader('</ignoreProperty>') as (doc:chararray);";
+ pig.registerQuery(query);
+ Iterator<?> it = pig.openIterator("A");
+ int tupleCount = 0;
+ while (it.hasNext()) {
+ Tuple tuple = (Tuple) it.next();
+ if (tuple == null)
+ break;
+ else {
+ if (tuple.size() > 0) {
+ tupleCount++;
+ }
+ }
+ }
+ assertEquals(0, tupleCount);
+ }
+
}