You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cd...@apache.org on 2008/03/06 04:00:35 UTC

svn commit: r634136 - in /hadoop/core/trunk: ./ src/java/org/apache/hadoop/mapred/lib/ src/test/org/apache/hadoop/mapred/

Author: cdouglas
Date: Wed Mar  5 19:00:31 2008
New Revision: 634136

URL: http://svn.apache.org/viewvc?rev=634136&view=rev
Log:
HADOOP-2906. Add an OutputFormat capable of using keys, values, and config
params to map records to different output files. Contributed by Runping Qi.


Added:
    hadoop/core/trunk/src/java/org/apache/hadoop/mapred/lib/MultipleOutputFormat.java
    hadoop/core/trunk/src/java/org/apache/hadoop/mapred/lib/MultipleSequenceFileOutputFormat.java
    hadoop/core/trunk/src/java/org/apache/hadoop/mapred/lib/MultipleTextOutputFormat.java
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMultipleTextOutputFormat.java
Modified:
    hadoop/core/trunk/CHANGES.txt

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=634136&r1=634135&r2=634136&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Wed Mar  5 19:00:31 2008
@@ -38,6 +38,10 @@
     HADOOP-2219. A new command "df -count" that counts the number of
     files and directories.  (Tsz Wo (Nicholas), SZE via dhruba)
 
+    HADOOP-2906. Add an OutputFormat capable of using keys, values, and
+    config params to map records to different output files.
+    (Runping Qi via cdouglas)
+
   IMPROVEMENTS
 
     HADOOP-2655. Copy on write for data and metadata files in the 

