You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by aa...@apache.org on 2016/06/14 01:23:55 UTC
hadoop git commit: HADOOP-13270. BZip2CompressionInputStream finds
the same compression marker twice in corner case,
causing duplicate data blocks. Contributed by Kai Sasaki.
Repository: hadoop
Updated Branches:
refs/heads/trunk 709a814fe -> e3ba9ad3f
HADOOP-13270. BZip2CompressionInputStream finds the same compression marker twice in corner case, causing duplicate data blocks. Contributed by Kai Sasaki.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/e3ba9ad3
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/e3ba9ad3
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/e3ba9ad3
Branch: refs/heads/trunk
Commit: e3ba9ad3f116306910f74645ded91506345b9f6e
Parents: 709a814
Author: Akira Ajisaka <aa...@apache.org>
Authored: Tue Jun 14 10:18:17 2016 +0900
Committer: Akira Ajisaka <aa...@apache.org>
Committed: Tue Jun 14 10:18:41 2016 +0900
----------------------------------------------------------------------
.../apache/hadoop/io/compress/BZip2Codec.java | 7 +-
.../hadoop/mapred/TestTextInputFormat.java | 108 ++++++++++---------
2 files changed, 63 insertions(+), 52 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e3ba9ad3/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/BZip2Codec.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/BZip2Codec.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/BZip2Codec.java
index 2c5a7be..49dd9c1 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/BZip2Codec.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/BZip2Codec.java
@@ -207,7 +207,12 @@ public class BZip2Codec implements Configurable, SplittableCompressionCodec {
// time stream might start without a leading BZ.
final long FIRST_BZIP2_BLOCK_MARKER_POSITION =
CBZip2InputStream.numberOfBytesTillNextMarker(seekableIn);
- long adjStart = Math.max(0L, start - FIRST_BZIP2_BLOCK_MARKER_POSITION);
+ long adjStart = 0L;
+ if (start != 0) {
+ // Other than the first of file, the marker size is 6 bytes.
+ adjStart = Math.max(0L, start - (FIRST_BZIP2_BLOCK_MARKER_POSITION
+ - (HEADER_LEN + SUB_HEADER_LEN)));
+ }
((Seekable)seekableIn).seek(adjStart);
SplitCompressionInputStream in =
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e3ba9ad3/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestTextInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestTextInputFormat.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestTextInputFormat.java
index b833b60..5106c38 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestTextInputFormat.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestTextInputFormat.java
@@ -175,69 +175,75 @@ public class TestTextInputFormat {
for (int length = MAX_LENGTH / 2; length < MAX_LENGTH;
length += random.nextInt(MAX_LENGTH / 4)+1) {
- LOG.info("creating; entries = " + length);
-
-
- // create a file with length entries
- Writer writer =
- new OutputStreamWriter(codec.createOutputStream(localFs.create(file)));
- try {
- for (int i = 0; i < length; i++) {
- writer.write(Integer.toString(i));
- writer.write("\n");
- }
- } finally {
- writer.close();
+ for (int i = 0; i < 3; i++) {
+ int numSplits = random.nextInt(MAX_LENGTH / 2000) + 1;
+ verifyPartitions(length, numSplits, file, codec, conf);
}
+ }
- // try splitting the file in a variety of sizes
- TextInputFormat format = new TextInputFormat();
- format.configure(conf);
- LongWritable key = new LongWritable();
- Text value = new Text();
- for (int i = 0; i < 3; i++) {
- int numSplits = random.nextInt(MAX_LENGTH/2000)+1;
- LOG.info("splitting: requesting = " + numSplits);
- InputSplit[] splits = format.getSplits(conf, numSplits);
- LOG.info("splitting: got = " + splits.length);
+ // corner case when we have byte alignment and position of stream are same
+ verifyPartitions(471507, 218, file, codec, conf);
+ verifyPartitions(473608, 110, file, codec, conf);
+ }
+ private void verifyPartitions(int length, int numSplits, Path file,
+ CompressionCodec codec, JobConf conf) throws IOException {
+ LOG.info("creating; entries = " + length);
- // check each split
- BitSet bits = new BitSet(length);
- for (int j = 0; j < splits.length; j++) {
- LOG.debug("split["+j+"]= " + splits[j]);
- RecordReader<LongWritable, Text> reader =
- format.getRecordReader(splits[j], conf, reporter);
- try {
- int counter = 0;
- while (reader.next(key, value)) {
- int v = Integer.parseInt(value.toString());
- LOG.debug("read " + v);
- if (bits.get(v)) {
- LOG.warn("conflict with " + v +
+ // create a file with length entries
+ Writer writer =
+ new OutputStreamWriter(codec.createOutputStream(localFs.create(file)));
+ try {
+ for (int i = 0; i < length; i++) {
+ writer.write(Integer.toString(i));
+ writer.write("\n");
+ }
+ } finally {
+ writer.close();
+ }
+
+ // try splitting the file in a variety of sizes
+ TextInputFormat format = new TextInputFormat();
+ format.configure(conf);
+ LongWritable key = new LongWritable();
+ Text value = new Text();
+ LOG.info("splitting: requesting = " + numSplits);
+ InputSplit[] splits = format.getSplits(conf, numSplits);
+ LOG.info("splitting: got = " + splits.length);
+
+
+ // check each split
+ BitSet bits = new BitSet(length);
+ for (int j = 0; j < splits.length; j++) {
+ LOG.debug("split["+j+"]= " + splits[j]);
+ RecordReader<LongWritable, Text> reader =
+ format.getRecordReader(splits[j], conf, Reporter.NULL);
+ try {
+ int counter = 0;
+ while (reader.next(key, value)) {
+ int v = Integer.parseInt(value.toString());
+ LOG.debug("read " + v);
+ if (bits.get(v)) {
+ LOG.warn("conflict with " + v +
" in split " + j +
" at position "+reader.getPos());
- }
- assertFalse("Key in multiple partitions.", bits.get(v));
- bits.set(v);
- counter++;
- }
- if (counter > 0) {
- LOG.info("splits["+j+"]="+splits[j]+" count=" + counter);
- } else {
- LOG.debug("splits["+j+"]="+splits[j]+" count=" + counter);
- }
- } finally {
- reader.close();
}
+ assertFalse("Key in multiple partitions.", bits.get(v));
+ bits.set(v);
+ counter++;
}
- assertEquals("Some keys in no partition.", length, bits.cardinality());
+ if (counter > 0) {
+ LOG.info("splits["+j+"]="+splits[j]+" count=" + counter);
+ } else {
+ LOG.debug("splits["+j+"]="+splits[j]+" count=" + counter);
+ }
+ } finally {
+ reader.close();
}
-
}
-
+ assertEquals("Some keys in no partition.", length, bits.cardinality());
}
private static LineReader makeStream(String str) throws IOException {
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org