You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cd...@apache.org on 2018/01/26 17:19:40 UTC
hadoop git commit: MapFile.fix creates a wrong index file in case of
block-compressed data file. Contributed by Grigori Rybkine
Repository: hadoop
Updated Branches:
refs/heads/trunk 8b5b045bd -> 56872cff9
MapFile.fix creates a wrong index file in case of block-compressed data file. Contributed by Grigori Rybkine
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/56872cff
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/56872cff
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/56872cff
Branch: refs/heads/trunk
Commit: 56872cff92f543bf77206a1324968559dceb7bc2
Parents: 8b5b045
Author: Chris Douglas <cd...@apache.org>
Authored: Fri Jan 26 09:06:48 2018 -0800
Committer: Chris Douglas <cd...@apache.org>
Committed: Fri Jan 26 09:18:30 2018 -0800
----------------------------------------------------------------------
.../main/java/org/apache/hadoop/io/MapFile.java | 35 ++++++++++--
.../java/org/apache/hadoop/io/TestMapFile.java | 59 +++++++++++++++++++-
2 files changed, 88 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/56872cff/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/MapFile.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/MapFile.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/MapFile.java
index d56822f..51db0b3 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/MapFile.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/MapFile.java
@@ -811,15 +811,40 @@ public class MapFile {
(LongWritable.class));
}
try {
- long pos = 0L;
+ /** What's the position (in bytes) we wrote when we got the last index */
+ long lastIndexPos = -1;
+ /**
+ * What was size when we last wrote an index. Set to MIN_VALUE to ensure
+ * that we have an index at position zero - midKey will throw an exception
+ * if this is not the case
+ */
+ long lastIndexKeyCount = Long.MIN_VALUE;
+ long pos = dataReader.getPosition();
LongWritable position = new LongWritable();
+ long nextBlock = pos;
+ boolean blockCompressed = dataReader.isBlockCompressed();
while(dataReader.next(key, value)) {
- cnt++;
- if (cnt % indexInterval == 0) {
+ if (blockCompressed) {
+ long curPos = dataReader.getPosition();
+ if (curPos > nextBlock) {
+ pos = nextBlock; // current block position
+ nextBlock = curPos;
+ }
+ }
+ // Follow the same logic as in
+ // {@link MapFile.Writer#append(WritableComparable, Writable)}
+ if (cnt >= lastIndexKeyCount + indexInterval && pos > lastIndexPos) {
position.set(pos);
- if (!dryrun) indexWriter.append(key, position);
+ if (!dryrun) {
+ indexWriter.append(key, position);
+ }
+ lastIndexPos = pos;
+ lastIndexKeyCount = cnt;
+ }
+ if (!blockCompressed) {
+ pos = dataReader.getPosition(); // next record position
}
- pos = dataReader.getPosition();
+ cnt++;
}
} catch(Throwable t) {
// truncated data file. swallow it.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/56872cff/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestMapFile.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestMapFile.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestMapFile.java
index ff8df7c..7ec4227 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestMapFile.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestMapFile.java
@@ -485,6 +485,63 @@ public class TestMapFile {
IOUtils.cleanup(null, writer);
}
}
+
+ /**
+ * test {@link MapFile#fix(FileSystem, Path, Class<? extends Writable>,
+ * Class<? extends Writable>, boolean, Configuration)}
+ * method in case of BLOCK compression
+ */
+ @Test
+ public void testFixBlockCompress() throws Exception {
+ final String indexLessMapFile = "testFixBlockCompress.mapfile";
+ final int compressBlocksize = 100;
+ final int indexInterval = 4;
+ final int noBlocks = 4;
+ final String value = "value-";
+ final int size = noBlocks * compressBlocksize / (4 + value.length());
+
+ conf.setInt("io.seqfile.compress.blocksize", compressBlocksize);
+ MapFile.Writer.setIndexInterval(conf, indexInterval);
+ FileSystem fs = FileSystem.getLocal(conf);
+ Path dir = new Path(TEST_DIR, indexLessMapFile);
+ MapFile.Writer writer = null;
+ MapFile.Reader reader = null;
+ try {
+ writer =
+ new MapFile.Writer(conf, dir,
+ MapFile.Writer.keyClass(IntWritable.class),
+ MapFile.Writer.valueClass(Text.class),
+ MapFile.Writer.compression(CompressionType.BLOCK));
+ for (int i = 0; i < size; i++) {
+ writer.append(new IntWritable(i), new Text(value + i));
+ }
+ writer.close();
+ Path index = new Path(dir, MapFile.INDEX_FILE_NAME);
+ fs.rename(index, index.suffix(".orig"));
+
+ assertEquals("No of valid MapFile entries wrong", size,
+ MapFile.fix(fs, dir, IntWritable.class, Text.class,
+ false, conf));
+ reader = new MapFile.Reader(dir, conf);
+ IntWritable key;
+ Text val = new Text();
+ int notFound = 0;
+ for (int i = 0; i < size; i++) {
+ key = new IntWritable(i);
+ if (null == reader.get(key, val)) {
+ notFound++;
+ }
+ }
+ assertEquals("With MapFile.fix-ed index, could not get entries # ",
+ 0, notFound);
+ } finally {
+ IOUtils.cleanupWithLogger(null, writer, reader);
+ if (fs.exists(dir)) {
+ fs.delete(dir, true);
+ }
+ }
+ }
+
/**
* test all available constructor for {@code MapFile.Writer}
*/
@@ -619,7 +676,7 @@ public class TestMapFile {
} catch (Exception ex) {
fail("testMainMethodMapFile error !!!");
} finally {
- IOUtils.cleanup(null, writer);
+ IOUtils.cleanupWithLogger(null, writer);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org