You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cd...@apache.org on 2008/03/06 04:00:35 UTC
svn commit: r634136 - in /hadoop/core/trunk: ./
src/java/org/apache/hadoop/mapred/lib/ src/test/org/apache/hadoop/mapred/
Author: cdouglas
Date: Wed Mar 5 19:00:31 2008
New Revision: 634136
URL: http://svn.apache.org/viewvc?rev=634136&view=rev
Log:
HADOOP-2906. Add an OutputFormat capable of using keys, values, and config
params to map records to different output files. Contributed by Runping Qi.
Added:
hadoop/core/trunk/src/java/org/apache/hadoop/mapred/lib/MultipleOutputFormat.java
hadoop/core/trunk/src/java/org/apache/hadoop/mapred/lib/MultipleSequenceFileOutputFormat.java
hadoop/core/trunk/src/java/org/apache/hadoop/mapred/lib/MultipleTextOutputFormat.java
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMultipleTextOutputFormat.java
Modified:
hadoop/core/trunk/CHANGES.txt
Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=634136&r1=634135&r2=634136&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Wed Mar 5 19:00:31 2008
@@ -38,6 +38,10 @@
HADOOP-2219. A new command "df -count" that counts the number of
files and directories. (Tsz Wo (Nicholas), SZE via dhruba)
+ HADOOP-2906. Add an OutputFormat capable of using keys, values, and
+ config params to map records to different output files.
+ (Runping Qi via cdouglas)
+
IMPROVEMENTS
HADOOP-2655. Copy on write for data and metadata files in the
Added: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/lib/MultipleOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/lib/MultipleOutputFormat.java?rev=634136&view=auto
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/lib/MultipleOutputFormat.java (added)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/lib/MultipleOutputFormat.java Wed Mar 5 19:00:31 2008
@@ -0,0 +1,229 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.lib;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.TreeMap;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputFormatBase;
+import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.util.Progressable;
+
+/**
+ * This abstract class extends the OutputFormatBase, allowing to write the
+ * output data to different output files. There are three basic use cases for
+ * this class.
+ *
+ * Case one: This class is used for a map reduce job with at least one reducer.
+ * The reducer wants to write data to different files depending on the actual
+ * keys. It is assumed that a key (or value) enocodes the actual key (value)
+ * and the desired location for the actual key (value).
+ *
+ * Case two: Tis class is used for a map only job. The job wants to use an
+ * output file name that is either a part of the input file name of the input
+ * data, or some derivation of it.
+ *
+ * Case three: This class is used for a map only job. The job wants to use an
+ * output file name that depends on both the keys and the input file name,
+ *
+ */
+public abstract class MultipleOutputFormat<K extends WritableComparable, V extends Writable>
+ extends OutputFormatBase<K, V> {
+
+ /**
+ * Create a composite record writer that can write key/value data to different
+ * output files
+ *
+ * @param fs
+ * the file system to use
+ * @param job
+ * the job conf for the job
+ * @param name
+ * the leaf file name for the output file (such as part-00000")
+ * @param arg3
+ * a progressable for reporting progress.
+ * @return a composite record writer
+ * @throws IOException
+ */
+ public RecordWriter<K, V> getRecordWriter(FileSystem fs, JobConf job,
+ String name, Progressable arg3) throws IOException {
+
+ final FileSystem myFS = fs;
+ final String myName = generateLeafFileName(name);
+ final JobConf myJob = job;
+ final Progressable myProgressable = arg3;
+
+ return new RecordWriter<K, V>() {
+
+ // a cache storing the record writers for different output files.
+ TreeMap<String, RecordWriter<K, V>> recordWriters = new TreeMap<String, RecordWriter<K, V>>();
+
+ public void write(K key, V value) throws IOException {
+
+ // get the file name based on the key
+ String keyBasedPath = generateFileNameForKeyValue(key, value, myName);
+
+ // get the file name based on the input file name
+ String finalPath = getInputFileBasedOutputFileName(myJob, keyBasedPath);
+
+ // get the actual key
+ K actualKey = generateActualKey(key, value);
+ V actualValue = generateActualValue(key, value);
+
+ RecordWriter<K, V> rw = this.recordWriters.get(finalPath);
+ if (rw == null) {
+ // if we don't have the record writer yet for the final path, create
+ // one
+ // and add it to the cache
+ rw = getBaseRecordWriter(myFS, myJob, finalPath, myProgressable);
+ this.recordWriters.put(finalPath, rw);
+ }
+ rw.write(actualKey, actualValue);
+ };
+
+ public void close(Reporter reporter) throws IOException {
+ Iterator<String> keys = this.recordWriters.keySet().iterator();
+ while (keys.hasNext()) {
+ RecordWriter<K, V> rw = this.recordWriters.get(keys.next());
+ rw.close(reporter);
+ }
+ this.recordWriters.clear();
+ };
+ };
+ }
+
+ /**
+ * Generate the leaf name for the output file name. The default behavior does
+ * not change the leaf file name (such as part-00000)
+ *
+ * @param name
+ * the leaf file name for the output file
+ * @return the given leaf file name
+ */
+ protected String generateLeafFileName(String name) {
+ return name;
+ }
+
+ /**
+ * Generate the file output file name based on the given key and the leaf file
+ * name. The default behavior is that the file name does not depend on the
+ * key.
+ *
+ * @param key
+ * the key of the output data
+ * @param name
+ * the leaf file name
+ * @return generated file name
+ */
+ protected String generateFileNameForKeyValue(K key, V value, String name) {
+ return name;
+ }
+
+ /**
+ * Generate the actual key from the given key/value. The default behavior is that
+ * the actual key is equal to the given key
+ *
+ * @param key
+ * the key of the output data
+ * @param value
+ * the value of the output data
+ * @return the actual key derived from the given key/value
+ */
+ protected K generateActualKey(K key, V value) {
+ return key;
+ }
+
+ /**
+ * Generate the actual value from the given key and value. The default behavior is that
+ * the actual value is equal to the given value
+ *
+ * @param key
+ * the key of the output data
+ * @param value
+ * the value of the output data
+ * @return the actual value derived from the given key/value
+ */
+ protected V generateActualValue(K key, V value) {
+ return value;
+ }
+
+
+ /**
+ * Generate the outfile name based on a given anme and the input file name. If
+ * the map input file does not exists (i.e. this is not for a map only job),
+ * the given name is returned unchanged. If the config value for
+ * "num.of.trailing.legs.to.use" is not set, or set 0 or negative, the given
+ * name is returned unchanged. Otherwise, return a file name consisting of the
+ * N trailing legs of the input file name where N is the config value for
+ * "num.of.trailing.legs.to.use".
+ *
+ * @param job
+ * the job config
+ * @param name
+ * the output file name
+ * @return the outfile name based on a given anme and the input file name.
+ */
+ protected String getInputFileBasedOutputFileName(JobConf job, String name) {
+ String infilepath = job.get("map.input.file");
+ if (infilepath == null) {
+ // if the map input file does not exists, then return the given name
+ return name;
+ }
+ int numOfTrailingLegsToUse = job.getInt("mapred.outputformat.numOfTrailingLegs", 0);
+ if (numOfTrailingLegsToUse <= 0) {
+ return name;
+ }
+ Path infile = new Path(infilepath);
+ Path parent = infile.getParent();
+ String midName = infile.getName();
+ Path outPath = new Path(midName);
+ for (int i = 1; i < numOfTrailingLegsToUse; i++) {
+ if (parent == null) break;
+ midName = parent.getName();
+ if (midName.length() == 0) break;
+ parent = parent.getParent();
+ outPath = new Path(midName, outPath);
+ }
+ return outPath.toString();
+ }
+
+ /**
+ *
+ * @param fs
+ * the file system to use
+ * @param job
+ * a job conf object
+ * @param name
+ * the name of the file over which a record writer object will be
+ * constructed
+ * @param arg3
+ * a progressable object
+ * @return A RecordWriter object over the given file
+ * @throws IOException
+ */
+ abstract protected RecordWriter<K, V> getBaseRecordWriter(FileSystem fs,
+ JobConf job, String name, Progressable arg3) throws IOException;
+}
Added: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/lib/MultipleSequenceFileOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/lib/MultipleSequenceFileOutputFormat.java?rev=634136&view=auto
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/lib/MultipleSequenceFileOutputFormat.java (added)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/lib/MultipleSequenceFileOutputFormat.java Wed Mar 5 19:00:31 2008
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.lib;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.mapred.SequenceFileOutputFormat;
+import org.apache.hadoop.util.Progressable;
+
+/**
+ * This class extends the MultipleOutputFormat, allowing to write the output data
+ * to different output files in sequence file output format.
+ */
+public class MultipleSequenceFileOutputFormat extends MultipleOutputFormat {
+
+ private SequenceFileOutputFormat theSequenceFileOutputFormat = null;
+
+ @Override
+ protected RecordWriter getBaseRecordWriter(FileSystem fs, JobConf job, String name, Progressable arg3) throws IOException {
+ if (theSequenceFileOutputFormat == null) {
+ theSequenceFileOutputFormat = new SequenceFileOutputFormat();
+ }
+ return theSequenceFileOutputFormat.getRecordWriter(fs, job, name, arg3);
+ }
+}
Added: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/lib/MultipleTextOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/lib/MultipleTextOutputFormat.java?rev=634136&view=auto
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/lib/MultipleTextOutputFormat.java (added)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/lib/MultipleTextOutputFormat.java Wed Mar 5 19:00:31 2008
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.lib;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.mapred.TextOutputFormat;
+import org.apache.hadoop.util.Progressable;
+
+/**
+ * This class extends the MultipleOutputFormat, allowing to write the output
+ * data to different output files in Text output format.
+ */
+public class MultipleTextOutputFormat<K extends WritableComparable, V extends Writable>
+ extends MultipleOutputFormat<K, V> {
+
+ private TextOutputFormat<K, V> theTextOutputFormat = null;
+
+ @Override
+ protected RecordWriter<K, V> getBaseRecordWriter(FileSystem fs, JobConf job,
+ String name, Progressable arg3) throws IOException {
+ if (theTextOutputFormat == null) {
+ theTextOutputFormat = new TextOutputFormat<K, V>();
+ }
+ return theTextOutputFormat.getRecordWriter(fs, job, name, arg3);
+ }
+}
Added: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMultipleTextOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMultipleTextOutputFormat.java?rev=634136&view=auto
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMultipleTextOutputFormat.java (added)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMultipleTextOutputFormat.java Wed Mar 5 19:00:31 2008
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.*;
+import junit.framework.TestCase;
+
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.io.*;
+
+import org.apache.hadoop.mapred.lib.*;
+
+public class TestMultipleTextOutputFormat extends TestCase {
+ private static JobConf defaultConf = new JobConf();
+
+ private static FileSystem localFs = null;
+ static {
+ try {
+ localFs = FileSystem.getLocal(defaultConf);
+ } catch (IOException e) {
+ throw new RuntimeException("init failure", e);
+ }
+ }
+
+ private static Path workDir =
+ new Path(new Path(System.getProperty("test.build.data", "."), "data"),
+ "TestMultipleTextOutputFormat");
+
+ private static void writeData(RecordWriter<Text, Text> rw) throws IOException {
+ for (int i = 10; i < 40; i++) {
+ String k = "" + i;
+ String v = "" + i;
+ rw.write(new Text(k), new Text(v));
+ }
+ }
+
+ static class KeyBasedMultipleTextOutputFormat extends MultipleTextOutputFormat<Text, Text> {
+ protected String generateFileNameForKeyValue(Text key, Text v, String name) {
+
+ return key.toString().substring(0, 1) + "-" + name;
+ }
+ }
+
+ private static void test1(JobConf job) throws IOException {
+ FileSystem fs = FileSystem.getLocal(job);
+ String name = "part-00000";
+ KeyBasedMultipleTextOutputFormat theOutputFormat = new KeyBasedMultipleTextOutputFormat();
+ RecordWriter<Text, Text> rw = theOutputFormat.getRecordWriter(fs, job, name, null);
+ writeData(rw);
+ rw.close(null);
+ }
+
+ private static void test2(JobConf job) throws IOException {
+ FileSystem fs = FileSystem.getLocal(job);
+ String name = "part-00000";
+ //pretend that we have input file with 1/2/3 as the suffix
+ job.set("map.input.file", "1/2/3");
+ // we use the last two legs of the input file as the output file
+ job.set("mapred.outputformat.numOfTrailingLegs", "2");
+ MultipleTextOutputFormat<Text, Text> theOutputFormat = new MultipleTextOutputFormat<Text, Text>();
+ RecordWriter<Text, Text> rw = theOutputFormat.getRecordWriter(fs, job, name, null);
+ writeData(rw);
+ rw.close(null);
+ }
+
+ public void testFormat() throws Exception {
+ JobConf job = new JobConf();
+ job.setOutputPath(workDir);
+ FileSystem fs = workDir.getFileSystem(job);
+ if (!fs.mkdirs(workDir)) {
+ fail("Failed to create output directory");
+ }
+ //System.out.printf("workdir: %s\n", workDir.toString());
+ TestMultipleTextOutputFormat.test1(job);
+ TestMultipleTextOutputFormat.test2(job);
+ String file_11 = "1-part-00000";
+
+ File expectedFile_11 = new File(new Path(workDir, file_11).toString());
+
+ //System.out.printf("expectedFile_11: %s\n", new Path(workDir, file_11).toString());
+ StringBuffer expectedOutput = new StringBuffer();
+ for (int i = 10; i < 20; i++) {
+ expectedOutput.append(""+i).append('\t').append(""+i).append("\n");
+ }
+ String output = UtilsForTests.slurp(expectedFile_11);
+ //System.out.printf("File_2 output: %s\n", output);
+ assertEquals(output, expectedOutput.toString());
+
+ String file_12 = "2-part-00000";
+
+ File expectedFile_12 = new File(new Path(workDir, file_12).toString());
+ //System.out.printf("expectedFile_12: %s\n", new Path(workDir, file_12).toString());
+ expectedOutput = new StringBuffer();
+ for (int i = 20; i < 30; i++) {
+ expectedOutput.append(""+i).append('\t').append(""+i).append("\n");
+ }
+ output = UtilsForTests.slurp(expectedFile_12);
+ //System.out.printf("File_2 output: %s\n", output);
+ assertEquals(output, expectedOutput.toString());
+
+ String file_13 = "3-part-00000";
+
+ File expectedFile_13 = new File(new Path(workDir, file_13).toString());
+ //System.out.printf("expectedFile_13: %s\n", new Path(workDir, file_13).toString());
+ expectedOutput = new StringBuffer();
+ for (int i = 30; i < 40; i++) {
+ expectedOutput.append(""+i).append('\t').append(""+i).append("\n");
+ }
+ output = UtilsForTests.slurp(expectedFile_13);
+ //System.out.printf("File_2 output: %s\n", output);
+ assertEquals(output, expectedOutput.toString());
+
+ String file_2 = "2/3";
+
+ File expectedFile_2 = new File(new Path(workDir, file_2).toString());
+ //System.out.printf("expectedFile_2: %s\n", new Path(workDir, file_2).toString());
+ expectedOutput = new StringBuffer();
+ for (int i = 10; i < 40; i++) {
+ expectedOutput.append(""+i).append('\t').append(""+i).append("\n");
+ }
+ output = UtilsForTests.slurp(expectedFile_2);
+ //System.out.printf("File_2 output: %s\n", output);
+ assertEquals(output, expectedOutput.toString());
+ }
+
+ public static void main(String[] args) throws Exception {
+ new TestMultipleTextOutputFormat().testFormat();
+ }
+}