You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cd...@apache.org on 2018/01/26 17:17:20 UTC
[1/3] hadoop git commit: MapFile.fix creates a wrong index file in
case of block-compressed data file. Contributed by Grigori Rybkine
Repository: hadoop
Updated Branches:
refs/heads/branch-2 5f44bd3eb -> 9bd439e2c
refs/heads/branch-2.9 98499bb09 -> e062e2b08
refs/heads/branch-3.0 e6c66baba -> 7f3548778
MapFile.fix creates a wrong index file in case of block-compressed data file. Contributed by Grigori Rybkine
(cherry picked from commit 91db424c4360d7556660e8c57ac9a266e6688e01)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/7f354877
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/7f354877
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/7f354877
Branch: refs/heads/branch-3.0
Commit: 7f354877889b343878a8a09792d5cec8d2846a50
Parents: e6c66ba
Author: Chris Douglas <cd...@apache.org>
Authored: Fri Jan 26 09:06:48 2018 -0800
Committer: Chris Douglas <cd...@apache.org>
Committed: Fri Jan 26 09:15:03 2018 -0800
----------------------------------------------------------------------
.../main/java/org/apache/hadoop/io/MapFile.java | 35 ++++++++++--
.../java/org/apache/hadoop/io/TestMapFile.java | 59 +++++++++++++++++++-
2 files changed, 88 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/7f354877/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/MapFile.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/MapFile.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/MapFile.java
index d56822f..51db0b3 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/MapFile.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/MapFile.java
@@ -811,15 +811,40 @@ public class MapFile {
(LongWritable.class));
}
try {
- long pos = 0L;
+ /** What's the position (in bytes) we wrote when we got the last index */
+ long lastIndexPos = -1;
+ /**
+ * What was size when we last wrote an index. Set to MIN_VALUE to ensure
+ * that we have an index at position zero - midKey will throw an exception
+ * if this is not the case
+ */
+ long lastIndexKeyCount = Long.MIN_VALUE;
+ long pos = dataReader.getPosition();
LongWritable position = new LongWritable();
+ long nextBlock = pos;
+ boolean blockCompressed = dataReader.isBlockCompressed();
while(dataReader.next(key, value)) {
- cnt++;
- if (cnt % indexInterval == 0) {
+ if (blockCompressed) {
+ long curPos = dataReader.getPosition();
+ if (curPos > nextBlock) {
+ pos = nextBlock; // current block position
+ nextBlock = curPos;
+ }
+ }
+ // Follow the same logic as in
+ // {@link MapFile.Writer#append(WritableComparable, Writable)}
+ if (cnt >= lastIndexKeyCount + indexInterval && pos > lastIndexPos) {
position.set(pos);
- if (!dryrun) indexWriter.append(key, position);
+ if (!dryrun) {
+ indexWriter.append(key, position);
+ }
+ lastIndexPos = pos;
+ lastIndexKeyCount = cnt;
+ }
+ if (!blockCompressed) {
+ pos = dataReader.getPosition(); // next record position
}
- pos = dataReader.getPosition();
+ cnt++;
}
} catch(Throwable t) {
// truncated data file. swallow it.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/7f354877/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestMapFile.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestMapFile.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestMapFile.java
index ff8df7c..7ec4227 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestMapFile.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestMapFile.java
@@ -485,6 +485,63 @@ public class TestMapFile {
IOUtils.cleanup(null, writer);
}
}
+
+ /**
+ * test {@link MapFile#fix(FileSystem, Path, Class<? extends Writable>,
+ * Class<? extends Writable>, boolean, Configuration)}
+ * method in case of BLOCK compression
+ */
+ @Test
+ public void testFixBlockCompress() throws Exception {
+ final String indexLessMapFile = "testFixBlockCompress.mapfile";
+ final int compressBlocksize = 100;
+ final int indexInterval = 4;
+ final int noBlocks = 4;
+ final String value = "value-";
+ final int size = noBlocks * compressBlocksize / (4 + value.length());
+
+ conf.setInt("io.seqfile.compress.blocksize", compressBlocksize);
+ MapFile.Writer.setIndexInterval(conf, indexInterval);
+ FileSystem fs = FileSystem.getLocal(conf);
+ Path dir = new Path(TEST_DIR, indexLessMapFile);
+ MapFile.Writer writer = null;
+ MapFile.Reader reader = null;
+ try {
+ writer =
+ new MapFile.Writer(conf, dir,
+ MapFile.Writer.keyClass(IntWritable.class),
+ MapFile.Writer.valueClass(Text.class),
+ MapFile.Writer.compression(CompressionType.BLOCK));
+ for (int i = 0; i < size; i++) {
+ writer.append(new IntWritable(i), new Text(value + i));
+ }
+ writer.close();
+ Path index = new Path(dir, MapFile.INDEX_FILE_NAME);
+ fs.rename(index, index.suffix(".orig"));
+
+ assertEquals("No of valid MapFile entries wrong", size,
+ MapFile.fix(fs, dir, IntWritable.class, Text.class,
+ false, conf));
+ reader = new MapFile.Reader(dir, conf);
+ IntWritable key;
+ Text val = new Text();
+ int notFound = 0;
+ for (int i = 0; i < size; i++) {
+ key = new IntWritable(i);
+ if (null == reader.get(key, val)) {
+ notFound++;
+ }
+ }
+ assertEquals("With MapFile.fix-ed index, could not get entries # ",
+ 0, notFound);
+ } finally {
+ IOUtils.cleanupWithLogger(null, writer, reader);
+ if (fs.exists(dir)) {
+ fs.delete(dir, true);
+ }
+ }
+ }
+
/**
* test all available constructor for {@code MapFile.Writer}
*/
@@ -619,7 +676,7 @@ public class TestMapFile {
} catch (Exception ex) {
fail("testMainMethodMapFile error !!!");
} finally {
- IOUtils.cleanup(null, writer);
+ IOUtils.cleanupWithLogger(null, writer);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org
[2/3] hadoop git commit: MapFile.fix creates a wrong index file in
case of block-compressed data file. Contributed by Grigori Rybkine
Posted by cd...@apache.org.
MapFile.fix creates a wrong index file in case of block-compressed data file. Contributed by Grigori Rybkine
(cherry picked from commit 91db424c4360d7556660e8c57ac9a266e6688e01)
(cherry picked from commit 7f354877889b343878a8a09792d5cec8d2846a50)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/9bd439e2
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/9bd439e2
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/9bd439e2
Branch: refs/heads/branch-2
Commit: 9bd439e2c535b95ff0d2b5767b05a7ef43479298
Parents: 5f44bd3
Author: Chris Douglas <cd...@apache.org>
Authored: Fri Jan 26 09:06:48 2018 -0800
Committer: Chris Douglas <cd...@apache.org>
Committed: Fri Jan 26 09:15:53 2018 -0800
----------------------------------------------------------------------
.../main/java/org/apache/hadoop/io/MapFile.java | 35 ++++++++++--
.../java/org/apache/hadoop/io/TestMapFile.java | 59 +++++++++++++++++++-
2 files changed, 88 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9bd439e2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/MapFile.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/MapFile.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/MapFile.java
index f9e0145..8373e01 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/MapFile.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/MapFile.java
@@ -811,15 +811,40 @@ public class MapFile {
(LongWritable.class));
}
try {
- long pos = 0L;
+ /** What's the position (in bytes) we wrote when we got the last index */
+ long lastIndexPos = -1;
+ /**
+ * What was size when we last wrote an index. Set to MIN_VALUE to ensure
+ * that we have an index at position zero - midKey will throw an exception
+ * if this is not the case
+ */
+ long lastIndexKeyCount = Long.MIN_VALUE;
+ long pos = dataReader.getPosition();
LongWritable position = new LongWritable();
+ long nextBlock = pos;
+ boolean blockCompressed = dataReader.isBlockCompressed();
while(dataReader.next(key, value)) {
- cnt++;
- if (cnt % indexInterval == 0) {
+ if (blockCompressed) {
+ long curPos = dataReader.getPosition();
+ if (curPos > nextBlock) {
+ pos = nextBlock; // current block position
+ nextBlock = curPos;
+ }
+ }
+ // Follow the same logic as in
+ // {@link MapFile.Writer#append(WritableComparable, Writable)}
+ if (cnt >= lastIndexKeyCount + indexInterval && pos > lastIndexPos) {
position.set(pos);
- if (!dryrun) indexWriter.append(key, position);
+ if (!dryrun) {
+ indexWriter.append(key, position);
+ }
+ lastIndexPos = pos;
+ lastIndexKeyCount = cnt;
+ }
+ if (!blockCompressed) {
+ pos = dataReader.getPosition(); // next record position
}
- pos = dataReader.getPosition();
+ cnt++;
}
} catch(Throwable t) {
// truncated data file. swallow it.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9bd439e2/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestMapFile.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestMapFile.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestMapFile.java
index ff8df7c..7ec4227 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestMapFile.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestMapFile.java
@@ -485,6 +485,63 @@ public class TestMapFile {
IOUtils.cleanup(null, writer);
}
}
+
+ /**
+ * test {@link MapFile#fix(FileSystem, Path, Class<? extends Writable>,
+ * Class<? extends Writable>, boolean, Configuration)}
+ * method in case of BLOCK compression
+ */
+ @Test
+ public void testFixBlockCompress() throws Exception {
+ final String indexLessMapFile = "testFixBlockCompress.mapfile";
+ final int compressBlocksize = 100;
+ final int indexInterval = 4;
+ final int noBlocks = 4;
+ final String value = "value-";
+ final int size = noBlocks * compressBlocksize / (4 + value.length());
+
+ conf.setInt("io.seqfile.compress.blocksize", compressBlocksize);
+ MapFile.Writer.setIndexInterval(conf, indexInterval);
+ FileSystem fs = FileSystem.getLocal(conf);
+ Path dir = new Path(TEST_DIR, indexLessMapFile);
+ MapFile.Writer writer = null;
+ MapFile.Reader reader = null;
+ try {
+ writer =
+ new MapFile.Writer(conf, dir,
+ MapFile.Writer.keyClass(IntWritable.class),
+ MapFile.Writer.valueClass(Text.class),
+ MapFile.Writer.compression(CompressionType.BLOCK));
+ for (int i = 0; i < size; i++) {
+ writer.append(new IntWritable(i), new Text(value + i));
+ }
+ writer.close();
+ Path index = new Path(dir, MapFile.INDEX_FILE_NAME);
+ fs.rename(index, index.suffix(".orig"));
+
+ assertEquals("No of valid MapFile entries wrong", size,
+ MapFile.fix(fs, dir, IntWritable.class, Text.class,
+ false, conf));
+ reader = new MapFile.Reader(dir, conf);
+ IntWritable key;
+ Text val = new Text();
+ int notFound = 0;
+ for (int i = 0; i < size; i++) {
+ key = new IntWritable(i);
+ if (null == reader.get(key, val)) {
+ notFound++;
+ }
+ }
+ assertEquals("With MapFile.fix-ed index, could not get entries # ",
+ 0, notFound);
+ } finally {
+ IOUtils.cleanupWithLogger(null, writer, reader);
+ if (fs.exists(dir)) {
+ fs.delete(dir, true);
+ }
+ }
+ }
+
/**
* test all available constructor for {@code MapFile.Writer}
*/
@@ -619,7 +676,7 @@ public class TestMapFile {
} catch (Exception ex) {
fail("testMainMethodMapFile error !!!");
} finally {
- IOUtils.cleanup(null, writer);
+ IOUtils.cleanupWithLogger(null, writer);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org
[3/3] hadoop git commit: MapFile.fix creates a wrong index file in
case of block-compressed data file. Contributed by Grigori Rybkine
Posted by cd...@apache.org.
MapFile.fix creates a wrong index file in case of block-compressed data file. Contributed by Grigori Rybkine
(cherry picked from commit 91db424c4360d7556660e8c57ac9a266e6688e01)
(cherry picked from commit 7f354877889b343878a8a09792d5cec8d2846a50)
(cherry picked from commit 9bd439e2c535b95ff0d2b5767b05a7ef43479298)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/e062e2b0
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/e062e2b0
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/e062e2b0
Branch: refs/heads/branch-2.9
Commit: e062e2b08c56adb1fa7965a7de543810df6f5a91
Parents: 98499bb
Author: Chris Douglas <cd...@apache.org>
Authored: Fri Jan 26 09:06:48 2018 -0800
Committer: Chris Douglas <cd...@apache.org>
Committed: Fri Jan 26 09:16:08 2018 -0800
----------------------------------------------------------------------
.../main/java/org/apache/hadoop/io/MapFile.java | 35 ++++++++++--
.../java/org/apache/hadoop/io/TestMapFile.java | 59 +++++++++++++++++++-
2 files changed, 88 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e062e2b0/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/MapFile.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/MapFile.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/MapFile.java
index f9e0145..8373e01 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/MapFile.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/MapFile.java
@@ -811,15 +811,40 @@ public class MapFile {
(LongWritable.class));
}
try {
- long pos = 0L;
+ /** What's the position (in bytes) we wrote when we got the last index */
+ long lastIndexPos = -1;
+ /**
+ * What was size when we last wrote an index. Set to MIN_VALUE to ensure
+ * that we have an index at position zero - midKey will throw an exception
+ * if this is not the case
+ */
+ long lastIndexKeyCount = Long.MIN_VALUE;
+ long pos = dataReader.getPosition();
LongWritable position = new LongWritable();
+ long nextBlock = pos;
+ boolean blockCompressed = dataReader.isBlockCompressed();
while(dataReader.next(key, value)) {
- cnt++;
- if (cnt % indexInterval == 0) {
+ if (blockCompressed) {
+ long curPos = dataReader.getPosition();
+ if (curPos > nextBlock) {
+ pos = nextBlock; // current block position
+ nextBlock = curPos;
+ }
+ }
+ // Follow the same logic as in
+ // {@link MapFile.Writer#append(WritableComparable, Writable)}
+ if (cnt >= lastIndexKeyCount + indexInterval && pos > lastIndexPos) {
position.set(pos);
- if (!dryrun) indexWriter.append(key, position);
+ if (!dryrun) {
+ indexWriter.append(key, position);
+ }
+ lastIndexPos = pos;
+ lastIndexKeyCount = cnt;
+ }
+ if (!blockCompressed) {
+ pos = dataReader.getPosition(); // next record position
}
- pos = dataReader.getPosition();
+ cnt++;
}
} catch(Throwable t) {
// truncated data file. swallow it.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e062e2b0/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestMapFile.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestMapFile.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestMapFile.java
index ff8df7c..7ec4227 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestMapFile.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestMapFile.java
@@ -485,6 +485,63 @@ public class TestMapFile {
IOUtils.cleanup(null, writer);
}
}
+
+ /**
+ * test {@link MapFile#fix(FileSystem, Path, Class<? extends Writable>,
+ * Class<? extends Writable>, boolean, Configuration)}
+ * method in case of BLOCK compression
+ */
+ @Test
+ public void testFixBlockCompress() throws Exception {
+ final String indexLessMapFile = "testFixBlockCompress.mapfile";
+ final int compressBlocksize = 100;
+ final int indexInterval = 4;
+ final int noBlocks = 4;
+ final String value = "value-";
+ final int size = noBlocks * compressBlocksize / (4 + value.length());
+
+ conf.setInt("io.seqfile.compress.blocksize", compressBlocksize);
+ MapFile.Writer.setIndexInterval(conf, indexInterval);
+ FileSystem fs = FileSystem.getLocal(conf);
+ Path dir = new Path(TEST_DIR, indexLessMapFile);
+ MapFile.Writer writer = null;
+ MapFile.Reader reader = null;
+ try {
+ writer =
+ new MapFile.Writer(conf, dir,
+ MapFile.Writer.keyClass(IntWritable.class),
+ MapFile.Writer.valueClass(Text.class),
+ MapFile.Writer.compression(CompressionType.BLOCK));
+ for (int i = 0; i < size; i++) {
+ writer.append(new IntWritable(i), new Text(value + i));
+ }
+ writer.close();
+ Path index = new Path(dir, MapFile.INDEX_FILE_NAME);
+ fs.rename(index, index.suffix(".orig"));
+
+ assertEquals("No of valid MapFile entries wrong", size,
+ MapFile.fix(fs, dir, IntWritable.class, Text.class,
+ false, conf));
+ reader = new MapFile.Reader(dir, conf);
+ IntWritable key;
+ Text val = new Text();
+ int notFound = 0;
+ for (int i = 0; i < size; i++) {
+ key = new IntWritable(i);
+ if (null == reader.get(key, val)) {
+ notFound++;
+ }
+ }
+ assertEquals("With MapFile.fix-ed index, could not get entries # ",
+ 0, notFound);
+ } finally {
+ IOUtils.cleanupWithLogger(null, writer, reader);
+ if (fs.exists(dir)) {
+ fs.delete(dir, true);
+ }
+ }
+ }
+
/**
* test all available constructor for {@code MapFile.Writer}
*/
@@ -619,7 +676,7 @@ public class TestMapFile {
} catch (Exception ex) {
fail("testMainMethodMapFile error !!!");
} finally {
- IOUtils.cleanup(null, writer);
+ IOUtils.cleanupWithLogger(null, writer);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org