You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by zh...@apache.org on 2015/03/02 18:15:40 UTC

[24/50] [abbrv] hadoop git commit: HADOOP-11569. Provide Merge API for MapFile to merge multiple similar MapFiles to one MapFile. Contributed by Vinayakumar B.

HADOOP-11569. Provide Merge API for MapFile to merge multiple similar MapFiles to one MapFile. Contributed by Vinayakumar B.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/645ebb96
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/645ebb96
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/645ebb96

Branch: refs/heads/HDFS-7285
Commit: 645ebb965b88cb3018fb1588268cfaf8db837431
Parents: cc02446
Author: Tsuyoshi Ozawa <oz...@apache.org>
Authored: Fri Feb 27 17:46:07 2015 +0900
Committer: Zhe Zhang <zh...@cloudera.com>
Committed: Mon Mar 2 09:13:53 2015 -0800

----------------------------------------------------------------------
 hadoop-common-project/hadoop-common/CHANGES.txt |   3 +
 .../main/java/org/apache/hadoop/io/MapFile.java | 143 +++++++++++++++++++
 .../java/org/apache/hadoop/io/TestMapFile.java  |  56 ++++++++
 3 files changed, 202 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/645ebb96/hadoop-common-project/hadoop-common/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt
index 1d9a6d4..6d4da77 100644
--- a/hadoop-common-project/hadoop-common/CHANGES.txt
+++ b/hadoop-common-project/hadoop-common/CHANGES.txt
@@ -445,6 +445,9 @@ Release 2.7.0 - UNRELEASED
 
     HADOOP-11510. Expose truncate API via FileContext. (yliu)
 
+    HADOOP-11569. Provide Merge API for MapFile to merge multiple similar MapFiles
+    to one MapFile. (Vinayakumar B via ozawa)
+
   IMPROVEMENTS
 
     HADOOP-11483. HardLink.java should use the jdk7 createLink method (aajisaka)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/645ebb96/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/MapFile.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/MapFile.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/MapFile.java
index 84c9dcc..ee76458 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/MapFile.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/MapFile.java
@@ -25,6 +25,7 @@ import java.util.Arrays;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.HadoopIllegalArgumentException;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
@@ -824,6 +825,148 @@ public class MapFile {
     return cnt;
   }
 