Added: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/lib/MultipleOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/lib/MultipleOutputFormat.java?rev=634136&view=auto
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/lib/MultipleOutputFormat.java (added)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/lib/MultipleOutputFormat.java Wed Mar  5 19:00:31 2008
@@ -0,0 +1,229 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.lib;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.TreeMap;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputFormatBase;
+import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.util.Progressable;
+
+/**
+ * This abstract class extends the OutputFormatBase, allowing to write the
+ * output data to different output files. There are three basic use cases for
+ * this class.
+ * 
+ * Case one: This class is used for a map reduce job with at least one reducer.
+ * The reducer wants to write data to different files depending on the actual
+ * keys. It is assumed that a key (or value) enocodes the actual key (value)
+ * and the desired location for the actual key (value).
+ * 
+ * Case two: Tis class is used for a map only job. The job wants to use an
+ * output file name that is either a part of the input file name of the input
+ * data, or some derivation of it.
+ * 
+ * Case three: This class is used for a map only job. The job wants to use an
+ * output file name that depends on both the keys and the input file name,
+ * 
+ */
+public abstract class MultipleOutputFormat<K extends WritableComparable, V extends Writable>
+    extends OutputFormatBase<K, V> {
+
+  /**
+   * Create a composite record writer that can write key/value data to different
+   * output files
+   * 
+   * @param fs
+   *          the file system to use
+   * @param job
+   *          the job conf for the job
+   * @param name
+   *          the leaf file name for the output file (such as part-00000")
+   * @param arg3
+   *          a progressable for reporting progress.
+   * @return a composite record writer
+   * @throws IOException
+   */
+  public RecordWriter<K, V> getRecordWriter(FileSystem fs, JobConf job,
+      String name, Progressable arg3) throws IOException {
+
+    final FileSystem myFS = fs;
+    final String myName = generateLeafFileName(name);
+    final JobConf myJob = job;
+    final Progressable myProgressable = arg3;
+
+    return new RecordWriter<K, V>() {
+
+      // a cache storing the record writers for different output files.
+      TreeMap<String, RecordWriter<K, V>> recordWriters = new TreeMap<String, RecordWriter<K, V>>();
+
+      public void write(K key, V value) throws IOException {
+
+        // get the file name based on the key
+        String keyBasedPath = generateFileNameForKeyValue(key, value, myName);
+
+        // get the file name based on the input file name
+        String finalPath = getInputFileBasedOutputFileName(myJob, keyBasedPath);
+
+        // get the actual key
+        K actualKey = generateActualKey(key, value);
+        V actualValue = generateActualValue(key, value);
+
+        RecordWriter<K, V> rw = this.recordWriters.get(finalPath);
+        if (rw == null) {
+          // if we don't have the record writer yet for the final path, create
+          // one
+          // and add it to the cache
+          rw = getBaseRecordWriter(myFS, myJob, finalPath, myProgressable);
+          this.recordWriters.put(finalPath, rw);
+        }
+        rw.write(actualKey, actualValue);
+      };
+
+      public void close(Reporter reporter) throws IOException {
+        Iterator<String> keys = this.recordWriters.keySet().iterator();
+        while (keys.hasNext()) {
+          RecordWriter<K, V> rw = this.recordWriters.get(keys.next());
+          rw.close(reporter);
+        }
+        this.recordWriters.clear();
+      };
+    };
+  }
+
+  /**
+   * Generate the leaf name for the output file name. The default behavior does
+   * not change the leaf file name (such as part-00000)
+   * 
+   * @param name
+   *          the leaf file name for the output file
+   * @return the given leaf file name
+   */
+  protected String generateLeafFileName(String name) {
+    return name;
+  }
+
+  /**
+   * Generate the file output file name based on the given key and the leaf file
+   * name. The default behavior is that the file name does not depend on the
+   * key.
+   * 
+   * @param key
+   *          the key of the output data
+   * @param name
+   *          the leaf file name
+   * @return generated file name
+   */
+  protected String generateFileNameForKeyValue(K key, V value, String name) {
+    return name;
+  }
+
+  /**
+   * Generate the actual key from the given key/value. The default behavior is that
+   * the actual key is equal to the given key
+   * 
+   * @param key
+   *          the key of the output data
+   * @param value
+   *          the value of the output data
+   * @return the actual key derived from the given key/value
+   */
+  protected K generateActualKey(K key, V value) {
+    return key;
+  }
+  
+  /**
+   * Generate the actual value from the given key and value. The default behavior is that
+   * the actual value is equal to the given value
+   * 
+   * @param key
+   *          the key of the output data
+   * @param value
+   *          the value of the output data
+   * @return the actual value derived from the given key/value
+   */
+  protected V generateActualValue(K key, V value) {
+    return value;
+  }
+  
+
+  /**
+   * Generate the outfile name based on a given anme and the input file name. If
+   * the map input file does not exists (i.e. this is not for a map only job),
+   * the given name is returned unchanged. If the config value for
+   * "num.of.trailing.legs.to.use" is not set, or set 0 or negative, the given
+   * name is returned unchanged. Otherwise, return a file name consisting of the
+   * N trailing legs of the input file name where N is the config value for
+   * "num.of.trailing.legs.to.use".
+   * 
+   * @param job
+   *          the job config
+   * @param name
+   *          the output file name
+   * @return the outfile name based on a given anme and the input file name.
+   */
+  protected String getInputFileBasedOutputFileName(JobConf job, String name) {
+    String infilepath = job.get("map.input.file");
+    if (infilepath == null) {
+      // if the map input file does not exists, then return the given name
+      return name;
+    }
+    int numOfTrailingLegsToUse = job.getInt("mapred.outputformat.numOfTrailingLegs", 0);
+    if (numOfTrailingLegsToUse <= 0) {
+      return name;
+    }
+    Path infile = new Path(infilepath);
+    Path parent = infile.getParent();
+    String midName = infile.getName();
+    Path outPath = new Path(midName);
+    for (int i = 1; i < numOfTrailingLegsToUse; i++) {
+      if (parent == null) break;
+      midName = parent.getName();
+      if (midName.length() == 0) break;
+      parent = parent.getParent();
+      outPath = new Path(midName, outPath);
+    }
+    return outPath.toString();
+  }
+
+  /**
+   * 
+   * @param fs
+   *          the file system to use
+   * @param job
+   *          a job conf object
+   * @param name
+   *          the name of the file over which a record writer object will be
+   *          constructed
+   * @param arg3
+   *          a progressable object
+   * @return A RecordWriter object over the given file
+   * @throws IOException
+   */
+  abstract protected RecordWriter<K, V> getBaseRecordWriter(FileSystem fs,
+      JobConf job, String name, Progressable arg3) throws IOException;
+}

