You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2008/03/02 08:13:22 UTC

svn commit: r632717 - in /hadoop/core/trunk: CHANGES.txt src/java/org/apache/hadoop/mapred/SequenceFileAsBinaryInputFormat.java src/test/org/apache/hadoop/mapred/TestSequenceFileAsBinaryInputFormat.java

Author: omalley
Date: Sat Mar  1 23:13:21 2008
New Revision: 632717

URL: http://svn.apache.org/viewvc?rev=632717&view=rev
Log:
HADOOP-2923. Check in SequenceFileAsBinaryInputFormat, which was left out of
commit for HADOOP-2603. Contributed by Chris Douglas.

Added:
    hadoop/core/trunk/src/java/org/apache/hadoop/mapred/SequenceFileAsBinaryInputFormat.java
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestSequenceFileAsBinaryInputFormat.java
Modified:
    hadoop/core/trunk/CHANGES.txt

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=632717&r1=632716&r2=632717&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Sat Mar  1 23:13:21 2008
@@ -191,6 +191,9 @@
     HADOOP-2918.  Improve error logging so that dfs writes failure with
     "No lease on file" can be diagnosed. (dhruba)
 
+    HADOOP-2923.  Add SequenceFileAsBinaryInputFormat, which was
+    missed in the commit for HADOOP-2603. (cdouglas via omalley)
+
 Release 0.16.0 - 2008-02-07
 
   INCOMPATIBLE CHANGES

Added: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/SequenceFileAsBinaryInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/SequenceFileAsBinaryInputFormat.java?rev=632717&view=auto
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/SequenceFileAsBinaryInputFormat.java (added)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/SequenceFileAsBinaryInputFormat.java Sat Mar  1 23:13:21 2008
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+
+/**
+ * InputFormat reading keys, values from SequenceFiles in binary (raw)
+ * format.
+ */
+public class SequenceFileAsBinaryInputFormat
+    extends SequenceFileInputFormat<BytesWritable,BytesWritable> {
+
+  public SequenceFileAsBinaryInputFormat() {
+    super();
+  }
+
+  public RecordReader<BytesWritable,BytesWritable> getRecordReader(
+      InputSplit split, JobConf job, Reporter reporter)
+      throws IOException {
+    return new SequenceFileAsBinaryRecordReader(job, (FileSplit)split);
+  }
+
+  /**
+   * Read records from a SequenceFile as binary (raw) bytes.
+   */
+  public static class SequenceFileAsBinaryRecordReader
+      implements RecordReader<BytesWritable,BytesWritable> {
+    private SequenceFile.Reader in;
+    private long start;
+    private long end;
+    private boolean done = false;
+    private DataOutputBuffer buffer = new DataOutputBuffer();
+    private SequenceFile.ValueBytes vbytes;
+
+    public SequenceFileAsBinaryRecordReader(Configuration conf, FileSplit split)
+        throws IOException {
+      Path path = split.getPath();
+      FileSystem fs = path.getFileSystem(conf);
+      this.in = new SequenceFile.Reader(fs, path, conf);
+      this.end = split.getStart() + split.getLength();
+      vbytes = in.createValueBytes();
+    }
+
+    public BytesWritable createKey() {
+      return new BytesWritable();
+    }
+
+    public BytesWritable createValue() {
+      return new BytesWritable();
+    }
+
+    /**
+     * Retrieve the name of the key class for this SequenceFile.
+     * @see org.apache.hadoop.io.SequenceFile.Reader#getKeyClassName
+     */
+    public String getKeyClassName() {
+      return in.getKeyClassName();
+    }
+
+    /**
+     * Retrieve the name of the value class for this SequenceFile.
+     * @see org.apache.hadoop.io.SequenceFile.Reader#getValueClassName
+     */
+    public String getValueClassName() {
+      return in.getValueClassName();
+    }
+
+    /**
+     * Read raw bytes from a SequenceFile.
+     */
+    public synchronized boolean next(BytesWritable key, BytesWritable val)
+        throws IOException {
+      if (done) return false;
+      long pos = in.getPosition();
+      boolean eof = -1 == in.nextRawKey(buffer);
+      if (!eof) {
+        key.set(buffer.getData(), 0, buffer.getLength());
+        buffer.reset();
+        in.nextRawValue(vbytes);
+        vbytes.writeUncompressedBytes(buffer);
+        val.set(buffer.getData(), 0, buffer.getLength());
+        buffer.reset();
+      }
+      return !(done = (eof || (pos >= end && in.syncSeen())));
+    }
+
+    public long getPos() throws IOException {
+      return in.getPosition();
+    }
+
+    public void close() throws IOException {
+      in.close();
+    }
+
+    /**
+     * Return the progress within the input split
+     * @return 0.0 to 1.0 of the input byte range
+     */
+    public float getProgress() throws IOException {
+      if (end == start) {
+        return 0.0f;
+      } else {
+        return Math.min(1.0f, (float)((in.getPosition() - start) /
+                                      (double)(end - start)));
+      }
+    }
+  }
+}

