You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by pr...@apache.org on 2010/03/16 23:44:17 UTC

svn commit: r924034 - in /hadoop/pig/trunk: ./ lib-src/bzip2/org/apache/pig/ lib-src/bzip2/org/apache/pig/bzip2r/ lib-src/bzip2/org/apache/tools/bzip2r/ src/org/apache/pig/builtin/ test/org/apache/pig/test/ test/org/apache/pig/test/data/

Author: pradeepkth
Date: Tue Mar 16 22:44:16 2010
New Revision: 924034

URL: http://svn.apache.org/viewvc?rev=924034&view=rev
Log:
PIG-1257: PigStorage per the new load-store redesign should support splitting of bzip files (pradeepkth)

Added:
    hadoop/pig/trunk/lib-src/bzip2/org/apache/pig/
    hadoop/pig/trunk/lib-src/bzip2/org/apache/pig/bzip2r/
    hadoop/pig/trunk/lib-src/bzip2/org/apache/pig/bzip2r/Bzip2TextInputFormat.java
    hadoop/pig/trunk/test/org/apache/pig/test/data/blockEndingInCR.txt.bz2   (with props)
    hadoop/pig/trunk/test/org/apache/pig/test/data/blockHeaderEndsAt136500.txt.bz2   (with props)
    hadoop/pig/trunk/test/org/apache/pig/test/data/recordLossblockHeaderEndsAt136500.txt.bz2   (with props)
Modified:
    hadoop/pig/trunk/CHANGES.txt
    hadoop/pig/trunk/lib-src/bzip2/org/apache/tools/bzip2r/CBZip2InputStream.java
    hadoop/pig/trunk/src/org/apache/pig/builtin/PigStorage.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestBZip.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestMapReduce.java
    hadoop/pig/trunk/test/org/apache/pig/test/Util.java

Modified: hadoop/pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=924034&r1=924033&r2=924034&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Tue Mar 16 22:44:16 2010
@@ -68,6 +68,9 @@ manner (rding via pradeepkth)
 
 IMPROVEMENTS
 
+PIG-1257: PigStorage per the new load-store redesign should support splitting
+of bzip files (pradeepkth)
+
 PIG-1290: WeightedRangePartitioner should not check if input is empty if
 quantile file is empty (pradeepkth)
 

