You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cd...@apache.org on 2008/06/06 20:18:29 UTC
svn commit: r664067 - in /hadoop/core/trunk: ./
src/java/org/apache/hadoop/mapred/ src/java/org/apache/hadoop/mapred/lib/
src/test/org/apache/hadoop/mapred/
Author: cdouglas
Date: Fri Jun 6 11:18:29 2008
New Revision: 664067
URL: http://svn.apache.org/viewvc?rev=664067&view=rev
Log:
HADOOP-3460. Add SequenceFileAsBinaryOutputFormat to permit direct
writes of serialized data. Contributed by Koji Noguchi.
Added:
hadoop/core/trunk/src/java/org/apache/hadoop/mapred/SequenceFileAsBinaryOutputFormat.java
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestSequenceFileAsBinaryOutputFormat.java
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/java/org/apache/hadoop/mapred/SequenceFileOutputFormat.java
hadoop/core/trunk/src/java/org/apache/hadoop/mapred/lib/MultipleSequenceFileOutputFormat.java
Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=664067&r1=664066&r2=664067&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Fri Jun 6 11:18:29 2008
@@ -143,6 +143,9 @@
HADOOP-3307. Support for Archives in Hadoop. (Mahadev Konar via ddas)
+ HADOOP-3460. Add SequenceFileAsBinaryOutputFormat to permit direct
+ writes of serialized data. (Koji Noguchi via cdouglas)
+
IMPROVEMENTS
HADOOP-2928. Remove deprecated FileSystem.getContentLength().
Added: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/SequenceFileAsBinaryOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/SequenceFileAsBinaryOutputFormat.java?rev=664067&view=auto
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/SequenceFileAsBinaryOutputFormat.java (added)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/SequenceFileAsBinaryOutputFormat.java Fri Jun 6 11:18:29 2008
@@ -0,0 +1,191 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import java.io.DataOutputStream;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.io.SequenceFile.ValueBytes;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.DefaultCodec;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.Progressable;
+
+/**
+ * An {@link OutputFormat} that writes keys, values to
+ * {@link SequenceFile}s in binary(raw) format
+ */
+public class SequenceFileAsBinaryOutputFormat
+ extends SequenceFileOutputFormat <BytesWritable,BytesWritable> {
+
+ /**
+ * Inner class used for appendRaw
+ */
+ static protected class WritableValueBytes implements ValueBytes {
+ private BytesWritable value;
+
+ public WritableValueBytes() {
+ this.value = null;
+ }
+ public WritableValueBytes(BytesWritable value) {
+ this.value = value;
+ }
+
+ public void reset(BytesWritable value) {
+ this.value = value;
+ }
+
+ public void writeUncompressedBytes(DataOutputStream outStream)
+ throws IOException {
+ outStream.write(value.get(), 0, value.getSize());
+ }
+
+ public void writeCompressedBytes(DataOutputStream outStream)
+ throws IllegalArgumentException, IOException {
+ throw
+ new UnsupportedOperationException("WritableValueBytes doesn't support "
+ + "RECORD compression");
+ }
+ public int getSize(){
+ return value.getSize();
+ }
+ }
+
+ /**
+ * Set the key class for the {@link SequenceFile}
+ * <p>This allows the user to specify the key class to be different
+ * from the actual class ({@link BytesWritable}) used for writing </p>
+ *
+ * @param conf the {@link JobConf} to modify
+ * @param theClass the SequenceFile output key class.
+ */
+ static public void setSequenceFileOutputKeyClass(JobConf conf,
+ Class<?> theClass) {
+ conf.setClass("mapred.seqbinary.output.key.class", theClass, Object.class);
+ }
+
+ /**
+ * Set the value class for the {@link SequenceFile}
+ * <p>This allows the user to specify the value class to be different
+ * from the actual class ({@link BytesWritable}) used for writing </p>
+ *
+ * @param conf the {@link JobConf} to modify
+ * @param theClass the SequenceFile output key class.
+ */
+ static public void setSequenceFileOutputValueClass(JobConf conf,
+ Class<?> theClass) {
+ conf.setClass("mapred.seqbinary.output.value.class",
+ theClass, Object.class);
+ }
+
+ /**
+ * Get the key class for the {@link SequenceFile}
+ *
+ * @return the key class of the {@link SequenceFile}
+ */
+ static public Class<?> getSequenceFileOutputKeyClass(JobConf conf) {
+ return conf.getClass("mapred.seqbinary.output.key.class",
+ conf.getOutputKeyClass(), Object.class);
+ }
+
+ /**
+ * Get the value class for the {@link SequenceFile}
+ *
+ * @return the value class of the {@link SequenceFile}
+ */
+ static public Class<?> getSequenceFileOutputValueClass(JobConf conf) {
+ return conf.getClass("mapred.seqbinary.output.value.class",
+ conf.getOutputValueClass(), Object.class);
+ }
+
+
+
+
+ @Override
+ public RecordWriter <BytesWritable, BytesWritable>
+ getRecordWriter(FileSystem ignored, JobConf job,
+ String name, Progressable progress)
+ throws IOException {
+
+ Path outputPath = getWorkOutputPath(job);
+ FileSystem fs = outputPath.getFileSystem(job);
+ if (!fs.exists(outputPath)) {
+ throw new IOException("Output directory doesnt exist");
+ }
+ Path file = new Path(outputPath, name);
+ CompressionCodec codec = null;
+ CompressionType compressionType = CompressionType.NONE;
+ if (getCompressOutput(job)) {
+ // find the kind of compression to do
+ compressionType = getOutputCompressionType(job);
+
+ // find the right codec
+ Class codecClass = getOutputCompressorClass(job, DefaultCodec.class);
+ codec = (CompressionCodec)
+ ReflectionUtils.newInstance(codecClass, job);
+ }
+ final SequenceFile.Writer out =
+ SequenceFile.createWriter(fs, job, file,
+ getSequenceFileOutputKeyClass(job),
+ getSequenceFileOutputValueClass(job),
+ compressionType,
+ codec,
+ progress);
+
+ return new RecordWriter<BytesWritable, BytesWritable>() {
+
+ private WritableValueBytes wvaluebytes = new WritableValueBytes();
+
+ public void write(BytesWritable bkey, BytesWritable bvalue)
+ throws IOException {
+
+ wvaluebytes.reset(bvalue);
+ out.appendRaw(bkey.get(), 0, bkey.getSize(), wvaluebytes);
+ wvaluebytes.reset(null);
+ }
+
+ public void close(Reporter reporter) throws IOException {
+ out.close();
+ }
+
+ };
+
+ }
+
+ @Override
+ public void checkOutputSpecs(FileSystem ignored, JobConf job)
+ throws IOException {
+ super.checkOutputSpecs(ignored, job);
+ if (getCompressOutput(job) &&
+ getOutputCompressionType(job) == CompressionType.RECORD ){
+ throw new InvalidJobConfException("SequenceFileAsBinaryOutputFormat "
+ + "doesn't support Record Compression" );
+ }
+
+ }
+
+}
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/SequenceFileOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/SequenceFileOutputFormat.java?rev=664067&r1=664066&r2=664067&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/SequenceFileOutputFormat.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/SequenceFileOutputFormat.java Fri Jun 6 11:18:29 2008
@@ -35,10 +35,11 @@
import org.apache.hadoop.util.*;
/** An {@link OutputFormat} that writes {@link SequenceFile}s. */
-public class SequenceFileOutputFormat
-extends FileOutputFormat<WritableComparable, Writable> {
+public class SequenceFileOutputFormat <K extends WritableComparable,
+ V extends Writable>
+extends FileOutputFormat<K, V> {
- public RecordWriter<WritableComparable, Writable> getRecordWriter(
+ public RecordWriter<K, V> getRecordWriter(
FileSystem ignored, JobConf job,
String name, Progressable progress)
throws IOException {
@@ -68,9 +69,9 @@
codec,
progress);
- return new RecordWriter<WritableComparable, Writable>() {
+ return new RecordWriter<K, V>() {
- public void write(WritableComparable key, Writable value)
+ public void write(K key, V value)
throws IOException {
out.append(key, value);
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/lib/MultipleSequenceFileOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/lib/MultipleSequenceFileOutputFormat.java?rev=664067&r1=664066&r2=664067&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/lib/MultipleSequenceFileOutputFormat.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/lib/MultipleSequenceFileOutputFormat.java Fri Jun 6 11:18:29 2008
@@ -38,6 +38,7 @@
private SequenceFileOutputFormat theSequenceFileOutputFormat = null;
@Override
+ @SuppressWarnings("unchecked")
protected RecordWriter<WritableComparable, Writable> getBaseRecordWriter(
FileSystem fs,
JobConf job,
Added: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestSequenceFileAsBinaryOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestSequenceFileAsBinaryOutputFormat.java?rev=664067&view=auto
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestSequenceFileAsBinaryOutputFormat.java (added)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestSequenceFileAsBinaryOutputFormat.java Fri Jun 6 11:18:29 2008
@@ -0,0 +1,206 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import java.util.Random;
+
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.io.*;
+import org.apache.hadoop.mapred.*;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
+
+import junit.framework.TestCase;
+import org.apache.commons.logging.*;
+
+public class TestSequenceFileAsBinaryOutputFormat extends TestCase {
+ private static final Log LOG =
+ LogFactory.getLog(TestSequenceFileAsBinaryOutputFormat.class.getName());
+
+ private static final int RECORDS = 10000;
+
+ public void testBinary() throws IOException {
+ JobConf job = new JobConf();
+ FileSystem fs = FileSystem.getLocal(job);
+ Path dir = new Path(System.getProperty("test.build.data",".") + "/mapred");
+ Path file = new Path(dir, "testbinary.seq");
+ Random r = new Random();
+ long seed = r.nextLong();
+ r.setSeed(seed);
+
+ fs.delete(dir, true);
+ if (!fs.mkdirs(dir)) {
+ fail("Failed to create output directory");
+ }
+
+ FileOutputFormat.setWorkOutputPath(job, dir);
+
+ SequenceFileAsBinaryOutputFormat.setSequenceFileOutputKeyClass(job,
+ IntWritable.class );
+ SequenceFileAsBinaryOutputFormat.setSequenceFileOutputValueClass(job,
+ DoubleWritable.class );
+
+ SequenceFileAsBinaryOutputFormat.setCompressOutput(job, true);
+ SequenceFileAsBinaryOutputFormat.setOutputCompressionType(job,
+ CompressionType.BLOCK);
+
+ BytesWritable bkey = new BytesWritable();
+ BytesWritable bval = new BytesWritable();
+
+
+ RecordWriter <BytesWritable, BytesWritable> writer =
+ new SequenceFileAsBinaryOutputFormat().getRecordWriter(fs,
+ job, file.toString(),
+ Reporter.NULL);
+
+ IntWritable iwritable = new IntWritable();
+ DoubleWritable dwritable = new DoubleWritable();
+ DataOutputBuffer outbuf = new DataOutputBuffer();
+ LOG.info("Creating data by SequenceFileAsBinaryOutputFormat");
+ try {
+ for (int i = 0; i < RECORDS; ++i) {
+ iwritable = new IntWritable(r.nextInt());
+ iwritable.write(outbuf);
+ bkey.set(outbuf.getData(), 0, outbuf.getLength());
+ outbuf.reset();
+ dwritable = new DoubleWritable(r.nextDouble());
+ dwritable.write(outbuf);
+ bval.set(outbuf.getData(), 0, outbuf.getLength());
+ outbuf.reset();
+ writer.write(bkey, bval);
+ }
+ } finally {
+ writer.close(Reporter.NULL);
+ }
+
+ InputFormat<IntWritable,DoubleWritable> iformat =
+ new SequenceFileInputFormat<IntWritable,DoubleWritable>();
+ int count = 0;
+ r.setSeed(seed);
+ DataInputBuffer buf = new DataInputBuffer();
+ final int NUM_SPLITS = 3;
+ SequenceFileInputFormat.addInputPath(job, file);
+ LOG.info("Reading data by SequenceFileInputFormat");
+ for (InputSplit split : iformat.getSplits(job, NUM_SPLITS)) {
+ RecordReader<IntWritable,DoubleWritable> reader =
+ iformat.getRecordReader(split, job, Reporter.NULL);
+ try {
+ int sourceInt;
+ double sourceDouble;
+ while (reader.next(iwritable, dwritable)) {
+ sourceInt = r.nextInt();
+ sourceDouble = r.nextDouble();
+ assertEquals(
+ "Keys don't match: " + "*" + iwritable.get() + ":" +
+ sourceInt + "*",
+ sourceInt, iwritable.get());
+ assertTrue(
+ "Vals don't match: " + "*" + dwritable.get() + ":" +
+ sourceDouble + "*",
+ Double.compare(dwritable.get(), sourceDouble) == 0 );
+ ++count;
+ }
+ } finally {
+ reader.close();
+ }
+ }
+ assertEquals("Some records not found", RECORDS, count);
+ }
+
+ public void testSequenceOutputClassDefaultsToMapRedOutputClass()
+ throws IOException {
+ JobConf job = new JobConf();
+ FileSystem fs = FileSystem.getLocal(job);
+
+ // Setting Random class to test getSequenceFileOutput{Key,Value}Class
+ job.setOutputKeyClass(FloatWritable.class);
+ job.setOutputValueClass(BooleanWritable.class);
+
+ assertEquals("SequenceFileOutputKeyClass should default to ouputKeyClass",
+ FloatWritable.class,
+ SequenceFileAsBinaryOutputFormat.getSequenceFileOutputKeyClass(
+ job));
+ assertEquals("SequenceFileOutputValueClass should default to "
+ + "ouputValueClass",
+ BooleanWritable.class,
+ SequenceFileAsBinaryOutputFormat.getSequenceFileOutputValueClass(
+ job));
+
+ SequenceFileAsBinaryOutputFormat.setSequenceFileOutputKeyClass(job,
+ IntWritable.class );
+ SequenceFileAsBinaryOutputFormat.setSequenceFileOutputValueClass(job,
+ DoubleWritable.class );
+
+ assertEquals("SequenceFileOutputKeyClass not updated",
+ IntWritable.class,
+ SequenceFileAsBinaryOutputFormat.getSequenceFileOutputKeyClass(
+ job));
+ assertEquals("SequenceFileOutputValueClass not updated",
+ DoubleWritable.class,
+ SequenceFileAsBinaryOutputFormat.getSequenceFileOutputValueClass(
+ job));
+ }
+
+ public void testcheckOutputSpecsForbidRecordCompression() throws IOException {
+ JobConf job = new JobConf();
+ FileSystem fs = FileSystem.getLocal(job);
+ Path dir = new Path(System.getProperty("test.build.data",".") + "/mapred");
+ Path outputdir = new Path(System.getProperty("test.build.data",".")
+ + "/output");
+
+ fs.delete(dir, true);
+ fs.delete(outputdir, true);
+ if (!fs.mkdirs(dir)) {
+ fail("Failed to create output directory");
+ }
+
+ FileOutputFormat.setWorkOutputPath(job, dir);
+
+ // Without outputpath, FileOutputFormat.checkoutputspecs will throw
+ // InvalidJobConfException
+ FileOutputFormat.setOutputPath(job, outputdir);
+
+ // SequenceFileAsBinaryOutputFormat doesn't support record compression
+ // It should throw an exception when checked by checkOutputSpecs
+ SequenceFileAsBinaryOutputFormat.setCompressOutput(job, true);
+
+ SequenceFileAsBinaryOutputFormat.setOutputCompressionType(job,
+ CompressionType.BLOCK);
+ try {
+ new SequenceFileAsBinaryOutputFormat().checkOutputSpecs(fs, job);
+ } catch (Exception e) {
+ fail("Block compression should be allowed for "
+ + "SequenceFileAsBinaryOutputFormat:"
+ + "Caught " + e.getClass().getName());
+ }
+
+ SequenceFileAsBinaryOutputFormat.setOutputCompressionType(job,
+ CompressionType.RECORD);
+ try {
+ new SequenceFileAsBinaryOutputFormat().checkOutputSpecs(fs, job);
+ fail("Record compression should not be allowed for "
+ +"SequenceFileAsBinaryOutputFormat");
+ } catch (InvalidJobConfException ie) {
+ // expected
+ } catch (Exception e) {
+ fail("Expected " + InvalidJobConfException.class.getName()
+ + "but caught " + e.getClass().getName() );
+ }
+ }
+}