You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cd...@apache.org on 2008/06/06 20:18:29 UTC

svn commit: r664067 - in /hadoop/core/trunk: ./ src/java/org/apache/hadoop/mapred/ src/java/org/apache/hadoop/mapred/lib/ src/test/org/apache/hadoop/mapred/

Author: cdouglas
Date: Fri Jun  6 11:18:29 2008
New Revision: 664067

URL: http://svn.apache.org/viewvc?rev=664067&view=rev
Log:
HADOOP-3460. Add SequenceFileAsBinaryOutputFormat to permit direct
writes of serialized data. Contributed by Koji Noguchi.


Added:
    hadoop/core/trunk/src/java/org/apache/hadoop/mapred/SequenceFileAsBinaryOutputFormat.java
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestSequenceFileAsBinaryOutputFormat.java
Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/java/org/apache/hadoop/mapred/SequenceFileOutputFormat.java
    hadoop/core/trunk/src/java/org/apache/hadoop/mapred/lib/MultipleSequenceFileOutputFormat.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=664067&r1=664066&r2=664067&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Fri Jun  6 11:18:29 2008
@@ -143,6 +143,9 @@
 
     HADOOP-3307. Support for Archives in Hadoop. (Mahadev Konar via ddas)
 
+    HADOOP-3460. Add SequenceFileAsBinaryOutputFormat to permit direct
+    writes of serialized data. (Koji Noguchi via cdouglas)
+
   IMPROVEMENTS
    
     HADOOP-2928. Remove deprecated FileSystem.getContentLength().

