You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ga...@apache.org on 2010/03/17 17:26:59 UTC

svn commit: r924355 - in /hadoop/pig/trunk/contrib: CHANGES.txt piggybank/java/src/main/java/org/apache/pig/piggybank/storage/XMLLoader.java piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestXMLLoader.java

Author: gates
Date: Wed Mar 17 16:26:59 2010
New Revision: 924355

URL: http://svn.apache.org/viewvc?rev=924355&view=rev
Log:
PIG-1284 Added XMLLoader to piggybank.

Added:
    hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/XMLLoader.java
    hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestXMLLoader.java
Modified:
    hadoop/pig/trunk/contrib/CHANGES.txt

Modified: hadoop/pig/trunk/contrib/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/CHANGES.txt?rev=924355&r1=924354&r2=924355&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/CHANGES.txt (original)
+++ hadoop/pig/trunk/contrib/CHANGES.txt Wed Mar 17 16:26:59 2010
@@ -1,10 +1,111 @@
-PIG-1126: updated fieldsToRead function (olgan)
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+Pig Change Log
+
+Trunk (unreleased changes)
+
+INCOMPATIBLE CHANGES
+
+IMPROVEMENTS
+
+PIG-1284 Added XMLLoader to piggybank (aloknsingh via gates)
+
+OPTIMIZATIONS
+
+BUG FIXES
+
+Release 0.6.0
+
+INCOMPATIBLE CHANGES
+
+PIG-1126: updated fieldsToRead function for piggybank loaders (olgan)
+
+IMPROVEMENTS
+
 PIG-1015: [piggybank] DateExtractor should take into account timezones
 (dryaboy via  olgan)
-PIG-911: Added SequenceFileLoader (dryaboy via gates)
+
+OPTIMIZATIONS
+
+BUG FIXES
+
+Release 0.5.0 - Released
+
+INCOMPATIBLE CHANGES
+
+IMPROVEMENTS
+
+PIG-911: Added SequenceFileLoader to piggybank (dryaboy via gates)
+
+OPTIMIZATIONS
+
+BUG FIXES
+
+Release 0.4.0 - Released
+
+INCOMPATIBLE CHANGES
+
+IMPROVEMENTS
+
 PIG-885: New UDFs for piggybank (Bin, Decode, LookupInFiles, RegexExtract, RegexMatch, HashFVN, DiffDate) (daijy)
-PIG-868: added strin manipulation functions (bennies via olgan)
-PIG-273: addition of Top and SearchQuery UDFs (ankur via olgan)
+
+PIG-868: added strin manipulation functions to piggybank (bennies via olgan)
+
+OPTIMIZATIONS
+
+BUG FIXES
+
+Release 0.3.0 - Released
+
+INCOMPATIBLE CHANGES
+
+IMPROVEMENTS
+
+PIG-732: addition of Top and SearchQuery UDFs to piggybank (ankur via olgan)
+
+OPTIMIZATIONS
+
+BUG FIXES
+
+Release 0.2.0 - Released
+
+INCOMPATIBLE CHANGES
+
+IMPROVEMENTS
+
+OPTIMIZATIONS
+
+BUG FIXES
+
+Release 0.1.0 - Released
+
+INCOMPATIBLE CHANGES
+
+IMPROVEMENTS
+
 PIG-246: created UDF repository (olgan)
-PIG-245: UDF wrappers for Java Math functions (ajaygarg via olgan)
-PIG-277: UDF for computing correlation and covariance between data sets (ajaygarg via olgan)
+
+PIG-245: UDF wrappers for Java Math functions added to piggybank (ajaygarg via olgan)
+
+PIG-277: UDF for computing correlation and covariance between data sets added to piggybank (ajaygarg via olgan)
+
+OPTIMIZATIONS
+
+BUG FIXES
+

