You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by aa...@apache.org on 2016/06/14 01:23:55 UTC

hadoop git commit: HADOOP-13270. BZip2CompressionInputStream finds the same compression marker twice in corner case, causing duplicate data blocks. Contributed by Kai Sasaki.

Repository: hadoop
Updated Branches:
  refs/heads/trunk 709a814fe -> e3ba9ad3f


HADOOP-13270. BZip2CompressionInputStream finds the same compression marker twice in corner case, causing duplicate data blocks. Contributed by Kai Sasaki.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/e3ba9ad3
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/e3ba9ad3
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/e3ba9ad3

Branch: refs/heads/trunk
Commit: e3ba9ad3f116306910f74645ded91506345b9f6e
Parents: 709a814
Author: Akira Ajisaka <aa...@apache.org>
Authored: Tue Jun 14 10:18:17 2016 +0900
Committer: Akira Ajisaka <aa...@apache.org>
Committed: Tue Jun 14 10:18:41 2016 +0900

----------------------------------------------------------------------
 .../apache/hadoop/io/compress/BZip2Codec.java   |   7 +-
 .../hadoop/mapred/TestTextInputFormat.java      | 108 ++++++++++---------
 2 files changed, 63 insertions(+), 52 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/e3ba9ad3/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/BZip2Codec.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/BZip2Codec.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/BZip2Codec.java
index 2c5a7be..49dd9c1 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/BZip2Codec.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/BZip2Codec.java
@@ -207,7 +207,12 @@ public class BZip2Codec implements Configurable, SplittableCompressionCodec {
     // time stream might start without a leading BZ.
     final long FIRST_BZIP2_BLOCK_MARKER_POSITION =
       CBZip2InputStream.numberOfBytesTillNextMarker(seekableIn);
-    long adjStart = Math.max(0L, start - FIRST_BZIP2_BLOCK_MARKER_POSITION);
+    long adjStart = 0L;
+    if (start != 0) {
+      // Other than the first of file, the marker size is 6 bytes.
+      adjStart = Math.max(0L, start - (FIRST_BZIP2_BLOCK_MARKER_POSITION
+          - (HEADER_LEN + SUB_HEADER_LEN)));
+    }
 
     ((Seekable)seekableIn).seek(adjStart);
     SplitCompressionInputStream in =

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e3ba9ad3/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestTextInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestTextInputFormat.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestTextInputFormat.java
index b833b60..5106c38 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestTextInputFormat.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestTextInputFormat.java
@@ -175,69 +175,75 @@ public class TestTextInputFormat {
     for (int length = MAX_LENGTH / 2; length < MAX_LENGTH;
         length += random.nextInt(MAX_LENGTH / 4)+1) {
 
-      LOG.info("creating; entries = " + length);
-
-
-      // create a file with length entries
-      Writer writer =
-        new OutputStreamWriter(codec.createOutputStream(localFs.create(file)));
-      try {
-        for (int i = 0; i < length; i++) {
-          writer.write(Integer.toString(i));
-          writer.write("\n");
-        }
-      } finally {
-        writer.close();
+      for (int i = 0; i < 3; i++) {
+        int numSplits = random.nextInt(MAX_LENGTH / 2000) + 1;
+        verifyPartitions(length, numSplits, file, codec, conf);
       }
+    }
 
-      // try splitting the file in a variety of sizes
-      TextInputFormat format = new TextInputFormat();
-      format.configure(conf);
-      LongWritable key = new LongWritable();
-      Text value = new Text();
-      for (int i = 0; i < 3; i++) {
-        int numSplits = random.nextInt(MAX_LENGTH/2000)+1;
-        LOG.info("splitting: requesting = " + numSplits);
-        InputSplit[] splits = format.getSplits(conf, numSplits);
-        LOG.info("splitting: got =        " + splits.length);
+    // corner case when we have byte alignment and position of stream are same
+    verifyPartitions(471507, 218, file, codec, conf);
+    verifyPartitions(473608, 110, file, codec, conf);
+  }
 
+  private void verifyPartitions(int length, int numSplits, Path file,
+      CompressionCodec codec, JobConf conf) throws IOException {
 
+    LOG.info("creating; entries = " + length);
 
-        // check each split
-        BitSet bits = new BitSet(length);
-        for (int j = 0; j < splits.length; j++) {
-          LOG.debug("split["+j+"]= " + splits[j]);
-          RecordReader<LongWritable, Text> reader =
-            format.getRecordReader(splits[j], conf, reporter);
-          try {
-            int counter = 0;
-            while (reader.next(key, value)) {
-              int v = Integer.parseInt(value.toString());
-              LOG.debug("read " + v);
 
-              if (bits.get(v)) {
-                LOG.warn("conflict with " + v +
+    // create a file with length entries
+    Writer writer =
+        new OutputStreamWriter(codec.createOutputStream(localFs.create(file)));
+    try {
+      for (int i = 0; i < length; i++) {
+        writer.write(Integer.toString(i));
+        writer.write("\n");
+      }
+    } finally {
+      writer.close();
+    }
+
+    // try splitting the file in a variety of sizes
+    TextInputFormat format = new TextInputFormat();
+    format.configure(conf);
+    LongWritable key = new LongWritable();
+    Text value = new Text();
+    LOG.info("splitting: requesting = " + numSplits);
+    InputSplit[] splits = format.getSplits(conf, numSplits);
+    LOG.info("splitting: got =        " + splits.length);
+
+
+    // check each split
+    BitSet bits = new BitSet(length);
+    for (int j = 0; j < splits.length; j++) {
+      LOG.debug("split["+j+"]= " + splits[j]);
+      RecordReader<LongWritable, Text> reader =
+              format.getRecordReader(splits[j], conf, Reporter.NULL);
+      try {
+        int counter = 0;
+        while (reader.next(key, value)) {
+          int v = Integer.parseInt(value.toString());
+          LOG.debug("read " + v);
+          if (bits.get(v)) {
+            LOG.warn("conflict with " + v +
                     " in split " + j +
                     " at position "+reader.getPos());
-              }
-              assertFalse("Key in multiple partitions.", bits.get(v));
-              bits.set(v);
-              counter++;
-            }
-            if (counter > 0) {
-              LOG.info("splits["+j+"]="+splits[j]+" count=" + counter);
-            } else {
-              LOG.debug("splits["+j+"]="+splits[j]+" count=" + counter);
-            }
-          } finally {
-            reader.close();
           }
+          assertFalse("Key in multiple partitions.", bits.get(v));
+          bits.set(v);
+          counter++;
         }
-        assertEquals("Some keys in no partition.", length, bits.cardinality());
+        if (counter > 0) {
+          LOG.info("splits["+j+"]="+splits[j]+" count=" + counter);
+        } else {
+          LOG.debug("splits["+j+"]="+splits[j]+" count=" + counter);
+        }
+      } finally {
+        reader.close();
       }
-
     }
-
+    assertEquals("Some keys in no partition.", length, bits.cardinality());
   }
 
   private static LineReader makeStream(String str) throws IOException {


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org