Added: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/SequenceFileAsBinaryOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/SequenceFileAsBinaryOutputFormat.java?rev=664067&view=auto
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/SequenceFileAsBinaryOutputFormat.java (added)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/SequenceFileAsBinaryOutputFormat.java Fri Jun  6 11:18:29 2008
@@ -0,0 +1,191 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import java.io.DataOutputStream;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.io.SequenceFile.ValueBytes;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.DefaultCodec;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.Progressable;
+
+/** 
+ * An {@link OutputFormat} that writes keys, values to 
+ * {@link SequenceFile}s in binary(raw) format
+ */
+public class SequenceFileAsBinaryOutputFormat 
+ extends SequenceFileOutputFormat <BytesWritable,BytesWritable> {
+
+  /** 
+   * Inner class used for appendRaw
+   */
+  static protected class WritableValueBytes implements ValueBytes {
+    private BytesWritable value;
+
+    public WritableValueBytes() {
+      this.value = null;
+    }
+    public WritableValueBytes(BytesWritable value) {
+      this.value = value;
+    }
+
+    public void reset(BytesWritable value) {
+      this.value = value;
+    }
+
+    public void writeUncompressedBytes(DataOutputStream outStream)
+      throws IOException {
+      outStream.write(value.get(), 0, value.getSize());
+    }
+
+    public void writeCompressedBytes(DataOutputStream outStream)
+      throws IllegalArgumentException, IOException {
+      throw
+        new UnsupportedOperationException("WritableValueBytes doesn't support " 
+                                          + "RECORD compression"); 
+    }
+    public int getSize(){
+      return value.getSize();
+    }
+  }
+
+  /**
+   * Set the key class for the {@link SequenceFile}
+   * <p>This allows the user to specify the key class to be different 
+   * from the actual class ({@link BytesWritable}) used for writing </p>
+   * 
+   * @param conf the {@link JobConf} to modify
+   * @param theClass the SequenceFile output key class.
+   */
+  static public void setSequenceFileOutputKeyClass(JobConf conf, 
+                                                   Class<?> theClass) {
+    conf.setClass("mapred.seqbinary.output.key.class", theClass, Object.class);
+  }
+
+  /**
+   * Set the value class for the {@link SequenceFile}
+   * <p>This allows the user to specify the value class to be different 
+   * from the actual class ({@link BytesWritable}) used for writing </p>
+   * 
+   * @param conf the {@link JobConf} to modify
+   * @param theClass the SequenceFile output key class.
+   */
+  static public void setSequenceFileOutputValueClass(JobConf conf, 
+                                                     Class<?> theClass) {
+    conf.setClass("mapred.seqbinary.output.value.class", 
+                  theClass, Object.class);
+  }
+
+  /**
+   * Get the key class for the {@link SequenceFile}
+   * 
+   * @return the key class of the {@link SequenceFile}
+   */
+  static public Class<?> getSequenceFileOutputKeyClass(JobConf conf) { 
+    return conf.getClass("mapred.seqbinary.output.key.class", 
+                         conf.getOutputKeyClass(), Object.class);
+  }
+
+  /**
+   * Get the value class for the {@link SequenceFile}
+   * 
+   * @return the value class of the {@link SequenceFile}
+   */
+  static public Class<?> getSequenceFileOutputValueClass(JobConf conf) { 
+    return conf.getClass("mapred.seqbinary.output.value.class", 
+                         conf.getOutputValueClass(), Object.class);
+  }
+
+
+
+  
+  @Override 
+  public RecordWriter <BytesWritable, BytesWritable> 
+             getRecordWriter(FileSystem ignored, JobConf job,
+                             String name, Progressable progress)
+    throws IOException {
+
+    Path outputPath = getWorkOutputPath(job);
+    FileSystem fs = outputPath.getFileSystem(job);
+    if (!fs.exists(outputPath)) {
+      throw new IOException("Output directory doesnt exist");
+    }
+    Path file = new Path(outputPath, name);
+    CompressionCodec codec = null;
+    CompressionType compressionType = CompressionType.NONE;
+    if (getCompressOutput(job)) {
+      // find the kind of compression to do
+      compressionType = getOutputCompressionType(job);
+
+      // find the right codec
+      Class codecClass = getOutputCompressorClass(job, DefaultCodec.class);
+      codec = (CompressionCodec) 
+        ReflectionUtils.newInstance(codecClass, job);
+    }
+    final SequenceFile.Writer out = 
+      SequenceFile.createWriter(fs, job, file,
+                    getSequenceFileOutputKeyClass(job),
+                    getSequenceFileOutputValueClass(job),
+                    compressionType,
+                    codec,
+                    progress);
+
+    return new RecordWriter<BytesWritable, BytesWritable>() {
+        
+        private WritableValueBytes wvaluebytes = new WritableValueBytes();
+
+        public void write(BytesWritable bkey, BytesWritable bvalue)
+          throws IOException {
+
+          wvaluebytes.reset(bvalue);
+          out.appendRaw(bkey.get(), 0, bkey.getSize(), wvaluebytes);
+          wvaluebytes.reset(null);
+        }
+
+        public void close(Reporter reporter) throws IOException { 
+          out.close();
+        }
+
+      };
+
+  }
+
+  @Override 
+  public void checkOutputSpecs(FileSystem ignored, JobConf job) 
+            throws IOException {
+    super.checkOutputSpecs(ignored, job);
+    if (getCompressOutput(job) && 
+        getOutputCompressionType(job) == CompressionType.RECORD ){
+        throw new InvalidJobConfException("SequenceFileAsBinaryOutputFormat "
+                    + "doesn't support Record Compression" );
+    }
+
+  }
+
+}

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/SequenceFileOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/SequenceFileOutputFormat.java?rev=664067&r1=664066&r2=664067&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/SequenceFileOutputFormat.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/SequenceFileOutputFormat.java Fri Jun  6 11:18:29 2008
@@ -35,10 +35,11 @@
 import org.apache.hadoop.util.*;
 
 /** An {@link OutputFormat} that writes {@link SequenceFile}s. */