Added: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/lib/MultipleSequenceFileOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/lib/MultipleSequenceFileOutputFormat.java?rev=634136&view=auto
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/lib/MultipleSequenceFileOutputFormat.java (added)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/lib/MultipleSequenceFileOutputFormat.java Wed Mar  5 19:00:31 2008
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.lib;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.mapred.SequenceFileOutputFormat;
+import org.apache.hadoop.util.Progressable;
+
+/**
+ * This class extends the MultipleOutputFormat, allowing to write the output data 
+ * to different output files in sequence file output format. 
+ */
+public class MultipleSequenceFileOutputFormat extends MultipleOutputFormat {
+
+  private SequenceFileOutputFormat theSequenceFileOutputFormat = null;
+  
+  @Override
+  protected RecordWriter getBaseRecordWriter(FileSystem fs, JobConf job, String name, Progressable arg3) throws IOException {
+    if (theSequenceFileOutputFormat == null) {
+      theSequenceFileOutputFormat = new SequenceFileOutputFormat();
+    }
+    return theSequenceFileOutputFormat.getRecordWriter(fs, job, name, arg3);
+  }
+}

Added: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/lib/MultipleTextOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/lib/MultipleTextOutputFormat.java?rev=634136&view=auto
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/lib/MultipleTextOutputFormat.java (added)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/lib/MultipleTextOutputFormat.java Wed Mar  5 19:00:31 2008
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.lib;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.mapred.TextOutputFormat;
+import org.apache.hadoop.util.Progressable;
+
+/**
+ * This class extends the MultipleOutputFormat, allowing to write the output
+ * data to different output files in Text output format.
+ */
+public class MultipleTextOutputFormat<K extends WritableComparable, V extends Writable>
+    extends MultipleOutputFormat<K, V> {
+
+  private TextOutputFormat<K, V> theTextOutputFormat = null;
+
+  @Override
+  protected RecordWriter<K, V> getBaseRecordWriter(FileSystem fs, JobConf job,
+      String name, Progressable arg3) throws IOException {
+    if (theTextOutputFormat == null) {
+      theTextOutputFormat = new TextOutputFormat<K, V>();
+    }
+    return theTextOutputFormat.getRecordWriter(fs, job, name, arg3);
+  }
+}

