You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ha...@apache.org on 2018/02/16 22:00:43 UTC
[21/21] hadoop git commit: HADOOP-15206. BZip2 drops and duplicates
records when input split size is small. Contributed by Aki Tanaka
HADOOP-15206. BZip2 drops and duplicates records when input split size is small. Contributed by Aki Tanaka
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/0898ff42
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/0898ff42
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/0898ff42
Branch: refs/heads/HDFS-12996
Commit: 0898ff42e9e5c53f2fce7ccdeb4e1cd7d0f123b3
Parents: 4c2119f
Author: Jason Lowe <jl...@apache.org>
Authored: Fri Feb 16 14:49:00 2018 -0600
Committer: Jason Lowe <jl...@apache.org>
Committed: Fri Feb 16 14:49:00 2018 -0600
----------------------------------------------------------------------
.../apache/hadoop/io/compress/BZip2Codec.java | 30 +++++++++++++++++++-
.../hadoop/mapred/TestTextInputFormat.java | 8 ++++++
2 files changed, 37 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0898ff42/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/BZip2Codec.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/BZip2Codec.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/BZip2Codec.java
index db78118..3c78cfc 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/BZip2Codec.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/BZip2Codec.java
@@ -362,9 +362,29 @@ public class BZip2Codec implements Configurable, SplittableCompressionCodec {
bufferedIn = new BufferedInputStream(super.in);
this.startingPos = super.getPos();
this.readMode = readMode;
+ long numSkipped = 0;
if (this.startingPos == 0) {
// We only strip header if it is start of file
bufferedIn = readStreamHeader();
+ } else if (this.readMode == READ_MODE.BYBLOCK &&
+ this.startingPos <= HEADER_LEN + SUB_HEADER_LEN) {
+ // When we're in BYBLOCK mode and the start position is >=0
+ // and < HEADER_LEN + SUB_HEADER_LEN, we should skip to after
+ // start of the first bz2 block to avoid duplicated records
+ numSkipped = HEADER_LEN + SUB_HEADER_LEN + 1 - this.startingPos;
+ long skipBytes = numSkipped;
+ while (skipBytes > 0) {
+ long s = bufferedIn.skip(skipBytes);
+ if (s > 0) {
+ skipBytes -= s;
+ } else {
+ if (bufferedIn.read() == -1) {
+ break; // end of the split
+ } else {
+ skipBytes--;
+ }
+ }
+ }
}
input = new CBZip2InputStream(bufferedIn, readMode);
if (this.isHeaderStripped) {
@@ -375,7 +395,15 @@ public class BZip2Codec implements Configurable, SplittableCompressionCodec {
input.updateReportedByteCount(SUB_HEADER_LEN);
}
- this.updatePos(false);
+ if (numSkipped > 0) {
+ input.updateReportedByteCount((int) numSkipped);
+ }
+
+ // To avoid dropped records, not advertising a new byte position
+ // when we are in BYBLOCK mode and the start position is 0
+ if (!(this.readMode == READ_MODE.BYBLOCK && this.startingPos == 0)) {
+ this.updatePos(false);
+ }
}
private BufferedInputStream readStreamHeader() throws IOException {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0898ff42/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestTextInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestTextInputFormat.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestTextInputFormat.java
index 0ea1d6d..22d9a57 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestTextInputFormat.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestTextInputFormat.java
@@ -183,6 +183,14 @@ public class TestTextInputFormat {
// corner case when we have byte alignment and position of stream are same
verifyPartitions(471507, 218, file, codec, conf);
verifyPartitions(473608, 110, file, codec, conf);
+
+ // corner case when split size is small and position of stream is before
+ // the first BZip2 block
+ verifyPartitions(100, 20, file, codec, conf);
+ verifyPartitions(100, 25, file, codec, conf);
+ verifyPartitions(100, 30, file, codec, conf);
+ verifyPartitions(100, 50, file, codec, conf);
+ verifyPartitions(100, 100, file, codec, conf);
}
// Test a corner case when position of stream is right after BZip2 marker
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org