Added: hadoop/pig/trunk/lib-src/bzip2/org/apache/pig/bzip2r/Bzip2TextInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/lib-src/bzip2/org/apache/pig/bzip2r/Bzip2TextInputFormat.java?rev=924034&view=auto
==============================================================================
--- hadoop/pig/trunk/lib-src/bzip2/org/apache/pig/bzip2r/Bzip2TextInputFormat.java (added)
+++ hadoop/pig/trunk/lib-src/bzip2/org/apache/pig/bzip2r/Bzip2TextInputFormat.java Tue Mar 16 22:44:16 2010
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.bzip2r;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.tools.bzip2r.CBZip2InputStream;
+
+@SuppressWarnings("unchecked")
+public class Bzip2TextInputFormat extends FileInputFormat {
+
+    /**
+     * Treats keys as offset in file and value as line. Since the input file is
+     * compressed, the offset for a particular line is not well-defined. This
+     * implementation returns the starting position of a compressed block as the
+     * key for every line in that block.
+     */
+
+    private static class BZip2LineRecordReader extends RecordReader<LongWritable,Text> {
+
+        private long start;
+
+        private long end;
+
+        private long pos;
+
+        private CBZip2InputStream in;
+
+        private ByteArrayOutputStream buffer = new ByteArrayOutputStream(256);
+        
+        // flag to indicate if previous character read was Carriage Return ('\r')
+        // and the next character was not Line Feed ('\n')
+        private boolean CRFollowedByNonLF = false;
+        
+        // in the case where a Carriage Return ('\r') was not followed by a 
+        // Line Feed ('\n'), this variable will hold that non Line Feed character
+        // that was read from the underlying stream.
+        private byte nonLFChar;
+        
+        
+        /**
+         * Provide a bridge to get the bytes from the ByteArrayOutputStream without
+         * creating a new byte array.
+         */
+        private static class TextStuffer extends OutputStream {
+            public Text target;
+
+            @Override
+            public void write(int b) {
+                throw new UnsupportedOperationException("write(byte) not supported");
+            }
+
+            @Override
+            public void write(byte[] data, int offset, int len) throws IOException {
+                target.clear();
+                target.set(data, offset, len);
+            }
+        }
+
+        private TextStuffer bridge = new TextStuffer();
+
+        private LongWritable key = new LongWritable();
+        private Text value = new Text();
+
+        public BZip2LineRecordReader(Configuration job, FileSplit split)
+        throws IOException {
+            start = split.getStart();
+            end = start + split.getLength();
+            final Path file = split.getPath();
+
+            // open the file and seek to the start of the split
+            FileSystem fs = file.getFileSystem(job);
+            FSDataInputStream fileIn = fs.open(split.getPath());
+            fileIn.seek(start);
+
+            in = new CBZip2InputStream(fileIn, 9, end);
+            if (start != 0) {
+                // skip first line and re-establish "start".
+                // LineRecordReader.readLine(this.in, null);
+                readLine(this.in, null);
+                start = in.getPos();
+            }
+            pos = in.getPos();
+        }
+
+        public LongWritable createKey() {
+            return new LongWritable();
+        }
+
+        public Text createValue() {
+            return new Text();
+        }
+
+        /*
+         * LineRecordReader.readLine() is depricated in HAdoop 0.17. So it is added here
+         * locally.
+         */
+        private long readLine(InputStream in, 
+                OutputStream out) throws IOException {
+            long bytes = 0;
+            while (true) {
+                int b = -1;
+                if(CRFollowedByNonLF) {
+                    // In the previous call, a Carriage Return ('\r') was followed
+                    // by a non Line Feed ('\n') character - in that call we would
+                    // have not returned the non Line Feed character but would have
+                    // read it from the stream - lets use that already read character
+                    // now
+                    b = nonLFChar;
+                    CRFollowedByNonLF = false;
+                } else {
+                    b = in.read();
+                }
+                if (b == -1) {
+                    break;
+                }
+                bytes += 1;
+
+                byte c = (byte)b;
+                if (c == '\n') {
+                    break;
+                }
+
+                if (c == '\r') {
+                    byte nextC = (byte)in.read();
+                    if (nextC != '\n') {
+                        CRFollowedByNonLF = true;
+                        nonLFChar = nextC;
+                    } else {
+                        bytes += 1;
+                    }
+                    break;
+                }
+
+                if (out != null) {
+                    out.write(c);
+                }
+            }
+            return bytes;
+        }
+
+        /** Read a line. */
+        public  boolean next(LongWritable key, Text value)
+        throws IOException {
+            if (pos > end)
+                return false;
+
+            key.set(pos); // key is position
+            buffer.reset();
+            // long bytesRead = LineRecordReader.readLine(in, buffer); 
+            long bytesRead = readLine(in, buffer);
+            if (bytesRead == 0) {
+                return false;
+            }
+            pos = in.getPos();
+            // if we have read ahead because we encountered a carriage return
+            // char followed by a non line feed char, decrement the pos
+            if(CRFollowedByNonLF) {
+                pos--;
+            }
+
+            bridge.target = value;
+            buffer.writeTo(bridge);
+            return true;
+        }
+
+        /**
+         * Get the progress within the split
+         */
+        @Override
+        public float getProgress() {
+            if (start == end) {
+                return 0.0f;
+            } else {
+                return Math.min(1.0f, (pos - start) / (float) (end - start));
+            }
+        }
+
+        public  long getPos() throws IOException {
+            return pos;
+        }
+
+        @Override
+        public  void close() throws IOException {
+            in.close();
+        }
+
+        @Override
+        public LongWritable getCurrentKey() throws IOException,
+        InterruptedException {
+            return key;
+        }
+
+        @Override
+        public Text getCurrentValue() throws IOException, InterruptedException {
+            return value;
+        }
+
+        @Override
+        public void initialize(InputSplit split, TaskAttemptContext context)
+        throws IOException, InterruptedException {
+            // no op        
+        }
+
+        @Override
+        public boolean nextKeyValue() throws IOException, InterruptedException {
+            return next(key, value);
+        }
+
+    }
+
+    @Override
+    public RecordReader createRecordReader(InputSplit split,
+            TaskAttemptContext context) throws IOException, InterruptedException {
+        return new BZip2LineRecordReader(context.getConfiguration(), 
+                (FileSplit) split);
+    }
+
+}