Added: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMultipleTextOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMultipleTextOutputFormat.java?rev=634136&view=auto
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMultipleTextOutputFormat.java (added)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMultipleTextOutputFormat.java Wed Mar  5 19:00:31 2008
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.*;
+import junit.framework.TestCase;
+
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.io.*;
+
+import org.apache.hadoop.mapred.lib.*;
+
+public class TestMultipleTextOutputFormat extends TestCase {
+  private static JobConf defaultConf = new JobConf();
+
+  private static FileSystem localFs = null;
+  static {
+    try {
+      localFs = FileSystem.getLocal(defaultConf);
+    } catch (IOException e) {
+      throw new RuntimeException("init failure", e);
+    }
+  }
+
+  private static Path workDir = 
+    new Path(new Path(System.getProperty("test.build.data", "."), "data"), 
+             "TestMultipleTextOutputFormat");
+
+  private static void writeData(RecordWriter<Text, Text> rw) throws IOException {
+    for (int i = 10; i < 40; i++) {
+      String k = "" + i;
+      String v = "" + i;
+      rw.write(new Text(k), new Text(v));
+    }
+  }
+  
+  static class KeyBasedMultipleTextOutputFormat extends MultipleTextOutputFormat<Text, Text> {
+    protected String generateFileNameForKeyValue(Text key, Text v, String name) {
+      
+      return key.toString().substring(0, 1) + "-" + name;
+    }
+  }
+  
+  private static void test1(JobConf job) throws IOException {
+    FileSystem fs = FileSystem.getLocal(job);
+    String name = "part-00000";
+    KeyBasedMultipleTextOutputFormat theOutputFormat = new KeyBasedMultipleTextOutputFormat();
+    RecordWriter<Text, Text> rw = theOutputFormat.getRecordWriter(fs, job, name, null);
+    writeData(rw);
+    rw.close(null);
+  }
+  
+  private static void test2(JobConf job) throws IOException {
+    FileSystem fs = FileSystem.getLocal(job);
+    String name = "part-00000";
+    //pretend that we have input file with 1/2/3 as the suffix
+    job.set("map.input.file", "1/2/3");
+    // we use the last two legs of the input file as the output file
+    job.set("mapred.outputformat.numOfTrailingLegs", "2");
+    MultipleTextOutputFormat<Text, Text> theOutputFormat = new MultipleTextOutputFormat<Text, Text>();
+    RecordWriter<Text, Text> rw = theOutputFormat.getRecordWriter(fs, job, name, null);
+    writeData(rw);
+    rw.close(null);
+  }
+  
+  public void testFormat() throws Exception {
+    JobConf job = new JobConf();
+    job.setOutputPath(workDir);
+    FileSystem fs = workDir.getFileSystem(job);
+    if (!fs.mkdirs(workDir)) {
+      fail("Failed to create output directory");
+    }
+    //System.out.printf("workdir: %s\n", workDir.toString());
+    TestMultipleTextOutputFormat.test1(job);
+    TestMultipleTextOutputFormat.test2(job);
+    String file_11 = "1-part-00000";
+    
+    File expectedFile_11 = new File(new Path(workDir, file_11).toString()); 
+
+    //System.out.printf("expectedFile_11: %s\n", new Path(workDir, file_11).toString());
+    StringBuffer expectedOutput = new StringBuffer();
+    for (int i = 10; i < 20; i++) {
+      expectedOutput.append(""+i).append('\t').append(""+i).append("\n");
+    }
+    String output = UtilsForTests.slurp(expectedFile_11);
+    //System.out.printf("File_2 output: %s\n", output);
+    assertEquals(output, expectedOutput.toString());
+    
+    String file_12 = "2-part-00000";
+    
+    File expectedFile_12 = new File(new Path(workDir, file_12).toString()); 
+    //System.out.printf("expectedFile_12: %s\n", new Path(workDir, file_12).toString());
+    expectedOutput = new StringBuffer();
+    for (int i = 20; i < 30; i++) {
+      expectedOutput.append(""+i).append('\t').append(""+i).append("\n");
+    }
+    output = UtilsForTests.slurp(expectedFile_12);
+    //System.out.printf("File_2 output: %s\n", output);
+    assertEquals(output, expectedOutput.toString());
+    
+    String file_13 = "3-part-00000";
+    
+    File expectedFile_13 = new File(new Path(workDir, file_13).toString()); 
+    //System.out.printf("expectedFile_13: %s\n", new Path(workDir, file_13).toString());
+    expectedOutput = new StringBuffer();
+    for (int i = 30; i < 40; i++) {
+      expectedOutput.append(""+i).append('\t').append(""+i).append("\n");
+    }
+    output = UtilsForTests.slurp(expectedFile_13);
+    //System.out.printf("File_2 output: %s\n", output);
+    assertEquals(output, expectedOutput.toString());
+    
+    String file_2 = "2/3";
+    
+    File expectedFile_2 = new File(new Path(workDir, file_2).toString()); 
+    //System.out.printf("expectedFile_2: %s\n", new Path(workDir, file_2).toString());
+    expectedOutput = new StringBuffer();
+    for (int i = 10; i < 40; i++) {
+      expectedOutput.append(""+i).append('\t').append(""+i).append("\n");
+    }
+    output = UtilsForTests.slurp(expectedFile_2);
+    //System.out.printf("File_2 output: %s\n", output);
+    assertEquals(output, expectedOutput.toString());
+  }
+
+  public static void main(String[] args) throws Exception {
+    new TestMultipleTextOutputFormat().testFormat();
+  }
+}