You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by pr...@apache.org on 2010/03/16 23:44:17 UTC
svn commit: r924034 - in /hadoop/pig/trunk: ./ lib-src/bzip2/org/apache/pig/
lib-src/bzip2/org/apache/pig/bzip2r/ lib-src/bzip2/org/apache/tools/bzip2r/
src/org/apache/pig/builtin/ test/org/apache/pig/test/
test/org/apache/pig/test/data/
Author: pradeepkth
Date: Tue Mar 16 22:44:16 2010
New Revision: 924034
URL: http://svn.apache.org/viewvc?rev=924034&view=rev
Log:
PIG-1257: PigStorage per the new load-store redesign should support splitting of bzip files (pradeepkth)
Added:
hadoop/pig/trunk/lib-src/bzip2/org/apache/pig/
hadoop/pig/trunk/lib-src/bzip2/org/apache/pig/bzip2r/
hadoop/pig/trunk/lib-src/bzip2/org/apache/pig/bzip2r/Bzip2TextInputFormat.java
hadoop/pig/trunk/test/org/apache/pig/test/data/blockEndingInCR.txt.bz2 (with props)
hadoop/pig/trunk/test/org/apache/pig/test/data/blockHeaderEndsAt136500.txt.bz2 (with props)
hadoop/pig/trunk/test/org/apache/pig/test/data/recordLossblockHeaderEndsAt136500.txt.bz2 (with props)
Modified:
hadoop/pig/trunk/CHANGES.txt
hadoop/pig/trunk/lib-src/bzip2/org/apache/tools/bzip2r/CBZip2InputStream.java
hadoop/pig/trunk/src/org/apache/pig/builtin/PigStorage.java
hadoop/pig/trunk/test/org/apache/pig/test/TestBZip.java
hadoop/pig/trunk/test/org/apache/pig/test/TestMapReduce.java
hadoop/pig/trunk/test/org/apache/pig/test/Util.java
Modified: hadoop/pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=924034&r1=924033&r2=924034&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Tue Mar 16 22:44:16 2010
@@ -68,6 +68,9 @@ manner (rding via pradeepkth)
IMPROVEMENTS
+PIG-1257: PigStorage per the new load-store redesign should support splitting
+of bzip files (pradeepkth)
+
PIG-1290: WeightedRangePartitioner should not check if input is empty if
quantile file is empty (pradeepkth)
Added: hadoop/pig/trunk/lib-src/bzip2/org/apache/pig/bzip2r/Bzip2TextInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/lib-src/bzip2/org/apache/pig/bzip2r/Bzip2TextInputFormat.java?rev=924034&view=auto
==============================================================================
--- hadoop/pig/trunk/lib-src/bzip2/org/apache/pig/bzip2r/Bzip2TextInputFormat.java (added)
+++ hadoop/pig/trunk/lib-src/bzip2/org/apache/pig/bzip2r/Bzip2TextInputFormat.java Tue Mar 16 22:44:16 2010
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.bzip2r;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.tools.bzip2r.CBZip2InputStream;
+
+@SuppressWarnings("unchecked")
+public class Bzip2TextInputFormat extends FileInputFormat {
+
+ /**
+ * Treats keys as offset in file and value as line. Since the input file is
+ * compressed, the offset for a particular line is not well-defined. This
+ * implementation returns the starting position of a compressed block as the
+ * key for every line in that block.
+ */
+
+ private static class BZip2LineRecordReader extends RecordReader<LongWritable,Text> {
+
+ private long start;
+
+ private long end;
+
+ private long pos;
+
+ private CBZip2InputStream in;
+
+ private ByteArrayOutputStream buffer = new ByteArrayOutputStream(256);
+
+ // flag to indicate if previous character read was Carriage Return ('\r')
+ // and the next character was not Line Feed ('\n')
+ private boolean CRFollowedByNonLF = false;
+
+ // in the case where a Carriage Return ('\r') was not followed by a
+ // Line Feed ('\n'), this variable will hold that non Line Feed character
+ // that was read from the underlying stream.
+ private byte nonLFChar;
+
+
+ /**
+ * Provide a bridge to get the bytes from the ByteArrayOutputStream without
+ * creating a new byte array.
+ */
+ private static class TextStuffer extends OutputStream {
+ public Text target;
+
+ @Override
+ public void write(int b) {
+ throw new UnsupportedOperationException("write(byte) not supported");
+ }
+
+ @Override
+ public void write(byte[] data, int offset, int len) throws IOException {
+ target.clear();
+ target.set(data, offset, len);
+ }
+ }
+
+ private TextStuffer bridge = new TextStuffer();
+
+ private LongWritable key = new LongWritable();
+ private Text value = new Text();
+
+ public BZip2LineRecordReader(Configuration job, FileSplit split)
+ throws IOException {
+ start = split.getStart();
+ end = start + split.getLength();
+ final Path file = split.getPath();
+
+ // open the file and seek to the start of the split
+ FileSystem fs = file.getFileSystem(job);
+ FSDataInputStream fileIn = fs.open(split.getPath());
+ fileIn.seek(start);
+
+ in = new CBZip2InputStream(fileIn, 9, end);
+ if (start != 0) {
+ // skip first line and re-establish "start".
+ // LineRecordReader.readLine(this.in, null);
+ readLine(this.in, null);
+ start = in.getPos();
+ }
+ pos = in.getPos();
+ }
+
+ public LongWritable createKey() {
+ return new LongWritable();
+ }
+
+ public Text createValue() {
+ return new Text();
+ }
+
+ /*
+ * LineRecordReader.readLine() is depricated in HAdoop 0.17. So it is added here
+ * locally.
+ */
+ private long readLine(InputStream in,
+ OutputStream out) throws IOException {
+ long bytes = 0;
+ while (true) {
+ int b = -1;
+ if(CRFollowedByNonLF) {
+ // In the previous call, a Carriage Return ('\r') was followed
+ // by a non Line Feed ('\n') character - in that call we would
+ // have not returned the non Line Feed character but would have
+ // read it from the stream - lets use that already read character
+ // now
+ b = nonLFChar;
+ CRFollowedByNonLF = false;
+ } else {
+ b = in.read();
+ }
+ if (b == -1) {
+ break;
+ }
+ bytes += 1;
+
+ byte c = (byte)b;
+ if (c == '\n') {
+ break;
+ }
+
+ if (c == '\r') {
+ byte nextC = (byte)in.read();
+ if (nextC != '\n') {
+ CRFollowedByNonLF = true;
+ nonLFChar = nextC;
+ } else {
+ bytes += 1;
+ }
+ break;
+ }
+
+ if (out != null) {
+ out.write(c);
+ }
+ }
+ return bytes;
+ }
+
+ /** Read a line. */
+ public boolean next(LongWritable key, Text value)
+ throws IOException {
+ if (pos > end)
+ return false;
+
+ key.set(pos); // key is position
+ buffer.reset();
+ // long bytesRead = LineRecordReader.readLine(in, buffer);
+ long bytesRead = readLine(in, buffer);
+ if (bytesRead == 0) {
+ return false;
+ }
+ pos = in.getPos();
+ // if we have read ahead because we encountered a carriage return
+ // char followed by a non line feed char, decrement the pos
+ if(CRFollowedByNonLF) {
+ pos--;
+ }
+
+ bridge.target = value;
+ buffer.writeTo(bridge);
+ return true;
+ }
+
+ /**
+ * Get the progress within the split
+ */
+ @Override
+ public float getProgress() {
+ if (start == end) {
+ return 0.0f;
+ } else {
+ return Math.min(1.0f, (pos - start) / (float) (end - start));
+ }
+ }
+
+ public long getPos() throws IOException {
+ return pos;
+ }
+
+ @Override
+ public void close() throws IOException {
+ in.close();
+ }
+
+ @Override
+ public LongWritable getCurrentKey() throws IOException,
+ InterruptedException {
+ return key;
+ }
+
+ @Override
+ public Text getCurrentValue() throws IOException, InterruptedException {
+ return value;
+ }
+
+ @Override
+ public void initialize(InputSplit split, TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ // no op
+ }
+
+ @Override
+ public boolean nextKeyValue() throws IOException, InterruptedException {
+ return next(key, value);
+ }
+
+ }
+
+ @Override
+ public RecordReader createRecordReader(InputSplit split,
+ TaskAttemptContext context) throws IOException, InterruptedException {
+ return new BZip2LineRecordReader(context.getConfiguration(),
+ (FileSplit) split);
+ }
+
+}
Modified: hadoop/pig/trunk/lib-src/bzip2/org/apache/tools/bzip2r/CBZip2InputStream.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/lib-src/bzip2/org/apache/tools/bzip2r/CBZip2InputStream.java?rev=924034&r1=924033&r2=924034&view=diff
==============================================================================
--- hadoop/pig/trunk/lib-src/bzip2/org/apache/tools/bzip2r/CBZip2InputStream.java (original)
+++ hadoop/pig/trunk/lib-src/bzip2/org/apache/tools/bzip2r/CBZip2InputStream.java Tue Mar 16 22:44:16 2010
@@ -76,10 +76,10 @@
*/
package org.apache.tools.bzip2r;
-import java.io.IOException;
import java.io.InputStream;
-
-import org.apache.pig.backend.datastorage.SeekableInputStream;
+import java.io.IOException;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.mapreduce.InputSplit;
/**
* An input stream that decompresses from the BZip2 format (without the file
@@ -127,7 +127,13 @@ public class CBZip2InputStream extends I
private boolean blockRandomised;
+ // a buffer to keep the read byte
private int bsBuff;
+
+ // since bzip is bit-aligned at block boundaries there can be a case wherein
+ // only few bits out of a read byte are consumed and the remaining bits
+ // need to be consumed while processing the next block.
+ // indicate how many bits in bsBuff have not been processed yet
private int bsLive;
private CRC mCrc = new CRC();
@@ -154,7 +160,7 @@ public class CBZip2InputStream extends I
private int[][] perm = new int[N_GROUPS][MAX_ALPHA_SIZE];
private int[] minLens = new int[N_GROUPS];
- private SeekableInputStream innerBsStream;
+ private FSDataInputStream innerBsStream;
long readLimit = Long.MAX_VALUE;
public long getReadLimit() {
return readLimit;
@@ -192,13 +198,20 @@ public class CBZip2InputStream extends I
int j2;
char z;
- // The positioning is a bit tricky. we set newPos when we start reading a new block
- // and we set retPos to newPos once we have read a character from that block.
- // see getPos() for more detail
- private long retPos, newPos = -1;
-
- public CBZip2InputStream(SeekableInputStream zStream, int blockSize) throws IOException {
- retPos = newPos = zStream.tell();
+ // see comment in getPos()
+ private long retPos = -1;
+ // the position offset which corresponds to the end of the InputSplit that
+ // will be processed by this instance
+ private long endOffsetOfSplit;
+
+ private boolean signalToStopReading;
+
+ public CBZip2InputStream(FSDataInputStream zStream, int blockSize, long end)
+ throws IOException {
+ endOffsetOfSplit = end;
+ // initialize retPos to the beginning of the current InputSplit
+ // see comments in getPos() to understand how this is used.
+ retPos = zStream.getPos();
ll8 = null;
tt = null;
checkComputedCombinedCRC = blockSize == -1;
@@ -208,19 +221,26 @@ public class CBZip2InputStream extends I
setupBlock();
}
- public CBZip2InputStream(SeekableInputStream zStream) throws IOException {
- this(zStream, -1);
+ public CBZip2InputStream(FSDataInputStream zStream) throws IOException {
+ this(zStream, -1, Long.MAX_VALUE);
}
+ @Override
public int read() throws IOException {
if (streamEnd) {
return -1;
} else {
- if (retPos < newPos) {
- retPos = newPos;
- } else {
- retPos = newPos+1;
+
+ // if we just started reading a bzip block which starts at a position
+ // >= end of current split, then we should set up retpos such that
+ // after a record is read, future getPos() calls will get a value
+ // > end of current split - this way we will read only one record out
+ // of this bzip block - the rest of the records from this bzip block
+ // should be read by the next map task while processing the next split
+ if(signalToStopReading) {
+ retPos = endOffsetOfSplit + 1;
}
+
int retChar = currentChar;
switch(currentState) {
case START_BLOCK_STATE:
@@ -249,12 +269,17 @@ public class CBZip2InputStream extends I
}
/**
- * This is supposed to approximate the position in the underlying stream. However,
- * with compression, the underlying stream position is very vague. One position may
- * have multiple positions and visa versa. So we do something very subtle:
- * The position of the first byte of a compressed block will have the position of
- * the block header at the start of the block. Every byte after the first byte will
- * be one plus the position of the block header.
+ * getPos is used by the caller to know when the processing of the current
+ * {@link InputSplit} is complete. In this method, as we read each bzip
+ * block, we keep returning the beginning of the {@link InputSplit} as the
+ * return value until we hit a block which starts at a position >= end of
+ * current split. At that point we should set up retpos such that after a
+ * record is read, future getPos() calls will get a value > end of current
+ * split - this way we will read only one record out of that bzip block -
+ * the rest of the records from that bzip block should be read by the next
+ * map task while processing the next split
+ * @return
+ * @throws IOException
*/
public long getPos() throws IOException{
return retPos;
@@ -291,8 +316,9 @@ public class CBZip2InputStream extends I
streamEnd = true;
return;
}
-
- newPos = innerBsStream.tell();
+
+ // position before beginning of bzip block header
+ long pos = innerBsStream.getPos();
if (!searchForMagic) {
char magic1, magic2, magic3, magic4;
char magic5, magic6;
@@ -324,13 +350,31 @@ public class CBZip2InputStream extends I
magic <<= 1;
magic &= mask;
magic |= bsR(1);
+ // if we just found the block header, the beginning of the bzip
+ // header would be 6 bytes before the current stream position
+ // when we eventually break from this while(), if it is because
+ // we found a block header then pos will have the correct start
+ // of header position
+ pos = innerBsStream.getPos() - 6;
}
if (magic == eos) {
complete();
return;
}
+
+ }
+ // if the previous block finished a few bits into the previous byte,
+ // then we will first be reading the remaining bits from the previous
+ // byte - so logically pos needs to be one behind
+ if(bsLive > 0) {
+ pos--;
}
+ if(pos >= endOffsetOfSplit) {
+ // we have reached a block which begins exactly at the next InputSplit
+ // or >1 byte into the next InputSplit - lets record this fact
+ signalToStopReading = true;
+ }
storedBlockCRC = bsGetInt32();
if (bsR(1) == 1) {
@@ -389,7 +433,7 @@ public class CBZip2InputStream extends I
}
}
- private void bsSetStream(SeekableInputStream f) {
+ private void bsSetStream(FSDataInputStream f) {
innerBsStream = f;
bsLive = 0;
bsBuff = 0;
Modified: hadoop/pig/trunk/src/org/apache/pig/builtin/PigStorage.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/builtin/PigStorage.java?rev=924034&r1=924033&r2=924034&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/builtin/PigStorage.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/builtin/PigStorage.java Tue Mar 16 22:44:16 2010
@@ -32,6 +32,7 @@ import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.compress.BZip2Codec;
import org.apache.hadoop.io.compress.GzipCodec;
+import org.apache.pig.bzip2r.Bzip2TextInputFormat;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.OutputFormat;
@@ -74,6 +75,7 @@ LoadPushDown {
private ArrayList<Object> mProtoTuple = null;
private TupleFactory mTupleFactory = TupleFactory.getInstance();
private static final int BUFFER_SIZE = 1024;
+ private String loadLocation;
public PigStorage() {
}
@@ -226,7 +228,11 @@ LoadPushDown {
@Override
public InputFormat getInputFormat() {
- return new TextInputFormat();
+ if(loadLocation.endsWith("bz2") || loadLocation.endsWith("bz")) {
+ return new Bzip2TextInputFormat();
+ } else {
+ return new TextInputFormat();
+ }
}
@Override
@@ -237,6 +243,7 @@ LoadPushDown {
@Override
public void setLocation(String location, Job job)
throws IOException {
+ loadLocation = location;
FileInputFormat.setInputPaths(job, location);
}
@@ -254,7 +261,7 @@ LoadPushDown {
public void setStoreLocation(String location, Job job) throws IOException {
job.getConfiguration().set("mapred.textoutputformat.separator", "");
FileOutputFormat.setOutputPath(job, new Path(location));
- if (location.endsWith(".bz2")) {
+ if (location.endsWith(".bz2") || location.endsWith("bz")) {
FileOutputFormat.setCompressOutput(job, true);
FileOutputFormat.setOutputCompressorClass(job, BZip2Codec.class);
} else if (location.endsWith(".gz")) {
Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestBZip.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestBZip.java?rev=924034&r1=924033&r2=924034&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestBZip.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestBZip.java Tue Mar 16 22:44:16 2010
@@ -25,13 +25,22 @@ import java.io.FileOutputStream;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
+import java.util.Properties;
+import java.util.Map.Entry;
+import junit.framework.Assert;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.pig.ExecType;
import org.apache.pig.PigServer;
-import org.apache.pig.test.utils.LocalSeekableInputStream;
-import org.apache.pig.backend.datastorage.ElementDescriptor;
+import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.PigContext;
import org.apache.tools.bzip2r.CBZip2InputStream;
import org.apache.tools.bzip2r.CBZip2OutputStream;
import org.junit.Test;
@@ -68,15 +77,10 @@ public class TestBZip {
+ "';");
pig.registerQuery("A = foreach (group (filter AA by $0 > 0) all) generate flatten($1);");
pig.registerQuery("store A into '" + out.getAbsolutePath() + "';");
-
- File dir = new File("testbzip");
- deleteFiles(dir);
-
- processCopyToLocal(pig, out.getAbsolutePath(), dir.getAbsolutePath());
-
- LocalSeekableInputStream is = new LocalSeekableInputStream(
- new File(dir.getAbsolutePath() + "/part-r-00000.bz2"));
-
+ FileSystem fs = FileSystem.get(ConfigurationUtil.toConfiguration(
+ pig.getPigContext().getProperties()));
+ FSDataInputStream is = fs.open(new Path(out.getAbsolutePath() +
+ "/part-r-00000.bz2"));
CBZip2InputStream cis = new CBZip2InputStream(is);
// Just a sanity check, to make sure it was a bzip file; we
@@ -101,8 +105,67 @@ public class TestBZip {
in.delete();
out.delete();
+ }
+
+ /**
+ * Tests that '\n', '\r' and '\r\n' are treated as record delims when using
+ * bzip just like they are when using uncompressed text
+ */
+ @Test
+ public void testRecordDelims() throws Exception {
+ String[] inputData = new String[] {
+ "1\t2\r3\t4", // '\r' case - this will be split into two tuples
+ "5\t6\r", // '\r\n' case
+ "7\t8", // '\n' case
+ "9\t10\r" // '\r\n' at the end of file
+ };
+
+ // bzip compressed input
+ File in = File.createTempFile("junit", ".bz2");
+ String compressedInputFileName = in.getAbsolutePath();
+ in.deleteOnExit();
+ String unCompressedInputFileName = "testRecordDelims-uncomp.txt";
+ Util.createInputFile(cluster, unCompressedInputFileName, inputData);
+
+ try {
+ CBZip2OutputStream cos =
+ new CBZip2OutputStream(new FileOutputStream(in));
+ for (int i = 0; i < inputData.length; i++) {
+ StringBuffer sb = new StringBuffer();
+ sb.append(inputData[i]).append("\n");
+ byte bytes[] = sb.toString().getBytes();
+ cos.write(bytes);
+ }
+ cos.close();
+
+ Util.copyFromLocalToCluster(cluster, compressedInputFileName,
+ compressedInputFileName);
+
+ // pig script to read uncompressed input
+ String script = "a = load '" + unCompressedInputFileName +"';";
+ PigServer pig = new PigServer(ExecType.MAPREDUCE, cluster
+ .getProperties());
+ pig.registerQuery(script);
+ Iterator<Tuple> it1 = pig.openIterator("a");
+
+ // pig script to read compressed input
+ script = "a = load '" + compressedInputFileName +"';";
+ pig.registerQuery(script);
+ Iterator<Tuple> it2 = pig.openIterator("a");
+
+ while(it1.hasNext()) {
+ Tuple t1 = it1.next();
+ Tuple t2 = it2.next();
+ Assert.assertEquals(t1, t2);
+ }
+
+ Assert.assertFalse(it2.hasNext());
+
+ } finally {
+ in.delete();
+ Util.deleteFile(cluster, unCompressedInputFileName);
+ }
- deleteFiles(dir);
}
/**
@@ -130,15 +193,10 @@ public class TestBZip {
+ "';");
pig.registerQuery("A=foreach (group (filter AA by $0 < '0') all) generate flatten($1);");
pig.registerQuery("store A into '" + out.getAbsolutePath() + "';");
-
- File dir = new File("testbzip2");
- deleteFiles(dir);
-
- processCopyToLocal(pig, out.getAbsolutePath(), dir.getAbsolutePath());
-
- LocalSeekableInputStream is = new LocalSeekableInputStream(
- new File(dir.getAbsolutePath() + "/part-r-00000.bz2"));
-
+ FileSystem fs = FileSystem.get(ConfigurationUtil.toConfiguration(
+ pig.getPigContext().getProperties()));
+ FSDataInputStream is = fs.open(new Path(out.getAbsolutePath() +
+ "/part-r-00000.bz2"));
CBZip2InputStream cis = new CBZip2InputStream(is);
// Just a sanity check, to make sure it was a bzip file; we
@@ -152,7 +210,6 @@ public class TestBZip {
in.delete();
out.delete();
- deleteFiles(dir);
}
/**
@@ -166,32 +223,86 @@ public class TestBZip {
tmp));
cos.close();
assertNotSame(0, tmp.length());
+ FileSystem fs = FileSystem.getLocal(new Configuration(false));
CBZip2InputStream cis = new CBZip2InputStream(
- new LocalSeekableInputStream(tmp));
+ fs.open(new Path(tmp.getAbsolutePath())));
assertEquals(-1, cis.read(new byte[100]));
cis.close();
tmp.delete();
}
- private void processCopyToLocal(PigServer pig, String src, String dst)
- throws IOException {
-
- ElementDescriptor srcPath = pig.getPigContext().getDfs().asElement(src);
- ElementDescriptor dstPath = pig.getPigContext().getLfs().asElement(dst);
-
- srcPath.copy(dstPath, false);
+ /**
+ * Tests the case where a bzip block ends exactly at the end of the {@link InputSplit}
+ * with the block header ending a few bits into the last byte of current
+ * InputSplit. This case results in dropped records in Pig 0.6 release
+ */
+ @Test
+ public void testBlockHeaderEndingAtSplitNotByteAligned() throws IOException {
+ String inputFileName =
+ "test/org/apache/pig/test/data/recordLossblockHeaderEndsAt136500.txt.bz2";
+ Long expectedCount = 74999L; // number of lines in above file
+ // the first block in the above file exactly ends a few bits into the
+ // byte at position 136500
+ int splitSize = 136500;
+ Util.copyFromLocalToCluster(cluster, inputFileName, inputFileName);
+ testCount(inputFileName, expectedCount, splitSize);
}
- private void deleteFiles(File file) {
- if (!file.exists()) return;
-
- if (file.isDirectory()) {
- File[] files = file.listFiles();
- for (File f : files) {
- deleteFiles(f);
+ /**
+ * Tests the case where a bzip block ends exactly at the end of the input
+ * split (byte aligned with the last byte) and the last byte is a carriage
+ * return.
+ */
+ @Test
+ public void testBlockHeaderEndingWithCR() throws IOException {
+ String inputFileName =
+ "test/org/apache/pig/test/data/blockEndingInCR.txt.bz2";
+ // number of lines in above file (the value is 1 more than bzcat | wc -l
+ // since there is a '\r' which is also treated as a record delim
+ Long expectedCount = 82094L;
+ // the first block in the above file exactly ends at the byte at
+ // position 136498 and the last byte is a carriage return ('\r')
+ int splitSize = 136498;
+ Util.copyFromLocalToCluster(cluster, inputFileName, inputFileName);
+ testCount(inputFileName, expectedCount, splitSize);
+ }
+
+ /**
+ * Tests the case where a bzip block ends exactly at the end of the input
+ * split and has more data which results in overcounting (record duplication)
+ * in Pig 0.6
+ */
+ @Test
+ public void testBlockHeaderEndingAtSplitOverCounting() throws IOException {
+ String inputFileName =
+ "test/org/apache/pig/test/data/blockHeaderEndsAt136500.txt.bz2";
+ Long expectedCount = 1041046L; // number of lines in above file
+ // the first block in the above file exactly ends a few bits into the
+ // byte at position 136500
+ int splitSize = 136500;
+ Util.copyFromLocalToCluster(cluster, inputFileName, inputFileName);
+ testCount(inputFileName, expectedCount, splitSize);
+ }
+
+ private void testCount(String inputFileName, Long expectedCount,
+ int splitSize) throws IOException {
+ try {
+ String script = "a = load '" + inputFileName + "';" +
+ "b = group a all;" +
+ "c = foreach b generate COUNT_STAR(a);";
+ Properties props = new Properties();
+ for (Entry<Object, Object> entry : cluster.getProperties().entrySet()) {
+ props.put(entry.getKey(), entry.getValue());
}
+ props.setProperty("mapred.max.split.size", Integer.toString(splitSize));
+ PigContext pigContext = new PigContext(ExecType.MAPREDUCE, props);
+ PigServer pig = new PigServer(pigContext);
+ Util.registerMultiLineQuery(pig, script);
+ Iterator<Tuple> it = pig.openIterator("c");
+ Long result = (Long) it.next().get(0);
+ assertEquals(expectedCount, result);
+ } finally {
+ Util.deleteFile(cluster, inputFileName);
}
- System.out.println("delete file: " + file.getAbsolutePath()
- + " : " + file.delete());
}
}
Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestMapReduce.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestMapReduce.java?rev=924034&r1=924033&r2=924034&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestMapReduce.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestMapReduce.java Tue Mar 16 22:44:16 2010
@@ -26,6 +26,8 @@ import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.PrintStream;
import java.util.Iterator;
+import java.util.Properties;
+import java.util.Map.Entry;
import junit.framework.TestCase;
@@ -110,8 +112,13 @@ public class TestMapReduce extends TestC
public void testBZip2Aligned() throws Throwable {
int offsets[] = { 219642, 219643, 219644, 552019, 552020 };
for(int i = 1; i < offsets.length; i ++) {
- System.setProperty("pig.overrideBlockSize", Integer.toString(offsets[i]));
- PigContext pigContext = new PigContext(ExecType.MAPREDUCE, cluster.getProperties());
+
+ Properties props = new Properties();
+ for (Entry<Object, Object> entry : cluster.getProperties().entrySet()) {
+ props.put(entry.getKey(), entry.getValue());
+ }
+ props.setProperty("mapred.max.split.size", Integer.toString(offsets[i]));
+ PigContext pigContext = new PigContext(ExecType.MAPREDUCE, props);
PigServer pig = new PigServer(pigContext);
pig.registerQuery("a = load '"
+ Util.generateURI(
Modified: hadoop/pig/trunk/test/org/apache/pig/test/Util.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/Util.java?rev=924034&r1=924033&r2=924034&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/Util.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/Util.java Tue Mar 16 22:44:16 2010
@@ -32,6 +32,7 @@ import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.io.PrintStream;
import java.io.PrintWriter;
+import java.io.StringReader;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
@@ -67,6 +68,8 @@ import org.apache.pig.impl.logicalLayer.
import org.apache.pig.impl.logicalLayer.parser.ParseException;
import org.apache.pig.impl.logicalLayer.parser.QueryParser;
import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.tools.grunt.Grunt;
+import org.apache.pig.tools.grunt.GruntParser;
public class Util {
private static BagFactory mBagFactory = BagFactory.getInstance();
@@ -343,13 +346,17 @@ public class Util {
* @throws IOException
*/
static public void copyFromLocalToCluster(MiniCluster cluster, String localFileName, String fileNameOnCluster) throws IOException {
- BufferedReader reader = new BufferedReader(new FileReader(localFileName));
- String line = null;
- List<String> contents = new ArrayList<String>();
- while((line = reader.readLine()) != null) {
- contents.add(line);
- }
- Util.createInputFile(cluster, fileNameOnCluster, contents.toArray(new String[0]));
+ PigServer ps = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+ String script = "fs -put " + localFileName + " " + fileNameOnCluster;
+
+ GruntParser parser = new GruntParser(new StringReader(script));
+ parser.setInteractive(false);
+ parser.setParams(ps);
+ try {
+ parser.parseStopOnError();
+ } catch (org.apache.pig.tools.pigscript.parser.ParseException e) {
+ throw new IOException(e);
+ }
}
static public void copyFromClusterToLocal(MiniCluster cluster, String fileNameOnCluster, String localFileName) throws IOException {
Added: hadoop/pig/trunk/test/org/apache/pig/test/data/blockEndingInCR.txt.bz2
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/data/blockEndingInCR.txt.bz2?rev=924034&view=auto
==============================================================================
Binary file - no diff available.
Propchange: hadoop/pig/trunk/test/org/apache/pig/test/data/blockEndingInCR.txt.bz2
------------------------------------------------------------------------------
svn:mime-type = application/octet-stream
Added: hadoop/pig/trunk/test/org/apache/pig/test/data/blockHeaderEndsAt136500.txt.bz2
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/data/blockHeaderEndsAt136500.txt.bz2?rev=924034&view=auto
==============================================================================
Binary file - no diff available.
Propchange: hadoop/pig/trunk/test/org/apache/pig/test/data/blockHeaderEndsAt136500.txt.bz2
------------------------------------------------------------------------------
svn:mime-type = application/octet-stream
Added: hadoop/pig/trunk/test/org/apache/pig/test/data/recordLossblockHeaderEndsAt136500.txt.bz2
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/data/recordLossblockHeaderEndsAt136500.txt.bz2?rev=924034&view=auto
==============================================================================
Binary file - no diff available.
Propchange: hadoop/pig/trunk/test/org/apache/pig/test/data/recordLossblockHeaderEndsAt136500.txt.bz2
------------------------------------------------------------------------------
svn:mime-type = application/octet-stream