Added: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestSequenceFileAsBinaryInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestSequenceFileAsBinaryInputFormat.java?rev=632717&view=auto
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestSequenceFileAsBinaryInputFormat.java (added)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestSequenceFileAsBinaryInputFormat.java Sat Mar  1 23:13:21 2008
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import java.util.Random;
+
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.io.*;
+import org.apache.hadoop.mapred.*;
+
+import junit.framework.TestCase;
+import org.apache.commons.logging.*;
+
+public class TestSequenceFileAsBinaryInputFormat extends TestCase {
+  private static final Log LOG = InputFormatBase.LOG;
+  private static final int RECORDS = 10000;
+
+  public void testBinary() throws IOException {
+    JobConf job = new JobConf();
+    FileSystem fs = FileSystem.getLocal(job);
+    Path dir = new Path(System.getProperty("test.build.data",".") + "/mapred");
+    Path file = new Path(dir, "testbinary.seq");
+    Random r = new Random();
+    long seed = r.nextLong();
+    r.setSeed(seed);
+
+    fs.delete(dir);
+    job.setInputPath(dir);
+
+    Text tkey = new Text();
+    Text tval = new Text();
+
+    SequenceFile.Writer writer =
+      new SequenceFile.Writer(fs, job, file, Text.class, Text.class);
+    try {
+      for (int i = 0; i < RECORDS; ++i) {
+        tkey.set(Integer.toString(r.nextInt(), 36));
+        tval.set(Long.toString(r.nextLong(), 36));
+        writer.append(tkey, tval);
+      }
+    } finally {
+      writer.close();
+    }
+
+    InputFormat<BytesWritable,BytesWritable> bformat =
+      new SequenceFileAsBinaryInputFormat();
+
+    int count = 0;
+    r.setSeed(seed);
+    BytesWritable bkey = new BytesWritable();
+    BytesWritable bval = new BytesWritable();
+    Text cmpkey = new Text();
+    Text cmpval = new Text();
+    DataInputBuffer buf = new DataInputBuffer();
+    RecordReader<BytesWritable,BytesWritable> reader =
+      bformat.getRecordReader(new FileSplit(file, 0,
+            fs.getFileStatus(file).getLen(), job), job, Reporter.NULL);
+    try {
+      while (reader.next(bkey, bval)) {
+        tkey.set(Integer.toString(r.nextInt(), 36));
+        tval.set(Long.toString(r.nextLong(), 36));
+        buf.reset(bkey.get(), bkey.getSize());
+        cmpkey.readFields(buf);
+        buf.reset(bval.get(), bval.getSize());
+        cmpval.readFields(buf);
+        assertTrue(
+            "Keys don't match: " + "*" + cmpkey.toString() + ":" +
+                                         tkey.toString() + "*",
+            cmpkey.toString().equals(tkey.toString()));
+        assertTrue(
+            "Vals don't match: " + "*" + cmpval.toString() + ":" +
+                                         tval.toString() + "*",
+            cmpval.toString().equals(tval.toString()));
+        ++count;
+      }
+    } finally {
+      reader.close();
+    }
+    assertEquals("Some records not found", RECORDS, count);
+  }
+
+}