Modified: hadoop/pig/trunk/lib-src/bzip2/org/apache/tools/bzip2r/CBZip2InputStream.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/lib-src/bzip2/org/apache/tools/bzip2r/CBZip2InputStream.java?rev=924034&r1=924033&r2=924034&view=diff
==============================================================================
--- hadoop/pig/trunk/lib-src/bzip2/org/apache/tools/bzip2r/CBZip2InputStream.java (original)
+++ hadoop/pig/trunk/lib-src/bzip2/org/apache/tools/bzip2r/CBZip2InputStream.java Tue Mar 16 22:44:16 2010
@@ -76,10 +76,10 @@
  */
 package org.apache.tools.bzip2r;
 
-import java.io.IOException;
 import java.io.InputStream;
-
-import org.apache.pig.backend.datastorage.SeekableInputStream;
+import java.io.IOException;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.mapreduce.InputSplit;
 
 /**
  * An input stream that decompresses from the BZip2 format (without the file
@@ -127,7 +127,13 @@ public class CBZip2InputStream extends I
 
     private boolean blockRandomised;
 
+    // a buffer to keep the read byte
     private int bsBuff;
+    
+    // since bzip is bit-aligned at block boundaries there can be a case wherein
+    // only few bits out of a read byte are consumed and the remaining bits
+    // need to be consumed while processing the next block.
+    // indicate how many bits in bsBuff have not been processed yet
     private int bsLive;
     private CRC mCrc = new CRC();
 
@@ -154,7 +160,7 @@ public class CBZip2InputStream extends I
     private int[][] perm = new int[N_GROUPS][MAX_ALPHA_SIZE];
     private int[] minLens = new int[N_GROUPS];
 
-    private SeekableInputStream innerBsStream;
+    private FSDataInputStream innerBsStream;
     long readLimit = Long.MAX_VALUE;
     public long getReadLimit() {
         return readLimit;
@@ -192,13 +198,20 @@ public class CBZip2InputStream extends I
     int j2;
     char z;
     
-    // The positioning is a bit tricky. we set newPos when we start reading a new block
-    // and we set retPos to newPos once we have read a character from that block.
-    // see getPos() for more detail
-    private long retPos, newPos = -1;
-
-    public CBZip2InputStream(SeekableInputStream zStream, int blockSize) throws IOException {
-        retPos = newPos = zStream.tell();
+    // see comment in getPos()
+    private long retPos = -1;
+    // the position offset which corresponds to the end of the InputSplit that
+    // will be processed by this instance 
+    private long endOffsetOfSplit;
+
+    private boolean signalToStopReading;
+
+    public CBZip2InputStream(FSDataInputStream zStream, int blockSize, long end)
+    throws IOException {
+        endOffsetOfSplit = end;
+        // initialize retPos to the beginning of the current InputSplit
+        // see comments in getPos() to understand how this is used.
+        retPos = zStream.getPos();
     	ll8 = null;
         tt = null;
         checkComputedCombinedCRC = blockSize == -1;
@@ -208,19 +221,26 @@ public class CBZip2InputStream extends I
         setupBlock();
     }
     
-    public CBZip2InputStream(SeekableInputStream zStream) throws IOException {
-    	this(zStream, -1);
+    public CBZip2InputStream(FSDataInputStream zStream) throws IOException {
+    	this(zStream, -1, Long.MAX_VALUE);
     }
 
+    @Override
     public int read() throws IOException {
         if (streamEnd) {
             return -1;
         } else {
-            if (retPos < newPos) {
-                retPos = newPos;
-            } else {
-                retPos = newPos+1;
+            
+            // if we just started reading a bzip block which starts at a position
+            // >= end of current split, then we should set up retpos such that
+            // after a record is read, future getPos() calls will get a value
+            // > end of current split - this way we will read only one record out
+            // of this bzip block - the rest of the records from this bzip block
+            // should be read by the next map task while processing the next split
+            if(signalToStopReading) {
+                retPos = endOffsetOfSplit + 1;
             }
+            
             int retChar = currentChar;
             switch(currentState) {
             case START_BLOCK_STATE:
@@ -249,12 +269,17 @@ public class CBZip2InputStream extends I
     }
 
     /**
-     * This is supposed to approximate the position in the underlying stream. However,
-     * with compression, the underlying stream position is very vague. One position may
-     * have multiple positions and visa versa. So we do something very subtle:
-     * The position of the first byte of a compressed block will have the position of
-     * the block header at the start of the block. Every byte after the first byte will
-     * be one plus the position of the block header.
+     * getPos is used by the caller to know when the processing of the current 
+     * {@link InputSplit} is complete. In this method, as we read each bzip
+     * block, we keep returning the beginning of the {@link InputSplit} as the
+     * return value until we hit a block  which starts at a position >= end of
+     * current split. At that point we should set up retpos such that after a 
+     * record is read, future getPos() calls will get a value > end of current 
+     * split - this way we will read only one record out of that bzip block - 
+     * the rest of the records from that bzip block should be read by the next 
+     * map task while processing the next split
+     * @return
+     * @throws IOException
      */
     public long getPos() throws IOException{
         return retPos;
@@ -291,8 +316,9 @@ public class CBZip2InputStream extends I
             streamEnd = true;
             return;
         }