Added: hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/XMLLoader.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/XMLLoader.java?rev=924355&view=auto
==============================================================================
--- hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/XMLLoader.java (added)
+++ hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/XMLLoader.java Wed Mar 17 16:26:59 2010
@@ -0,0 +1,610 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.piggybank.storage;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.pig.LoadFunc;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.io.BufferedPositionedInputStream;
+
+
+
+/**
+ * A <code>XMLLoaderBufferedPositionedInputStream</code> is the package class and is the 
+ * decorater overthe BufferedPositionedInputStream which in turn decorate
+ * BufferedInputStream. It contains <code>BufferedPositionedInputStream<code>
+ * input stream, which it uses as
+ * its  basic source of data, possibly reading or providing  additional
+ * functionality. The class <code>XMLLoaderBufferedPositionedInputStream</code>
+ * itself simply overrides the necessary medthod for reading i.e 
+ * <code>read</code> <code>getPosition<code> with versions that
+ * pass all requests to the contained  input
+ * stream or do some special processing. Subclasses of <code>XMLLoaderBufferedPositionedInputStream</code>
+ * may further override some of  these methods
+ * and may also provide additional methods
+ * and fields.
+ * It also provides additional method <code>collectTag<collect> which will give the byte 
+ * array between the tag which is a xml record. i.e <tag> .*</tag> will be returned
+ *
+ * @note we can't use the standard SAX or STAX parser as for a big xml 
+ *       the intermetant hadoop block may not be the valid xml and hence those
+ *       parser may create pb. 
+ *
+ * @since   pig 2.0
+ */
+
+class XMLLoaderBufferedPositionedInputStream extends BufferedPositionedInputStream { 
+
+    public final static int S_START = 0;
+    public final static int S_MATCH_PREFIX = 1;
+    public final static int S_MATCH_TAG = 2;
+
+    /**
+     * The input streamed to be filtered 
+     */
+    InputStream wrapperIn;
+
+    /**
+     * The field to know if the underlying buffer contains any more bytes
+     */
+    boolean _isReadable;
+
+    /**
+     * Creates a <code>XMLLoaderBufferedPositionedInputStream</code>
+     * by assigning the  argument <code>in</code>
+     * to the field <code>this.wrapperIn</code> so as
+     * to remember it for later use.
+     *
+     * @param   in   the underlying input stream,
+     */
+    public XMLLoaderBufferedPositionedInputStream(InputStream in){
+        super(in);
+        this.wrapperIn = in;
+        setReadable(true);
+    }
+
+    /**
+     * Since the input stream is control by Pig or hadoop 
+     * stream and there seems to be issue with multiple closing 
+     * with hadoop and pig
+     *
+     * @exception  IOException  if an I/O error occurs.
+     */
+    public void close() throws IOException {
+      //  throw new IOException("Closing stream BAD");
+    }
+
+    /**
+     * Set the stream readable or non readable. This is needed
+     * to control the xml parsing.
+     * @param flag The boolean flag to be set
+     * @see XMLLoaderBufferedPositionedInputStream#isReadable
+     */
+    private void setReadable(boolean flag) {
+       _isReadable = flag;
+    }
+
+    /**
+     * See if the stream readable or non readable. This is needed
+     * to control the xml parsing.
+     * @return  true if readable otherwise false
+     * @see XMLLoaderBufferedPositionedInputStream#setReadable
+     */
+    public boolean isReadable() {
+       return _isReadable == true;
+    }
+
+    /**
+     * @Override org.apache.pig.impl.io.BufferedPositionedInputStream.read
+     * It is just the wrapper for now.
+     * Reads the next byte of data from this input stream. The value
+     * byte is returned as an <code>int</code> in the range
+     * <code>0</code> to <code>255</code>. If no byte is available
+     * because the end of the stream has been reached, the value
+     * <code>-1</code> is returned. This method blocks until input data
+     * is available, the end of the stream is detected, or an exception
+     * is thrown.
+     * <p>
+     * This method
+     * simply performs <code>in.read()</code> and returns the result.
+     *
+     * @return     the next byte of data, or <code>-1</code> if the end of the
+     *             stream is reached.
+     * @exception  IOException  if an I/O error occurs.
+     * @see        XMLLoaderBufferedPositionedInputStreamInputStream#wrapperIn
+     */
+    public int read() throws IOException {
+       return wrapperIn.read();
+    }
+
+    /**
+     * This is collect the bytes from current position to the ending tag.
+     * This scans for the tags and do the pattern match byte by byte
+     * this must be used along with 
+     *  XMLLoaderBufferedPositionedInputStream#skipToTag
+     *
+     * @param tagName the end tag to search for
+     *
+     * @param limit the end pointer for the block for this mapper
+     *
+     * @return the byte array containing the documents until the end of tag
+     *
+     * @see loader.XMLLoaderBufferedPositionedInputStream.collectUntilEndTag
+     *
+     */
+    private byte[] collectUntilEndTag(String tagName, long limit) {
+
+      //@todo use the charset and get the charset encoding from the xml encoding.
+      byte[] tmp = tagName.getBytes();
+      byte[] tag = new byte[tmp.length + 3];
+      tag[0] = (byte)'<';
+      tag[1] = (byte)'/';
+      for (int i = 0; i < tmp.length; ++i) {
+        tag[2+i] = tmp[i];
+      }
+      tag[tmp.length+2] = (byte)'>';
+      // System.out.println("[collectUntilEndTag] TAG " + tag + tagName); // DEBUG
+
+      ByteArrayOutputStream collectBuf = new ByteArrayOutputStream(1024);
+      int idxTagChar = 0;
+      while (true) {
+        int b = -1;
+        try {
+          b = this.read();
+          if (b == -1) {
+            collectBuf.reset();
+            this.setReadable(false);
+            break;
+          }
+          collectBuf.write((byte)(b));
+
+          // start to match the target close tag
+          if (b == tag[idxTagChar]) {
+            ++idxTagChar;
+            if (idxTagChar == tag.length) {
+              break;
+            }
+          } else {
+            idxTagChar = 0;
+          }
+        }
+        catch (IOException e) {
+          this.setReadable(false);
+          return null;
+        }
+      }
+      // DEBUG
+      //System.out.println("Match = " + new String(collectBuf.toByteArray()));
+      return collectBuf.toByteArray();
+    }
+
+    /**
+     * This is collect the from the matching tag.
+     * This scans for the tags and do the pattern match byte by byte
+     * This returns a part doc. it must be used along with 
+     * XMLLoaderBufferedPositionedInputStream#collectUntilEndTag
+     * 
+     * @param tagName the start tag to search for
+     *
+     * @param limit the end pointer for the block for this mapper
+     *
+     * @return the byte array containing match of the tag.
+     *
+     * @see loader.XMLLoaderBufferedPositionedInputStream.collectUntilEndTag
+     *
+     */
+    private byte[] skipToTag(String tagName, long limit) {
+      
+      //@todo use the charset and get the charset encoding from the xml encoding.
+      byte[] tmp = tagName.getBytes();
+      byte[] tag = new byte[tmp.length + 1];
+      tag[0] = (byte)'<';
+      for (int i = 0; i < tmp.length; ++i) {
+        tag[1+i] = tmp[i];
+      }
+      //System.out.println("[skipToTag] TAG " + tag + tagName); // DEBUG
+
+      ByteArrayOutputStream matchBuf = new ByteArrayOutputStream(512);
+      int idxTagChar = 0;
+      int state = S_START;
+      while (true) {
+        int b = -1;
+        try {
+          b = this.read();
+          if (b == -1) {
+            state = S_START;
+            matchBuf.reset();
+            this.setReadable(false);
+            break;
+          }
+          switch (state) {
+            case S_START:
+              // start to match the target open tag
+              if (b == tag[idxTagChar]) {
+                ++idxTagChar;
+                matchBuf.write((byte)(b));
+                if (idxTagChar == tag.length) {
+                  state = S_MATCH_PREFIX;
+                }
+              } else {  // mismatch
+                idxTagChar = 0;
+                matchBuf.reset();
+              }
+              break;
+            case S_MATCH_PREFIX:
+              // tag match iff next character is whitespaces or close tag mark
+              if (b == ' ' || b == '\t' || b == '>') {
+                matchBuf.write((byte)(b));
+                state = S_MATCH_TAG;
+              } else {
+                idxTagChar = 0;
+                matchBuf.reset();
+                state = S_START;
+              }
+              break;
+            case S_MATCH_TAG:
+              // keep copy characters until we hit the close tag mark
+              matchBuf.write((byte)(b));
+              break;
+            default:
+              throw new IllegalArgumentException("Invalid state: " + state);
+          }
+          if (state == S_MATCH_TAG && b == '>') {
+            break;
+          }
+          if (state != S_MATCH_TAG && this.getPosition() > limit) {
+            // need to break, no record in this block
+            break;
+          }
+          // DEBUG
+          /*
+          if (idxTagChar > 0) {
+            System.out.println("Match b='" + (char)b + "'"
+                + ", tag='" + (char)tag[idxTagChar-1] + "'"
+                + ", idxTagChar=" + (idxTagChar-1)
+                + ", tagLength=" + tag.length);
+          } else {
+            System.out.println("Mismatch b='" + (char)b + "'"
+                + ", tag='" + (char)tag[idxTagChar] + "'"
+                + ", idxTagChar=" + (idxTagChar)
+                + ", tagLength=" + tag.length);
+          }
+          */
+        }
+        catch (IOException e) {
+          this.setReadable(false);
+          return null;
+        }
+      }
+      // DEBUG
+      //System.out.println("Match = " + new String(matchBuf.toByteArray()));
+      return matchBuf.toByteArray();
+    }
+
+    /**
+     * This is collect bytes from start and end tag both inclusive
+     * This scans for the tags and do the pattern match byte by byte
+     * 
+     * @param tagName the start tag to search for
+     *
+     * @param limit the end pointer for the block for this mapper
+     *
+     * @return the byte array containing match of the <code><tag>.*</tag><code>.
+     *
+     * @see loader.XMLLoaderBufferedPositionedInputStream.skipToTag
+     *
+     * @see loader.XMLLoaderBufferedPositionedInputStream.collectUntilEndTag
+     *
+     */
+    byte[] collectTag(String tagName, long limit) throws IOException {
+       ByteArrayOutputStream collectBuf = new ByteArrayOutputStream(1024);
+       byte[] beginTag = skipToTag(tagName, limit);
+       byte[] untilTag = collectUntilEndTag(tagName, limit);
+
+       if (beginTag.length > 0 && untilTag.length > 0) {
+           for (byte b: beginTag) {
+              collectBuf.write(b);
+           }
+           for (byte b: untilTag) {
+              collectBuf.write(b);
+           }
+       }
+       return collectBuf.toByteArray();
+    }
+
+}
+
+
+/**
+ * The load function to load the XML file
+ * This implements the LoadFunc interface which is used to parse records
+ * from a dataset. The various helper adaptor function is extended from loader.Utf8StorageConverter
+ * which included various functions to cast raw byte data into various datatypes. 
+ * other sections of the code can call back to the loader to do the cast.
+ * This takes a xmlTag as the arg which it will use to split the inputdataset into
+ * multiple records. 
+ * <code>
+ *    
+ * For example if the input xml (input.xml) is like this
+ *     <configuration>
+ *         <property>
+ *            <name> foobar </name>
+ *            <value> barfoo </value>
+ *         </property>
+ *         <ignoreProperty>
+ *           <name> foo </name>
+ *         </ignoreProperty>
+ *         <property>
+ *            <name> justname </name>
+ *         </property>
+ *     </configuration>
+ *
+ *    And your pig script is like this
+ *
+ *    --load the jar files
+ *    register /homes/aloks/pig/udfLib/loader.jar;
+ *    -- load the dataset using XMLLoader
+ *    -- A is the bag containing the tuple which contains one atom i.e doc see output
+ *    A = load '/user/aloks/pig/input.xml using loader.XMLLoader('property') as (doc:chararray);
+ *    --dump the result
+ *    dump A;
+ *
+ *
+ *    Then you will get the output
+ *
+ *    (<property>
+ *             <name> foobar </name>
+ *             <value> barfoo </value>
+ *          </property>)
+ *    (<property>
+ *             <name> justname </name>
+ *          </property>)
+ *
+ *
+ *    Where each () indicate one record 
+ *
+ * 
+ * </code>
+ */
+
+public class XMLLoader extends LoadFunc {
+
+    /**
+     * logger from pig
+     */
+    protected final Log mLog = LogFactory.getLog(getClass());
+
+    private XMLFileRecordReader reader = null;
+
+
+    /**
+     * the tuple content which is used while returning
+     */
+    private ArrayList<Object> mProtoTuple = null;
+
+    /**
+     * The record seperated. The default value is 'document'
+     */
+    public String recordIdentifier = "document";
+
+    
+    public XMLLoader() {
+
+    }
+
+    /**
+     * Constructs a Pig loader that uses specified string as the record seperater
+     * for example if the recordIdentifier is document. It will consider the record as 
+     * <document> .* </document>
+     * 
+     * @param recordIdentifier the xml tag which is used to pull records
+     *
+     */
+    public XMLLoader(String recordIdentifier) {
+        this();
+        this.recordIdentifier = recordIdentifier;
+    }
+
+    /**
+     * Retrieves the next tuple to be processed.
+     * @return the next tuple to be processed or null if there are no more tuples
+     * to be processed.
+     * @throws IOException
+     */
+    @Override
+    public Tuple getNext() throws IOException {
+ 
+        boolean next = false;
+        
+        try {
+            next = reader.nextKeyValue();
+        } catch (InterruptedException e) {
+            throw new IOException(e);
+        }
+        
+        if (!next) return null;
+        
+        Tuple t = null;
+        
+        try {
+            byte[] tagContent = (byte[]) reader.getCurrentValue();
+            t = createTuple(tagContent);
+        } catch (Exception e) {
+            throw new IOException(e);
+        }
+
+        return t; 
+ 
+    }
+    
+    public Tuple createTuple(byte[] content) throws Exception {
+        if (mProtoTuple == null) {
+            mProtoTuple = new ArrayList<Object>();
+        }
+        if (content.length > 0) {
+            mProtoTuple.add(new DataByteArray(content));
+        }
+        Tuple t = TupleFactory.getInstance().newTupleNoCopy(mProtoTuple);
+        mProtoTuple = null;
+
+        return t;
+    }
+
+    /**
+     * to check for equality 
+     * @param object 
+     */
+    public boolean equals(Object obj) {
+        return equals((XMLLoader)obj);
+    }
+
+    /**
+     * to check for equality 
+     * @param XMLLoader object 
+     */
+    public boolean equals(XMLLoader other) {
+        return this.recordIdentifier.equals(other.recordIdentifier);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public InputFormat getInputFormat() throws IOException {
+        return new XMLFileInputFormat(recordIdentifier);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public void prepareToRead(RecordReader reader, PigSplit split)
+            throws IOException {
+        this.reader = (XMLFileRecordReader) reader;
+    }
+
+    @Override
+    public void setLocation(String location, Job job) throws IOException {
+        FileInputFormat.setInputPaths(job, location);
+    }
+    
+    //------------------------------------------------------------------------
+    // Implementation of InputFormat
+    
+    public static class XMLFileInputFormat extends FileInputFormat {
+
+        private String recordIdentifier;
+        
+        public XMLFileInputFormat(String recordIdentifier) {
+            this.recordIdentifier = recordIdentifier;
+        }
+        
+        @SuppressWarnings("unchecked")
+        @Override
+        public RecordReader createRecordReader(InputSplit split,
+                TaskAttemptContext context) throws IOException,
+                InterruptedException {
+            
+            return new XMLFileRecordReader(recordIdentifier);
+        }     
+    }
+    
+    //------------------------------------------------------------------------
+    // Implementation of RecordReader
+    
+    public static class XMLFileRecordReader extends RecordReader {
+
+        private long start;
+        private long end;
+        private String recordIdentifier;
+        
+        /*
+         * xmlloader input stream which has the ability to split the input
+         * dataset into records by the specified tag
+         */
+        private XMLLoaderBufferedPositionedInputStream xmlLoaderBPIS = null;
+
+        public XMLFileRecordReader(String recordIdentifier) {
+            this.recordIdentifier = recordIdentifier;
+        }
+        
+        @Override
+        public void initialize(InputSplit genericSplit, TaskAttemptContext context)
+                throws IOException, InterruptedException {
+            FileSplit split = (FileSplit) genericSplit;
+            Configuration job = context.getConfiguration();
+ 
+            start = split.getStart();
+            end = start + split.getLength();
+            final Path file = split.getPath();
+ 
+            // open the file and seek to the start of the split
+            FileSystem fs = file.getFileSystem(job);
+            FSDataInputStream fileIn = fs.open(split.getPath());
+        
+            this.xmlLoaderBPIS = new XMLLoaderBufferedPositionedInputStream(fileIn);
+        }
+
+        
+        @Override
+        public void close() throws IOException {
+            xmlLoaderBPIS.close();
+        }
+
+        @Override
+        public Object getCurrentKey() throws IOException, InterruptedException {
+            return null;
+        }
+
+        @Override
+        public Object getCurrentValue() throws IOException,
+                InterruptedException {            
+            return xmlLoaderBPIS.collectTag(recordIdentifier, end);
+        }
+
+        @Override
+        public float getProgress() throws IOException, InterruptedException {
+ 
+            return 0;
+        }
+
+
+        @Override
+        public boolean nextKeyValue() throws IOException, InterruptedException {
+            return xmlLoaderBPIS.isReadable();
+        }
+        
+    }
+}

Added: hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestXMLLoader.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestXMLLoader.java?rev=924355&view=auto
==============================================================================
--- hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestXMLLoader.java (added)
+++ hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestXMLLoader.java Wed Mar 17 16:26:59 2010
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the
+ * NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is
+ * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.pig.piggybank.test.storage;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.regex.Pattern;
+
+import junit.framework.TestCase;
+
+import org.apache.pig.PigServer;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.Tuple;
+import static org.apache.pig.ExecType.LOCAL;
+
+public class TestXMLLoader extends TestCase {
+  private static String patternString = "(\\d+)!+(\\w+)~+(\\w+)";
+  private final static Pattern pattern = Pattern.compile(patternString);
+  public static ArrayList<String[]> data = new ArrayList<String[]>();
+  static {
+    data.add(new String[] { "<configuration>"});
+    data.add(new String[] { "<property>"});
+    data.add(new String[] { "<name> foobar </name>"});
+    data.add(new String[] { "<value> barfoo </value>"});
+    data.add(new String[] { "</property>"});
+    data.add(new String[] { "<ignoreProperty>"});
+    data.add(new String[] { "<name> foo </name>"});
+    data.add(new String[] { "</ignoreProperty>"});
+    data.add(new String[] { "<property>"});
+    data.add(new String[] { "<name> justname </name>"});
+    data.add(new String[] { "</property>"});
+    data.add(new String[] { "</configuration>"});
+  }
+
+  public void testLoadXMLLoader() throws Exception {
+    //ArrayList<DataByteArray[]> expected = TestHelper.getExpected(data, pattern);
+    String filename = TestHelper.createTempFile(data, "");
+    PigServer pig = new PigServer(LOCAL);
+    filename = filename.replace("\\", "\\\\");
+    patternString = patternString.replace("\\", "\\\\");
+    String query = "A = LOAD 'file:" + filename + "' USING org.apache.pig.piggybank.storage.XMLLoader('property') as (doc:chararray);";
+    pig.registerQuery(query);
+    Iterator<?> it = pig.openIterator("A");
+    int tupleCount = 0;
+    while (it.hasNext()) {
+      Tuple tuple = (Tuple) it.next();
+      if (tuple == null)
+        break;
+      else {
+        //TestHelper.examineTuple(expected, tuple, tupleCount);
+        if (tuple.size() > 0) {
+            tupleCount++;
+            //System.out.println("tuple=" + tuple+":"+tuple.size());
+        }
+      }
+    }
+    assertEquals(3, tupleCount); // pig adds extra 
+  }
+}