You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2008/06/07 00:18:49 UTC
svn commit: r664159 - in /hadoop/core/trunk: ./
src/java/org/apache/hadoop/io/ src/java/org/apache/hadoop/mapred/
src/java/org/apache/hadoop/mapred/lib/
Author: omalley
Date: Fri Jun 6 15:18:48 2008
New Revision: 664159
URL: http://svn.apache.org/viewvc?rev=664159&view=rev
Log:
HADOOP-3413. Allow SequenceFile.Reader to use serialization framework. Contributed by Tom White.
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/java/org/apache/hadoop/io/SequenceFile.java
hadoop/core/trunk/src/java/org/apache/hadoop/mapred/SequenceFileInputFilter.java
hadoop/core/trunk/src/java/org/apache/hadoop/mapred/SequenceFileInputFormat.java
hadoop/core/trunk/src/java/org/apache/hadoop/mapred/SequenceFileOutputFormat.java
hadoop/core/trunk/src/java/org/apache/hadoop/mapred/SequenceFileRecordReader.java
hadoop/core/trunk/src/java/org/apache/hadoop/mapred/lib/MultipleOutputFormat.java
hadoop/core/trunk/src/java/org/apache/hadoop/mapred/lib/MultipleSequenceFileOutputFormat.java
hadoop/core/trunk/src/java/org/apache/hadoop/mapred/lib/MultipleTextOutputFormat.java
hadoop/core/trunk/src/java/org/apache/hadoop/mapred/lib/MultithreadedMapRunner.java
Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=664159&r1=664158&r2=664159&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Fri Jun 6 15:18:48 2008
@@ -72,11 +72,12 @@
(hairong)
HADOOP-3459. Change in the output format of dfs -ls to more closely match
- /bin/ls. New format is: perm repl owner group size date name (Mukund Madhugiri via omally)
+ /bin/ls. New format is: perm repl owner group size date name
+ (Mukund Madhugiri via omally)
- HADOOP-3113. An fsync invoked on a HDFS file really really persists data! The datanode
- moves blocks in the tmp directory to the real block directory on a datanode-restart.
- (dhruba)
+ HADOOP-3113. An fsync invoked on a HDFS file really really
+ persists data! The datanode moves blocks in the tmp directory to
+ the real block directory on a datanode-restart. (dhruba)
HADOOP-3452. Change fsck to return non-zero status for a corrupt
FileSystem. (lohit vijayarenu via cdouglas)
@@ -157,6 +158,9 @@
HADOOP-3502. Quota API needs documentation in Forrest. (hairong)
+ HADOOP-3413. Allow SequenceFile.Reader to use serialization
+ framework. (tomwhite via omalley)
+
IMPROVEMENTS
HADOOP-2928. Remove deprecated FileSystem.getContentLength().
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/io/SequenceFile.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/io/SequenceFile.java?rev=664159&r1=664158&r2=664159&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/io/SequenceFile.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/io/SequenceFile.java Fri Jun 6 15:18:48 2008
@@ -33,6 +33,7 @@
import org.apache.hadoop.io.compress.DefaultCodec;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.io.compress.zlib.ZlibFactory;
+import org.apache.hadoop.io.serializer.Deserializer;
import org.apache.hadoop.io.serializer.SerializationFactory;
import org.apache.hadoop.io.serializer.Serializer;
import org.apache.hadoop.conf.*;
@@ -1403,6 +1404,9 @@
private CompressionInputStream valInFilter = null;
private DataInputStream valIn = null;
private Decompressor valDecompressor = null;
+
+ private Deserializer keyDeserializer;
+ private Deserializer valDeserializer;
/** Open the named file. */
public Reader(FileSystem fs, Path file, Configuration conf)
@@ -1540,9 +1544,27 @@
valLenDecompressor);
valLenIn = new DataInputStream(valLenInFilter);
}
+
+ SerializationFactory serializationFactory =
+ new SerializationFactory(conf);
+ this.keyDeserializer =
+ getDeserializer(serializationFactory, getKeyClass());
+ if (!blockCompressed) {
+ this.keyDeserializer.open(valBuffer);
+ } else {
+ this.keyDeserializer.open(keyIn);
+ }
+ this.valDeserializer =
+ getDeserializer(serializationFactory, getValueClass());
+ this.valDeserializer.open(valIn);
}
}
+ @SuppressWarnings("unchecked")
+ private Deserializer getDeserializer(SerializationFactory sf, Class c) {
+ return sf.getDeserializer(c);
+ }
+
/** Close the file. */
public synchronized void close() throws IOException {
// Return the decompressors to the pool
@@ -1551,6 +1573,13 @@
CodecPool.returnDecompressor(valLenDecompressor);
CodecPool.returnDecompressor(valDecompressor);
+ if (keyDeserializer != null) {
+ keyDeserializer.close();
+ }
+ if (valDeserializer != null) {
+ valDeserializer.close();
+ }
+
// Close the input-stream
in.close();
}
@@ -1743,6 +1772,51 @@
}
+ /**
+ * Get the 'value' corresponding to the last read 'key'.
+ * @param val : The 'value' to be read.
+ * @throws IOException
+ */
+ public synchronized Object getCurrentValue(Object val)
+ throws IOException {
+ if (val instanceof Configurable) {
+ ((Configurable) val).setConf(this.conf);
+ }
+
+ // Position stream to 'current' value
+ seekToCurrentValue();
+
+ if (!blockCompressed) {
+ val = deserializeValue(val);
+
+ if (valIn.read() > 0) {
+ LOG.info("available bytes: " + valIn.available());
+ throw new IOException(val+" read "+(valBuffer.getPosition()-keyLength)
+ + " bytes, should read " +
+ (valBuffer.getLength()-keyLength));
+ }
+ } else {
+ // Get the value
+ int valLength = WritableUtils.readVInt(valLenIn);
+ val = deserializeValue(val);
+
+ // Read another compressed 'value'
+ --noBufferedValues;
+
+ // Sanity check
+ if (valLength < 0) {
+ LOG.debug(val + " is a zero-length value");
+ }
+ }
+ return val;
+
+ }
+
+ @SuppressWarnings("unchecked")
+ private Object deserializeValue(Object val) throws IOException {
+ return valDeserializer.deserialize(val);
+ }
+
/** Read the next key in the file into <code>key</code>, skipping its
* value. True if another entry exists, and false at end of file. */
public synchronized boolean next(Writable key) throws IOException {
@@ -1974,6 +2048,60 @@
}
+ /** Read the next key in the file, skipping its
+ * value. Return null at end of file. */
+ public synchronized Object next(Object key) throws IOException {
+ if (key != null && key.getClass() != getKeyClass()) {
+ throw new IOException("wrong key class: "+key.getClass().getName()
+ +" is not "+keyClass);
+ }
+
+ if (!blockCompressed) {
+ outBuf.reset();
+
+ keyLength = next(outBuf);
+ if (keyLength < 0)
+ return null;
+
+ valBuffer.reset(outBuf.getData(), outBuf.getLength());
+
+ key = deserializeKey(key);
+ valBuffer.mark(0);
+ if (valBuffer.getPosition() != keyLength)
+ throw new IOException(key + " read " + valBuffer.getPosition()
+ + " bytes, should read " + keyLength);
+ } else {
+ //Reset syncSeen
+ syncSeen = false;
+
+ if (noBufferedKeys == 0) {
+ try {
+ readBlock();
+ } catch (EOFException eof) {
+ return null;
+ }
+ }
+
+ int keyLength = WritableUtils.readVInt(keyLenIn);
+
+ // Sanity check
+ if (keyLength < 0) {
+ return null;
+ }
+
+ //Read another compressed 'key'
+ key = deserializeKey(key);
+ --noBufferedKeys;
+ }
+
+ return key;
+ }
+
+ @SuppressWarnings("unchecked")
+ private Object deserializeKey(Object key) throws IOException {
+ return keyDeserializer.deserialize(key);
+ }
+
/**
* Read 'raw' values.
* @param val - The 'raw' value
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/SequenceFileInputFilter.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/SequenceFileInputFilter.java?rev=664159&r1=664158&r2=664159&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/SequenceFileInputFilter.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/SequenceFileInputFilter.java Fri Jun 6 15:18:48 2008
@@ -20,7 +20,6 @@
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.nio.charset.CharacterCodingException;
import java.security.DigestException;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
@@ -29,11 +28,8 @@
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.util.ReflectionUtils;
/**
@@ -42,8 +38,7 @@
*
*/
-public class SequenceFileInputFilter<K extends WritableComparable,
- V extends Writable>
+public class SequenceFileInputFilter<K, V>
extends SequenceFileInputFormat<K, V> {
final private static String FILTER_CLASS = "sequencefile.filter.class";
@@ -89,11 +84,11 @@
* @param key record key
* @return true if a record is accepted; return false otherwise
*/
- public abstract boolean accept(Writable key);
+ public abstract boolean accept(Object key);
}
/**
- * base calss for Filters
+ * base class for Filters
*/
public static abstract class FilterBase implements Filter {
Configuration conf;
@@ -136,9 +131,9 @@
/** Filtering method
* If key matches the regex, return true; otherwise return false
- * @see org.apache.hadoop.mapred.SequenceFileInputFilter.Filter#accept(org.apache.hadoop.io.Writable)
+ * @see org.apache.hadoop.mapred.SequenceFileInputFilter.Filter#accept(Object)
*/
- public boolean accept(Writable key) {
+ public boolean accept(Object key) {
return p.matcher(key.toString()).matches();
}
}
@@ -180,9 +175,9 @@
/** Filtering method
* If record# % frequency==0, return true; otherwise return false
- * @see org.apache.hadoop.mapred.SequenceFileInputFilter.Filter#accept(org.apache.hadoop.io.Writable)
+ * @see org.apache.hadoop.mapred.SequenceFileInputFilter.Filter#accept(Object)
*/
- public boolean accept(Writable key) {
+ public boolean accept(Object key) {
boolean accepted = false;
if (count == 0)
accepted = true;
@@ -241,9 +236,9 @@
/** Filtering method
* If MD5(key) % frequency==0, return true; otherwise return false
- * @see org.apache.hadoop.mapred.SequenceFileInputFilter.Filter#accept(org.apache.hadoop.io.Writable)
+ * @see org.apache.hadoop.mapred.SequenceFileInputFilter.Filter#accept(Object)
*/
- public boolean accept(Writable key) {
+ public boolean accept(Object key) {
try {
long hashcode;
if (key instanceof Text) {
@@ -282,8 +277,7 @@
}
}
- private static class FilterRecordReader<K extends WritableComparable,
- V extends Writable>
+ private static class FilterRecordReader<K, V>
extends SequenceFileRecordReader<K, V> {
private Filter filter;
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/SequenceFileInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/SequenceFileInputFormat.java?rev=664159&r1=664158&r2=664159&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/SequenceFileInputFormat.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/SequenceFileInputFormat.java Fri Jun 6 15:18:48 2008
@@ -24,13 +24,9 @@
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.MapFile;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
/** An {@link InputFormat} for {@link SequenceFile}s. */
-public class SequenceFileInputFormat<K extends WritableComparable,
- V extends Writable>
- extends FileInputFormat<K, V> {
+public class SequenceFileInputFormat<K, V> extends FileInputFormat<K, V> {
public SequenceFileInputFormat() {
setMinSplitSize(SequenceFile.SYNC_INTERVAL);
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/SequenceFileOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/SequenceFileOutputFormat.java?rev=664159&r1=664158&r2=664159&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/SequenceFileOutputFormat.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/SequenceFileOutputFormat.java Fri Jun 6 15:18:48 2008
@@ -26,8 +26,6 @@
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.DefaultCodec;
@@ -35,9 +33,7 @@
import org.apache.hadoop.util.*;
/** An {@link OutputFormat} that writes {@link SequenceFile}s. */
-public class SequenceFileOutputFormat <K extends WritableComparable,
- V extends Writable>
-extends FileOutputFormat<K, V> {
+public class SequenceFileOutputFormat <K,V> extends FileOutputFormat<K, V> {
public RecordWriter<K, V> getRecordWriter(
FileSystem ignored, JobConf job,
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/SequenceFileRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/SequenceFileRecordReader.java?rev=664159&r1=664158&r2=664159&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/SequenceFileRecordReader.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/SequenceFileRecordReader.java Fri Jun 6 15:18:48 2008
@@ -28,9 +28,7 @@
import org.apache.hadoop.util.ReflectionUtils;
/** An {@link RecordReader} for {@link SequenceFile}s. */
-public class SequenceFileRecordReader<K extends WritableComparable,
- V extends Writable>
- implements RecordReader<K, V> {
+public class SequenceFileRecordReader<K, V> implements RecordReader<K, V> {
private SequenceFile.Reader in;
private long start;
@@ -55,11 +53,11 @@
/** The class of key that must be passed to {@link
- * #next(WritableComparable,Writable)}.. */
+ * #next(Object, Object)}.. */
public Class getKeyClass() { return in.getKeyClass(); }
/** The class of value that must be passed to {@link
- * #next(WritableComparable,Writable)}.. */
+ * #next(Object, Object)}.. */
public Class getValueClass() { return in.getValueClass(); }
@SuppressWarnings("unchecked")
@@ -76,11 +74,14 @@
public synchronized boolean next(K key, V value) throws IOException {
if (!more) return false;
long pos = in.getPosition();
- boolean eof = in.next(key, value);
+ boolean remaining = (in.next(key) != null);
+ if (remaining) {
+ getCurrentValue(value);
+ }
if (pos >= end && in.syncSeen()) {
more = false;
} else {
- more = eof;
+ more = remaining;
}
return more;
}
@@ -89,11 +90,11 @@
throws IOException {
if (!more) return false;
long pos = in.getPosition();
- boolean eof = in.next(key);
+ boolean remaining = (in.next(key) != null);
if (pos >= end && in.syncSeen()) {
more = false;
} else {
- more = eof;
+ more = remaining;
}
return more;
}
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/lib/MultipleOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/lib/MultipleOutputFormat.java?rev=664159&r1=664158&r2=664159&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/lib/MultipleOutputFormat.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/lib/MultipleOutputFormat.java Fri Jun 6 15:18:48 2008
@@ -24,8 +24,6 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.RecordWriter;
@@ -39,10 +37,10 @@
*
* Case one: This class is used for a map reduce job with at least one reducer.
* The reducer wants to write data to different files depending on the actual
- * keys. It is assumed that a key (or value) enocodes the actual key (value)
+ * keys. It is assumed that a key (or value) encodes the actual key (value)
* and the desired location for the actual key (value).
*
- * Case two: Tis class is used for a map only job. The job wants to use an
+ * Case two: This class is used for a map only job. The job wants to use an
* output file name that is either a part of the input file name of the input
* data, or some derivation of it.
*
@@ -50,8 +48,7 @@
* output file name that depends on both the keys and the input file name,
*
*/
-public abstract class MultipleOutputFormat<K extends WritableComparable,
- V extends Writable>
+public abstract class MultipleOutputFormat<K, V>
extends FileOutputFormat<K, V> {
/**
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/lib/MultipleSequenceFileOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/lib/MultipleSequenceFileOutputFormat.java?rev=664159&r1=664158&r2=664159&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/lib/MultipleSequenceFileOutputFormat.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/lib/MultipleSequenceFileOutputFormat.java Fri Jun 6 15:18:48 2008
@@ -32,21 +32,20 @@
* This class extends the MultipleOutputFormat, allowing to write the output data
* to different output files in sequence file output format.
*/
-public class MultipleSequenceFileOutputFormat
-extends MultipleOutputFormat<WritableComparable, Writable> {
+public class MultipleSequenceFileOutputFormat <K,V>
+extends MultipleOutputFormat<K, V> {
- private SequenceFileOutputFormat theSequenceFileOutputFormat = null;
+ private SequenceFileOutputFormat<K,V> theSequenceFileOutputFormat = null;
@Override
@SuppressWarnings("unchecked")
- protected RecordWriter<WritableComparable, Writable> getBaseRecordWriter(
- FileSystem fs,
- JobConf job,
- String name,
- Progressable arg3)
+ protected RecordWriter<K, V> getBaseRecordWriter(FileSystem fs,
+ JobConf job,
+ String name,
+ Progressable arg3)
throws IOException {
if (theSequenceFileOutputFormat == null) {
- theSequenceFileOutputFormat = new SequenceFileOutputFormat();
+ theSequenceFileOutputFormat = new SequenceFileOutputFormat<K,V>();
}
return theSequenceFileOutputFormat.getRecordWriter(fs, job, name, arg3);
}
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/lib/MultipleTextOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/lib/MultipleTextOutputFormat.java?rev=664159&r1=664158&r2=664159&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/lib/MultipleTextOutputFormat.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/lib/MultipleTextOutputFormat.java Fri Jun 6 15:18:48 2008
@@ -21,8 +21,6 @@
import java.io.IOException;
import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordWriter;
import org.apache.hadoop.mapred.TextOutputFormat;
@@ -32,7 +30,7 @@
* This class extends the MultipleOutputFormat, allowing to write the output
* data to different output files in Text output format.
*/
-public class MultipleTextOutputFormat<K extends WritableComparable, V extends Writable>
+public class MultipleTextOutputFormat<K, V>
extends MultipleOutputFormat<K, V> {
private TextOutputFormat<K, V> theTextOutputFormat = null;
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/lib/MultithreadedMapRunner.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/lib/MultithreadedMapRunner.java?rev=664159&r1=664158&r2=664159&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/lib/MultithreadedMapRunner.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/lib/MultithreadedMapRunner.java Fri Jun 6 15:18:48 2008
@@ -19,8 +19,6 @@
package org.apache.hadoop.mapred.lib;
import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.MapRunnable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Mapper;
@@ -49,10 +47,7 @@
* value is 10 threads.
* <p>
*/
-public class MultithreadedMapRunner<K1 extends WritableComparable,
- V1 extends Writable,
- K2 extends WritableComparable,
- V2 extends Writable>
+public class MultithreadedMapRunner<K1, V1, K2, V2>
implements MapRunnable<K1, V1, K2, V2> {
private static final Log LOG =