You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cd...@apache.org on 2018/01/26 17:17:22 UTC

[3/3] hadoop git commit: MapFile.fix creates a wrong index file in case of block-compressed data file. Contributed by Grigori Rybkine

MapFile.fix creates a wrong index file in case of block-compressed data file. Contributed by Grigori Rybkine

(cherry picked from commit 91db424c4360d7556660e8c57ac9a266e6688e01)
(cherry picked from commit 7f354877889b343878a8a09792d5cec8d2846a50)
(cherry picked from commit 9bd439e2c535b95ff0d2b5767b05a7ef43479298)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/e062e2b0
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/e062e2b0
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/e062e2b0

Branch: refs/heads/branch-2.9
Commit: e062e2b08c56adb1fa7965a7de543810df6f5a91
Parents: 98499bb
Author: Chris Douglas <cd...@apache.org>
Authored: Fri Jan 26 09:06:48 2018 -0800
Committer: Chris Douglas <cd...@apache.org>
Committed: Fri Jan 26 09:16:08 2018 -0800

----------------------------------------------------------------------
 .../main/java/org/apache/hadoop/io/MapFile.java | 35 ++++++++++--
 .../java/org/apache/hadoop/io/TestMapFile.java  | 59 +++++++++++++++++++-
 2 files changed, 88 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/e062e2b0/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/MapFile.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/MapFile.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/MapFile.java
index f9e0145..8373e01 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/MapFile.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/MapFile.java
@@ -811,15 +811,40 @@ public class MapFile {
                                     (LongWritable.class));
     }
     try {
-      long pos = 0L;
+      /** What's the position (in bytes) we wrote when we got the last index */
+      long lastIndexPos = -1;
+      /**
+       * What was size when we last wrote an index. Set to MIN_VALUE to ensure
+       * that we have an index at position zero - midKey will throw an exception
+       * if this is not the case
+       */
+      long lastIndexKeyCount = Long.MIN_VALUE;
+      long pos = dataReader.getPosition();
       LongWritable position = new LongWritable();
+      long nextBlock = pos;
+      boolean blockCompressed = dataReader.isBlockCompressed();
       while(dataReader.next(key, value)) {
-        cnt++;
-        if (cnt % indexInterval == 0) {
+        if (blockCompressed) {
+          long curPos = dataReader.getPosition();
+          if (curPos > nextBlock) {
+            pos = nextBlock;                       // current block position
+            nextBlock = curPos;
+          }
+        }
+        // Follow the same logic as in
+        // {@link MapFile.Writer#append(WritableComparable, Writable)}
+        if (cnt >= lastIndexKeyCount + indexInterval && pos > lastIndexPos) {
           position.set(pos);
-          if (!dryrun) indexWriter.append(key, position);
+          if (!dryrun) {
+            indexWriter.append(key, position);
+          }
+          lastIndexPos = pos;
+          lastIndexKeyCount = cnt;
+        }
+        if (!blockCompressed) {
+          pos = dataReader.getPosition();         // next record position
         }
-        pos = dataReader.getPosition();
+        cnt++;
       }
     } catch(Throwable t) {
       // truncated data file. swallow it.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e062e2b0/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestMapFile.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestMapFile.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestMapFile.java
index ff8df7c..7ec4227 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestMapFile.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestMapFile.java
@@ -485,6 +485,63 @@ public class TestMapFile {
       IOUtils.cleanup(null, writer);
     }
   }
+
+  /**
+   * test {@link MapFile#fix(FileSystem, Path, Class<? extends Writable>,
+   *                         Class<? extends Writable>, boolean, Configuration)}
+   * method in case of BLOCK compression
+   */
+  @Test
+  public void testFixBlockCompress() throws Exception {
+    final String indexLessMapFile = "testFixBlockCompress.mapfile";
+    final int compressBlocksize = 100;
+    final int indexInterval = 4;
+    final int noBlocks = 4;
+    final String value = "value-";
+    final int size = noBlocks * compressBlocksize / (4 + value.length());
+
+    conf.setInt("io.seqfile.compress.blocksize", compressBlocksize);
+    MapFile.Writer.setIndexInterval(conf, indexInterval);
+    FileSystem fs = FileSystem.getLocal(conf);
+    Path dir = new Path(TEST_DIR, indexLessMapFile);
+    MapFile.Writer writer = null;
+    MapFile.Reader reader = null;
+    try {
+      writer =
+          new MapFile.Writer(conf, dir,
+          MapFile.Writer.keyClass(IntWritable.class),
+          MapFile.Writer.valueClass(Text.class),
+          MapFile.Writer.compression(CompressionType.BLOCK));
+      for (int i = 0; i < size; i++) {
+        writer.append(new IntWritable(i), new Text(value + i));
+      }
+      writer.close();
+      Path index = new Path(dir, MapFile.INDEX_FILE_NAME);
+      fs.rename(index, index.suffix(".orig"));
+
+      assertEquals("No of valid MapFile entries wrong", size,
+                   MapFile.fix(fs, dir, IntWritable.class, Text.class,
+                               false, conf));
+      reader = new MapFile.Reader(dir, conf);
+      IntWritable key;
+      Text val = new Text();
+      int notFound = 0;
+      for (int i = 0; i < size; i++) {
+        key = new IntWritable(i);
+        if (null == reader.get(key, val)) {
+          notFound++;
+        }
+      }
+      assertEquals("With MapFile.fix-ed index, could not get entries # ",
+                   0, notFound);
+    } finally {
+      IOUtils.cleanupWithLogger(null, writer, reader);
+      if (fs.exists(dir)) {
+        fs.delete(dir, true);
+      }
+    }
+  }
+
   /**
    * test all available constructor for {@code MapFile.Writer}
    */
@@ -619,7 +676,7 @@ public class TestMapFile {
     } catch (Exception ex) {
       fail("testMainMethodMapFile error !!!");
     } finally {
-      IOUtils.cleanup(null, writer);
+      IOUtils.cleanupWithLogger(null, writer);
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org