You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2014/04/25 01:58:40 UTC

svn commit: r1589911 - in /pig/trunk: ./ contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/ contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/evaluation/xml/data/ contrib/piggybank/java/src/test/java/org/apache/pig/...

Author: daijy
Date: Thu Apr 24 23:58:39 2014
New Revision: 1589911

URL: http://svn.apache.org/r1589911
Log:
PIG-3865: Remodel the XMLLoader to work to be faster and more maintainable

Added:
    pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/evaluation/xml/data/
    pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/evaluation/xml/data/test-file-2.xml.bz2   (with props)
Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/XMLLoader.java
    pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestXMLLoader.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1589911&r1=1589910&r2=1589911&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Thu Apr 24 23:58:39 2014
@@ -32,6 +32,8 @@ PIG-2207: Support custom counters for ag
 
 IMPROVEMENTS
 
+PIG-3865: Remodel the XMLLoader to work to be faster and more maintainable (aseldawy via daijy)
+
 PIG-3737: Bundle dependent jars in distribution in %PIG_HOME%/lib folder (daijy)
 
 PIG-3771: Piggybank Avrostorage makes a lot of namenode calls in the backend (rohini)

Modified: pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/XMLLoader.java
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/XMLLoader.java?rev=1589911&r1=1589910&r2=1589911&view=diff
==============================================================================
--- pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/XMLLoader.java (original)
+++ pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/XMLLoader.java Thu Apr 24 23:58:39 2014
@@ -18,693 +18,303 @@
 
 package org.apache.pig.piggybank.storage;
 
-import java.io.ByteArrayOutputStream;
 import java.io.IOException;
-import java.io.InputStream;
-import java.util.ArrayList;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.io.compress.CompressionCodecFactory;
-import org.apache.hadoop.io.compress.CompressionInputStream;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.input.FileSplit;
 import org.apache.pig.LoadFunc;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigTextInputFormat;
+import org.apache.pig.bzip2r.Bzip2TextInputFormat;
 import org.apache.pig.data.DataByteArray;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
-import org.apache.pig.impl.io.BufferedPositionedInputStream;
-import org.apache.tools.bzip2r.CBZip2InputStream;
-
-
-
 
 /**
- * A <code>XMLLoaderBufferedPositionedInputStream</code> is the package class and is the 
- * decorator over the BufferedPositionedInputStream which in turn decorate
- * BufferedInputStream. It contains <code>BufferedPositionedInputStream<code>
- * input stream, which it uses as
- * its  basic source of data, possibly reading or providing  additional
- * functionality. The class <code>XMLLoaderBufferedPositionedInputStream</code>
- * itself simply overrides the necessary medthod for reading i.e 
- * <code>read</code> <code>getPosition<code> with versions that
- * pass all requests to the contained  input
- * stream or do some special processing. Subclasses of <code>XMLLoaderBufferedPositionedInputStream</code>
- * may further override some of  these methods
- * and may also provide additional methods
- * and fields.
- * It also provides additional method <code>collectTag<collect> which will give the byte 
- * array between the tag which is a xml record. i.e <tag> .*</tag> will be returned
+ * Parses an XML input file given a specified identifier of tags to be loaded.
+ * The output is a bag of XML elements where each element is returned as
+ * a chararray containing the text of the matched XML element including the
+ * start and tags as well as the data between them. In case of nesting elements
+ * of the matching tags, only the top level one is returned.
  *
- * @note we can't use the standard SAX or STAX parser as for a big xml 
- *       the intermittent hadoop block may not be the valid xml and hence those
- *       parser may create pb. 
- *
- * @since   pig 2.0
  */