-
-        newPos = innerBsStream.tell();
+        
+        // position before beginning of bzip block header        
+        long pos = innerBsStream.getPos();
         if (!searchForMagic) {
             char magic1, magic2, magic3, magic4;
             char magic5, magic6;
@@ -324,13 +350,31 @@ public class CBZip2InputStream extends I
                 magic <<= 1;
                 magic &= mask;
                 magic |= bsR(1);
+                // if we just found the block header, the beginning of the bzip 
+                // header would be 6 bytes before the current stream position
+                // when we eventually break from this while(), if it is because
+                // we found a block header then pos will have the correct start
+                // of header position
+                pos = innerBsStream.getPos() - 6;
             }
             if (magic == eos) {
                 complete();
                 return;
             }
+            
+        }
+        // if the previous block finished a few bits into the previous byte,
+        // then we will first be reading the remaining bits from the previous
+        // byte - so logically pos needs to be one behind
+        if(bsLive > 0)  {
+            pos--;
         }
         
+        if(pos >= endOffsetOfSplit) {
+            // we have reached a block which begins exactly at the next InputSplit
+            // or >1 byte into the next InputSplit - lets record this fact
+            signalToStopReading = true;
+        }
         storedBlockCRC = bsGetInt32();
 
         if (bsR(1) == 1) {
@@ -389,7 +433,7 @@ public class CBZip2InputStream extends I
         }
     }
 
