You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2008/03/02 08:13:22 UTC
svn commit: r632717 - in /hadoop/core/trunk: CHANGES.txt
src/java/org/apache/hadoop/mapred/SequenceFileAsBinaryInputFormat.java
src/test/org/apache/hadoop/mapred/TestSequenceFileAsBinaryInputFormat.java
Author: omalley
Date: Sat Mar 1 23:13:21 2008
New Revision: 632717
URL: http://svn.apache.org/viewvc?rev=632717&view=rev
Log:
HADOOP-2923. Check in SequenceFileAsBinaryInputFormat, which was left out of
commit for HADOOP-2603. Contributed by Chris Douglas.
Added:
hadoop/core/trunk/src/java/org/apache/hadoop/mapred/SequenceFileAsBinaryInputFormat.java
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestSequenceFileAsBinaryInputFormat.java
Modified:
hadoop/core/trunk/CHANGES.txt
Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=632717&r1=632716&r2=632717&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Sat Mar 1 23:13:21 2008
@@ -191,6 +191,9 @@
HADOOP-2918. Improve error logging so that dfs writes failure with
"No lease on file" can be diagnosed. (dhruba)
+ HADOOP-2923. Add SequenceFileAsBinaryInputFormat, which was
+ missed in the commit for HADOOP-2603. (cdouglas via omalley)
+
Release 0.16.0 - 2008-02-07
INCOMPATIBLE CHANGES
Added: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/SequenceFileAsBinaryInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/SequenceFileAsBinaryInputFormat.java?rev=632717&view=auto
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/SequenceFileAsBinaryInputFormat.java (added)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/SequenceFileAsBinaryInputFormat.java Sat Mar 1 23:13:21 2008
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+
+/**
+ * InputFormat reading keys, values from SequenceFiles in binary (raw)
+ * format.
+ */
+public class SequenceFileAsBinaryInputFormat
+ extends SequenceFileInputFormat<BytesWritable,BytesWritable> {
+
+ public SequenceFileAsBinaryInputFormat() {
+ super();
+ }
+
+ public RecordReader<BytesWritable,BytesWritable> getRecordReader(
+ InputSplit split, JobConf job, Reporter reporter)
+ throws IOException {
+ return new SequenceFileAsBinaryRecordReader(job, (FileSplit)split);
+ }
+
+ /**
+ * Read records from a SequenceFile as binary (raw) bytes.
+ */
+ public static class SequenceFileAsBinaryRecordReader
+ implements RecordReader<BytesWritable,BytesWritable> {
+ private SequenceFile.Reader in;
+ private long start;
+ private long end;
+ private boolean done = false;
+ private DataOutputBuffer buffer = new DataOutputBuffer();
+ private SequenceFile.ValueBytes vbytes;
+
+ public SequenceFileAsBinaryRecordReader(Configuration conf, FileSplit split)
+ throws IOException {
+ Path path = split.getPath();
+ FileSystem fs = path.getFileSystem(conf);
+ this.in = new SequenceFile.Reader(fs, path, conf);
+ this.end = split.getStart() + split.getLength();
+ vbytes = in.createValueBytes();
+ }
+
+ public BytesWritable createKey() {
+ return new BytesWritable();
+ }
+
+ public BytesWritable createValue() {
+ return new BytesWritable();
+ }
+
+ /**
+ * Retrieve the name of the key class for this SequenceFile.
+ * @see org.apache.hadoop.io.SequenceFile.Reader#getKeyClassName
+ */
+ public String getKeyClassName() {
+ return in.getKeyClassName();
+ }
+
+ /**
+ * Retrieve the name of the value class for this SequenceFile.
+ * @see org.apache.hadoop.io.SequenceFile.Reader#getValueClassName
+ */
+ public String getValueClassName() {
+ return in.getValueClassName();
+ }
+
+ /**
+ * Read raw bytes from a SequenceFile.
+ */
+ public synchronized boolean next(BytesWritable key, BytesWritable val)
+ throws IOException {
+ if (done) return false;
+ long pos = in.getPosition();
+ boolean eof = -1 == in.nextRawKey(buffer);
+ if (!eof) {
+ key.set(buffer.getData(), 0, buffer.getLength());
+ buffer.reset();
+ in.nextRawValue(vbytes);
+ vbytes.writeUncompressedBytes(buffer);
+ val.set(buffer.getData(), 0, buffer.getLength());
+ buffer.reset();
+ }
+ return !(done = (eof || (pos >= end && in.syncSeen())));
+ }
+
+ public long getPos() throws IOException {
+ return in.getPosition();
+ }
+
+ public void close() throws IOException {
+ in.close();
+ }
+
+ /**
+ * Return the progress within the input split
+ * @return 0.0 to 1.0 of the input byte range
+ */
+ public float getProgress() throws IOException {
+ if (end == start) {
+ return 0.0f;
+ } else {
+ return Math.min(1.0f, (float)((in.getPosition() - start) /
+ (double)(end - start)));
+ }
+ }
+ }
+}
Added: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestSequenceFileAsBinaryInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestSequenceFileAsBinaryInputFormat.java?rev=632717&view=auto
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestSequenceFileAsBinaryInputFormat.java (added)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestSequenceFileAsBinaryInputFormat.java Sat Mar 1 23:13:21 2008
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import java.util.Random;
+
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.io.*;
+import org.apache.hadoop.mapred.*;
+
+import junit.framework.TestCase;
+import org.apache.commons.logging.*;
+
+public class TestSequenceFileAsBinaryInputFormat extends TestCase {
+ private static final Log LOG = InputFormatBase.LOG;
+ private static final int RECORDS = 10000;
+
+ public void testBinary() throws IOException {
+ JobConf job = new JobConf();
+ FileSystem fs = FileSystem.getLocal(job);
+ Path dir = new Path(System.getProperty("test.build.data",".") + "/mapred");
+ Path file = new Path(dir, "testbinary.seq");
+ Random r = new Random();
+ long seed = r.nextLong();
+ r.setSeed(seed);
+
+ fs.delete(dir);
+ job.setInputPath(dir);
+
+ Text tkey = new Text();
+ Text tval = new Text();
+
+ SequenceFile.Writer writer =
+ new SequenceFile.Writer(fs, job, file, Text.class, Text.class);
+ try {
+ for (int i = 0; i < RECORDS; ++i) {
+ tkey.set(Integer.toString(r.nextInt(), 36));
+ tval.set(Long.toString(r.nextLong(), 36));
+ writer.append(tkey, tval);
+ }
+ } finally {
+ writer.close();
+ }
+
+ InputFormat<BytesWritable,BytesWritable> bformat =
+ new SequenceFileAsBinaryInputFormat();
+
+ int count = 0;
+ r.setSeed(seed);
+ BytesWritable bkey = new BytesWritable();
+ BytesWritable bval = new BytesWritable();
+ Text cmpkey = new Text();
+ Text cmpval = new Text();
+ DataInputBuffer buf = new DataInputBuffer();
+ RecordReader<BytesWritable,BytesWritable> reader =
+ bformat.getRecordReader(new FileSplit(file, 0,
+ fs.getFileStatus(file).getLen(), job), job, Reporter.NULL);
+ try {
+ while (reader.next(bkey, bval)) {
+ tkey.set(Integer.toString(r.nextInt(), 36));
+ tval.set(Long.toString(r.nextLong(), 36));
+ buf.reset(bkey.get(), bkey.getSize());
+ cmpkey.readFields(buf);
+ buf.reset(bval.get(), bval.getSize());
+ cmpval.readFields(buf);
+ assertTrue(
+ "Keys don't match: " + "*" + cmpkey.toString() + ":" +
+ tkey.toString() + "*",
+ cmpkey.toString().equals(tkey.toString()));
+ assertTrue(
+ "Vals don't match: " + "*" + cmpval.toString() + ":" +
+ tval.toString() + "*",
+ cmpval.toString().equals(tval.toString()));
+ ++count;
+ }
+ } finally {
+ reader.close();
+ }
+ assertEquals("Some records not found", RECORDS, count);
+ }
+
+}