You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by zh...@apache.org on 2015/03/02 18:15:40 UTC
[24/50] [abbrv] hadoop git commit: HADOOP-11569. Provide Merge API
for MapFile to merge multiple similar MapFiles to one MapFile. Contributed by
Vinayakumar B.
HADOOP-11569. Provide Merge API for MapFile to merge multiple similar MapFiles to one MapFile. Contributed by Vinayakumar B.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/645ebb96
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/645ebb96
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/645ebb96
Branch: refs/heads/HDFS-7285
Commit: 645ebb965b88cb3018fb1588268cfaf8db837431
Parents: cc02446
Author: Tsuyoshi Ozawa <oz...@apache.org>
Authored: Fri Feb 27 17:46:07 2015 +0900
Committer: Zhe Zhang <zh...@cloudera.com>
Committed: Mon Mar 2 09:13:53 2015 -0800
----------------------------------------------------------------------
hadoop-common-project/hadoop-common/CHANGES.txt | 3 +
.../main/java/org/apache/hadoop/io/MapFile.java | 143 +++++++++++++++++++
.../java/org/apache/hadoop/io/TestMapFile.java | 56 ++++++++
3 files changed, 202 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/645ebb96/hadoop-common-project/hadoop-common/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt
index 1d9a6d4..6d4da77 100644
--- a/hadoop-common-project/hadoop-common/CHANGES.txt
+++ b/hadoop-common-project/hadoop-common/CHANGES.txt
@@ -445,6 +445,9 @@ Release 2.7.0 - UNRELEASED
HADOOP-11510. Expose truncate API via FileContext. (yliu)
+ HADOOP-11569. Provide Merge API for MapFile to merge multiple similar MapFiles
+ to one MapFile. (Vinayakumar B via ozawa)
+
IMPROVEMENTS
HADOOP-11483. HardLink.java should use the jdk7 createLink method (aajisaka)
http://git-wip-us.apache.org/repos/asf/hadoop/blob/645ebb96/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/MapFile.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/MapFile.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/MapFile.java
index 84c9dcc..ee76458 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/MapFile.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/MapFile.java
@@ -25,6 +25,7 @@ import java.util.Arrays;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
@@ -824,6 +825,148 @@ public class MapFile {
return cnt;
}
+ /**
+ * Class to merge multiple MapFiles of same Key and Value types to one MapFile
+ */
+ public static class Merger {
+ private Configuration conf;
+ private WritableComparator comparator = null;
+ private Reader[] inReaders;
+ private Writer outWriter;
+ private Class<Writable> valueClass = null;
+ private Class<WritableComparable> keyClass = null;
+
+ public Merger(Configuration conf) throws IOException {
+ this.conf = conf;
+ }
+
+ /**
+ * Merge multiple MapFiles to one Mapfile
+ *
+ * @param inMapFiles
+ * @param outMapFile
+ * @throws IOException
+ */
+ public void merge(Path[] inMapFiles, boolean deleteInputs,
+ Path outMapFile) throws IOException {
+ try {
+ open(inMapFiles, outMapFile);
+ mergePass();
+ } finally {
+ close();
+ }
+ if (deleteInputs) {
+ for (int i = 0; i < inMapFiles.length; i++) {
+ Path path = inMapFiles[i];
+ delete(path.getFileSystem(conf), path.toString());
+ }
+ }
+ }
+
+ /*
+ * Open all input files for reading and verify the key and value types. And
+ * open Output file for writing
+ */
+ @SuppressWarnings("unchecked")
+ private void open(Path[] inMapFiles, Path outMapFile) throws IOException {
+ inReaders = new Reader[inMapFiles.length];
+ for (int i = 0; i < inMapFiles.length; i++) {
+ Reader reader = new Reader(inMapFiles[i], conf);
+ if (keyClass == null || valueClass == null) {
+ keyClass = (Class<WritableComparable>) reader.getKeyClass();
+ valueClass = (Class<Writable>) reader.getValueClass();
+ } else if (keyClass != reader.getKeyClass()
+ || valueClass != reader.getValueClass()) {
+ throw new HadoopIllegalArgumentException(
+ "Input files cannot be merged as they"
+ + " have different Key and Value classes");
+ }
+ inReaders[i] = reader;
+ }
+
+ if (comparator == null) {
+ Class<? extends WritableComparable> cls;
+ cls = keyClass.asSubclass(WritableComparable.class);
+ this.comparator = WritableComparator.get(cls, conf);
+ } else if (comparator.getKeyClass() != keyClass) {
+ throw new HadoopIllegalArgumentException(
+ "Input files cannot be merged as they"
+ + " have different Key class compared to"
+ + " specified comparator");
+ }
+
+ outWriter = new MapFile.Writer(conf, outMapFile,
+ MapFile.Writer.keyClass(keyClass),
+ MapFile.Writer.valueClass(valueClass));
+ }
+
+ /**
+ * Merge all input files to output map file.<br>
+ * 1. Read first key/value from all input files to keys/values array. <br>
+ * 2. Select the least key and corresponding value. <br>
+ * 3. Write the selected key and value to output file. <br>
+ * 4. Replace the already written key/value in keys/values arrays with the
+ * next key/value from the selected input <br>
+ * 5. Repeat step 2-4 till all keys are read. <br>
+ */
+ private void mergePass() throws IOException {
+ // re-usable array
+ WritableComparable[] keys = new WritableComparable[inReaders.length];
+ Writable[] values = new Writable[inReaders.length];
+ // Read first key/value from all inputs
+ for (int i = 0; i < inReaders.length; i++) {
+ keys[i] = ReflectionUtils.newInstance(keyClass, null);
+ values[i] = ReflectionUtils.newInstance(valueClass, null);
+ if (!inReaders[i].next(keys[i], values[i])) {
+ // Handle empty files
+ keys[i] = null;
+ values[i] = null;
+ }
+ }
+
+ do {
+ int currentEntry = -1;
+ WritableComparable currentKey = null;
+ Writable currentValue = null;
+ for (int i = 0; i < keys.length; i++) {
+ if (keys[i] == null) {
+ // Skip Readers reached EOF
+ continue;
+ }
+ if (currentKey == null || comparator.compare(currentKey, keys[i]) > 0) {
+ currentEntry = i;
+ currentKey = keys[i];
+ currentValue = values[i];
+ }
+ }
+ if (currentKey == null) {
+ // Merge Complete
+ break;
+ }
+ // Write the selected key/value to merge stream
+ outWriter.append(currentKey, currentValue);
+ // Replace the already written key/value in keys/values arrays with the
+ // next key/value from the selected input
+ if (!inReaders[currentEntry].next(keys[currentEntry],
+ values[currentEntry])) {
+ // EOF for this file
+ keys[currentEntry] = null;
+ values[currentEntry] = null;
+ }
+ } while (true);
+ }
+
+ private void close() throws IOException {
+ for (int i = 0; i < inReaders.length; i++) {
+ IOUtils.closeStream(inReaders[i]);
+ inReaders[i] = null;
+ }
+ if (outWriter != null) {
+ outWriter.close();
+ outWriter = null;
+ }
+ }
+ }
public static void main(String[] args) throws Exception {
String usage = "Usage: MapFile inFile outFile";
http://git-wip-us.apache.org/repos/asf/hadoop/blob/645ebb96/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestMapFile.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestMapFile.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestMapFile.java
index ced74fb..3f14de0 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestMapFile.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestMapFile.java
@@ -21,6 +21,10 @@ import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
@@ -730,4 +734,56 @@ public class TestMapFile {
reader.close();
}
}
+
+ @Test
+ public void testMerge() throws Exception {
+ final String TEST_METHOD_KEY = "testMerge.mapfile";
+ int SIZE = 10;
+ int ITERATIONS = 5;
+ Path[] in = new Path[5];
+ List<Integer> expected = new ArrayList<Integer>();
+ for (int j = 0; j < 5; j++) {
+ try (MapFile.Writer writer = createWriter(TEST_METHOD_KEY + "." + j,
+ IntWritable.class, Text.class)) {
+ in[j] = new Path(TEST_DIR, TEST_METHOD_KEY + "." + j);
+ for (int i = 0; i < SIZE; i++) {
+ expected.add(i + j);
+ writer.append(new IntWritable(i + j), new Text("Value:" + (i + j)));
+ }
+ }
+ }
+ // Sort expected values
+ Collections.sort(expected);
+ // Merge all 5 files
+ MapFile.Merger merger = new MapFile.Merger(conf);
+ merger.merge(in, true, new Path(TEST_DIR, TEST_METHOD_KEY));
+
+ try (MapFile.Reader reader = createReader(TEST_METHOD_KEY,
+ IntWritable.class)) {
+ int start = 0;
+ // test iteration
+ Text startValue = new Text("Value:" + start);
+ int i = 0;
+ while (i++ < ITERATIONS) {
+ Iterator<Integer> expectedIterator = expected.iterator();
+ IntWritable key = new IntWritable(start);
+ Text value = startValue;
+ IntWritable prev = new IntWritable(start);
+ while (reader.next(key, value)) {
+ assertTrue("Next key should be always equal or more",
+ prev.get() <= key.get());
+ assertEquals(expectedIterator.next().intValue(), key.get());
+ prev.set(key.get());
+ }
+ reader.reset();
+ }
+ }
+
+ // inputs should be deleted
+ for (int j = 0; j < in.length; j++) {
+ Path path = in[j];
+ assertFalse("inputs should be deleted",
+ path.getFileSystem(conf).exists(path));
+ }
+ }
}