+  /**
+   * Class to merge multiple MapFiles of same Key and Value types to one MapFile
+   */
+  public static class Merger {
+    private Configuration conf;
+    private WritableComparator comparator = null;
+    private Reader[] inReaders;
+    private Writer outWriter;
+    private Class<Writable> valueClass = null;
+    private Class<WritableComparable> keyClass = null;
+
+    public Merger(Configuration conf) throws IOException {
+      this.conf = conf;
+    }
+
+    /**
+     * Merge multiple MapFiles to one Mapfile
+     *
+     * @param inMapFiles
+     * @param outMapFile
+     * @throws IOException
+     */
+    public void merge(Path[] inMapFiles, boolean deleteInputs,
+        Path outMapFile) throws IOException {
+      try {
+        open(inMapFiles, outMapFile);
+        mergePass();
+      } finally {
+        close();
+      }
+      if (deleteInputs) {
+        for (int i = 0; i < inMapFiles.length; i++) {
+          Path path = inMapFiles[i];
+          delete(path.getFileSystem(conf), path.toString());
+        }
+      }
+    }
+
+    /*
+     * Open all input files for reading and verify the key and value types. And
+     * open Output file for writing
+     */
+    @SuppressWarnings("unchecked")
+    private void open(Path[] inMapFiles, Path outMapFile) throws IOException {
+      inReaders = new Reader[inMapFiles.length];
+      for (int i = 0; i < inMapFiles.length; i++) {
+        Reader reader = new Reader(inMapFiles[i], conf);
+        if (keyClass == null || valueClass == null) {
+          keyClass = (Class<WritableComparable>) reader.getKeyClass();
+          valueClass = (Class<Writable>) reader.getValueClass();
+        } else if (keyClass != reader.getKeyClass()
+            || valueClass != reader.getValueClass()) {
+          throw new HadoopIllegalArgumentException(
+              "Input files cannot be merged as they"
+                  + " have different Key and Value classes");
+        }
+        inReaders[i] = reader;
+      }
+
+      if (comparator == null) {
+        Class<? extends WritableComparable> cls;
+        cls = keyClass.asSubclass(WritableComparable.class);
+        this.comparator = WritableComparator.get(cls, conf);
+      } else if (comparator.getKeyClass() != keyClass) {
+        throw new HadoopIllegalArgumentException(
+            "Input files cannot be merged as they"
+                + " have different Key class compared to"
+                + " specified comparator");
+      }
+
+      outWriter = new MapFile.Writer(conf, outMapFile,
+          MapFile.Writer.keyClass(keyClass),
+          MapFile.Writer.valueClass(valueClass));
+    }
+
+    /**
+     * Merge all input files to output map file.<br>
+     * 1. Read first key/value from all input files to keys/values array. <br>
+     * 2. Select the least key and corresponding value. <br>
+     * 3. Write the selected key and value to output file. <br>
+     * 4. Replace the already written key/value in keys/values arrays with the
+     * next key/value from the selected input <br>
+     * 5. Repeat step 2-4 till all keys are read. <br>
+     */
+    private void mergePass() throws IOException {
+      // re-usable array
+      WritableComparable[] keys = new WritableComparable[inReaders.length];
+      Writable[] values = new Writable[inReaders.length];
+      // Read first key/value from all inputs
+      for (int i = 0; i < inReaders.length; i++) {
+        keys[i] = ReflectionUtils.newInstance(keyClass, null);
+        values[i] = ReflectionUtils.newInstance(valueClass, null);
+        if (!inReaders[i].next(keys[i], values[i])) {
+          // Handle empty files
+          keys[i] = null;
+          values[i] = null;
+        }
+      }
+
+      do {
+        int currentEntry = -1;
+        WritableComparable currentKey = null;
+        Writable currentValue = null;
+        for (int i = 0; i < keys.length; i++) {
+          if (keys[i] == null) {
+            // Skip Readers reached EOF
+            continue;
+          }
+          if (currentKey == null || comparator.compare(currentKey, keys[i]) > 0) {
+            currentEntry = i;
+            currentKey = keys[i];
+            currentValue = values[i];
+          }
+        }
+        if (currentKey == null) {
+          // Merge Complete
+          break;
+        }
+        // Write the selected key/value to merge stream
+        outWriter.append(currentKey, currentValue);
+        // Replace the already written key/value in keys/values arrays with the
+        // next key/value from the selected input
+        if (!inReaders[currentEntry].next(keys[currentEntry],
+            values[currentEntry])) {
+          // EOF for this file
+          keys[currentEntry] = null;
+          values[currentEntry] = null;
+        }
+      } while (true);
+    }
+
+    private void close() throws IOException {
+      for (int i = 0; i < inReaders.length; i++) {
+        IOUtils.closeStream(inReaders[i]);
+        inReaders[i] = null;
+      }
+      if (outWriter != null) {
+        outWriter.close();
+        outWriter = null;
+      }
+    }
+  }
 
   public static void main(String[] args) throws Exception {
     String usage = "Usage: MapFile inFile outFile";

http://git-wip-us.apache.org/repos/asf/hadoop/blob/645ebb96/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestMapFile.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestMapFile.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestMapFile.java
index ced74fb..3f14de0 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestMapFile.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestMapFile.java
@@ -21,6 +21,10 @@ import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -730,4 +734,56 @@ public class TestMapFile {
       reader.close();
     }
   }
+
+  @Test
+  public void testMerge() throws Exception {
+    final String TEST_METHOD_KEY = "testMerge.mapfile";
+    int SIZE = 10;
+    int ITERATIONS = 5;
+    Path[] in = new Path[5];
+    List<Integer> expected = new ArrayList<Integer>();
+    for (int j = 0; j < 5; j++) {
+      try (MapFile.Writer writer = createWriter(TEST_METHOD_KEY + "." + j,
+          IntWritable.class, Text.class)) {
+        in[j] = new Path(TEST_DIR, TEST_METHOD_KEY + "." + j);
+        for (int i = 0; i < SIZE; i++) {
+          expected.add(i + j);
+          writer.append(new IntWritable(i + j), new Text("Value:" + (i + j)));
+        }
+      }
+    }
+    // Sort expected values
+    Collections.sort(expected);
+    // Merge all 5 files
+    MapFile.Merger merger = new MapFile.Merger(conf);
+    merger.merge(in, true, new Path(TEST_DIR, TEST_METHOD_KEY));
+
+    try (MapFile.Reader reader = createReader(TEST_METHOD_KEY,
+        IntWritable.class)) {
+      int start = 0;
+      // test iteration
+      Text startValue = new Text("Value:" + start);
+      int i = 0;
+      while (i++ < ITERATIONS) {
+        Iterator<Integer> expectedIterator = expected.iterator();
+        IntWritable key = new IntWritable(start);
+        Text value = startValue;
+        IntWritable prev = new IntWritable(start);
+        while (reader.next(key, value)) {
+          assertTrue("Next key should be always equal or more",
+              prev.get() <= key.get());
+          assertEquals(expectedIterator.next().intValue(), key.get());
+          prev.set(key.get());
+        }
+        reader.reset();
+      }
+    }
+
+    // inputs should be deleted
+    for (int j = 0; j < in.length; j++) {
+      Path path = in[j];
+      assertFalse("inputs should be deleted",
+          path.getFileSystem(conf).exists(path));
+    }
+  }
 }