+public class XMLLoader extends LoadFunc {
 
-class XMLLoaderBufferedPositionedInputStream extends BufferedPositionedInputStream { 
-
-    public final static int S_START = 0;
-    public final static int S_MATCH_PREFIX = 1;
-    public final static int S_MATCH_TAG = 2;
+  /**
+   * Use this record reader to read XML tags out of a text file. It matches only
+   * the tags identified by an identifier configured through a call to
+   * {@link #setXMLIdentifier(String)}. It there are nesting tags of the given
+   * identifier, only the top level one is returned which also includes all
+   * enclosed tags.
+   */
+  public static class XMLRecordReader extends RecordReader<LongWritable, Text> {
+    protected final RecordReader<LongWritable, Text> wrapped;
 
+    /**Regular expression for XML tag identifier*/
+    private static final String XMLTagNameRegExp = "[a-zA-Z\\_][0-9a-zA-Z\\-_]+";
     /**
-     * The input streamed to be filtered 
+     * A regular expression that matches key parts in the XML text needed to
+     * correctly parse it and find matches of the given identifier
      */
-    InputStream wrapperIn;
+    private Pattern identifiersPattern;
 
-    /**
-     * The field to know if the underlying buffer contains any more bytes
-     */
-    boolean _isReadable;
+    private LongWritable key;
+    private Text value;
 
-    /**
-    * The field set the maximum bytes that is readable by this instance of stream.
-    */
-    private long maxBytesReadable = 0;
-    	
-    /**
-    * The field denote the number of bytes read by this stream. 
-    */
-    long bytesRead = 0;
-    	
-    /**
-    * Denotes the end of the current split location
-    */
-    long end = 0;
-    
-    /**
-     * Creates a <code>XMLLoaderBufferedPositionedInputStream</code>
-     * by assigning the  argument <code>in</code>
-     * to the field <code>this.wrapperIn</code> so as
-     * to remember it for later use.
-     *
-     * @param   in   the underlying input stream,
-     */
-    public XMLLoaderBufferedPositionedInputStream(InputStream in){
-        super(in);
-        this.wrapperIn = in;
-        setReadable(true);
-    }
-    
-    /**
-     * Creates a  split aware <code>XMLLoaderBufferedPositionedInputStream</code>.
-     * @param in    the underlying input stream
-     * @param start    start location of the split
-     * @param end    end location of the split
-     */
-    public XMLLoaderBufferedPositionedInputStream(InputStream in,long start,long end){
-       this(in);
-       this.end = end;
-       maxBytesReadable = end - start;
-    }
+    /**Position of the current buffer in the file*/
+    private long bufferPos;
 
-    /**
-     * Set the stream readable or non readable. This is needed
-     * to control the xml parsing.
-     * @param flag The boolean flag to be set
-     * @see XMLLoaderBufferedPositionedInputStream#isReadable
-     */
-    private void setReadable(boolean flag) {
-       _isReadable = flag;
-    }
+    /**Holds parts of the input file that were read but not parsed yet*/
+    private String buffer;
 
-    /**
-     * See if the stream readable or non readable. This is needed
-     * to control the xml parsing.
-     * @return  true if readable otherwise false
-     * @see XMLLoaderBufferedPositionedInputStream#setReadable
-     */
-    public boolean isReadable() {
-       return _isReadable == true;
-    }
+    /**Original end of the split to parse*/
+    private long originalEnd;
 
-    /**
-     * org.apache.pig.impl.io.BufferedPositionedInputStream.read
-     * It is just the wrapper for now.
-     * Reads the next byte of data from this input stream. The value
-     * byte is returned as an <code>int</code> in the range
-     * <code>0</code> to <code>255</code>. If no byte is available
-     * because the end of the stream has been reached, the value
-     * <code>-1</code> is returned. This method blocks until input data
-     * is available, the end of the stream is detected, or an exception
-     * is thrown.
-     * <p>
-     * This method
-     * simply performs <code>in.read()</code> and returns the result.
-     *
-     * @return     the next byte of data, or <code>-1</code> if the end of the
-     *             stream is reached.
-     * @exception  IOException  if an I/O error occurs.
-     * @see        XMLLoaderBufferedPositionedInputStreamInputStream#wrapperIn
-     */
-    public int read() throws IOException {
-       return wrapperIn.read();
-    }
-
-    /**
-     * This is collect the bytes from current position to the ending tag.
-     * This scans for the tags and do the pattern match byte by byte
-     * this must be used along with 
-     *  XMLLoaderBufferedPositionedInputStream#skipToTag
-     *
-     * @param tagName the end tag to search for
-     *
-     * @param limit the end pointer for the block for this mapper
-     *
-     * @return the byte array containing the documents until the end of tag
-     *
-     * @see loader.XMLLoaderBufferedPositionedInputStream.collectUntilEndTag
-     *
-     */
-    private byte[] collectUntilEndTag(String tagName, long limit) {
+    private boolean terminated;
 
-      //@todo use the charset and get the charset encoding from the xml encoding.
-      byte[] tmp = tagName.getBytes();
-      ByteArrayOutputStream collectBuf = new ByteArrayOutputStream(1024);
-      // Levels of elements we went inside matched node
-      int depth = 0;
-      
-      //Since skipToTag was called before this function, we know that we are
-      //currently inside the matched tag. Assuming the XML file is well
-      //structured, we read till we encounter the first close tag. Since
-      //the matched element might contain nested element, we keep track of the
-      //current depth and terminate only when we encounter a closing tag at
-      //level zero
-      
-      // A flag to indicate the parsing is currently inside a (start/end) tag
-      boolean insideTag = false;
-      // A flag to indicate that the current tag is a closing (end) tag
-      boolean closingTag = false;
-      
-      // Last byte read
-      int last_b = -1;
-      while (true) {
-        int b = -1;
-        try {
-          b = this.read();
-          ++bytesRead; // Add one to the bytes read
-          if (b == -1) {
-            collectBuf.reset();
-            this.setReadable(false);
-            break;
-          }
-          collectBuf.write((byte)(b));
-
-          // Check if the start tag has matched except for the last char
-          if (b == '<') {
-            insideTag = true;
-            closingTag = false;
-          } else if (b == '>') {
-            // Detect the pattern />
-            if (last_b == '/')
-              closingTag = true;
-            insideTag = false;
-            if (closingTag) {
-              if (depth == 0)
-                break;
-              depth--;
-            }
-          } else if (b == '/' && last_b == '<') {
-            // Detected the pattern </
-            closingTag = true;
-          } else if (insideTag && last_b == '<') {
-            // First character after '<' which is not a '/'
-            depth++;
-          }
-        }
-        catch (IOException e) {
-          this.setReadable(false);
-          return null;
-        }
-        last_b = b;
-      }
-      return collectBuf.toByteArray();
+    public XMLRecordReader(RecordReader<LongWritable, Text> wrapped) {
+      this.wrapped = wrapped;
     }
 
     /**
-     * This is collect the from the matching tag.
-     * This scans for the tags and do the pattern match byte by byte
-     * This returns a part doc. it must be used along with 
-     * XMLLoaderBufferedPositionedInputStream#collectUntilEndTag
-     * 
-     * @param tagName the start tag to search for
-     *
-     * @param limit the end pointer for the block for this mapper
-     *
-     * @return the byte array containing match of the tag.
-     *
-     * @see loader.XMLLoaderBufferedPositionedInputStream.collectUntilEndTag
-     *
+     * Delegate the initialization method to the wrapped stream after changing
+     * the length of the split to be non-ending.
      */
-    private byte[] skipToTag(String tagName, long limit) throws IOException {
-      
-      //@todo use the charset and get the charset encoding from the xml encoding.
-      byte[] tmp = tagName.getBytes();
-      byte[] tag = new byte[tmp.length + 1];
-      tag[0] = (byte)'<';
-      for (int i = 0; i < tmp.length; ++i) {
-        tag[1+i] = tmp[i];
-      }
-
-      ByteArrayOutputStream matchBuf = new ByteArrayOutputStream(512);
-      int idxTagChar = 0;
-      int state = S_START;
-      
-      /*
-       * Read till the tag is found in this block. If a partial tag block is found
-       * then continue on to the next block.matchBuf contains the data that is currently 
-       * matched. If the read has reached the end of split and there are matched data 
-       * then continue on to the next block.
-       */
-      while (splitBoundaryCriteria(wrapperIn) ||  (matchBuf.size() > 0 )) {
-        int b = -1;
-        try {
-          b = this.read();
-          ++bytesRead; // Increment the bytes read by 1
-          if (b == -1) {
-            state = S_START;
-            matchBuf.reset();
-            this.setReadable(false);
-            break;
-          }
-          switch (state) {
-            case S_START:
-              // start to match the target open tag
-              if (b == tag[idxTagChar]) {
-                ++idxTagChar;
-                matchBuf.write((byte)(b));
-                if (idxTagChar == tag.length) {
-                  state = S_MATCH_PREFIX;
-                }
-              } else {  // mismatch
-                idxTagChar = 0;
-                matchBuf.reset();
-              }
-              break;
-            case S_MATCH_PREFIX:
-              // tag match iff next character is whitespaces or close tag mark
-              if (Character.isWhitespace(b) || b == '/' || b == '>') {
-                matchBuf.write((byte)(b));
-                state = S_MATCH_TAG;
-              } else {
-                idxTagChar = 0;
-                matchBuf.reset();
-                state = S_START;
-              }
-              break;
-            case S_MATCH_TAG:
-              // keep copy characters until we hit the close tag mark
-              matchBuf.write((byte)(b));
-              break;
-            default:
-              throw new IllegalArgumentException("Invalid state: " + state);
-          }
-          if (state == S_MATCH_TAG && (b == '>' || Character.isWhitespace(b))) {
-            break;
-          }
-          if (state != S_MATCH_TAG && this.getPosition() > limit) {
-            // need to break, no record in this block
-            break;
-          }
-        }
-        catch (IOException e) {
-          this.setReadable(false);
-          return null;
-        }
+    @Override
+    public void initialize(InputSplit split, TaskAttemptContext context)
+	throws IOException, InterruptedException {
+      key = new LongWritable();
+      value = new Text();
+      if (split instanceof FileSplit) {
+	FileSplit fsplit = (FileSplit) split;
+	originalEnd = fsplit.getStart() + fsplit.getLength();
+	Path path = fsplit.getPath();
+	long fileEnd = path.getFileSystem(context.getConfiguration()).getFileStatus(path).getLen();
+	FileSplit extendedSplit = new FileSplit(path, fsplit.getStart(),
+	    Math.min(fsplit.getLength() * 10, fileEnd - fsplit.getStart()), fsplit.getLocations());
+	this.wrapped.initialize(extendedSplit, context);
+      } else {
+	throw new RuntimeException("Cannot override a split of type'"+
+	    split.getClass()+"'");
       }
-      return matchBuf.toByteArray();
-    }
-    /**
-     * Returns whether the split boundary condition has reached or not.
-     * For normal files ; the condition is to read till the split end reaches.
-     * Gz files will have  maxBytesReadable set to near Long.MAXVALUE, hence
-     * this will cause the entire file to be read. For bz2 and bz files, the 
-     * condition lies on the position which until which it is read. 
-     *  
-     * @param wrapperIn2
-     * @return true/false depending on whether split boundary has reached or no
-     * @throws IOException
-     */
-    private boolean splitBoundaryCriteria(InputStream wrapperIn2) throws IOException {
-       if(wrapperIn2 instanceof CBZip2InputStream)
-          return ((CBZip2InputStream)wrapperIn2).getPos() <= end;
-       else
-          return bytesRead <= maxBytesReadable;
     }
 
-    /**
-     * This is collect bytes from start and end tag both inclusive
-     * This scans for the tags and do the pattern match byte by byte
-     * 
-     * @param tagName the start tag to search for
-     *
-     * @param limit the end pointer for the block for this mapper
-     *
-     * @return the byte array containing match of the <code><tag>.*</tag><code>.
-     *
-     * @see loader.XMLLoaderBufferedPositionedInputStream.skipToTag
-     *
-     * @see loader.XMLLoaderBufferedPositionedInputStream.collectUntilEndTag
-     *
-     */
-    byte[] collectTag(String tagName, long limit) throws IOException {
-       ByteArrayOutputStream collectBuf = new ByteArrayOutputStream(1024);
-       byte[] beginTag = skipToTag(tagName, limit);
-       
-       // Check if the tag is closed inline
-       if (beginTag.length > 2 && beginTag[beginTag.length - 2] == '/' &&
-           beginTag[beginTag.length-1] == '>') {
-         return beginTag;
-       }
-
-       // No need to search for the end tag if the start tag is not found
-       if(beginTag.length > 0 ){ 
-          byte[] untilTag = collectUntilEndTag(tagName, limit);
-          if (untilTag.length > 0) {
-             for (byte b: beginTag) {
-             collectBuf.write(b);
-           }
-           for (byte b: untilTag) {
-              collectBuf.write(b);
-           }
-          }
-       }
-       return collectBuf.toByteArray();
+    public void setXMLIdentifier(String identifier) {
+      if (!identifier.matches(XMLTagNameRegExp))
+	throw new RuntimeException("XML tag identifier '"+identifier+"' does not match the regular expression /"+XMLTagNameRegExp+"/");
+      String inlineClosedTagRegExp = "<\\s*"+identifier+"\\s*[^>]*/>";
+      String openTagRegExp = "<\\s*"+identifier+"(?:\\s*|\\s+(?:[^/>]*|[^>]*[^>/]))>";
+      String closeTagRegExp = "</\\s*"+identifier+"\\s*>";
+      identifiersPattern = Pattern.compile("("+inlineClosedTagRegExp+")|("+openTagRegExp+")|("+closeTagRegExp+")");
     }
 
-}
-
-
-/**
- * The load function to load the XML file
- * This implements the LoadFunc interface which is used to parse records
- * from a dataset. The various helper adaptor function is extended from loader.Utf8StorageConverter
- * which included various functions to cast raw byte data into various datatypes. 
- * other sections of the code can call back to the loader to do the cast.
- * This takes a xmlTag as the arg which it will use to split the inputdataset into
- * multiple records. 
- * <code>
- *    
- * For example if the input xml (input.xml) is like this
- *     <configuration>
- *         <property>
- *            <name> foobar </name>
- *            <value> barfoo </value>
- *         </property>
- *         <ignoreProperty>
- *           <name> foo </name>
- *         </ignoreProperty>
- *         <property>
- *            <name> justname </name>
- *         </property>
- *     </configuration>
- *
- *    And your pig script is like this
- *
- *    --load the jar files
- *    register /homes/aloks/pig/udfLib/loader.jar;
- *    -- load the dataset using XMLLoader
- *    -- A is the bag containing the tuple which contains one atom i.e doc see output
- *    A = load '/user/aloks/pig/input.xml using loader.XMLLoader('property') as (doc:chararray);
- *    --dump the result
- *    dump A;
- *
- *
- *    Then you will get the output
- *
- *    (<property>
- *             <name> foobar </name>
- *             <value> barfoo </value>
- *          </property>)
- *    (<property>
- *             <name> justname </name>
- *          </property>)
- *
- *
- *    Where each () indicate one record 
- *
- * 
- * </code>
- */
-
-public class XMLLoader extends LoadFunc {
-
-    /**
-     * logger from pig
-     */
-    protected final Log mLog = LogFactory.getLog(getClass());
-
-    private XMLFileRecordReader reader = null;
-
-
-    /**
-     * the tuple content which is used while returning
-     */
-    private ArrayList<Object> mProtoTuple = null;
-
-    /**
-     * The record seperated. The default value is 'document'
-     */
-    public String recordIdentifier = "document";
-
-    private String loadLocation;
-    
-    public XMLLoader() {
-
+    /* Delegate all methods to the wrapped stream */
+    public void close() throws IOException {
+      wrapped.close();
     }
 
-    /**
-     * Constructs a Pig loader that uses specified string as the record seperater
-     * for example if the recordIdentifier is document. It will consider the record as 
-     * <document> .* </document>
-     * 
-     * @param recordIdentifier the xml tag which is used to pull records
-     *
-     */
-    public XMLLoader(String recordIdentifier) {
-        this();
-        this.recordIdentifier = recordIdentifier;
+    public boolean equals(Object obj) {
+      return wrapped.equals(obj);
     }
 
-    /**
-     * Retrieves the next tuple to be processed.
-     * @return the next tuple to be processed or null if there are no more tuples
-     * to be processed.
-     * @throws IOException
-     */
-    @Override
-    public Tuple getNext() throws IOException {
- 
-        boolean next = false;
-        
-        try {
-            next = reader.nextKeyValue();
-        } catch (InterruptedException e) {
-            throw new IOException(e);
-        }
-        
-        if (!next) return null;
-        
-        Tuple t = null;
-     
-        try {
-            byte[] tagContent = (byte[]) reader.getCurrentValue();
-            // No need to create the tuple if there are no contents
-            t = (tagContent.length > 0) ? createTuple(tagContent) : null;
-        } catch (Exception e) {
-            throw new IOException(e);
-        }
-
-        return t; 
- 
-    }
-    
-    public Tuple createTuple(byte[] content) throws Exception {
-        if (mProtoTuple == null) {
-            mProtoTuple = new ArrayList<Object>();
-        }
-        if (content.length > 0) {
-            mProtoTuple.add(new DataByteArray(content));
-        }
-        Tuple t = TupleFactory.getInstance().newTupleNoCopy(mProtoTuple);
-        mProtoTuple = null;
-
-        return t;
+    public LongWritable getCurrentKey() throws IOException, InterruptedException {
+      return key;
     }
 
-    /**
-     * to check for equality 
-     * @param object 
-     */
-    public boolean equals(Object obj) {
-        return equals((XMLLoader)obj);
+    public Text getCurrentValue() throws IOException, InterruptedException {
+      return value;
     }
 
-    /**
-     * to check for equality 
-     * @param XMLLoader object 
-     */
-    public boolean equals(XMLLoader other) {
-        return this.recordIdentifier.equals(other.recordIdentifier);
+    public float getProgress() throws IOException, InterruptedException {
+      return Math.max(1.0f, this.wrapped.getProgress() * 10);
     }
 
-    @SuppressWarnings("unchecked")
-    @Override
-    public InputFormat getInputFormat() throws IOException {
-       XMLFileInputFormat inputFormat = new XMLFileInputFormat(recordIdentifier);
-       if(loadLocation.endsWith(".bz2") || loadLocation.endsWith(".bz")) {
-          inputFormat.isSplitable = true;
-         }
-       return inputFormat;
+    public int hashCode() {
+      return wrapped.hashCode();
     }
 
-    @SuppressWarnings("unchecked")
-    @Override
-    public void prepareToRead(RecordReader reader, PigSplit split)
-            throws IOException {
-        this.reader = (XMLFileRecordReader) reader;
+    public boolean nextKeyValue() throws IOException, InterruptedException {
+      if (this.terminated)
+	return false;
+      int depth = 0;
+      // In case of an tag matched with an open tag and a closed tag, this buffer
+      // is used to accumulate matched element if it is spans multiple lines.
+      StringBuffer currentMatch = new StringBuffer();
+      // The start offset of first matched open tag. This marks the first byte
+      // in the range to be copied to output.
+      int offsetOfFirstMatchedOpenTag = 0;
+      try {
+      while (true) {
+	while (buffer == null || buffer.length() == 0) {
+	  if (!wrapped.nextKeyValue())
+	    return false; // End of split
+	  // if passed the end offset of current split, terminate the matching
+	  if (bufferPos >= originalEnd && depth == 0) {
+	    this.terminated = true;
+	    return false;
+	  }
+
+	  bufferPos = wrapped.getCurrentKey().get();
+	  buffer = wrapped.getCurrentValue().toString();
+	}
+	Matcher matcher = identifiersPattern.matcher(buffer);
+	while (matcher.find()) {
+	  int startOfCurrentMatch = matcher.start();
+	  int endOfCurrentMatch = matcher.end();
+	  String group;
+	  if ((group = matcher.group(1)) != null) {
+	    // Matched an inline-closed tag
+	    value = new Text(group);
+	    this.key.set(bufferPos + matcher.start(1));
+	    bufferPos += matcher.end(1);
+	    buffer = buffer.substring(endOfCurrentMatch);
+	    return true;
+	  } else if ((group = matcher.group(2)) != null) {
+	    // Matched an open tag
+	    // If this is a top-level match (i.e., not enclosed in another matched
+	    // tag), all bytes starting from this offset will be copied to output
+	    // in one of two cases:
+	    // 1- When a matching close tag is found
+	    // 2- When an end of line is encountered
+	    if (depth == 0) {
+	      offsetOfFirstMatchedOpenTag = startOfCurrentMatch;
+	      this.key.set(bufferPos + startOfCurrentMatch);
+	    }
+	    depth++;
+	  } else if ((group = matcher.group(3)) != null) {
+	    // Matched a closed tag
+	    if (depth > 0) {
+	      depth--;
+	      if (depth == 0) {
+		// A full top-level match
+		// Copy all bytes to output
+		if (currentMatch.length() == 0) {
+		  // A full match in one line, return it immediately
+		  value = new Text(buffer.substring(offsetOfFirstMatchedOpenTag, endOfCurrentMatch));
+		} else {
+		  currentMatch.append(buffer, offsetOfFirstMatchedOpenTag, endOfCurrentMatch);
+		  value = new Text(currentMatch.toString());
+		}
+		// Copy remaining non matched part to the buffer for next call
+		buffer = buffer.substring(endOfCurrentMatch);
+		bufferPos += endOfCurrentMatch;
+		return true;
+	      }
+	    }
+	  } else {
+	    throw new RuntimeException("Invalid match '"+matcher.group()+"' in string '"+buffer+"'");
+	  }
+	}
+	// No more matches in current line. If we are inside a match (i.e.,
+	// an open tag has been matched) copy all parts to the match.
+	// Otherwise, just drop it.
+	if (depth > 0) {
+	  // Inside a match
+	  currentMatch.append(buffer, offsetOfFirstMatchedOpenTag, buffer.length());
+	}
+	buffer = null;
+      }
+      } catch (InterruptedException e) {
+	throw new IOException("Error getting input");
+      }
+
     }
 
-    @Override
-    public void setLocation(String location, Job job) throws IOException {
-        loadLocation = location; 
-        FileInputFormat.setInputPaths(job, location);
-    }
-    
-    //------------------------------------------------------------------------
-    // Implementation of InputFormat
-    
-    public static class XMLFileInputFormat extends FileInputFormat {
-
-       /**
-        * Boolean flag used to identify whether splittable property is explicitly set.
-        */
-        private boolean isSplitable = false;
-        
-        private String recordIdentifier;
-        
-        public XMLFileInputFormat(String recordIdentifier) {
-            this.recordIdentifier = recordIdentifier;
-        }
-        
-        @SuppressWarnings("unchecked")
-        @Override
-        public RecordReader createRecordReader(InputSplit split,
-                TaskAttemptContext context) throws IOException,
-                InterruptedException {
-            
-            return new XMLFileRecordReader(recordIdentifier);
-        }  
-        
-        @Override
-        protected boolean isSplitable(JobContext context, Path filename) {
-           CompressionCodec codec = 
-              new CompressionCodecFactory(context.getConfiguration()).getCodec(filename);
-           return (!(codec == null)) ? isSplitable : true;
-        }
-    }
-    
-    //------------------------------------------------------------------------
-    // Implementation of RecordReader
-    
-    public static class XMLFileRecordReader extends RecordReader {
-
-        private long start;
-        private long end;
-        private String recordIdentifier;
-        
-        /*
-         * xmlloader input stream which has the ability to split the input
-         * dataset into records by the specified tag
-         */
-        private XMLLoaderBufferedPositionedInputStream xmlLoaderBPIS = null;
-
-        public XMLFileRecordReader(String recordIdentifier) {
-            this.recordIdentifier = recordIdentifier;
-        }
-        
-        @Override
-        public void initialize(InputSplit genericSplit, TaskAttemptContext context)
-                throws IOException, InterruptedException {
-            FileSplit split = (FileSplit) genericSplit;
-            Configuration job = context.getConfiguration();
- 
-            start = split.getStart();
-            end = start + split.getLength();
-            final Path file = split.getPath();
- 
-            // open the file and seek to the start of the split
-            FileSystem fs = file.getFileSystem(job);
-            FSDataInputStream fileIn = fs.open(split.getPath());
-            
-            // Seek to the start of the file
-            fileIn.seek(start);
-        
-            if(file.toString().endsWith(".bz2") || file.toString().endsWith(".bz"))
-            {
-            	// For bzip2 files use CBZip2InputStream to read and supply the upper input stream.
-               CBZip2InputStream in = new CBZip2InputStream(fileIn,9, end);
-               this.xmlLoaderBPIS = new XMLLoaderBufferedPositionedInputStream(in,start,end);
-            }
-            else if (file.toString().endsWith(".gz"))
-            {
-            	CompressionCodecFactory compressionCodecs =  new CompressionCodecFactory(job);
-            	final CompressionCodec codec = compressionCodecs.getCodec(file);
-            	 if (codec != null) {
-            	    end = Long.MAX_VALUE;
-            	      CompressionInputStream stream = codec.createInputStream(fileIn);
-            	      this.xmlLoaderBPIS = new XMLLoaderBufferedPositionedInputStream(stream,start,end);
-            	    }
-            }
-            
-            else
-            {
-               this.xmlLoaderBPIS = new XMLLoaderBufferedPositionedInputStream(fileIn,start,end);
-            }
-        }
-
-        
-        @Override
-        public void close() throws IOException {
-            xmlLoaderBPIS.close();
-        }
-
-        @Override
-        public Object getCurrentKey() throws IOException, InterruptedException {
-            return null;
-        }
-
-        @Override
-        public Object getCurrentValue() throws IOException,
-                InterruptedException {            
-            return xmlLoaderBPIS.collectTag(recordIdentifier, end);
-        }
-
-        @Override
-        public float getProgress() throws IOException, InterruptedException {
- 
-            return 0;
-        }
-
-
-        @Override
-        public boolean nextKeyValue() throws IOException, InterruptedException {
-            return xmlLoaderBPIS.isReadable();
-        }
-        
+    public String toString() {
+      return wrapped.toString();
     }
+  }
+
+  /**Location of the file loaded*/
+  private String loadLocation;
+
+  /**Underlying record reader*/
+  @SuppressWarnings("rawtypes")
+  protected RecordReader in = null;
+
+  /**XML tag to parse*/
+  private String identifier;
+
+  public XMLLoader(String identifier) {
+    this.identifier = identifier;
+  }
+
+  @Override
+  public void prepareToRead(RecordReader reader, PigSplit split)
+      throws IOException {
+    in = reader;
+  }
+
+  @Override
+  public Tuple getNext() throws IOException {
+    try {
+      if (!in.nextKeyValue())
+	return null;
+      Tuple tuple = createTuple(in.getCurrentValue().toString());
+      return tuple;
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+      return null;
+    }
+  }
+
+
+  /**
+   * Creates a tuple from a matched string
+   */
+  public Tuple createTuple(String str) {
+    return TupleFactory.getInstance().newTuple(new DataByteArray(str));
+  }
+
+  @SuppressWarnings("rawtypes")
+  @Override
+  public InputFormat getInputFormat() throws IOException {
+    if(loadLocation.endsWith(".bz2") || loadLocation.endsWith(".bz")) {
+      return new Bzip2TextInputFormat() {
+	@Override
+	public RecordReader<LongWritable, Text> createRecordReader(
+	    InputSplit split, TaskAttemptContext context) {
+	  try {
+	    RecordReader<LongWritable, Text> originalReader =
+		super.createRecordReader(split, context);
+	    XMLRecordReader reader = new XMLRecordReader(originalReader);
+	    reader.setXMLIdentifier(identifier);
+	    return reader;
+	  } catch (IOException e) {
+	    throw new RuntimeException("Cannot create input split", e);
+	  } catch (InterruptedException e) {
+	    throw new RuntimeException("Cannot create input split", e);
+	  }
+	}
+      };
+    } else {
+      return new PigTextInputFormat() {
+	@Override
+	public RecordReader<LongWritable, Text> createRecordReader(
+	    InputSplit split, TaskAttemptContext context) {
+	  RecordReader<LongWritable, Text> originalReader =
+	      super.createRecordReader(split, context);
+	  XMLRecordReader reader = new XMLRecordReader(originalReader);
+	  reader.setXMLIdentifier(identifier);
+	  return reader;
+	}
+      };
+    }
+  }
+
+  @Override
+  public void setLocation(String location, Job job) throws IOException {
+    loadLocation = location;
+    FileInputFormat.setInputPaths(job, location);
+  }
 }

Added: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/evaluation/xml/data/test-file-2.xml.bz2
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/evaluation/xml/data/test-file-2.xml.bz2?rev=1589911&view=auto
==============================================================================
Binary file - no diff available.

Propchange: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/evaluation/xml/data/test-file-2.xml.bz2
------------------------------------------------------------------------------
    svn:mime-type = application/octet-stream

Modified: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestXMLLoader.java
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestXMLLoader.java?rev=1589911&r1=1589910&r2=1589911&view=diff
==============================================================================
--- pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestXMLLoader.java (original)
+++ pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestXMLLoader.java Thu Apr 24 23:58:39 2014
@@ -17,21 +17,29 @@ import static org.apache.pig.ExecType.LO
 
 import java.io.ByteArrayInputStream;
 import java.io.File;
+import java.io.PrintStream;
 import java.util.ArrayList;
 import java.util.Iterator;
+import java.util.Random;
 
 import javax.xml.parsers.DocumentBuilder;
 import javax.xml.parsers.DocumentBuilderFactory;
 
 import junit.framework.TestCase;
 
+import org.apache.commons.compress.compressors.bzip2.BZip2CompressorOutputStream;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.pig.ExecType;
 import org.apache.pig.PigServer;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.test.Util;
+import org.w3c.dom.Document;
 
 public class TestXMLLoader extends TestCase {
-  private static String patternString = "(\\d+)!+(\\w+)~+(\\w+)";
   public static ArrayList<String[]> data = new ArrayList<String[]>();
   static {
     data.add(new String[] { "<configuration>"});
@@ -89,13 +97,11 @@ public class TestXMLLoader extends TestC
     inlineClosedTags.add(new String[] { "<event id='33'><tag k='a' v='b'/></event>"});
     inlineClosedTags.add(new String[] { "</events>"});
   }
-  
   public void testShouldReturn0TupleCountIfSearchTagIsNotFound () throws Exception
   {
     String filename = TestHelper.createTempFile(data, "");
     PigServer pig = new PigServer(LOCAL);
     filename = filename.replace("\\", "\\\\");
-    patternString = patternString.replace("\\", "\\\\");
     String query = "A = LOAD '" + filename + "' USING org.apache.pig.piggybank.storage.XMLLoader('invalid') as (doc:chararray);";
     pig.registerQuery(query);
     Iterator<?> it = pig.openIterator("A");
@@ -118,7 +124,6 @@ public class TestXMLLoader extends TestC
     String filename = TestHelper.createTempFile(data, "");
     PigServer pig = new PigServer(LOCAL);
     filename = filename.replace("\\", "\\\\");
-    patternString = patternString.replace("\\", "\\\\");
     String query = "A = LOAD '" + filename + "' USING org.apache.pig.piggybank.storage.XMLLoader('property') as (doc:chararray);";
     pig.registerQuery(query);
     Iterator<?> it = pig.openIterator("A");
@@ -175,7 +180,6 @@ public class TestXMLLoader extends TestC
     }
 	    
    }
-
    public void testLoaderShouldLoadBasicGzFile() throws Exception {
     String filename = TestHelper.createTempFile(data, "");
 	  
@@ -224,7 +228,6 @@ public class TestXMLLoader extends TestC
       String filename = TestHelper.createTempFile(testData, "");
       PigServer pig = new PigServer(LOCAL);
       filename = filename.replace("\\", "\\\\");
-      patternString = patternString.replace("\\", "\\\\");
       String query = "A = LOAD '" + filename + "' USING org.apache.pig.piggybank.storage.XMLLoader('name') as (doc:chararray);";
       pig.registerQuery(query);
       Iterator<?> it = pig.openIterator("A");
@@ -248,7 +251,6 @@ public class TestXMLLoader extends TestC
       String filename = TestHelper.createTempFile(data, "");
       PigServer pig = new PigServer(LOCAL);
       filename = filename.replace("\\", "\\\\");
-      patternString = patternString.replace("\\", "\\\\");
       String query = "A = LOAD '" + filename + "' USING org.apache.pig.piggybank.storage.XMLLoader('ignoreProperty') as (doc:chararray);";
       pig.registerQuery(query);
       Iterator<?> it = pig.openIterator("A");
@@ -265,14 +267,13 @@ public class TestXMLLoader extends TestC
       }
       assertEquals(1, tupleCount);  
    }
-   
    public void testShouldReturn0TupleCountIfNoEndTagIsFound() throws Exception
    {
       // modify the data content to avoid end tag for </ignoreProperty>
       ArrayList<String[]> testData = new ArrayList<String[]>();
       for (String content[] : data) {
          
-         if(false == data.equals(new String[] { "</ignoreProperty>"}))
+         if(!content[0].equals("</ignoreProperty>"))
          {
             testData.add(content);
          }
@@ -281,8 +282,7 @@ public class TestXMLLoader extends TestC
       String filename = TestHelper.createTempFile(testData, "");
       PigServer pig = new PigServer(LOCAL);
       filename = filename.replace("\\", "\\\\");
-      patternString = patternString.replace("\\", "\\\\");
-      String query = "A = LOAD '" + filename + "' USING org.apache.pig.piggybank.storage.XMLLoader('</ignoreProperty>') as (doc:chararray);";
+      String query = "A = LOAD '" + filename + "' USING org.apache.pig.piggybank.storage.XMLLoader('ignoreProperty') as (doc:chararray);";
       pig.registerQuery(query);
       Iterator<?> it = pig.openIterator("A");
       int tupleCount = 0;
@@ -307,8 +307,7 @@ public class TestXMLLoader extends TestC
       String filename = TestHelper.createTempFile(testData, "");
       PigServer pig = new PigServer(LOCAL);
       filename = filename.replace("\\", "\\\\");
-      patternString = patternString.replace("\\", "\\\\");
-      String query = "A = LOAD '" + filename + "' USING org.apache.pig.piggybank.storage.XMLLoader('</ignoreProperty>') as (doc:chararray);";
+      String query = "A = LOAD '" + filename + "' USING org.apache.pig.piggybank.storage.XMLLoader('ignoreProperty') as (doc:chararray);";
       pig.registerQuery(query);
       Iterator<?> it = pig.openIterator("A");
       int tupleCount = 0;
@@ -330,7 +329,6 @@ public class TestXMLLoader extends TestC
       String filename = TestHelper.createTempFile(nestedTags, "");
       PigServer pig = new PigServer(LOCAL);
       filename = filename.replace("\\", "\\\\");
-      patternString = patternString.replace("\\", "\\\\");
       String query = "A = LOAD '" + filename + "' USING org.apache.pig.piggybank.storage.XMLLoader('event') as (doc:chararray);";
       pig.registerQuery(query);
       Iterator<?> it = pig.openIterator("A");
@@ -353,7 +351,6 @@ public class TestXMLLoader extends TestC
      String filename = TestHelper.createTempFile(inlineClosedTags, "");
      PigServer pig = new PigServer(LOCAL);
      filename = filename.replace("\\", "\\\\");
-     patternString = patternString.replace("\\", "\\\\");
      String query = "A = LOAD '" + filename + "' USING org.apache.pig.piggybank.storage.XMLLoader('event') as (doc:chararray);";
      pig.registerQuery(query);
      Iterator<?> it = pig.openIterator("A");
@@ -375,7 +372,6 @@ public class TestXMLLoader extends TestC
      String filename = TestHelper.createTempFile(inlineClosedTags, "");
      PigServer pig = new PigServer(LOCAL);
      filename = filename.replace("\\", "\\\\");
-     patternString = patternString.replace("\\", "\\\\");
      String query = "A = LOAD '" + filename + "' USING org.apache.pig.piggybank.storage.XMLLoader('event') as (doc:chararray);";
      pig.registerQuery(query);
      Iterator<?> it = pig.openIterator("A");
@@ -391,4 +387,108 @@ public class TestXMLLoader extends TestC
        }
      }
    }
+
+   /**
+    * This test case test the special case when a non-matching tag spans two file
+    * splits in a .bz2 compressed file. At the same time, the part that falls in
+    * the first split is a prefix of the matching tag.
+    * In other words, till the end of the first split, it looks like the tag is
+    * matching but it is not actually matching.
+    * 
+    * @throws Exception
+    */
+   public void testXMLLoaderShouldNotReturnLastNonMatchedTag() throws Exception {
+     Configuration conf = new Configuration();
+     long blockSize = 100 * 1024;
+     conf.setLong("fs.local.block.size", blockSize);
+
+     String tagName = "event";
+
+     PigServer pig = new PigServer(LOCAL, conf);
+     FileSystem localFs = FileSystem.getLocal(conf);
+     FileStatus[] testFiles = localFs.globStatus(new Path("src/test/java/org/apache/pig/piggybank/test/evaluation/xml/data/*xml.bz2"));
+     assertTrue("No test files", testFiles.length > 0);
+     for (FileStatus testFile : testFiles) {
+       String testFileName = testFile.getPath().toUri().getPath().replace("\\", "\\\\");
+       String query = "A = LOAD '" + testFileName + "' USING org.apache.pig.piggybank.storage.XMLLoader('event') as (doc:chararray);";
+       pig.registerQuery(query);
+       Iterator<?> it = pig.openIterator("A");
+       while (it.hasNext()) {
+         Tuple tuple = (Tuple) it.next();
+         if (tuple == null)
+           break;
+         else {
+           if (tuple.size() > 0) {
+             assertTrue(((String)tuple.get(0)).startsWith("<"+tagName+">"));
+           }
+         }
+       }
+     }
+   }
+
+   /**
+    * This test checks that a multi-line tag spanning two splits should be
+    * matched.
+    * @throws Exception
+    */
+   public void testXMLLoaderShouldMatchTagSpanningSplits() throws Exception {
+     Configuration conf = new Configuration();
+     long blockSize = 512;
+     conf.setLong("fs.local.block.size", blockSize);
+     conf.setLong("mapred.max.split.size", blockSize);
+
+     String tagName = "event";
+     File tempFile = File.createTempFile("long-file", ".xml");
+     FileSystem localFs = FileSystem.getLocal(conf);
+     FSDataOutputStream directOut = localFs.create(new Path(tempFile.getAbsolutePath()), true);
+
+     String matchingElement = "<event>\ndata\n</event>\n";
+     long pos = 0;
+     int matchingCount = 0;
+     PrintStream ps = new PrintStream(directOut);
+     // 1- Write some elements that fit completely in the first block
+     while (pos + 2 * matchingElement.length() < blockSize) {
+       ps.print(matchingElement);
+       pos += matchingElement.length();
+       matchingCount++;
+     }
+     // 2- Write a long element that spans multiple lines and multiple blocks
+     String longElement = matchingElement.replace("data",
+         "data\ndata\ndata\ndata\ndata\ndata\ndata\ndata\ndata\ndata\ndata\n");
+     ps.print(longElement);
+     pos += longElement.length();
+     matchingCount++;
+     // 3- Write some more elements to fill in the second block completely
+     while (pos < 2 * blockSize) {
+       ps.print(matchingElement);
+       pos += matchingElement.length();
+       matchingCount++;
+     }
+     ps.close();
+     
+     
+     PigServer pig = new PigServer(LOCAL, conf);
+     String tempFileName = tempFile.getAbsolutePath().replace("\\", "\\\\");
+     String query = "A = LOAD '" + tempFileName + "' USING org.apache.pig.piggybank.storage.XMLLoader('event') as (doc:chararray);";
+     pig.registerQuery(query);
+     Iterator<?> it = pig.openIterator("A");
+     
+     int count = 0;
+     while (it.hasNext()) {
+       Tuple tuple = (Tuple) it.next();
+       if (tuple == null)
+         break;
+       else {
+         if (tuple.size() > 0) {
+           count++;
+           // Make sure the returned text is a proper XML element
+           DocumentBuilder docBuilder =
+               DocumentBuilderFactory.newInstance().newDocumentBuilder();
+           Document doc = docBuilder.parse(new ByteArrayInputStream(((String)tuple.get(0)).getBytes()));
+           assertTrue(doc.getDocumentElement().getNodeName().equals(tagName));
+         }
+       }
+     }
+     assertEquals(matchingCount, count);
+   }
 }