-    private void bsSetStream(SeekableInputStream f) {
+    private void bsSetStream(FSDataInputStream f) {
         innerBsStream = f;
         bsLive = 0;
         bsBuff = 0;

Modified: hadoop/pig/trunk/src/org/apache/pig/builtin/PigStorage.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/builtin/PigStorage.java?rev=924034&r1=924033&r2=924034&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/builtin/PigStorage.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/builtin/PigStorage.java Tue Mar 16 22:44:16 2010
@@ -32,6 +32,7 @@ import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.io.compress.BZip2Codec;
 import org.apache.hadoop.io.compress.GzipCodec;
+import org.apache.pig.bzip2r.Bzip2TextInputFormat;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.OutputFormat;
@@ -74,6 +75,7 @@ LoadPushDown {
     private ArrayList<Object> mProtoTuple = null;
     private TupleFactory mTupleFactory = TupleFactory.getInstance();
     private static final int BUFFER_SIZE = 1024;
+    private String loadLocation;
     
     public PigStorage() {
     }
@@ -226,7 +228,11 @@ LoadPushDown {
 
     @Override
     public InputFormat getInputFormat() {
-        return new TextInputFormat();
+        if(loadLocation.endsWith("bz2") || loadLocation.endsWith("bz")) {
+            return new Bzip2TextInputFormat();
+        } else {
+            return new TextInputFormat();
+        }
     }
 
     @Override
@@ -237,6 +243,7 @@ LoadPushDown {
     @Override
     public void setLocation(String location, Job job)
             throws IOException {
+        loadLocation = location;
         FileInputFormat.setInputPaths(job, location);
     }
 
@@ -254,7 +261,7 @@ LoadPushDown {
     public void setStoreLocation(String location, Job job) throws IOException {
         job.getConfiguration().set("mapred.textoutputformat.separator", "");
         FileOutputFormat.setOutputPath(job, new Path(location));
-        if (location.endsWith(".bz2")) {
+        if (location.endsWith(".bz2") || location.endsWith("bz")) {
             FileOutputFormat.setCompressOutput(job, true);
             FileOutputFormat.setOutputCompressorClass(job,  BZip2Codec.class);
         }  else if (location.endsWith(".gz")) {

Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestBZip.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestBZip.java?rev=924034&r1=924033&r2=924034&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestBZip.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestBZip.java Tue Mar 16 22:44:16 2010
@@ -25,13 +25,22 @@ import java.io.FileOutputStream;
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.Iterator;
+import java.util.Properties;
+import java.util.Map.Entry;
 
+import junit.framework.Assert;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.pig.ExecType;
 import org.apache.pig.PigServer;
-import org.apache.pig.test.utils.LocalSeekableInputStream;
-import org.apache.pig.backend.datastorage.ElementDescriptor;
+import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.PigContext;
 import org.apache.tools.bzip2r.CBZip2InputStream;
 import org.apache.tools.bzip2r.CBZip2OutputStream;
 import org.junit.Test;
@@ -68,15 +77,10 @@ public class TestBZip {
                 + "';");
         pig.registerQuery("A = foreach (group (filter AA by $0 > 0) all) generate flatten($1);");
         pig.registerQuery("store A into '" + out.getAbsolutePath() + "';");
-        
-        File dir = new File("testbzip");     
-        deleteFiles(dir);
-        
-        processCopyToLocal(pig, out.getAbsolutePath(), dir.getAbsolutePath());
-        
-        LocalSeekableInputStream is = new LocalSeekableInputStream(
-                new File(dir.getAbsolutePath() + "/part-r-00000.bz2")); 
-        
+        FileSystem fs = FileSystem.get(ConfigurationUtil.toConfiguration(
+                pig.getPigContext().getProperties()));
+        FSDataInputStream is = fs.open(new Path(out.getAbsolutePath() + 
+                "/part-r-00000.bz2"));
         CBZip2InputStream cis = new CBZip2InputStream(is);
         
         // Just a sanity check, to make sure it was a bzip file; we
@@ -101,8 +105,67 @@ public class TestBZip {
         
         in.delete();
         out.delete();
+    }
+    
+    /** 
+     * Tests that '\n', '\r' and '\r\n' are treated as record delims when using
+     * bzip just like they are when using uncompressed text
+     */
+    @Test
+    public void testRecordDelims() throws Exception {
+        String[] inputData = new String[] {
+                "1\t2\r3\t4", // '\r' case - this will be split into two tuples
+                "5\t6\r", // '\r\n' case
+                "7\t8", // '\n' case
+                "9\t10\r" // '\r\n' at the end of file
+        };
+        
+        // bzip compressed input
+        File in = File.createTempFile("junit", ".bz2");
+        String compressedInputFileName = in.getAbsolutePath();
+        in.deleteOnExit();
+        String unCompressedInputFileName = "testRecordDelims-uncomp.txt";
+        Util.createInputFile(cluster, unCompressedInputFileName, inputData);
+        
+        try {
+            CBZip2OutputStream cos = 
+                new CBZip2OutputStream(new FileOutputStream(in));
+            for (int i = 0; i < inputData.length; i++) {
+                StringBuffer sb = new StringBuffer();
+                sb.append(inputData[i]).append("\n");
+                byte bytes[] = sb.toString().getBytes();
+                cos.write(bytes);
+            }
+            cos.close();
+            
+            Util.copyFromLocalToCluster(cluster, compressedInputFileName,
+                    compressedInputFileName);
+            
+            // pig script to read uncompressed input
+            String script = "a = load '" + unCompressedInputFileName +"';";
+            PigServer pig = new PigServer(ExecType.MAPREDUCE, cluster
+                    .getProperties());
+            pig.registerQuery(script);
+            Iterator<Tuple> it1 = pig.openIterator("a");
+            
+            // pig script to read compressed input
+            script = "a = load '" + compressedInputFileName +"';";
+            pig.registerQuery(script);
+            Iterator<Tuple> it2 = pig.openIterator("a");
+            
+            while(it1.hasNext()) {
+                Tuple t1 = it1.next();
+                Tuple t2 = it2.next();
+                Assert.assertEquals(t1, t2);
+            }
+            
+            Assert.assertFalse(it2.hasNext());
+        
+        } finally {
+            in.delete();
+            Util.deleteFile(cluster, unCompressedInputFileName);
+        }
         
-        deleteFiles(dir);
     }
     
     /**
@@ -130,15 +193,10 @@ public class TestBZip {
                 + "';");
         pig.registerQuery("A=foreach (group (filter AA by $0 < '0') all) generate flatten($1);");
         pig.registerQuery("store A into '" + out.getAbsolutePath() + "';");
-            
-        File dir = new File("testbzip2");     
-        deleteFiles(dir);
-        
-        processCopyToLocal(pig, out.getAbsolutePath(), dir.getAbsolutePath());
-        
-        LocalSeekableInputStream is = new LocalSeekableInputStream(
-                new File(dir.getAbsolutePath() + "/part-r-00000.bz2")); 
-        
+        FileSystem fs = FileSystem.get(ConfigurationUtil.toConfiguration(
+                pig.getPigContext().getProperties()));
+        FSDataInputStream is = fs.open(new Path(out.getAbsolutePath() + 
+                "/part-r-00000.bz2"));
         CBZip2InputStream cis = new CBZip2InputStream(is);
         
         // Just a sanity check, to make sure it was a bzip file; we
@@ -152,7 +210,6 @@ public class TestBZip {
         in.delete();
         out.delete();
         
-        deleteFiles(dir);
     }
 
     /**
@@ -166,32 +223,86 @@ public class TestBZip {
                 tmp));
         cos.close();
         assertNotSame(0, tmp.length());
+        FileSystem fs = FileSystem.getLocal(new Configuration(false));
         CBZip2InputStream cis = new CBZip2InputStream(
-                new LocalSeekableInputStream(tmp));
+                fs.open(new Path(tmp.getAbsolutePath())));
         assertEquals(-1, cis.read(new byte[100]));
         cis.close();
         tmp.delete();
     }
     
-    private void processCopyToLocal(PigServer pig, String src, String dst) 
-            throws IOException {
-
-        ElementDescriptor srcPath = pig.getPigContext().getDfs().asElement(src);
-        ElementDescriptor dstPath = pig.getPigContext().getLfs().asElement(dst);
-            
-        srcPath.copy(dstPath, false);
+    /**
+     * Tests the case where a bzip block ends exactly at the end of the {@link InputSplit}
+     * with the block header ending a few bits into the last byte of current
+     * InputSplit. This case results in dropped records in Pig 0.6 release
+     */
+    @Test
+    public void testBlockHeaderEndingAtSplitNotByteAligned() throws IOException {
+        String inputFileName = 
+            "test/org/apache/pig/test/data/recordLossblockHeaderEndsAt136500.txt.bz2";
+        Long expectedCount = 74999L; // number of lines in above file
+        // the first block in the above file exactly ends a few bits into the 
+        // byte at position 136500 
+        int splitSize = 136500;
+        Util.copyFromLocalToCluster(cluster, inputFileName, inputFileName);
+        testCount(inputFileName, expectedCount, splitSize);
     }
     
-    private void deleteFiles(File file) {
-        if (!file.exists()) return;
-            
-        if (file.isDirectory()) {
-            File[] files = file.listFiles();
-            for (File f : files) {
-                deleteFiles(f);
+    /**
+     *  Tests the case where a bzip block ends exactly at the end of the input 
+     *  split (byte aligned with the last byte) and the last byte is a carriage
+     *  return.
+     */
+    @Test
+    public void testBlockHeaderEndingWithCR() throws IOException {
+        String inputFileName = 
+            "test/org/apache/pig/test/data/blockEndingInCR.txt.bz2";
+        // number of lines in above file (the value is 1 more than bzcat | wc -l
+        // since there is a '\r' which is also treated as a record delim
+        Long expectedCount = 82094L; 
+        // the first block in the above file exactly ends at the byte at 
+        // position 136498 and the last byte is a carriage return ('\r') 
+        int splitSize = 136498;
+        Util.copyFromLocalToCluster(cluster, inputFileName, inputFileName);
+        testCount(inputFileName, expectedCount, splitSize);
+    }
+    
+    /**
+     * Tests the case where a bzip block ends exactly at the end of the input
+     * split and has more data which results in overcounting (record duplication)
+     * in Pig 0.6
+     */
+    @Test
+    public void testBlockHeaderEndingAtSplitOverCounting() throws IOException {
+        String inputFileName = 
+            "test/org/apache/pig/test/data/blockHeaderEndsAt136500.txt.bz2";
+        Long expectedCount = 1041046L; // number of lines in above file
+        // the first block in the above file exactly ends a few bits into the 
+        // byte at position 136500 
+        int splitSize = 136500;
+        Util.copyFromLocalToCluster(cluster, inputFileName, inputFileName);
+        testCount(inputFileName, expectedCount, splitSize);
+    }
+    
+    private void testCount(String inputFileName, Long expectedCount, 
+            int splitSize) throws IOException {
+        try {
+            String script = "a = load '" + inputFileName + "';" +
+            		"b = group a all;" +
+            		"c = foreach b generate COUNT_STAR(a);";
+            Properties props = new Properties();
+            for (Entry<Object, Object> entry : cluster.getProperties().entrySet()) {
+                props.put(entry.getKey(), entry.getValue());
             }
+            props.setProperty("mapred.max.split.size", Integer.toString(splitSize));
+            PigContext pigContext = new PigContext(ExecType.MAPREDUCE, props);
+            PigServer pig = new PigServer(pigContext);
+            Util.registerMultiLineQuery(pig, script);
+            Iterator<Tuple> it = pig.openIterator("c");
+            Long result = (Long) it.next().get(0);
+            assertEquals(expectedCount, result);
+        } finally {
+            Util.deleteFile(cluster, inputFileName);
         }
-        System.out.println("delete file: " + file.getAbsolutePath() 
-                + " : " + file.delete());
     }
 }

Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestMapReduce.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestMapReduce.java?rev=924034&r1=924033&r2=924034&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestMapReduce.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestMapReduce.java Tue Mar 16 22:44:16 2010
@@ -26,6 +26,8 @@ import java.io.InputStreamReader;
 import java.io.OutputStream;
 import java.io.PrintStream;
 import java.util.Iterator;
+import java.util.Properties;
+import java.util.Map.Entry;
 
 import junit.framework.TestCase;
 
@@ -110,8 +112,13 @@ public class TestMapReduce extends TestC
     public void testBZip2Aligned() throws Throwable {
         int offsets[] = { 219642, 219643, 219644, 552019, 552020 };
         for(int i = 1; i < offsets.length; i ++) {
-            System.setProperty("pig.overrideBlockSize", Integer.toString(offsets[i]));
-            PigContext pigContext = new PigContext(ExecType.MAPREDUCE, cluster.getProperties());
+            
+            Properties props = new Properties();
+            for (Entry<Object, Object> entry : cluster.getProperties().entrySet()) {
+                props.put(entry.getKey(), entry.getValue());
+            }
+            props.setProperty("mapred.max.split.size", Integer.toString(offsets[i]));
+            PigContext pigContext = new PigContext(ExecType.MAPREDUCE, props);
             PigServer pig = new PigServer(pigContext);
             pig.registerQuery("a = load '"
                     + Util.generateURI(

Modified: hadoop/pig/trunk/test/org/apache/pig/test/Util.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/Util.java?rev=924034&r1=924033&r2=924034&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/Util.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/Util.java Tue Mar 16 22:44:16 2010
@@ -32,6 +32,7 @@ import java.io.OutputStream;
 import java.io.OutputStreamWriter;
 import java.io.PrintStream;
 import java.io.PrintWriter;
+import java.io.StringReader;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -67,6 +68,8 @@ import org.apache.pig.impl.logicalLayer.
 import org.apache.pig.impl.logicalLayer.parser.ParseException;
 import org.apache.pig.impl.logicalLayer.parser.QueryParser;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.tools.grunt.Grunt;
+import org.apache.pig.tools.grunt.GruntParser;
 
 public class Util {
     private static BagFactory mBagFactory = BagFactory.getInstance();
@@ -343,13 +346,17 @@ public class Util {
 	 * @throws IOException
 	 */
 	static public void copyFromLocalToCluster(MiniCluster cluster, String localFileName, String fileNameOnCluster) throws IOException {
-	    BufferedReader reader = new BufferedReader(new FileReader(localFileName));
-	    String line = null;
-	    List<String> contents = new ArrayList<String>();
-	    while((line = reader.readLine()) != null) {
-	        contents.add(line);
-	    }
-	    Util.createInputFile(cluster, fileNameOnCluster, contents.toArray(new String[0]));
+        PigServer ps = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+        String script = "fs -put " + localFileName + " " + fileNameOnCluster;
+
+	    GruntParser parser = new GruntParser(new StringReader(script));
+        parser.setInteractive(false);
+        parser.setParams(ps);
+        try {
+            parser.parseStopOnError();
+        } catch (org.apache.pig.tools.pigscript.parser.ParseException e) {
+            throw new IOException(e);
+        }
 	}
 	
 	static public void copyFromClusterToLocal(MiniCluster cluster, String fileNameOnCluster, String localFileName) throws IOException {

Added: hadoop/pig/trunk/test/org/apache/pig/test/data/blockEndingInCR.txt.bz2
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/data/blockEndingInCR.txt.bz2?rev=924034&view=auto
==============================================================================
Binary file - no diff available.

Propchange: hadoop/pig/trunk/test/org/apache/pig/test/data/blockEndingInCR.txt.bz2
------------------------------------------------------------------------------
    svn:mime-type = application/octet-stream

Added: hadoop/pig/trunk/test/org/apache/pig/test/data/blockHeaderEndsAt136500.txt.bz2
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/data/blockHeaderEndsAt136500.txt.bz2?rev=924034&view=auto
==============================================================================
Binary file - no diff available.

Propchange: hadoop/pig/trunk/test/org/apache/pig/test/data/blockHeaderEndsAt136500.txt.bz2
------------------------------------------------------------------------------
    svn:mime-type = application/octet-stream

Added: hadoop/pig/trunk/test/org/apache/pig/test/data/recordLossblockHeaderEndsAt136500.txt.bz2
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/data/recordLossblockHeaderEndsAt136500.txt.bz2?rev=924034&view=auto
==============================================================================
Binary file - no diff available.

Propchange: hadoop/pig/trunk/test/org/apache/pig/test/data/recordLossblockHeaderEndsAt136500.txt.bz2
------------------------------------------------------------------------------
    svn:mime-type = application/octet-stream