You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ha...@apache.org on 2018/02/16 22:00:43 UTC

[21/21] hadoop git commit: HADOOP-15206. BZip2 drops and duplicates records when input split size is small. Contributed by Aki Tanaka

HADOOP-15206. BZip2 drops and duplicates records when input split size is small. Contributed by Aki Tanaka


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/0898ff42
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/0898ff42
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/0898ff42

Branch: refs/heads/HDFS-12996
Commit: 0898ff42e9e5c53f2fce7ccdeb4e1cd7d0f123b3
Parents: 4c2119f
Author: Jason Lowe <jl...@apache.org>
Authored: Fri Feb 16 14:49:00 2018 -0600
Committer: Jason Lowe <jl...@apache.org>
Committed: Fri Feb 16 14:49:00 2018 -0600

----------------------------------------------------------------------
 .../apache/hadoop/io/compress/BZip2Codec.java   | 30 +++++++++++++++++++-
 .../hadoop/mapred/TestTextInputFormat.java      |  8 ++++++
 2 files changed, 37 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/0898ff42/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/BZip2Codec.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/BZip2Codec.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/BZip2Codec.java
index db78118..3c78cfc 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/BZip2Codec.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/BZip2Codec.java
@@ -362,9 +362,29 @@ public class BZip2Codec implements Configurable, SplittableCompressionCodec {
       bufferedIn = new BufferedInputStream(super.in);
       this.startingPos = super.getPos();
       this.readMode = readMode;
+      long numSkipped = 0;
       if (this.startingPos == 0) {
         // We only strip header if it is start of file
         bufferedIn = readStreamHeader();
+      } else if (this.readMode == READ_MODE.BYBLOCK  &&
+          this.startingPos <= HEADER_LEN + SUB_HEADER_LEN) {
+        // When we're in BYBLOCK mode and the start position is >=0
+        // and < HEADER_LEN + SUB_HEADER_LEN, we should skip to after
+        // start of the first bz2 block to avoid duplicated records
+        numSkipped = HEADER_LEN + SUB_HEADER_LEN + 1 - this.startingPos;
+        long skipBytes = numSkipped;
+        while (skipBytes > 0) {
+          long s = bufferedIn.skip(skipBytes);
+          if (s > 0) {
+            skipBytes -= s;
+          } else {
+            if (bufferedIn.read() == -1) {
+              break; // end of the split
+            } else {
+              skipBytes--;
+            }
+          }
+        }
       }
       input = new CBZip2InputStream(bufferedIn, readMode);
       if (this.isHeaderStripped) {
@@ -375,7 +395,15 @@ public class BZip2Codec implements Configurable, SplittableCompressionCodec {
         input.updateReportedByteCount(SUB_HEADER_LEN);
       }
 
-      this.updatePos(false);
+      if (numSkipped > 0) {
+        input.updateReportedByteCount((int) numSkipped);
+      }
+
+      // To avoid dropped records, not advertising a new byte position
+      // when we are in BYBLOCK mode and the start position is 0
+      if (!(this.readMode == READ_MODE.BYBLOCK && this.startingPos == 0)) {
+        this.updatePos(false);
+      }
     }
 
     private BufferedInputStream readStreamHeader() throws IOException {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0898ff42/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestTextInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestTextInputFormat.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestTextInputFormat.java
index 0ea1d6d..22d9a57 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestTextInputFormat.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestTextInputFormat.java
@@ -183,6 +183,14 @@ public class TestTextInputFormat {
     // corner case when we have byte alignment and position of stream are same
     verifyPartitions(471507, 218, file, codec, conf);
     verifyPartitions(473608, 110, file, codec, conf);
+
+    // corner case when split size is small and position of stream is before
+    // the first BZip2 block
+    verifyPartitions(100, 20, file, codec, conf);
+    verifyPartitions(100, 25, file, codec, conf);
+    verifyPartitions(100, 30, file, codec, conf);
+    verifyPartitions(100, 50, file, codec, conf);
+    verifyPartitions(100, 100, file, codec, conf);
   }
 
   // Test a corner case when position of stream is right after BZip2 marker


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org