You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ga...@apache.org on 2011/02/25 20:05:39 UTC

svn commit: r1074670 - in /pig/trunk/contrib: CHANGES.txt piggybank/java/src/main/java/org/apache/pig/piggybank/storage/XMLLoader.java piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestXMLLoader.java

Author: gates
Date: Fri Feb 25 19:05:38 2011
New Revision: 1074670

URL: http://svn.apache.org/viewvc?rev=1074670&view=rev
Log:
PIG-1842 Improve Scalability of the XMLLoader for large datasets such as wikipedia

Modified:
    pig/trunk/contrib/CHANGES.txt
    pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/XMLLoader.java
    pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestXMLLoader.java

Modified: pig/trunk/contrib/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/CHANGES.txt?rev=1074670&r1=1074669&r2=1074670&view=diff
==============================================================================
--- pig/trunk/contrib/CHANGES.txt (original)
+++ pig/trunk/contrib/CHANGES.txt Fri Feb 25 19:05:38 2011
@@ -24,6 +24,9 @@ INCOMPATIBLE CHANGES
 
 IMPROVEMENTS
 
+PIG-1842 Improve Scalability of the XMLLoader for large datasets such as
+wikipedia (vivekp via gates)
+
 PIG-1781 Piggybank: ISOToDay disregards timezone (should use ISODateTimeFormat
 instead of DateTime to parse) (misterbeebee via gates)
 