-public class SequenceFileOutputFormat 
-extends FileOutputFormat<WritableComparable, Writable> {
+public class SequenceFileOutputFormat <K extends WritableComparable,
+                                       V extends Writable>
+extends FileOutputFormat<K, V> {
 
-  public RecordWriter<WritableComparable, Writable> getRecordWriter(
+  public RecordWriter<K, V> getRecordWriter(
                                           FileSystem ignored, JobConf job,
                                           String name, Progressable progress)
     throws IOException {
@@ -68,9 +69,9 @@
                                 codec,
                                 progress);
 
-    return new RecordWriter<WritableComparable, Writable>() {
+    return new RecordWriter<K, V>() {
 
-        public void write(WritableComparable key, Writable value)
+        public void write(K key, V value)
           throws IOException {
 
           out.append(key, value);

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/lib/MultipleSequenceFileOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/lib/MultipleSequenceFileOutputFormat.java?rev=664067&r1=664066&r2=664067&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/lib/MultipleSequenceFileOutputFormat.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/lib/MultipleSequenceFileOutputFormat.java Fri Jun  6 11:18:29 2008
@@ -38,6 +38,7 @@
   private SequenceFileOutputFormat theSequenceFileOutputFormat = null;
   
   @Override
+  @SuppressWarnings("unchecked") 
   protected RecordWriter<WritableComparable, Writable> getBaseRecordWriter(
                                                          FileSystem fs,
                                                          JobConf job,

Added: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestSequenceFileAsBinaryOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestSequenceFileAsBinaryOutputFormat.java?rev=664067&view=auto
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestSequenceFileAsBinaryOutputFormat.java (added)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestSequenceFileAsBinaryOutputFormat.java Fri Jun  6 11:18:29 2008
@@ -0,0 +1,206 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import java.util.Random;
+
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.io.*;
+import org.apache.hadoop.mapred.*;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
+
+import junit.framework.TestCase;
+import org.apache.commons.logging.*;
+
+public class TestSequenceFileAsBinaryOutputFormat extends TestCase {
+  private static final Log LOG =
+      LogFactory.getLog(TestSequenceFileAsBinaryOutputFormat.class.getName());
+
+  private static final int RECORDS = 10000;
+
+  public void testBinary() throws IOException {
+    JobConf job = new JobConf();
+    FileSystem fs = FileSystem.getLocal(job);
+    Path dir = new Path(System.getProperty("test.build.data",".") + "/mapred");
+    Path file = new Path(dir, "testbinary.seq");
+    Random r = new Random();
+    long seed = r.nextLong();
+    r.setSeed(seed);
+
+    fs.delete(dir, true);
+    if (!fs.mkdirs(dir)) { 
+      fail("Failed to create output directory");
+    }
+
+    FileOutputFormat.setWorkOutputPath(job, dir);
+
+    SequenceFileAsBinaryOutputFormat.setSequenceFileOutputKeyClass(job, 
+                                          IntWritable.class );
+    SequenceFileAsBinaryOutputFormat.setSequenceFileOutputValueClass(job, 
+                                          DoubleWritable.class ); 
+
+    SequenceFileAsBinaryOutputFormat.setCompressOutput(job, true);
+    SequenceFileAsBinaryOutputFormat.setOutputCompressionType(job, 
+                                                       CompressionType.BLOCK);
+
+    BytesWritable bkey = new BytesWritable();
+    BytesWritable bval = new BytesWritable();
+
+
+    RecordWriter <BytesWritable, BytesWritable> writer = 
+      new SequenceFileAsBinaryOutputFormat().getRecordWriter(fs, 
+                                                       job, file.toString(),
+                                                       Reporter.NULL);
+
+    IntWritable iwritable = new IntWritable();
+    DoubleWritable dwritable = new DoubleWritable();
+    DataOutputBuffer outbuf = new DataOutputBuffer();
+    LOG.info("Creating data by SequenceFileAsBinaryOutputFormat");
+    try {
+      for (int i = 0; i < RECORDS; ++i) {
+        iwritable = new IntWritable(r.nextInt());
+        iwritable.write(outbuf);
+        bkey.set(outbuf.getData(), 0, outbuf.getLength());
+        outbuf.reset();
+        dwritable = new DoubleWritable(r.nextDouble());
+        dwritable.write(outbuf);
+        bval.set(outbuf.getData(), 0, outbuf.getLength());
+        outbuf.reset();
+        writer.write(bkey, bval);
+      }
+    } finally {
+      writer.close(Reporter.NULL);
+    }
+
+    InputFormat<IntWritable,DoubleWritable> iformat =
+                    new SequenceFileInputFormat<IntWritable,DoubleWritable>();
+    int count = 0;
+    r.setSeed(seed);
+    DataInputBuffer buf = new DataInputBuffer();
+    final int NUM_SPLITS = 3;
+    SequenceFileInputFormat.addInputPath(job, file);
+    LOG.info("Reading data by SequenceFileInputFormat");
+    for (InputSplit split : iformat.getSplits(job, NUM_SPLITS)) {
+      RecordReader<IntWritable,DoubleWritable> reader =
+        iformat.getRecordReader(split, job, Reporter.NULL);
+      try {
+        int sourceInt;
+        double sourceDouble;
+        while (reader.next(iwritable, dwritable)) {
+          sourceInt = r.nextInt();
+          sourceDouble = r.nextDouble();
+          assertEquals(
+              "Keys don't match: " + "*" + iwritable.get() + ":" + 
+                                           sourceInt + "*",
+              sourceInt, iwritable.get());
+          assertTrue(
+              "Vals don't match: " + "*" + dwritable.get() + ":" +
+                                           sourceDouble + "*",
+              Double.compare(dwritable.get(), sourceDouble) == 0 );
+          ++count;
+        }
+      } finally {
+        reader.close();
+      }
+    }
+    assertEquals("Some records not found", RECORDS, count);
+  }
+
+  public void testSequenceOutputClassDefaultsToMapRedOutputClass() 
+         throws IOException {
+    JobConf job = new JobConf();
+    FileSystem fs = FileSystem.getLocal(job);
+
+    // Setting Random class to test getSequenceFileOutput{Key,Value}Class
+    job.setOutputKeyClass(FloatWritable.class);
+    job.setOutputValueClass(BooleanWritable.class);
+
+    assertEquals("SequenceFileOutputKeyClass should default to ouputKeyClass", 
+             FloatWritable.class,
+             SequenceFileAsBinaryOutputFormat.getSequenceFileOutputKeyClass(
+                                                                         job));
+    assertEquals("SequenceFileOutputValueClass should default to " 
+             + "ouputValueClass", 
+             BooleanWritable.class,
+             SequenceFileAsBinaryOutputFormat.getSequenceFileOutputValueClass(
+                                                                         job));
+
+    SequenceFileAsBinaryOutputFormat.setSequenceFileOutputKeyClass(job, 
+                                          IntWritable.class );
+    SequenceFileAsBinaryOutputFormat.setSequenceFileOutputValueClass(job, 
+                                          DoubleWritable.class ); 
+
+    assertEquals("SequenceFileOutputKeyClass not updated", 
+             IntWritable.class,
+             SequenceFileAsBinaryOutputFormat.getSequenceFileOutputKeyClass(
+                                                                         job));
+    assertEquals("SequenceFileOutputValueClass not updated", 
+             DoubleWritable.class,
+             SequenceFileAsBinaryOutputFormat.getSequenceFileOutputValueClass(
+                                                                         job));
+  }
+
+  public void testcheckOutputSpecsForbidRecordCompression() throws IOException {
+    JobConf job = new JobConf();
+    FileSystem fs = FileSystem.getLocal(job);
+    Path dir = new Path(System.getProperty("test.build.data",".") + "/mapred");
+    Path outputdir = new Path(System.getProperty("test.build.data",".") 
+                              + "/output");
+
+    fs.delete(dir, true);
+    fs.delete(outputdir, true);
+    if (!fs.mkdirs(dir)) { 
+      fail("Failed to create output directory");
+    }
+
+    FileOutputFormat.setWorkOutputPath(job, dir);
+
+    // Without outputpath, FileOutputFormat.checkoutputspecs will throw 
+    // InvalidJobConfException
+    FileOutputFormat.setOutputPath(job, outputdir);
+
+    // SequenceFileAsBinaryOutputFormat doesn't support record compression
+    // It should throw an exception when checked by checkOutputSpecs
+    SequenceFileAsBinaryOutputFormat.setCompressOutput(job, true);
+
+    SequenceFileAsBinaryOutputFormat.setOutputCompressionType(job, 
+                                                       CompressionType.BLOCK);
+    try {
+      new SequenceFileAsBinaryOutputFormat().checkOutputSpecs(fs, job);
+    } catch (Exception e) {
+      fail("Block compression should be allowed for " 
+                       + "SequenceFileAsBinaryOutputFormat:" 
+                       + "Caught " + e.getClass().getName());
+    }
+
+    SequenceFileAsBinaryOutputFormat.setOutputCompressionType(job, 
+                                                       CompressionType.RECORD);
+    try {
+      new SequenceFileAsBinaryOutputFormat().checkOutputSpecs(fs, job);
+      fail("Record compression should not be allowed for " 
+                           +"SequenceFileAsBinaryOutputFormat");
+    } catch (InvalidJobConfException ie) {
+      // expected
+    } catch (Exception e) {
+      fail("Expected " + InvalidJobConfException.class.getName() 
+                       + "but caught " + e.getClass().getName() );
+    }
+  }
+}