You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2014/04/25 01:58:40 UTC
svn commit: r1589911 - in /pig/trunk: ./
contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/
contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/evaluation/xml/data/
contrib/piggybank/java/src/test/java/org/apache/pig/...
Author: daijy
Date: Thu Apr 24 23:58:39 2014
New Revision: 1589911
URL: http://svn.apache.org/r1589911
Log:
PIG-3865: Remodel the XMLLoader to work to be faster and more maintainable
Added:
pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/evaluation/xml/data/
pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/evaluation/xml/data/test-file-2.xml.bz2 (with props)
Modified:
pig/trunk/CHANGES.txt
pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/XMLLoader.java
pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestXMLLoader.java
Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1589911&r1=1589910&r2=1589911&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Thu Apr 24 23:58:39 2014
@@ -32,6 +32,8 @@ PIG-2207: Support custom counters for ag
IMPROVEMENTS
+PIG-3865: Remodel the XMLLoader to work to be faster and more maintainable (aseldawy via daijy)
+
PIG-3737: Bundle dependent jars in distribution in %PIG_HOME%/lib folder (daijy)
PIG-3771: Piggybank Avrostorage makes a lot of namenode calls in the backend (rohini)
Modified: pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/XMLLoader.java
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/XMLLoader.java?rev=1589911&r1=1589910&r2=1589911&view=diff
==============================================================================
--- pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/XMLLoader.java (original)
+++ pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/XMLLoader.java Thu Apr 24 23:58:39 2014
@@ -18,693 +18,303 @@
package org.apache.pig.piggybank.storage;
-import java.io.ByteArrayOutputStream;
import java.io.IOException;
-import java.io.InputStream;
-import java.util.ArrayList;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.io.compress.CompressionCodecFactory;
-import org.apache.hadoop.io.compress.CompressionInputStream;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.pig.LoadFunc;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigTextInputFormat;
+import org.apache.pig.bzip2r.Bzip2TextInputFormat;
import org.apache.pig.data.DataByteArray;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
-import org.apache.pig.impl.io.BufferedPositionedInputStream;
-import org.apache.tools.bzip2r.CBZip2InputStream;
-
-
-
/**
- * A <code>XMLLoaderBufferedPositionedInputStream</code> is the package class and is the
- * decorator over the BufferedPositionedInputStream which in turn decorate
- * BufferedInputStream. It contains <code>BufferedPositionedInputStream<code>
- * input stream, which it uses as
- * its basic source of data, possibly reading or providing additional
- * functionality. The class <code>XMLLoaderBufferedPositionedInputStream</code>
- * itself simply overrides the necessary medthod for reading i.e
- * <code>read</code> <code>getPosition<code> with versions that
- * pass all requests to the contained input
- * stream or do some special processing. Subclasses of <code>XMLLoaderBufferedPositionedInputStream</code>
- * may further override some of these methods
- * and may also provide additional methods
- * and fields.
- * It also provides additional method <code>collectTag<collect> which will give the byte
- * array between the tag which is a xml record. i.e <tag> .*</tag> will be returned
+ * Parses an XML input file given a specified identifier of tags to be loaded.
+ * The output is a bag of XML elements where each element is returned as
+ * a chararray containing the text of the matched XML element including the
+ * start and tags as well as the data between them. In case of nesting elements
+ * of the matching tags, only the top level one is returned.
*
- * @note we can't use the standard SAX or STAX parser as for a big xml
- * the intermittent hadoop block may not be the valid xml and hence those
- * parser may create pb.
- *
- * @since pig 2.0
*/
+public class XMLLoader extends LoadFunc {
-class XMLLoaderBufferedPositionedInputStream extends BufferedPositionedInputStream {
-
- public final static int S_START = 0;
- public final static int S_MATCH_PREFIX = 1;
- public final static int S_MATCH_TAG = 2;
+ /**
+ * Use this record reader to read XML tags out of a text file. It matches only
+ * the tags identified by an identifier configured through a call to
+ * {@link #setXMLIdentifier(String)}. It there are nesting tags of the given
+ * identifier, only the top level one is returned which also includes all
+ * enclosed tags.
+ */
+ public static class XMLRecordReader extends RecordReader<LongWritable, Text> {
+ protected final RecordReader<LongWritable, Text> wrapped;
+ /**Regular expression for XML tag identifier*/
+ private static final String XMLTagNameRegExp = "[a-zA-Z\\_][0-9a-zA-Z\\-_]+";
/**
- * The input streamed to be filtered
+ * A regular expression that matches key parts in the XML text needed to
+ * correctly parse it and find matches of the given identifier
*/
- InputStream wrapperIn;
+ private Pattern identifiersPattern;
- /**
- * The field to know if the underlying buffer contains any more bytes
- */
- boolean _isReadable;
+ private LongWritable key;
+ private Text value;
- /**
- * The field set the maximum bytes that is readable by this instance of stream.
- */
- private long maxBytesReadable = 0;
-
- /**
- * The field denote the number of bytes read by this stream.
- */
- long bytesRead = 0;
-
- /**
- * Denotes the end of the current split location
- */
- long end = 0;
-
- /**
- * Creates a <code>XMLLoaderBufferedPositionedInputStream</code>
- * by assigning the argument <code>in</code>
- * to the field <code>this.wrapperIn</code> so as
- * to remember it for later use.
- *
- * @param in the underlying input stream,
- */
- public XMLLoaderBufferedPositionedInputStream(InputStream in){
- super(in);
- this.wrapperIn = in;
- setReadable(true);
- }
-
- /**
- * Creates a split aware <code>XMLLoaderBufferedPositionedInputStream</code>.
- * @param in the underlying input stream
- * @param start start location of the split
- * @param end end location of the split
- */
- public XMLLoaderBufferedPositionedInputStream(InputStream in,long start,long end){
- this(in);
- this.end = end;
- maxBytesReadable = end - start;
- }
+ /**Position of the current buffer in the file*/
+ private long bufferPos;
- /**
- * Set the stream readable or non readable. This is needed
- * to control the xml parsing.
- * @param flag The boolean flag to be set
- * @see XMLLoaderBufferedPositionedInputStream#isReadable
- */
- private void setReadable(boolean flag) {
- _isReadable = flag;
- }
+ /**Holds parts of the input file that were read but not parsed yet*/
+ private String buffer;
- /**
- * See if the stream readable or non readable. This is needed
- * to control the xml parsing.
- * @return true if readable otherwise false
- * @see XMLLoaderBufferedPositionedInputStream#setReadable
- */
- public boolean isReadable() {
- return _isReadable == true;
- }
+ /**Original end of the split to parse*/
+ private long originalEnd;
- /**
- * org.apache.pig.impl.io.BufferedPositionedInputStream.read
- * It is just the wrapper for now.
- * Reads the next byte of data from this input stream. The value
- * byte is returned as an <code>int</code> in the range
- * <code>0</code> to <code>255</code>. If no byte is available
- * because the end of the stream has been reached, the value
- * <code>-1</code> is returned. This method blocks until input data
- * is available, the end of the stream is detected, or an exception
- * is thrown.
- * <p>
- * This method
- * simply performs <code>in.read()</code> and returns the result.
- *
- * @return the next byte of data, or <code>-1</code> if the end of the
- * stream is reached.
- * @exception IOException if an I/O error occurs.
- * @see XMLLoaderBufferedPositionedInputStreamInputStream#wrapperIn
- */
- public int read() throws IOException {
- return wrapperIn.read();
- }
-
- /**
- * This is collect the bytes from current position to the ending tag.
- * This scans for the tags and do the pattern match byte by byte
- * this must be used along with
- * XMLLoaderBufferedPositionedInputStream#skipToTag
- *
- * @param tagName the end tag to search for
- *
- * @param limit the end pointer for the block for this mapper
- *
- * @return the byte array containing the documents until the end of tag
- *
- * @see loader.XMLLoaderBufferedPositionedInputStream.collectUntilEndTag
- *
- */
- private byte[] collectUntilEndTag(String tagName, long limit) {
+ private boolean terminated;
- //@todo use the charset and get the charset encoding from the xml encoding.
- byte[] tmp = tagName.getBytes();
- ByteArrayOutputStream collectBuf = new ByteArrayOutputStream(1024);
- // Levels of elements we went inside matched node
- int depth = 0;
-
- //Since skipToTag was called before this function, we know that we are
- //currently inside the matched tag. Assuming the XML file is well
- //structured, we read till we encounter the first close tag. Since
- //the matched element might contain nested element, we keep track of the
- //current depth and terminate only when we encounter a closing tag at
- //level zero
-
- // A flag to indicate the parsing is currently inside a (start/end) tag
- boolean insideTag = false;
- // A flag to indicate that the current tag is a closing (end) tag
- boolean closingTag = false;
-
- // Last byte read
- int last_b = -1;
- while (true) {
- int b = -1;
- try {
- b = this.read();
- ++bytesRead; // Add one to the bytes read
- if (b == -1) {
- collectBuf.reset();
- this.setReadable(false);
- break;
- }
- collectBuf.write((byte)(b));
-
- // Check if the start tag has matched except for the last char
- if (b == '<') {
- insideTag = true;
- closingTag = false;
- } else if (b == '>') {
- // Detect the pattern />
- if (last_b == '/')
- closingTag = true;
- insideTag = false;
- if (closingTag) {
- if (depth == 0)
- break;
- depth--;
- }
- } else if (b == '/' && last_b == '<') {
- // Detected the pattern </
- closingTag = true;
- } else if (insideTag && last_b == '<') {
- // First character after '<' which is not a '/'
- depth++;
- }
- }
- catch (IOException e) {
- this.setReadable(false);
- return null;
- }
- last_b = b;
- }
- return collectBuf.toByteArray();
+ public XMLRecordReader(RecordReader<LongWritable, Text> wrapped) {
+ this.wrapped = wrapped;
}
/**
- * This is collect the from the matching tag.
- * This scans for the tags and do the pattern match byte by byte
- * This returns a part doc. it must be used along with
- * XMLLoaderBufferedPositionedInputStream#collectUntilEndTag
- *
- * @param tagName the start tag to search for
- *
- * @param limit the end pointer for the block for this mapper
- *
- * @return the byte array containing match of the tag.
- *
- * @see loader.XMLLoaderBufferedPositionedInputStream.collectUntilEndTag
- *
+ * Delegate the initialization method to the wrapped stream after changing
+ * the length of the split to be non-ending.
*/
- private byte[] skipToTag(String tagName, long limit) throws IOException {
-
- //@todo use the charset and get the charset encoding from the xml encoding.
- byte[] tmp = tagName.getBytes();
- byte[] tag = new byte[tmp.length + 1];
- tag[0] = (byte)'<';
- for (int i = 0; i < tmp.length; ++i) {
- tag[1+i] = tmp[i];
- }
-
- ByteArrayOutputStream matchBuf = new ByteArrayOutputStream(512);
- int idxTagChar = 0;
- int state = S_START;
-
- /*
- * Read till the tag is found in this block. If a partial tag block is found
- * then continue on to the next block.matchBuf contains the data that is currently
- * matched. If the read has reached the end of split and there are matched data
- * then continue on to the next block.
- */
- while (splitBoundaryCriteria(wrapperIn) || (matchBuf.size() > 0 )) {
- int b = -1;
- try {
- b = this.read();
- ++bytesRead; // Increment the bytes read by 1
- if (b == -1) {
- state = S_START;
- matchBuf.reset();
- this.setReadable(false);
- break;
- }
- switch (state) {
- case S_START:
- // start to match the target open tag
- if (b == tag[idxTagChar]) {
- ++idxTagChar;
- matchBuf.write((byte)(b));
- if (idxTagChar == tag.length) {
- state = S_MATCH_PREFIX;
- }
- } else { // mismatch
- idxTagChar = 0;
- matchBuf.reset();
- }
- break;
- case S_MATCH_PREFIX:
- // tag match iff next character is whitespaces or close tag mark
- if (Character.isWhitespace(b) || b == '/' || b == '>') {
- matchBuf.write((byte)(b));
- state = S_MATCH_TAG;
- } else {
- idxTagChar = 0;
- matchBuf.reset();
- state = S_START;
- }
- break;
- case S_MATCH_TAG:
- // keep copy characters until we hit the close tag mark
- matchBuf.write((byte)(b));
- break;
- default:
- throw new IllegalArgumentException("Invalid state: " + state);
- }
- if (state == S_MATCH_TAG && (b == '>' || Character.isWhitespace(b))) {
- break;
- }
- if (state != S_MATCH_TAG && this.getPosition() > limit) {
- // need to break, no record in this block
- break;
- }
- }
- catch (IOException e) {
- this.setReadable(false);
- return null;
- }
+ @Override
+ public void initialize(InputSplit split, TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ key = new LongWritable();
+ value = new Text();
+ if (split instanceof FileSplit) {
+ FileSplit fsplit = (FileSplit) split;
+ originalEnd = fsplit.getStart() + fsplit.getLength();
+ Path path = fsplit.getPath();
+ long fileEnd = path.getFileSystem(context.getConfiguration()).getFileStatus(path).getLen();
+ FileSplit extendedSplit = new FileSplit(path, fsplit.getStart(),
+ Math.min(fsplit.getLength() * 10, fileEnd - fsplit.getStart()), fsplit.getLocations());
+ this.wrapped.initialize(extendedSplit, context);
+ } else {
+ throw new RuntimeException("Cannot override a split of type'"+
+ split.getClass()+"'");
}
- return matchBuf.toByteArray();
- }
- /**
- * Returns whether the split boundary condition has reached or not.
- * For normal files ; the condition is to read till the split end reaches.
- * Gz files will have maxBytesReadable set to near Long.MAXVALUE, hence
- * this will cause the entire file to be read. For bz2 and bz files, the
- * condition lies on the position which until which it is read.
- *
- * @param wrapperIn2
- * @return true/false depending on whether split boundary has reached or no
- * @throws IOException
- */
- private boolean splitBoundaryCriteria(InputStream wrapperIn2) throws IOException {
- if(wrapperIn2 instanceof CBZip2InputStream)
- return ((CBZip2InputStream)wrapperIn2).getPos() <= end;
- else
- return bytesRead <= maxBytesReadable;
}
- /**
- * This is collect bytes from start and end tag both inclusive
- * This scans for the tags and do the pattern match byte by byte
- *
- * @param tagName the start tag to search for
- *
- * @param limit the end pointer for the block for this mapper
- *
- * @return the byte array containing match of the <code><tag>.*</tag><code>.
- *
- * @see loader.XMLLoaderBufferedPositionedInputStream.skipToTag
- *
- * @see loader.XMLLoaderBufferedPositionedInputStream.collectUntilEndTag
- *
- */
- byte[] collectTag(String tagName, long limit) throws IOException {
- ByteArrayOutputStream collectBuf = new ByteArrayOutputStream(1024);
- byte[] beginTag = skipToTag(tagName, limit);
-
- // Check if the tag is closed inline
- if (beginTag.length > 2 && beginTag[beginTag.length - 2] == '/' &&
- beginTag[beginTag.length-1] == '>') {
- return beginTag;
- }
-
- // No need to search for the end tag if the start tag is not found
- if(beginTag.length > 0 ){
- byte[] untilTag = collectUntilEndTag(tagName, limit);
- if (untilTag.length > 0) {
- for (byte b: beginTag) {
- collectBuf.write(b);
- }
- for (byte b: untilTag) {
- collectBuf.write(b);
- }
- }
- }
- return collectBuf.toByteArray();
+ public void setXMLIdentifier(String identifier) {
+ if (!identifier.matches(XMLTagNameRegExp))
+ throw new RuntimeException("XML tag identifier '"+identifier+"' does not match the regular expression /"+XMLTagNameRegExp+"/");
+ String inlineClosedTagRegExp = "<\\s*"+identifier+"\\s*[^>]*/>";
+ String openTagRegExp = "<\\s*"+identifier+"(?:\\s*|\\s+(?:[^/>]*|[^>]*[^>/]))>";
+ String closeTagRegExp = "</\\s*"+identifier+"\\s*>";
+ identifiersPattern = Pattern.compile("("+inlineClosedTagRegExp+")|("+openTagRegExp+")|("+closeTagRegExp+")");
}
-}
-
-
-/**
- * The load function to load the XML file
- * This implements the LoadFunc interface which is used to parse records
- * from a dataset. The various helper adaptor function is extended from loader.Utf8StorageConverter
- * which included various functions to cast raw byte data into various datatypes.
- * other sections of the code can call back to the loader to do the cast.
- * This takes a xmlTag as the arg which it will use to split the inputdataset into
- * multiple records.
- * <code>
- *
- * For example if the input xml (input.xml) is like this
- * <configuration>
- * <property>
- * <name> foobar </name>
- * <value> barfoo </value>
- * </property>
- * <ignoreProperty>
- * <name> foo </name>
- * </ignoreProperty>
- * <property>
- * <name> justname </name>
- * </property>
- * </configuration>
- *
- * And your pig script is like this
- *
- * --load the jar files
- * register /homes/aloks/pig/udfLib/loader.jar;
- * -- load the dataset using XMLLoader
- * -- A is the bag containing the tuple which contains one atom i.e doc see output
- * A = load '/user/aloks/pig/input.xml using loader.XMLLoader('property') as (doc:chararray);
- * --dump the result
- * dump A;
- *
- *
- * Then you will get the output
- *
- * (<property>
- * <name> foobar </name>
- * <value> barfoo </value>
- * </property>)
- * (<property>
- * <name> justname </name>
- * </property>)
- *
- *
- * Where each () indicate one record
- *
- *
- * </code>
- */
-
-public class XMLLoader extends LoadFunc {
-
- /**
- * logger from pig
- */
- protected final Log mLog = LogFactory.getLog(getClass());
-
- private XMLFileRecordReader reader = null;
-
-
- /**
- * the tuple content which is used while returning
- */
- private ArrayList<Object> mProtoTuple = null;
-
- /**
- * The record seperated. The default value is 'document'
- */
- public String recordIdentifier = "document";
-
- private String loadLocation;
-
- public XMLLoader() {
-
+ /* Delegate all methods to the wrapped stream */
+ public void close() throws IOException {
+ wrapped.close();
}
- /**
- * Constructs a Pig loader that uses specified string as the record seperater
- * for example if the recordIdentifier is document. It will consider the record as
- * <document> .* </document>
- *
- * @param recordIdentifier the xml tag which is used to pull records
- *
- */
- public XMLLoader(String recordIdentifier) {
- this();
- this.recordIdentifier = recordIdentifier;
+ public boolean equals(Object obj) {
+ return wrapped.equals(obj);
}
- /**
- * Retrieves the next tuple to be processed.
- * @return the next tuple to be processed or null if there are no more tuples
- * to be processed.
- * @throws IOException
- */
- @Override
- public Tuple getNext() throws IOException {
-
- boolean next = false;
-
- try {
- next = reader.nextKeyValue();
- } catch (InterruptedException e) {
- throw new IOException(e);
- }
-
- if (!next) return null;
-
- Tuple t = null;
-
- try {
- byte[] tagContent = (byte[]) reader.getCurrentValue();
- // No need to create the tuple if there are no contents
- t = (tagContent.length > 0) ? createTuple(tagContent) : null;
- } catch (Exception e) {
- throw new IOException(e);
- }
-
- return t;
-
- }
-
- public Tuple createTuple(byte[] content) throws Exception {
- if (mProtoTuple == null) {
- mProtoTuple = new ArrayList<Object>();
- }
- if (content.length > 0) {
- mProtoTuple.add(new DataByteArray(content));
- }
- Tuple t = TupleFactory.getInstance().newTupleNoCopy(mProtoTuple);
- mProtoTuple = null;
-
- return t;
+ public LongWritable getCurrentKey() throws IOException, InterruptedException {
+ return key;
}
- /**
- * to check for equality
- * @param object
- */
- public boolean equals(Object obj) {
- return equals((XMLLoader)obj);
+ public Text getCurrentValue() throws IOException, InterruptedException {
+ return value;
}
- /**
- * to check for equality
- * @param XMLLoader object
- */
- public boolean equals(XMLLoader other) {
- return this.recordIdentifier.equals(other.recordIdentifier);
+ public float getProgress() throws IOException, InterruptedException {
+ return Math.max(1.0f, this.wrapped.getProgress() * 10);
}
- @SuppressWarnings("unchecked")
- @Override
- public InputFormat getInputFormat() throws IOException {
- XMLFileInputFormat inputFormat = new XMLFileInputFormat(recordIdentifier);
- if(loadLocation.endsWith(".bz2") || loadLocation.endsWith(".bz")) {
- inputFormat.isSplitable = true;
- }
- return inputFormat;
+ public int hashCode() {
+ return wrapped.hashCode();
}
- @SuppressWarnings("unchecked")
- @Override
- public void prepareToRead(RecordReader reader, PigSplit split)
- throws IOException {
- this.reader = (XMLFileRecordReader) reader;
+ public boolean nextKeyValue() throws IOException, InterruptedException {
+ if (this.terminated)
+ return false;
+ int depth = 0;
+ // In case of an tag matched with an open tag and a closed tag, this buffer
+ // is used to accumulate matched element if it is spans multiple lines.
+ StringBuffer currentMatch = new StringBuffer();
+ // The start offset of first matched open tag. This marks the first byte
+ // in the range to be copied to output.
+ int offsetOfFirstMatchedOpenTag = 0;
+ try {
+ while (true) {
+ while (buffer == null || buffer.length() == 0) {
+ if (!wrapped.nextKeyValue())
+ return false; // End of split
+ // if passed the end offset of current split, terminate the matching
+ if (bufferPos >= originalEnd && depth == 0) {
+ this.terminated = true;
+ return false;
+ }
+
+ bufferPos = wrapped.getCurrentKey().get();
+ buffer = wrapped.getCurrentValue().toString();
+ }
+ Matcher matcher = identifiersPattern.matcher(buffer);
+ while (matcher.find()) {
+ int startOfCurrentMatch = matcher.start();
+ int endOfCurrentMatch = matcher.end();
+ String group;
+ if ((group = matcher.group(1)) != null) {
+ // Matched an inline-closed tag
+ value = new Text(group);
+ this.key.set(bufferPos + matcher.start(1));
+ bufferPos += matcher.end(1);
+ buffer = buffer.substring(endOfCurrentMatch);
+ return true;
+ } else if ((group = matcher.group(2)) != null) {
+ // Matched an open tag
+ // If this is a top-level match (i.e., not enclosed in another matched
+ // tag), all bytes starting from this offset will be copied to output
+ // in one of two cases:
+ // 1- When a matching close tag is found
+ // 2- When an end of line is encountered
+ if (depth == 0) {
+ offsetOfFirstMatchedOpenTag = startOfCurrentMatch;
+ this.key.set(bufferPos + startOfCurrentMatch);
+ }
+ depth++;
+ } else if ((group = matcher.group(3)) != null) {
+ // Matched a closed tag
+ if (depth > 0) {
+ depth--;
+ if (depth == 0) {
+ // A full top-level match
+ // Copy all bytes to output
+ if (currentMatch.length() == 0) {
+ // A full match in one line, return it immediately
+ value = new Text(buffer.substring(offsetOfFirstMatchedOpenTag, endOfCurrentMatch));
+ } else {
+ currentMatch.append(buffer, offsetOfFirstMatchedOpenTag, endOfCurrentMatch);
+ value = new Text(currentMatch.toString());
+ }
+ // Copy remaining non matched part to the buffer for next call
+ buffer = buffer.substring(endOfCurrentMatch);
+ bufferPos += endOfCurrentMatch;
+ return true;
+ }
+ }
+ } else {
+ throw new RuntimeException("Invalid match '"+matcher.group()+"' in string '"+buffer+"'");
+ }
+ }
+ // No more matches in current line. If we are inside a match (i.e.,
+ // an open tag has been matched) copy all parts to the match.
+ // Otherwise, just drop it.
+ if (depth > 0) {
+ // Inside a match
+ currentMatch.append(buffer, offsetOfFirstMatchedOpenTag, buffer.length());
+ }
+ buffer = null;
+ }
+ } catch (InterruptedException e) {
+ throw new IOException("Error getting input");
+ }
+
}
- @Override
- public void setLocation(String location, Job job) throws IOException {
- loadLocation = location;
- FileInputFormat.setInputPaths(job, location);
- }
-
- //------------------------------------------------------------------------
- // Implementation of InputFormat
-
- public static class XMLFileInputFormat extends FileInputFormat {
-
- /**
- * Boolean flag used to identify whether splittable property is explicitly set.
- */
- private boolean isSplitable = false;
-
- private String recordIdentifier;
-
- public XMLFileInputFormat(String recordIdentifier) {
- this.recordIdentifier = recordIdentifier;
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public RecordReader createRecordReader(InputSplit split,
- TaskAttemptContext context) throws IOException,
- InterruptedException {
-
- return new XMLFileRecordReader(recordIdentifier);
- }
-
- @Override
- protected boolean isSplitable(JobContext context, Path filename) {
- CompressionCodec codec =
- new CompressionCodecFactory(context.getConfiguration()).getCodec(filename);
- return (!(codec == null)) ? isSplitable : true;
- }
- }
-
- //------------------------------------------------------------------------
- // Implementation of RecordReader
-
- public static class XMLFileRecordReader extends RecordReader {
-
- private long start;
- private long end;
- private String recordIdentifier;
-
- /*
- * xmlloader input stream which has the ability to split the input
- * dataset into records by the specified tag
- */
- private XMLLoaderBufferedPositionedInputStream xmlLoaderBPIS = null;
-
- public XMLFileRecordReader(String recordIdentifier) {
- this.recordIdentifier = recordIdentifier;
- }
-
- @Override
- public void initialize(InputSplit genericSplit, TaskAttemptContext context)
- throws IOException, InterruptedException {
- FileSplit split = (FileSplit) genericSplit;
- Configuration job = context.getConfiguration();
-
- start = split.getStart();
- end = start + split.getLength();
- final Path file = split.getPath();
-
- // open the file and seek to the start of the split
- FileSystem fs = file.getFileSystem(job);
- FSDataInputStream fileIn = fs.open(split.getPath());
-
- // Seek to the start of the file
- fileIn.seek(start);
-
- if(file.toString().endsWith(".bz2") || file.toString().endsWith(".bz"))
- {
- // For bzip2 files use CBZip2InputStream to read and supply the upper input stream.
- CBZip2InputStream in = new CBZip2InputStream(fileIn,9, end);
- this.xmlLoaderBPIS = new XMLLoaderBufferedPositionedInputStream(in,start,end);
- }
- else if (file.toString().endsWith(".gz"))
- {
- CompressionCodecFactory compressionCodecs = new CompressionCodecFactory(job);
- final CompressionCodec codec = compressionCodecs.getCodec(file);
- if (codec != null) {
- end = Long.MAX_VALUE;
- CompressionInputStream stream = codec.createInputStream(fileIn);
- this.xmlLoaderBPIS = new XMLLoaderBufferedPositionedInputStream(stream,start,end);
- }
- }
-
- else
- {
- this.xmlLoaderBPIS = new XMLLoaderBufferedPositionedInputStream(fileIn,start,end);
- }
- }
-
-
- @Override
- public void close() throws IOException {
- xmlLoaderBPIS.close();
- }
-
- @Override
- public Object getCurrentKey() throws IOException, InterruptedException {
- return null;
- }
-
- @Override
- public Object getCurrentValue() throws IOException,
- InterruptedException {
- return xmlLoaderBPIS.collectTag(recordIdentifier, end);
- }
-
- @Override
- public float getProgress() throws IOException, InterruptedException {
-
- return 0;
- }
-
-
- @Override
- public boolean nextKeyValue() throws IOException, InterruptedException {
- return xmlLoaderBPIS.isReadable();
- }
-
+ public String toString() {
+ return wrapped.toString();
}
+ }
+
+ /**Location of the file loaded*/
+ private String loadLocation;
+
+ /**Underlying record reader*/
+ @SuppressWarnings("rawtypes")
+ protected RecordReader in = null;
+
+ /**XML tag to parse*/
+ private String identifier;
+
+ public XMLLoader(String identifier) {
+ this.identifier = identifier;
+ }
+
+ @Override
+ public void prepareToRead(RecordReader reader, PigSplit split)
+ throws IOException {
+ in = reader;
+ }
+
+ @Override
+ public Tuple getNext() throws IOException {
+ try {
+ if (!in.nextKeyValue())
+ return null;
+ Tuple tuple = createTuple(in.getCurrentValue().toString());
+ return tuple;
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ return null;
+ }
+ }
+
+
+ /**
+ * Creates a tuple from a matched string
+ */
+ public Tuple createTuple(String str) {
+ return TupleFactory.getInstance().newTuple(new DataByteArray(str));
+ }
+
+ @SuppressWarnings("rawtypes")
+ @Override
+ public InputFormat getInputFormat() throws IOException {
+ if(loadLocation.endsWith(".bz2") || loadLocation.endsWith(".bz")) {
+ return new Bzip2TextInputFormat() {
+ @Override
+ public RecordReader<LongWritable, Text> createRecordReader(
+ InputSplit split, TaskAttemptContext context) {
+ try {
+ RecordReader<LongWritable, Text> originalReader =
+ super.createRecordReader(split, context);
+ XMLRecordReader reader = new XMLRecordReader(originalReader);
+ reader.setXMLIdentifier(identifier);
+ return reader;
+ } catch (IOException e) {
+ throw new RuntimeException("Cannot create input split", e);
+ } catch (InterruptedException e) {
+ throw new RuntimeException("Cannot create input split", e);
+ }
+ }
+ };
+ } else {
+ return new PigTextInputFormat() {
+ @Override
+ public RecordReader<LongWritable, Text> createRecordReader(
+ InputSplit split, TaskAttemptContext context) {
+ RecordReader<LongWritable, Text> originalReader =
+ super.createRecordReader(split, context);
+ XMLRecordReader reader = new XMLRecordReader(originalReader);
+ reader.setXMLIdentifier(identifier);
+ return reader;
+ }
+ };
+ }
+ }
+
+ @Override
+ public void setLocation(String location, Job job) throws IOException {
+ loadLocation = location;
+ FileInputFormat.setInputPaths(job, location);
+ }
}
Added: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/evaluation/xml/data/test-file-2.xml.bz2
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/evaluation/xml/data/test-file-2.xml.bz2?rev=1589911&view=auto
==============================================================================
Binary file - no diff available.
Propchange: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/evaluation/xml/data/test-file-2.xml.bz2
------------------------------------------------------------------------------
svn:mime-type = application/octet-stream
Modified: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestXMLLoader.java
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestXMLLoader.java?rev=1589911&r1=1589910&r2=1589911&view=diff
==============================================================================
--- pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestXMLLoader.java (original)
+++ pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestXMLLoader.java Thu Apr 24 23:58:39 2014
@@ -17,21 +17,29 @@ import static org.apache.pig.ExecType.LO
import java.io.ByteArrayInputStream;
import java.io.File;
+import java.io.PrintStream;
import java.util.ArrayList;
import java.util.Iterator;
+import java.util.Random;
import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;
import junit.framework.TestCase;
+import org.apache.commons.compress.compressors.bzip2.BZip2CompressorOutputStream;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.pig.ExecType;
import org.apache.pig.PigServer;
import org.apache.pig.data.Tuple;
import org.apache.pig.test.Util;
+import org.w3c.dom.Document;
public class TestXMLLoader extends TestCase {
- private static String patternString = "(\\d+)!+(\\w+)~+(\\w+)";
public static ArrayList<String[]> data = new ArrayList<String[]>();
static {
data.add(new String[] { "<configuration>"});
@@ -89,13 +97,11 @@ public class TestXMLLoader extends TestC
inlineClosedTags.add(new String[] { "<event id='33'><tag k='a' v='b'/></event>"});
inlineClosedTags.add(new String[] { "</events>"});
}
-
public void testShouldReturn0TupleCountIfSearchTagIsNotFound () throws Exception
{
String filename = TestHelper.createTempFile(data, "");
PigServer pig = new PigServer(LOCAL);
filename = filename.replace("\\", "\\\\");
- patternString = patternString.replace("\\", "\\\\");
String query = "A = LOAD '" + filename + "' USING org.apache.pig.piggybank.storage.XMLLoader('invalid') as (doc:chararray);";
pig.registerQuery(query);
Iterator<?> it = pig.openIterator("A");
@@ -118,7 +124,6 @@ public class TestXMLLoader extends TestC
String filename = TestHelper.createTempFile(data, "");
PigServer pig = new PigServer(LOCAL);
filename = filename.replace("\\", "\\\\");
- patternString = patternString.replace("\\", "\\\\");
String query = "A = LOAD '" + filename + "' USING org.apache.pig.piggybank.storage.XMLLoader('property') as (doc:chararray);";
pig.registerQuery(query);
Iterator<?> it = pig.openIterator("A");
@@ -175,7 +180,6 @@ public class TestXMLLoader extends TestC
}
}
-
public void testLoaderShouldLoadBasicGzFile() throws Exception {
String filename = TestHelper.createTempFile(data, "");
@@ -224,7 +228,6 @@ public class TestXMLLoader extends TestC
String filename = TestHelper.createTempFile(testData, "");
PigServer pig = new PigServer(LOCAL);
filename = filename.replace("\\", "\\\\");
- patternString = patternString.replace("\\", "\\\\");
String query = "A = LOAD '" + filename + "' USING org.apache.pig.piggybank.storage.XMLLoader('name') as (doc:chararray);";
pig.registerQuery(query);
Iterator<?> it = pig.openIterator("A");
@@ -248,7 +251,6 @@ public class TestXMLLoader extends TestC
String filename = TestHelper.createTempFile(data, "");
PigServer pig = new PigServer(LOCAL);
filename = filename.replace("\\", "\\\\");
- patternString = patternString.replace("\\", "\\\\");
String query = "A = LOAD '" + filename + "' USING org.apache.pig.piggybank.storage.XMLLoader('ignoreProperty') as (doc:chararray);";
pig.registerQuery(query);
Iterator<?> it = pig.openIterator("A");
@@ -265,14 +267,13 @@ public class TestXMLLoader extends TestC
}
assertEquals(1, tupleCount);
}
-
public void testShouldReturn0TupleCountIfNoEndTagIsFound() throws Exception
{
// modify the data content to avoid end tag for </ignoreProperty>
ArrayList<String[]> testData = new ArrayList<String[]>();
for (String content[] : data) {
- if(false == data.equals(new String[] { "</ignoreProperty>"}))
+ if(!content[0].equals("</ignoreProperty>"))
{
testData.add(content);
}
@@ -281,8 +282,7 @@ public class TestXMLLoader extends TestC
String filename = TestHelper.createTempFile(testData, "");
PigServer pig = new PigServer(LOCAL);
filename = filename.replace("\\", "\\\\");
- patternString = patternString.replace("\\", "\\\\");
- String query = "A = LOAD '" + filename + "' USING org.apache.pig.piggybank.storage.XMLLoader('</ignoreProperty>') as (doc:chararray);";
+ String query = "A = LOAD '" + filename + "' USING org.apache.pig.piggybank.storage.XMLLoader('ignoreProperty') as (doc:chararray);";
pig.registerQuery(query);
Iterator<?> it = pig.openIterator("A");
int tupleCount = 0;
@@ -307,8 +307,7 @@ public class TestXMLLoader extends TestC
String filename = TestHelper.createTempFile(testData, "");
PigServer pig = new PigServer(LOCAL);
filename = filename.replace("\\", "\\\\");
- patternString = patternString.replace("\\", "\\\\");
- String query = "A = LOAD '" + filename + "' USING org.apache.pig.piggybank.storage.XMLLoader('</ignoreProperty>') as (doc:chararray);";
+ String query = "A = LOAD '" + filename + "' USING org.apache.pig.piggybank.storage.XMLLoader('ignoreProperty') as (doc:chararray);";
pig.registerQuery(query);
Iterator<?> it = pig.openIterator("A");
int tupleCount = 0;
@@ -330,7 +329,6 @@ public class TestXMLLoader extends TestC
String filename = TestHelper.createTempFile(nestedTags, "");
PigServer pig = new PigServer(LOCAL);
filename = filename.replace("\\", "\\\\");
- patternString = patternString.replace("\\", "\\\\");
String query = "A = LOAD '" + filename + "' USING org.apache.pig.piggybank.storage.XMLLoader('event') as (doc:chararray);";
pig.registerQuery(query);
Iterator<?> it = pig.openIterator("A");
@@ -353,7 +351,6 @@ public class TestXMLLoader extends TestC
String filename = TestHelper.createTempFile(inlineClosedTags, "");
PigServer pig = new PigServer(LOCAL);
filename = filename.replace("\\", "\\\\");
- patternString = patternString.replace("\\", "\\\\");
String query = "A = LOAD '" + filename + "' USING org.apache.pig.piggybank.storage.XMLLoader('event') as (doc:chararray);";
pig.registerQuery(query);
Iterator<?> it = pig.openIterator("A");
@@ -375,7 +372,6 @@ public class TestXMLLoader extends TestC
String filename = TestHelper.createTempFile(inlineClosedTags, "");
PigServer pig = new PigServer(LOCAL);
filename = filename.replace("\\", "\\\\");
- patternString = patternString.replace("\\", "\\\\");
String query = "A = LOAD '" + filename + "' USING org.apache.pig.piggybank.storage.XMLLoader('event') as (doc:chararray);";
pig.registerQuery(query);
Iterator<?> it = pig.openIterator("A");
@@ -391,4 +387,108 @@ public class TestXMLLoader extends TestC
}
}
}
+
+ /**
+ * This test case test the special case when a non-matching tag spans two file
+ * splits in a .bz2 compressed file. At the same time, the part that falls in
+ * the first split is a prefix of the matching tag.
+ * In other words, till the end of the first split, it looks like the tag is
+ * matching but it is not actually matching.
+ *
+ * @throws Exception
+ */
+ public void testXMLLoaderShouldNotReturnLastNonMatchedTag() throws Exception {
+ Configuration conf = new Configuration();
+ long blockSize = 100 * 1024;
+ conf.setLong("fs.local.block.size", blockSize);
+
+ String tagName = "event";
+
+ PigServer pig = new PigServer(LOCAL, conf);
+ FileSystem localFs = FileSystem.getLocal(conf);
+ FileStatus[] testFiles = localFs.globStatus(new Path("src/test/java/org/apache/pig/piggybank/test/evaluation/xml/data/*xml.bz2"));
+ assertTrue("No test files", testFiles.length > 0);
+ for (FileStatus testFile : testFiles) {
+ String testFileName = testFile.getPath().toUri().getPath().replace("\\", "\\\\");
+ String query = "A = LOAD '" + testFileName + "' USING org.apache.pig.piggybank.storage.XMLLoader('event') as (doc:chararray);";
+ pig.registerQuery(query);
+ Iterator<?> it = pig.openIterator("A");
+ while (it.hasNext()) {
+ Tuple tuple = (Tuple) it.next();
+ if (tuple == null)
+ break;
+ else {
+ if (tuple.size() > 0) {
+ assertTrue(((String)tuple.get(0)).startsWith("<"+tagName+">"));
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * This test checks that a multi-line tag spanning two splits should be
+ * matched.
+ * @throws Exception
+ */
+ public void testXMLLoaderShouldMatchTagSpanningSplits() throws Exception {
+ Configuration conf = new Configuration();
+ long blockSize = 512;
+ conf.setLong("fs.local.block.size", blockSize);
+ conf.setLong("mapred.max.split.size", blockSize);
+
+ String tagName = "event";
+ File tempFile = File.createTempFile("long-file", ".xml");
+ FileSystem localFs = FileSystem.getLocal(conf);
+ FSDataOutputStream directOut = localFs.create(new Path(tempFile.getAbsolutePath()), true);
+
+ String matchingElement = "<event>\ndata\n</event>\n";
+ long pos = 0;
+ int matchingCount = 0;
+ PrintStream ps = new PrintStream(directOut);
+ // 1- Write some elements that fit completely in the first block
+ while (pos + 2 * matchingElement.length() < blockSize) {
+ ps.print(matchingElement);
+ pos += matchingElement.length();
+ matchingCount++;
+ }
+ // 2- Write a long element that spans multiple lines and multiple blocks
+ String longElement = matchingElement.replace("data",
+ "data\ndata\ndata\ndata\ndata\ndata\ndata\ndata\ndata\ndata\ndata\n");
+ ps.print(longElement);
+ pos += longElement.length();
+ matchingCount++;
+ // 3- Write some more elements to fill in the second block completely
+ while (pos < 2 * blockSize) {
+ ps.print(matchingElement);
+ pos += matchingElement.length();
+ matchingCount++;
+ }
+ ps.close();
+
+
+ PigServer pig = new PigServer(LOCAL, conf);
+ String tempFileName = tempFile.getAbsolutePath().replace("\\", "\\\\");
+ String query = "A = LOAD '" + tempFileName + "' USING org.apache.pig.piggybank.storage.XMLLoader('event') as (doc:chararray);";
+ pig.registerQuery(query);
+ Iterator<?> it = pig.openIterator("A");
+
+ int count = 0;
+ while (it.hasNext()) {
+ Tuple tuple = (Tuple) it.next();
+ if (tuple == null)
+ break;
+ else {
+ if (tuple.size() > 0) {
+ count++;
+ // Make sure the returned text is a proper XML element
+ DocumentBuilder docBuilder =
+ DocumentBuilderFactory.newInstance().newDocumentBuilder();
+ Document doc = docBuilder.parse(new ByteArrayInputStream(((String)tuple.get(0)).getBytes()));
+ assertTrue(doc.getDocumentElement().getNodeName().equals(tagName));
+ }
+ }
+ }
+ assertEquals(matchingCount, count);
+ }
}