Modified: pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/XMLLoader.java
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/XMLLoader.java?rev=1074670&r1=1074669&r2=1074670&view=diff
==============================================================================
--- pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/XMLLoader.java (original)
+++ pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/XMLLoader.java Fri Feb 25 19:05:38 2011
@@ -53,7 +53,7 @@ import org.apache.tools.bzip2r.CBZip2Inp
 
 /**
  * A <code>XMLLoaderBufferedPositionedInputStream</code> is the package class and is the 
- * decorater overthe BufferedPositionedInputStream which in turn decorate
+ * decorator over the BufferedPositionedInputStream which in turn decorate
  * BufferedInputStream. It contains <code>BufferedPositionedInputStream<code>
  * input stream, which it uses as
  * its  basic source of data, possibly reading or providing  additional
@@ -69,7 +69,7 @@ import org.apache.tools.bzip2r.CBZip2Inp
  * array between the tag which is a xml record. i.e <tag> .*</tag> will be returned
  *
  * @note we can't use the standard SAX or STAX parser as for a big xml 
- *       the intermetant hadoop block may not be the valid xml and hence those
+ *       the intermittent hadoop block may not be the valid xml and hence those
  *       parser may create pb. 
  *
  * @since   pig 2.0
@@ -92,6 +92,21 @@ class XMLLoaderBufferedPositionedInputSt
     boolean _isReadable;
 
     /**
+    * The field set the maximum bytes that is readable by this instance of stream.
+    */
+    private long maxBytesReadable = 0;
+    	
+    /**
+    * The field denote the number of bytes read by this stream. 
+    */
+    long bytesRead = 0;
+    	
+    /**
+    * Denotes the end of the current split location
+    */
+    long end = 0;
+    
+    /**
      * Creates a <code>XMLLoaderBufferedPositionedInputStream</code>
      * by assigning the  argument <code>in</code>
      * to the field <code>this.wrapperIn</code> so as
@@ -104,16 +119,17 @@ class XMLLoaderBufferedPositionedInputSt
         this.wrapperIn = in;
         setReadable(true);
     }
-
+    
     /**
-     * Since the input stream is control by Pig or hadoop 
-     * stream and there seems to be issue with multiple closing 
-     * with hadoop and pig
-     *
-     * @exception  IOException  if an I/O error occurs.
-     */
-    public void close() throws IOException {
-      //  throw new IOException("Closing stream BAD");
+     * Creates a  split aware <code>XMLLoaderBufferedPositionedInputStream</code>.
+     * @param in    the underlying input stream
+     * @param start    start location of the split
+     * @param end    end location of the split
+     */
+    public XMLLoaderBufferedPositionedInputStream(InputStream in,long start,long end){
+       this(in);
+       this.end = end;
+       maxBytesReadable = end - start;
     }
 
     /**
@@ -137,7 +153,7 @@ class XMLLoaderBufferedPositionedInputSt
     }
 
     /**
-     * @Override org.apache.pig.impl.io.BufferedPositionedInputStream.read
+     * org.apache.pig.impl.io.BufferedPositionedInputStream.read
      * It is just the wrapper for now.
      * Reads the next byte of data from this input stream. The value
      * byte is returned as an <code>int</code> in the range
@@ -185,14 +201,21 @@ class XMLLoaderBufferedPositionedInputSt
         tag[2+i] = tmp[i];
       }
       tag[tmp.length+2] = (byte)'>';
-      // System.out.println("[collectUntilEndTag] TAG " + tag + tagName); // DEBUG
 
       ByteArrayOutputStream collectBuf = new ByteArrayOutputStream(1024);
       int idxTagChar = 0;
+      
+      /*
+       * Read till an end tag is found.It need not check for any condition since it 
+       * tries to read it till end.One issue that may happen is that if the xml 
+       * content is very huge; or if the end tag is not there in a huge file, 
+       * then it may blow up the memory. 
+       */
       while (true) {
         int b = -1;
         try {
           b = this.read();
+          ++bytesRead; // Add one to the bytes read
           if (b == -1) {
             collectBuf.reset();
             this.setReadable(false);
@@ -215,8 +238,6 @@ class XMLLoaderBufferedPositionedInputSt
           return null;
         }
       }
-      // DEBUG
-      //System.out.println("Match = " + new String(collectBuf.toByteArray()));
       return collectBuf.toByteArray();
     }
 
@@ -235,7 +256,7 @@ class XMLLoaderBufferedPositionedInputSt
      * @see loader.XMLLoaderBufferedPositionedInputStream.collectUntilEndTag
      *
      */
-    private byte[] skipToTag(String tagName, long limit) {
+    private byte[] skipToTag(String tagName, long limit) throws IOException {
       
       //@todo use the charset and get the charset encoding from the xml encoding.
       byte[] tmp = tagName.getBytes();
@@ -244,15 +265,22 @@ class XMLLoaderBufferedPositionedInputSt
       for (int i = 0; i < tmp.length; ++i) {
         tag[1+i] = tmp[i];
       }
-      //System.out.println("[skipToTag] TAG " + tag + tagName); // DEBUG
 
       ByteArrayOutputStream matchBuf = new ByteArrayOutputStream(512);
       int idxTagChar = 0;
       int state = S_START;
-      while (true) {
+      
+      /*
+       * Read till the tag is found in this block. If a partial tag block is found
+       * then continue on to the next block.matchBuf contains the data that is currently 
+       * matched. If the read has reached the end of split and there are matched data 
+       * then continue on to the next block.
+       */
+      while (splitBoundaryCriteria(wrapperIn) ||  (matchBuf.size() > 0 )) {
         int b = -1;
         try {
           b = this.read();
+          ++bytesRead; // Increment the bytes read by 1
           if (b == -1) {
             state = S_START;
             matchBuf.reset();
@@ -298,30 +326,31 @@ class XMLLoaderBufferedPositionedInputSt
             // need to break, no record in this block
             break;
           }
-          // DEBUG
-          /*
-          if (idxTagChar > 0) {
-            System.out.println("Match b='" + (char)b + "'"
-                + ", tag='" + (char)tag[idxTagChar-1] + "'"
-                + ", idxTagChar=" + (idxTagChar-1)
-                + ", tagLength=" + tag.length);
-          } else {
-            System.out.println("Mismatch b='" + (char)b + "'"
-                + ", tag='" + (char)tag[idxTagChar] + "'"
-                + ", idxTagChar=" + (idxTagChar)
-                + ", tagLength=" + tag.length);
-          }
-          */
         }
         catch (IOException e) {
           this.setReadable(false);
           return null;
         }
       }
-      // DEBUG
-      //System.out.println("Match = " + new String(matchBuf.toByteArray()));
       return matchBuf.toByteArray();
     }
+    /**
+     * Returns whether the split boundary condition has reached or not.
+     * For normal files ; the condition is to read till the split end reaches.
+     * Gz files will have  maxBytesReadable set to near Long.MAXVALUE, hence
+     * this will cause the entire file to be read. For bz2 and bz files, the 
+     * condition lies on the position which until which it is read. 
+     *  
+     * @param wrapperIn2
+     * @return true/false depending on whether split boundary has reached or no
+     * @throws IOException
+     */
+    private boolean splitBoundaryCriteria(InputStream wrapperIn2) throws IOException {
+       if(wrapperIn2 instanceof CBZip2InputStream)
+          return ((CBZip2InputStream)wrapperIn2).getPos() <= end;
+       else
+          return bytesRead <= maxBytesReadable;
+    }
 
     /**
      * This is collect bytes from start and end tag both inclusive
@@ -341,15 +370,18 @@ class XMLLoaderBufferedPositionedInputSt
     byte[] collectTag(String tagName, long limit) throws IOException {
        ByteArrayOutputStream collectBuf = new ByteArrayOutputStream(1024);
        byte[] beginTag = skipToTag(tagName, limit);
-       byte[] untilTag = collectUntilEndTag(tagName, limit);
 
-       if (beginTag.length > 0 && untilTag.length > 0) {
-           for (byte b: beginTag) {
-              collectBuf.write(b);
+       // No need to search for the end tag if the start tag is not found
+       if(beginTag.length > 0 ){ 
+          byte[] untilTag = collectUntilEndTag(tagName, limit);
+          if (untilTag.length > 0) {
+             for (byte b: beginTag) {
+             collectBuf.write(b);
            }
            for (byte b: untilTag) {
               collectBuf.write(b);
            }
+          }
        }
        return collectBuf.toByteArray();
     }
@@ -429,6 +461,7 @@ public class XMLLoader extends LoadFunc 
      */
     public String recordIdentifier = "document";
 
+    private String loadLocation;
     
     public XMLLoader() {
 
@@ -467,13 +500,11 @@ public class XMLLoader extends LoadFunc 
         if (!next) return null;
         
         Tuple t = null;
-        
+     
         try {
             byte[] tagContent = (byte[]) reader.getCurrentValue();
-            if(tagContent.length > 0)
-            {
-               t = createTuple(tagContent);
-            }
+            // No need to create the tuple if there are no contents
+            t = (tagContent.length > 0) ? createTuple(tagContent) : null;
         } catch (Exception e) {
             throw new IOException(e);
         }
@@ -514,7 +545,11 @@ public class XMLLoader extends LoadFunc 
     @SuppressWarnings("unchecked")
     @Override
     public InputFormat getInputFormat() throws IOException {
-        return new XMLFileInputFormat(recordIdentifier);
+       XMLFileInputFormat inputFormat = new XMLFileInputFormat(recordIdentifier);
+       if(loadLocation.endsWith(".bz2") || loadLocation.endsWith(".bz")) {
+          inputFormat.isSplitable = true;
+         }
+       return inputFormat;
     }
 
     @SuppressWarnings("unchecked")
@@ -526,6 +561,7 @@ public class XMLLoader extends LoadFunc 
 
     @Override
     public void setLocation(String location, Job job) throws IOException {
+        loadLocation = location; 
         FileInputFormat.setInputPaths(job, location);
     }
     
@@ -534,6 +570,11 @@ public class XMLLoader extends LoadFunc 
     
     public static class XMLFileInputFormat extends FileInputFormat {
 
+       /**
+        * Boolean flag used to identify whether splittable property is explicitly set.
+        */
+        private boolean isSplitable = false;
+        
         private String recordIdentifier;
         
         public XMLFileInputFormat(String recordIdentifier) {
@@ -551,10 +592,9 @@ public class XMLLoader extends LoadFunc 
         
         @Override
         protected boolean isSplitable(JobContext context, Path filename) {
-        	// Always returns false since this version of XMLLoader will read an entire file.
-        	// ie.w/o this , any file > block size, MR will compute multiple splits but all 
-        	// mappers will read the full file which is functionally wrong.
-        	return false;
+           CompressionCodec codec = 
+              new CompressionCodecFactory(context.getConfiguration()).getCodec(filename);
+           return (!(codec == null)) ? isSplitable : true;
         }
     }
     
@@ -590,26 +630,30 @@ public class XMLLoader extends LoadFunc 
             // open the file and seek to the start of the split
             FileSystem fs = file.getFileSystem(job);
             FSDataInputStream fileIn = fs.open(split.getPath());
-        
             
-            if(file.toString().endsWith(".bz2") )
+            // Seek to the start of the file
+            fileIn.seek(start);
+        
+            if(file.toString().endsWith(".bz2") || file.toString().endsWith(".bz"))
             {
             	// For bzip2 files use CBZip2InputStream to read and supply the upper input stream.
-            	this.xmlLoaderBPIS = new XMLLoaderBufferedPositionedInputStream(new CBZip2InputStream(fileIn, 9, end));
+               CBZip2InputStream in = new CBZip2InputStream(fileIn,9, end);
+               this.xmlLoaderBPIS = new XMLLoaderBufferedPositionedInputStream(in,start,end);
             }
             else if (file.toString().endsWith(".gz"))
             {
             	CompressionCodecFactory compressionCodecs =  new CompressionCodecFactory(job);
             	final CompressionCodec codec = compressionCodecs.getCodec(file);
             	 if (codec != null) {
+            	    end = Long.MAX_VALUE;
             	      CompressionInputStream stream = codec.createInputStream(fileIn);
-            	      this.xmlLoaderBPIS = new XMLLoaderBufferedPositionedInputStream(stream);
+            	      this.xmlLoaderBPIS = new XMLLoaderBufferedPositionedInputStream(stream,start,end);
             	    }
             }
             
             else
             {
-            	this.xmlLoaderBPIS = new XMLLoaderBufferedPositionedInputStream(fileIn);
+               this.xmlLoaderBPIS = new XMLLoaderBufferedPositionedInputStream(fileIn,start,end);
             }
         }
 

Modified: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestXMLLoader.java
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestXMLLoader.java?rev=1074670&r1=1074669&r2=1074670&view=diff
==============================================================================
--- pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestXMLLoader.java (original)
+++ pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestXMLLoader.java Fri Feb 25 19:05:38 2011
@@ -171,4 +171,113 @@ public class TestXMLLoader extends TestC
         new File(filename).delete();
     }
  }
+   
+   public void testXMLLoaderShouldNotConfusedWithTagsHavingSimilarPrefix () throws Exception
+   {
+      ArrayList<String[]> testData = new ArrayList<String[]>();
+      testData.add(new String[] { "<namethisalso> foobar9 </namethisalso>"});
+      testData.addAll(data);
+      String filename = TestHelper.createTempFile(testData, "");
+      PigServer pig = new PigServer(LOCAL);
+      filename = filename.replace("\\", "\\\\");
+      patternString = patternString.replace("\\", "\\\\");
+      String query = "A = LOAD 'file:" + filename + "' USING org.apache.pig.piggybank.storage.XMLLoader('name') as (doc:chararray);";
+      pig.registerQuery(query);
+      Iterator<?> it = pig.openIterator("A");
+      int tupleCount = 0;
+      while (it.hasNext()) {
+        Tuple tuple = (Tuple) it.next();
+        if (tuple == null) 
+          break;
+        else {
+          if (tuple.size() > 0) {
+              tupleCount++;
+          }
+        }
+      }
+      assertEquals(3, tupleCount);
+      
+   }
+   
+   public void testShouldReturn1ForIntermediateTagData () throws Exception
+   {
+      String filename = TestHelper.createTempFile(data, "");
+      PigServer pig = new PigServer(LOCAL);
+      filename = filename.replace("\\", "\\\\");
+      patternString = patternString.replace("\\", "\\\\");
+      String query = "A = LOAD 'file:" + filename + "' USING org.apache.pig.piggybank.storage.XMLLoader('ignoreProperty') as (doc:chararray);";
+      pig.registerQuery(query);
+      Iterator<?> it = pig.openIterator("A");
+      int tupleCount = 0;
+      while (it.hasNext()) {
+        Tuple tuple = (Tuple) it.next();
+        if (tuple == null) 
+          break;
+        else {
+          if (tuple.size() > 0) {
+              tupleCount++;
+          }
+        }
+      }
+      assertEquals(1, tupleCount);  
+   }
+   
+   public void testShouldReturn0TupleCountIfNoEndTagIsFound() throws Exception
+   {
+      // modify the data content to avoid end tag for </ignoreProperty>
+      ArrayList<String[]> testData = new ArrayList<String[]>();
+      for (String content[] : data) {
+         
+         if(false == data.equals(new String[] { "</ignoreProperty>"}))
+         {
+            testData.add(content);
+         }
+      }
+      
+      String filename = TestHelper.createTempFile(testData, "");
+      PigServer pig = new PigServer(LOCAL);
+      filename = filename.replace("\\", "\\\\");
+      patternString = patternString.replace("\\", "\\\\");
+      String query = "A = LOAD 'file:" + filename + "' USING org.apache.pig.piggybank.storage.XMLLoader('</ignoreProperty>') as (doc:chararray);";
+      pig.registerQuery(query);
+      Iterator<?> it = pig.openIterator("A");
+      int tupleCount = 0;
+      while (it.hasNext()) {
+        Tuple tuple = (Tuple) it.next();
+        if (tuple == null)
+          break;
+        else {
+          if (tuple.size() > 0) {
+              tupleCount++;
+          }
+        }
+      }
+      assertEquals(0, tupleCount);  
+   }
+   public void testShouldReturn0TupleCountIfEmptyFileIsPassed() throws Exception
+   {
+      // modify the data content to avoid end tag for </ignoreProperty>
+      ArrayList<String[]> testData = new ArrayList<String[]>();
+      
+      String filename = TestHelper.createTempFile(testData, "");
+      PigServer pig = new PigServer(LOCAL);
+      filename = filename.replace("\\", "\\\\");
+      patternString = patternString.replace("\\", "\\\\");
+      String query = "A = LOAD 'file:" + filename + "' USING org.apache.pig.piggybank.storage.XMLLoader('</ignoreProperty>') as (doc:chararray);";
+      pig.registerQuery(query);
+      Iterator<?> it = pig.openIterator("A");
+      int tupleCount = 0;
+      while (it.hasNext()) {
+        Tuple tuple = (Tuple) it.next();
+        if (tuple == null)
+          break;
+        else {
+          if (tuple.size() > 0) {
+              tupleCount++;
+          }
+        }
+      }
+      assertEquals(0, tupleCount);  
+   }
+   
 }