You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2010/12/04 08:13:12 UTC
svn commit: r1042107 [3/6] - in /hadoop/common/branches/HADOOP-6685: ./ ivy/
src/java/ src/java/org/apache/hadoop/fs/ src/java/org/apache/hadoop/io/
src/java/org/apache/hadoop/io/file/tfile/
src/java/org/apache/hadoop/io/serial/ src/java/org/apache/had...
Modified: hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/SetFile.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/SetFile.java?rev=1042107&r1=1042106&r2=1042107&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/SetFile.java (original)
+++ hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/SetFile.java Sat Dec 4 07:13:10 2010
@@ -31,6 +31,7 @@ import org.apache.hadoop.conf.*;
public class SetFile extends MapFile {
protected SetFile() {} // no public ctor
+ private static final NullWritable NULL_WRITABLE = NullWritable.get();
/**
* Write a new set file.
@@ -41,8 +42,10 @@ public class SetFile extends MapFile {
* @deprecated pass a Configuration too
*/
public Writer(FileSystem fs, String dirName,
- Class<? extends WritableComparable> keyClass) throws IOException {
- super(new Configuration(), fs, dirName, keyClass, NullWritable.class);
+ Class<? extends WritableComparable> keyClass
+ ) throws IOException {
+ super(new Configuration(), new Path(dirName),
+ keyClass(keyClass), valueClass(NullWritable.class));
}
/** Create a set naming the element class and compression type. */
@@ -59,6 +62,7 @@ public class SetFile extends MapFile {
SequenceFile.CompressionType compress) throws IOException {
super(conf, new Path(dirName),
comparator(comparator),
+ keyClass(comparator.getKeyClass()),
valueClass(NullWritable.class),
compression(compress));
}
@@ -66,7 +70,7 @@ public class SetFile extends MapFile {
/** Append a key to a set. The key must be strictly greater than the
* previous key added to the set. */
public void append(WritableComparable key) throws IOException{
- append(key, NullWritable.get());
+ append(key, NULL_WRITABLE);
}
}
@@ -94,7 +98,7 @@ public class SetFile extends MapFile {
* true if such a key exists and false when at the end of the set. */
public boolean next(WritableComparable key)
throws IOException {
- return next(key, NullWritable.get());
+ return next(key, NULL_WRITABLE);
}
/** Read the matching key from a set into <code>key</code>.
Modified: hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/file/tfile/BCFile.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/file/tfile/BCFile.java?rev=1042107&r1=1042106&r2=1042107&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/file/tfile/BCFile.java (original)
+++ hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/file/tfile/BCFile.java Sat Dec 4 07:13:10 2010
@@ -198,7 +198,6 @@ final class BCFile {
public class BlockAppender extends DataOutputStream {
private final BlockRegister blockRegister;
private final WBlockState wBlkState;
- @SuppressWarnings("hiding")
private boolean closed = false;
/**
@@ -282,15 +281,32 @@ final class BCFile {
* @throws IOException
* @see Compression#getSupportedAlgorithms
*/
+ @Deprecated
public Writer(FSDataOutputStream fout, String compressionName,
Configuration conf) throws IOException {
+ this(fout, Compression.getCompressionAlgorithmByName(compressionName),
+ conf);
+ }
+
+ /**
+ * Constructor
+ *
+ * @param fout
+ * FS output stream.
+ * @param compression
+ * The compression algorithm, which will be used for all
+ * data blocks.
+ * @throws IOException
+ */
+ public Writer(FSDataOutputStream fout, Algorithm compression,
+ Configuration conf) throws IOException {
if (fout.getPos() != 0) {
throw new IOException("Output file not at zero offset.");
}
this.out = fout;
this.conf = conf;
- dataIndex = new DataIndex(compressionName);
+ dataIndex = new DataIndex(compression);
metaIndex = new MetaIndex();
fsOutputBuffer = new BytesWritable();
Magic.write(fout);
@@ -651,6 +667,14 @@ final class BCFile {
}
/**
+ * Get the default compression algorithm.
+ * @return the default compression algorithm
+ */
+ public Algorithm getDefaultCompression() {
+ return dataIndex.getDefaultCompressionAlgorithm();
+ }
+
+ /**
* Get version of BCFile file being read.
*
* @return version of BCFile file being read.
@@ -870,12 +894,16 @@ final class BCFile {
}
}
+ public DataIndex(Algorithm defaultCompression) {
+ this.defaultCompressionAlgorithm = defaultCompression;
+ listRegions = new ArrayList<BlockRegion>();
+ }
+
// for write
+ @Deprecated
public DataIndex(String defaultCompressionAlgorithmName) {
- this.defaultCompressionAlgorithm =
- Compression
- .getCompressionAlgorithmByName(defaultCompressionAlgorithmName);
- listRegions = new ArrayList<BlockRegion>();
+ this(Compression
+ .getCompressionAlgorithmByName(defaultCompressionAlgorithmName));
}
public Algorithm getDefaultCompressionAlgorithm() {
Modified: hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/file/tfile/CompareUtils.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/file/tfile/CompareUtils.java?rev=1042107&r1=1042106&r2=1042107&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/file/tfile/CompareUtils.java (original)
+++ hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/file/tfile/CompareUtils.java Sat Dec 4 07:13:10 2010
@@ -19,8 +19,7 @@ package org.apache.hadoop.io.file.tfile;
import java.io.Serializable;
import java.util.Comparator;
-import org.apache.hadoop.io.RawComparator;
-import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.io.serial.RawComparator;
class CompareUtils {
/**
@@ -36,9 +35,9 @@ class CompareUtils {
*/
public static final class BytesComparator implements
Comparator<RawComparable> {
- private RawComparator<Object> cmp;
+ private RawComparator cmp;
- public BytesComparator(RawComparator<Object> cmp) {
+ public BytesComparator(RawComparator cmp) {
this.cmp = cmp;
}
@@ -73,7 +72,9 @@ class CompareUtils {
}
}
- public static final class ScalarComparator implements Comparator<Scalar>, Serializable {
+ @SuppressWarnings("serial")
+ public static final class ScalarComparator
+ implements Comparator<Scalar>, Serializable {
@Override
public int compare(Scalar o1, Scalar o2) {
long diff = o1.magnitude() - o2.magnitude();
@@ -83,16 +84,4 @@ class CompareUtils {
}
}
- public static final class MemcmpRawComparator implements
- RawComparator<Object>, Serializable {
- @Override
- public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
- return WritableComparator.compareBytes(b1, s1, l1, b2, s2, l2);
- }
-
- @Override
- public int compare(Object o1, Object o2) {
- throw new RuntimeException("Object comparison not supported");
- }
- }
}
Modified: hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/file/tfile/Compression.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/file/tfile/Compression.java?rev=1042107&r1=1042106&r2=1042107&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/file/tfile/Compression.java (original)
+++ hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/file/tfile/Compression.java Sat Dec 4 07:13:10 2010
@@ -39,7 +39,7 @@ import org.apache.hadoop.util.Reflection
/**
* Compression related stuff.
*/
-final class Compression {
+final public class Compression {
static final Log LOG = LogFactory.getLog(Compression.class);
/**
@@ -71,7 +71,7 @@ final class Compression {
/**
* Compression algorithms.
*/
- static enum Algorithm {
+ public static enum Algorithm {
LZO(TFile.COMPRESSION_LZO) {
private transient boolean checked = false;
private static final String defaultClazz =
@@ -99,7 +99,7 @@ final class Compression {
}
@Override
- CompressionCodec getCodec() throws IOException {
+ synchronized CompressionCodec getCodec() throws IOException {
if (!isSupported()) {
throw new IOException(
"LZO codec class not specified. Did you forget to set property "
@@ -160,7 +160,7 @@ final class Compression {
private transient DefaultCodec codec;
@Override
- CompressionCodec getCodec() {
+ synchronized CompressionCodec getCodec() {
if (codec == null) {
codec = new DefaultCodec();
codec.setConf(conf);
Modified: hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/file/tfile/RawComparable.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/file/tfile/RawComparable.java?rev=1042107&r1=1042106&r2=1042107&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/file/tfile/RawComparable.java (original)
+++ hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/file/tfile/RawComparable.java Sat Dec 4 07:13:10 2010
@@ -22,7 +22,7 @@ import java.util.Comparator;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.serial.RawComparator;
/**
* Interface for objects that can be compared through {@link RawComparator}.
Modified: hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/file/tfile/TFile.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/file/tfile/TFile.java?rev=1042107&r1=1042106&r2=1042107&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/file/tfile/TFile.java (original)
+++ hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/file/tfile/TFile.java Sat Dec 4 07:13:10 2010
@@ -41,16 +41,18 @@ import org.apache.hadoop.io.BytesWritabl
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.io.file.tfile.BCFile.Reader.BlockReader;
import org.apache.hadoop.io.file.tfile.BCFile.Writer.BlockAppender;
import org.apache.hadoop.io.file.tfile.Chunk.ChunkDecoder;
import org.apache.hadoop.io.file.tfile.Chunk.ChunkEncoder;
-import org.apache.hadoop.io.file.tfile.CompareUtils.BytesComparator;
-import org.apache.hadoop.io.file.tfile.CompareUtils.MemcmpRawComparator;
+import org.apache.hadoop.io.file.tfile.Compression.Algorithm;
+import org.apache.hadoop.io.serial.lib.MemcmpRawComparator;
import org.apache.hadoop.io.file.tfile.Utils.Version;
-import org.apache.hadoop.io.serializer.JavaSerializationComparator;
+import org.apache.hadoop.io.serial.RawComparator;
+import org.apache.hadoop.io.serial.lib.DeserializationRawComparator;
+import org.apache.hadoop.util.Options;
+import org.apache.hadoop.util.ReflectionUtils;
/**
* A TFile is a container of key-value pairs. Both keys and values are type-less
@@ -165,16 +167,56 @@ public class TFile {
public static final String COMPARATOR_MEMCMP = "memcmp";
/** comparator prefix: java class */
public static final String COMPARATOR_JCLASS = "jclass:";
+ /** user-managed comparator */
+ public static final String COMPARATOR_USER_MANAGED = "user";
/**
- * Make a raw comparator from a string name.
- *
- * @param name
- * Comparator name
- * @return A RawComparable comparator.
+ * A constant that is used to represent memcmp sort order in the tfile.
*/
- static public Comparator<RawComparable> makeComparator(String name) {
- return TFileMeta.makeComparator(name);
+ public static final RawComparator MEMCMP = new MemcmpRawComparator();
+
+ /**
+ * The kinds of comparators that tfile supports.
+ */
+ public static enum ComparatorKind {
+ NONE(""), MEMCMP(COMPARATOR_MEMCMP), USER_MANAGED(COMPARATOR_USER_MANAGED);
+
+ private String name;
+
+ ComparatorKind(String name) {
+ this.name = name;
+ }
+
+ @Override
+ public String toString() {
+ return name;
+ }
+
+ public static ComparatorKind fromString(String val) {
+ if (val == null || val.length() == 0) {
+ return NONE;
+ }
+ for (ComparatorKind kind: values()) {
+ if (kind.name.equals(val)) {
+ return kind;
+ }
+ }
+ if (val.startsWith(COMPARATOR_JCLASS)) {
+ return USER_MANAGED;
+ }
+ throw new IllegalArgumentException("Comparator kind " + val +
+ " unknown.");
+ }
+
+ static ComparatorKind fromComparator(RawComparator comparator) {
+ if (comparator == null) {
+ return NONE;
+ } else if (comparator.getClass() == MemcmpRawComparator.class){
+ return MEMCMP;
+ } else {
+ return USER_MANAGED;
+ }
+ }
}
// Prevent the instantiation of TFiles
@@ -242,9 +284,10 @@ public class TFile {
State state = State.READY;
Configuration conf;
long errorCount = 0;
+ private final RawComparator comparator;
/**
- * Constructor
+ * Constructor for a TFile Writer.
*
* @param fsdos
* output stream for writing. Must be at position 0.
@@ -255,7 +298,7 @@ public class TFile {
* @param compressName
* Name of the compression algorithm. Must be one of the strings
* returned by {@link TFile#getSupportedCompressionAlgorithms()}.
- * @param comparator
+ * @param comparatorName
* Leave comparator as null or empty string if TFile is not sorted.
* Otherwise, provide the string name for the comparison algorithm
* for keys. Two kinds of comparators are supported.
@@ -269,7 +312,7 @@ public class TFile {
* constructed through the default constructor (with no
* parameters). Parameterized RawComparators such as
* {@link WritableComparator} or
- * {@link JavaSerializationComparator} may not be directly used.
+ * {@link DeserializationRawComparator} may not be directly used.
* One should write a wrapper class that inherits from such classes
* and use its default constructor to perform proper
* initialization.
@@ -277,15 +320,156 @@ public class TFile {
* @param conf
* The configuration object.
* @throws IOException
+ * @deprecated Use Writer(Configuration,Option...) instead.
*/
public Writer(FSDataOutputStream fsdos, int minBlockSize,
- String compressName, String comparator, Configuration conf)
- throws IOException {
- sizeMinBlock = minBlockSize;
- tfileMeta = new TFileMeta(comparator);
- tfileIndex = new TFileIndex(tfileMeta.getComparator());
+ String compressName, String comparatorName,
+ Configuration conf) throws IOException {
+ this(conf, stream(fsdos), blockSize(minBlockSize),
+ compress(Compression.getCompressionAlgorithmByName(compressName)),
+ comparatorName(comparatorName));
+ }
+
+ /**
+ * Marker class for all of the Writer options.
+ */
+ public static interface Option {}
+
+ /**
+ * Create an option with a output stream.
+ * @param value output stream for writing. Must be at position 0.
+ * @return the new option
+ */
+ public static Option stream(FSDataOutputStream value) {
+ return new StreamOption(value);
+ }
+
+ /**
+ * Create an option for the compression algorithm.
+ * @param value the compression algorithm to use.
+ * @return the new option
+ */
+ public static Option compress(Algorithm value) {
+ return new CompressOption(value);
+ }
+
+ /**
+ * Create an option for the minimum block size.
+ * @param value the minimum number of bytes that a compression block will
+ * contain.
+ * @return the new option
+ */
+ public static Option blockSize(int value) {
+ return new BlockSizeOption(value);
+ }
+
+ /**
+ * Create an option for specifying the comparator.
+ * @param value the comparator for indexing and searching the file
+ * @return the new option
+ */
+ public static Option comparator(RawComparator value) {
+ return new ComparatorOption(value);
+ }
+
+ /**
+ * Create an option for the comparator from a string. This is intended to
+ * support old clients that specified the comparator name and expected
+ * the reader to be able to read it.
+ * @param value
+ * @return the new option
+ */
+ public static Option comparatorName(String value) {
+ return new ComparatorNameOption(value);
+ }
+
+ private static class StreamOption extends Options.FSDataOutputStreamOption
+ implements Option {
+ StreamOption(FSDataOutputStream value) {
+ super(value);
+ }
+ }
+
+ private static class CompressOption implements Option {
+ private Algorithm value;
+ CompressOption(Algorithm value) {
+ this.value = value;
+ }
+ Algorithm getValue() {
+ return value;
+ }
+ }
+
+ private static class BlockSizeOption extends Options.IntegerOption
+ implements Option {
+ BlockSizeOption(int value) {
+ super(value);
+ }
+ }
+
+ private static class ComparatorOption implements Option {
+ private RawComparator value;
+ ComparatorOption(RawComparator value) {
+ this.value = value;
+ }
+ RawComparator getValue() {
+ return value;
+ }
+ }
+
+ private static class ComparatorNameOption extends Options.StringOption
+ implements Option {
+ ComparatorNameOption(String value) {
+ super(value);
+ }
+ }
- writerBCF = new BCFile.Writer(fsdos, compressName, conf);
+ /**
+ * Constructor
+ *
+ * @param conf
+ * The configuration object.
+ * @param options
+ * the options for controlling the file.
+ * @throws IOException
+ */
+ public Writer(Configuration conf, Option... options) throws IOException {
+ BlockSizeOption blockSize = Options.getOption(BlockSizeOption.class,
+ options);
+ ComparatorOption comparatorOpt = Options.getOption(ComparatorOption.class,
+ options);
+ ComparatorNameOption comparatorNameOpt =
+ Options.getOption(ComparatorNameOption.class, options);
+ CompressOption compressOpt = Options.getOption(CompressOption.class,
+ options);
+ StreamOption stream = Options.getOption(StreamOption.class, options);
+
+ if (stream == null) {
+ throw new IllegalArgumentException("Must provide a stream");
+ }
+ if (comparatorOpt != null && comparatorNameOpt != null) {
+ throw new IllegalArgumentException("Can only provide one comparator" +
+ " option");
+ }
+
+ sizeMinBlock = blockSize == null ? 1048576 : blockSize.getValue();
+ String comparatorName;
+ if (comparatorOpt != null) {
+ comparator = comparatorOpt.getValue();
+ comparatorName = ComparatorKind.fromComparator(comparator).toString();
+ } else if (comparatorNameOpt != null) {
+ comparatorName = comparatorNameOpt.getValue();
+ comparator = makeComparator(comparatorName);
+ } else {
+ comparator = null;
+ comparatorName = null;
+ }
+ tfileMeta = new TFileMeta(comparatorName);
+ tfileIndex = new TFileIndex(comparator);
+ Algorithm compress =
+ compressOpt == null ? Algorithm.NONE : compressOpt.getValue();
+
+ writerBCF = new BCFile.Writer(stream.getValue(), compress, conf);
currentKeyBufferOS = new BoundedByteArrayOutputStream(MAX_KEY_SIZE);
lastKeyBufferOS = new BoundedByteArrayOutputStream(MAX_KEY_SIZE);
this.conf = conf;
@@ -455,8 +639,8 @@ public class TFile {
if (tfileMeta.isSorted() && tfileMeta.getRecordCount()>0) {
byte[] lastKey = lastKeyBufferOS.getBuffer();
int lastLen = lastKeyBufferOS.size();
- if (tfileMeta.getComparator().compare(key, 0, len, lastKey, 0,
- lastLen) < 0) {
+ // check sort order unless this is the first key
+ if (comparator.compare(key, 0, len, lastKey, 0, lastLen) < 0) {
throw new IOException("Keys are not added in sorted order");
}
}
@@ -687,7 +871,7 @@ public class TFile {
// TFile index, it is loaded lazily.
TFileIndex tfileIndex = null;
final TFileMeta tfileMeta;
- final BytesComparator comparator;
+ private RawComparator comparator = null;
// global begin and end locations.
private final Location begin;
@@ -784,6 +968,17 @@ public class TFile {
if (recordIndex != other.recordIndex) return false;
return true;
}
+
+ @Override
+ public String toString() {
+ StringBuilder builder = new StringBuilder();
+ builder.append("Location(");
+ builder.append(blockIndex);
+ builder.append(", ");
+ builder.append(recordIndex);
+ builder.append(")");
+ return builder.toString();
+ }
}
/**
@@ -798,8 +993,8 @@ public class TFile {
* @param conf
* @throws IOException
*/
- public Reader(FSDataInputStream fsdis, long fileLength, Configuration conf)
- throws IOException {
+ public Reader(FSDataInputStream fsdis, long fileLength, Configuration conf
+ ) throws IOException {
readerBCF = new BCFile.Reader(fsdis, fileLength, conf);
// first, read TFile meta
@@ -809,14 +1004,30 @@ public class TFile {
} finally {
brMeta.close();
}
+ comparator = makeComparator(tfileMeta.getComparatorName());
- comparator = tfileMeta.getComparator();
// Set begin and end locations.
begin = new Location(0, 0);
end = new Location(readerBCF.getBlockCount(), 0);
}
/**
+ * Set the comparator for reading this file. May only be called once for
+ * each Reader.
+ * @param comparator a comparator for this file.
+ */
+ public void setComparator(RawComparator comparator) {
+ ComparatorKind kind = ComparatorKind.fromComparator(comparator);
+ if (kind != tfileMeta.getComparatorKind()) {
+ throw new IllegalArgumentException("Illegal comparator for this tfile: "
+ + kind +
+ " instead of " +
+ tfileMeta.getComparatorKind());
+ }
+ this.comparator = comparator;
+ }
+
+ /**
* Close the reader. The state of the Reader object is undefined after
* close. Calling close() for multiple times has no effect.
*/
@@ -844,6 +1055,14 @@ public class TFile {
}
/**
+ * Get the version of the tfile format.
+ * @return the version of the file
+ */
+ public Version getFileVersion() {
+ return tfileMeta.getVersion();
+ }
+
+ /**
* Get the string representation of the comparator.
*
* @return If the TFile is not sorted by keys, an empty string will be
@@ -851,7 +1070,15 @@ public class TFile {
* provided during the TFile creation time will be returned.
*/
public String getComparatorName() {
- return tfileMeta.getComparatorString();
+ return tfileMeta.getComparatorKind().toString();
+ }
+
+ /**
+ * Get the compression algorithm.
+ * @return the compression algorithm used
+ */
+ public Algorithm getCompression() {
+ return readerBCF.getDefaultCompression();
}
/**
@@ -882,8 +1109,7 @@ public class TFile {
BlockReader brIndex = readerBCF.getMetaBlock(TFileIndex.BLOCK_NAME);
try {
tfileIndex =
- new TFileIndex(readerBCF.getBlockCount(), brIndex, tfileMeta
- .getComparator());
+ new TFileIndex(readerBCF.getBlockCount(), brIndex, comparator);
} finally {
brIndex.close();
}
@@ -947,7 +1173,7 @@ public class TFile {
*
* @return a Comparator that can compare RawComparable's.
*/
- public Comparator<RawComparable> getComparator() {
+ public RawComparator getComparator() {
return comparator;
}
@@ -1006,6 +1232,10 @@ public class TFile {
if (!isSorted()) {
throw new RuntimeException("Cannot compare keys for unsorted TFiles.");
}
+ if (comparator == null) {
+ throw new
+ RuntimeException("Cannot compare keys until comparator is set");
+ }
return comparator.compare(a, o1, l1, b, o2, l2);
}
@@ -1013,7 +1243,12 @@ public class TFile {
if (!isSorted()) {
throw new RuntimeException("Cannot compare keys for unsorted TFiles.");
}
- return comparator.compare(a, b);
+ if (comparator == null) {
+ throw new
+ RuntimeException("Cannot compare keys until comparator is set");
+ }
+ return comparator.compare(a.buffer(), a.offset(), a.size(),
+ b.buffer(), b.offset(), b.size());
}
/**
@@ -1028,7 +1263,9 @@ public class TFile {
*/
Location getLocationNear(long offset) {
int blockIndex = readerBCF.getBlockIndexNear(offset);
- if (blockIndex == -1) return end;
+ if (blockIndex == -1) {
+ return end;
+ }
return new Location(blockIndex, 0);
}
@@ -1089,7 +1326,8 @@ public class TFile {
* contains zero key-value pairs even if length is positive.
* @throws IOException
*/
- public Scanner createScannerByByteRange(long offset, long length) throws IOException {
+ public Scanner createScannerByByteRange(long offset,
+ long length) throws IOException {
return new Scanner(this, offset, offset + length);
}
@@ -2032,20 +2270,20 @@ public class TFile {
/**
* Data structure representing "TFile.meta" meta block.
*/
- static final class TFileMeta {
+ private static final class TFileMeta {
final static String BLOCK_NAME = "TFile.meta";
- final Version version;
+ private final Version version;
private long recordCount;
- private final String strComparator;
- private final BytesComparator comparator;
+ private final ComparatorKind comparatorKind;
+ private final String comparatorName;
// ctor for writes
- public TFileMeta(String comparator) {
+ public TFileMeta(String comparatorName) {
// set fileVersion to API version when we create it.
version = TFile.API_VERSION;
recordCount = 0;
- strComparator = (comparator == null) ? "" : comparator;
- this.comparator = makeComparator(strComparator);
+ this.comparatorKind = ComparatorKind.fromString(comparatorName);
+ this.comparatorName = comparatorName;
}
// ctor for reads
@@ -2055,42 +2293,14 @@ public class TFile {
throw new RuntimeException("Incompatible TFile fileVersion.");
}
recordCount = Utils.readVLong(in);
- strComparator = Utils.readString(in);
- comparator = makeComparator(strComparator);
- }
-
- @SuppressWarnings("unchecked")
- static BytesComparator makeComparator(String comparator) {
- if (comparator.length() == 0) {
- // unsorted keys
- return null;
- }
- if (comparator.equals(COMPARATOR_MEMCMP)) {
- // default comparator
- return new BytesComparator(new MemcmpRawComparator());
- } else if (comparator.startsWith(COMPARATOR_JCLASS)) {
- String compClassName =
- comparator.substring(COMPARATOR_JCLASS.length()).trim();
- try {
- Class compClass = Class.forName(compClassName);
- // use its default ctor to create an instance
- return new BytesComparator((RawComparator<Object>) compClass
- .newInstance());
- } catch (Exception e) {
- throw new IllegalArgumentException(
- "Failed to instantiate comparator: " + comparator + "("
- + e.toString() + ")");
- }
- } else {
- throw new IllegalArgumentException("Unsupported comparator: "
- + comparator);
- }
+ comparatorName = Utils.readString(in);
+ comparatorKind = ComparatorKind.fromString(comparatorName);
}
public void write(DataOutput out) throws IOException {
TFile.API_VERSION.write(out);
Utils.writeVLong(out, recordCount);
- Utils.writeString(out, strComparator);
+ Utils.writeString(out, comparatorName);
}
public long getRecordCount() {
@@ -2102,20 +2312,20 @@ public class TFile {
}
public boolean isSorted() {
- return !strComparator.equals("");
- }
-
- public String getComparatorString() {
- return strComparator;
+ return comparatorKind != ComparatorKind.NONE;
}
- public BytesComparator getComparator() {
- return comparator;
+ public ComparatorKind getComparatorKind() {
+ return comparatorKind;
}
public Version getVersion() {
return version;
}
+
+ public String getComparatorName() {
+ return comparatorName;
+ }
} // END: class MetaTFileMeta
/**
@@ -2126,7 +2336,7 @@ public class TFile {
private ByteArray firstKey;
private final ArrayList<TFileIndexEntry> index;
private final ArrayList<Long> recordNumIndex;
- private final BytesComparator comparator;
+ private final RawComparator comparator;
private long sum = 0;
/**
@@ -2134,7 +2344,7 @@ public class TFile {
*
* @throws IOException
*/
- public TFileIndex(int entryCount, DataInput in, BytesComparator comparator)
+ public TFileIndex(int entryCount, DataInput in, RawComparator comparator)
throws IOException {
index = new ArrayList<TFileIndexEntry>(entryCount);
recordNumIndex = new ArrayList<Long>(entryCount);
@@ -2217,7 +2427,7 @@ public class TFile {
/**
* For writing to file.
*/
- public TFileIndex(BytesComparator comparator) {
+ public TFileIndex(RawComparator comparator) {
index = new ArrayList<TFileIndexEntry>();
recordNumIndex = new ArrayList<Long>();
this.comparator = comparator;
@@ -2332,6 +2542,58 @@ public class TFile {
}
/**
+ * Make a raw comparator from a string name.
+ *
+ * @param name
+ * Comparator name
+ * @return A RawComparable comparator.
+ */
+ static RawComparator makeComparator(String comparator) {
+ if (comparator == null || comparator.length() == 0) {
+ // unsorted keys
+ return null;
+ }
+ if (comparator.equals(COMPARATOR_MEMCMP)) {
+ // default comparator
+ return MEMCMP;
+ } else if (comparator.equals(COMPARATOR_USER_MANAGED)) {
+ // the user needs to set it explicitly
+ return null;
+ } else if (comparator.startsWith(COMPARATOR_JCLASS)) {
+ // if it is a jclass string, we try to create it for them
+ // this only happens in old tfiles
+ String compClassName =
+ comparator.substring(COMPARATOR_JCLASS.length()).trim();
+ try {
+ Class<?> compClass = Class.forName(compClassName);
+ // use its default ctor to create an instance
+ return (RawComparator) ReflectionUtils.newInstance(compClass, null);
+ } catch (ClassNotFoundException cnfe) {
+ throw new IllegalArgumentException("Comparator class " + compClassName
+ + " not found.");
+ }
+ } else {
+ throw new IllegalArgumentException("Unsupported comparator: "
+ + comparator);
+ }
+ }
+
+ /**
+ * Create a stringification of a given comparator
+ * @param comparator the comparator to stringify, may be null
+ * @return the string identifying this comparator
+ */
+ static String stringifyComparator(RawComparator comparator) {
+ if (comparator == null) {
+ return "";
+ } else if (comparator.getClass() == MemcmpRawComparator.class){
+ return COMPARATOR_MEMCMP;
+ } else {
+ return COMPARATOR_USER_MANAGED;
+ }
+ }
+
+ /**
* Dumping the TFile information.
*
* @param args
Modified: hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/file/tfile/TFileDumper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/file/tfile/TFileDumper.java?rev=1042107&r1=1042106&r2=1042107&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/file/tfile/TFileDumper.java (original)
+++ hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/file/tfile/TFileDumper.java Sat Dec 4 07:13:10 2010
@@ -106,7 +106,7 @@ class TFileDumper {
int blockCnt = reader.readerBCF.getBlockCount();
int metaBlkCnt = reader.readerBCF.metaIndex.index.size();
properties.put("BCFile Version", reader.readerBCF.version.toString());
- properties.put("TFile Version", reader.tfileMeta.version.toString());
+ properties.put("TFile Version", reader.getFileVersion().toString());
properties.put("File Length", Long.toString(length));
properties.put("Data Compression", reader.readerBCF
.getDefaultCompressionName());
Modified: hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/file/tfile/Utils.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/file/tfile/Utils.java?rev=1042107&r1=1042106&r2=1042107&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/file/tfile/Utils.java (original)
+++ hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/file/tfile/Utils.java Sat Dec 4 07:13:10 2010
@@ -26,6 +26,7 @@ import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.serial.RawComparator;
/**
* Supporting Utility classes used by TFile, and shared by users of TFile.
@@ -414,15 +415,16 @@ public final class Utils {
* @return The index to the desired element if it exists; or list.size()
* otherwise.
*/
- public static <T> int lowerBound(List<? extends T> list, T key,
- Comparator<? super T> cmp) {
+ public static <T extends RawComparable>
+ int lowerBound(List<? extends T> list, T key, RawComparator cmp) {
int low = 0;
int high = list.size();
while (low < high) {
int mid = (low + high) >>> 1;
T midVal = list.get(mid);
- int ret = cmp.compare(midVal, key);
+ int ret = cmp.compare(midVal.buffer(), midVal.offset(), midVal.size(),
+ key.buffer(), key.offset(), key.size());
if (ret < 0)
low = mid + 1;
else high = mid;
@@ -445,15 +447,16 @@ public final class Utils {
* @return The index to the desired element if it exists; or list.size()
* otherwise.
*/
- public static <T> int upperBound(List<? extends T> list, T key,
- Comparator<? super T> cmp) {
+ public static <T extends RawComparable>
+ int upperBound(List<? extends T> list, T key, RawComparator cmp) {
int low = 0;
int high = list.size();
while (low < high) {
int mid = (low + high) >>> 1;
T midVal = list.get(mid);
- int ret = cmp.compare(midVal, key);
+ int ret = cmp.compare(midVal.buffer(), midVal.offset(), midVal.size(),
+ key.buffer(), key.offset(), key.size());
if (ret <= 0)
low = mid + 1;
else high = mid;
@@ -491,6 +494,35 @@ public final class Utils {
}
/**
+ * Lower bound binary search. Find the index to the first element in the list
+ * that compares greater than or equal to key.
+ *
+ * @param <T>
+ * Type of the input key.
+ * @param list
+ * The list
+ * @param key
+ * The input key.
+ * @return The index to the desired element if it exists; or list.size()
+ * otherwise.
+ */
+ public static <T> int lowerBound(List<? extends T> list,
+ T key, Comparator<? super T> cmp) {
+ int low = 0;
+ int high = list.size();
+
+ while (low < high) {
+ int mid = (low + high) >>> 1;
+ T midVal = list.get(mid);
+ int ret = cmp.compare(midVal, key);
+ if (ret < 0)
+ low = mid + 1;
+ else high = mid;
+ }
+ return low;
+ }
+
+ /**
* Upper bound binary search. Find the index to the first element in the list
* that compares greater than the input key.
*
Added: hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/serial/RawComparator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/serial/RawComparator.java?rev=1042107&view=auto
==============================================================================
--- hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/serial/RawComparator.java (added)
+++ hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/serial/RawComparator.java Sat Dec 4 07:13:10 2010
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.io.serial;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * A compare function that compares two sets of bytes.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Stable
+public interface RawComparator {
+
+ /**
+ * Compare the two serialized keys. This must be stable, so:
+ * compare(b1,s1,l1,b2,s2,l2) = -compare(b2,s2,l2,b1,s1,l2) for all buffers.
+ * @param b1 the left data buffer to compare
+ * @param s1 the first index in b1 to compare
+ * @param l1 the number of bytes in b1 to compare
+ * @param b2 the right data buffer to compare
+ * @param s2 the first index in b2 to compare
+ * @param l2 the number of bytes in b2 to compare
+ * @return negative if b1 is less than b2, 0 if they are equal, positive if
+ * b1 is greater than b2.
+ */
+ public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2);
+
+}
Added: hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/serial/Serialization.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/serial/Serialization.java?rev=1042107&view=auto
==============================================================================
--- hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/serial/Serialization.java (added)
+++ hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/serial/Serialization.java Sat Dec 4 07:13:10 2010
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.io.serial;
+
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * The primary interface to provide serialization.
+ * @param <T> the parent type that it will serialize
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Stable
+public abstract class Serialization<T> implements Cloneable {
+
+ /**
+ * Serialize the given object to the OutputStream.
+ * @param stream the stream to serialize to
+ * @param object the object to serialize
+ * @throws IOException if the serialization fails
+ */
+ public abstract void serialize(OutputStream stream,
+ T object) throws IOException;
+
+ /**
+ * Deserialize the given object from the InputStream.
+ * @param stream the stream to deserialize from
+ * @param reusableObject an object (or null) that may be reused by the
+ * serializer
+ * @param conf the user's configuration
+ * @return the object that was created or reused with the data in it
+ * @throws IOException if the deserialization fails
+ */
+ public abstract T deserialize(InputStream stream,
+ T reusableObject,
+ Configuration conf) throws IOException;
+
+ /**
+ * Get the default raw comparator for the given serializer
+ * @return a comparator that will compare bytes
+ */
+ public abstract RawComparator getRawComparator();
+
+ /**
+ * Serialize the serializer's configuration to the output stream.
+ * @param out the stream to serialize to
+ * @throws IOException if the serialization fails
+ */
+ public abstract void serializeSelf(OutputStream out) throws IOException;
+
+ /**
+ * Modify the serialization's configuration to reflect the contents of the
+ * input stream.
+ * @param in the stream to read from
+ * @param conf the configuration
+ * @throws IOException if the deserialization fails
+ */
+ public abstract void deserializeSelf(InputStream in,
+ Configuration conf) throws IOException;
+
+ /**
+ * Generate the state of the serialization in a human-friendly string.
+ * @return the textual representation of the serialization state
+ */
+ @Override
+ public abstract String toString();
+
+ /**
+ * Restore the state of the serialization from a human-friendly string.
+ * @param metadata the string that was generated by toString
+ * @throws IOException
+ */
+ public abstract void fromString(String metadata) throws IOException;
+
+ /**
+ * Get the name for this kind of serialization, which must be unique. This
+ * name is used to identify the serialization that was used to write a
+ * particular file.
+ * @return the unique name
+ */
+ public String getName() {
+ return getClass().getName();
+ }
+
+ /**
+ * Ensure the InputStream is a DataInput, wrapping it if necessary
+ * @param in the input stream to wrap
+ * @return the wrapped stream
+ */
+ protected DataInput ensureDataInput(InputStream in) {
+ if (in instanceof DataInput) {
+ return (DataInput) in;
+ } else {
+ return new DataInputStream(in);
+ }
+ }
+
+ /**
+ * Ensure the OutputStream is a DataOutput, wrapping it if necessary.
+ * @param out the output stream to wrap
+ * @return the wrapped stream
+ */
+ protected DataOutput ensureDataOutput(OutputStream out) {
+ if (out instanceof DataOutput) {
+ return (DataOutput) out;
+ } else {
+ return new DataOutputStream(out);
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public Serialization<T> clone() {
+ try {
+ return (Serialization<T>) super.clone();
+ } catch (CloneNotSupportedException e) {
+ throw new IllegalArgumentException("Can't clone object " + this, e);
+ }
+ }
+}
Added: hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/serial/SerializationFactory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/serial/SerializationFactory.java?rev=1042107&view=auto
==============================================================================
--- hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/serial/SerializationFactory.java (added)
+++ hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/serial/SerializationFactory.java Sat Dec 4 07:13:10 2010
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.io.serial;
+
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SERIALIZATIONS_KEY;
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_SERIALIZATIONS_KEY;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.serial.lib.CompatibilitySerialization;
+import org.apache.hadoop.io.serial.lib.WritableSerialization;
+import org.apache.hadoop.io.serial.lib.avro.AvroSerialization;
+import org.apache.hadoop.io.serial.lib.protobuf.ProtoBufSerialization;
+import org.apache.hadoop.io.serial.lib.thrift.ThriftSerialization;
+import org.apache.hadoop.util.ReflectionUtils;
+
+/**
+ * A factory that finds and creates Serializations.
+ *
+ * There are two methods. The first finds a Serialization by its name (ie.
+ * avro, writable, thrift, etc.). The second finds a TypedSerialization based
+ * on the type that needs to be serialized.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Stable
+public class SerializationFactory {
+ private static final Log LOG = LogFactory.getLog(SerializationFactory.class);
+
+ private final List<TypedSerialization<?>> typedSerializations =
+ new ArrayList<TypedSerialization<?>>();
+ private final Map<String, Serialization<?>> serializations =
+ new HashMap<String, Serialization<?>>();
+
+ public SerializationFactory(Configuration conf) {
+ Class<?>[] classes =
+ conf.getClasses(HADOOP_SERIALIZATIONS_KEY,
+ new Class<?>[]{WritableSerialization.class,
+ ProtoBufSerialization.class,
+ ThriftSerialization.class,
+ AvroSerialization.class,
+ CompatibilitySerialization.class});
+ for(Class<?> cls: classes) {
+ if (Serialization.class.isAssignableFrom(cls)) {
+ Serialization<?> serial =
+ (Serialization<?>) ReflectionUtils.newInstance(cls, conf);
+ if (serial instanceof TypedSerialization<?>) {
+ typedSerializations.add((TypedSerialization<?>) serial);
+ }
+ String name = serial.getName();
+ if (serializations.containsKey(name)) {
+ throw new IllegalArgumentException("Two serializations have the" +
+ " same name: " + name);
+ }
+ serializations.put(serial.getName(), serial);
+ LOG.debug("Adding serialization " + serial.getName());
+ } else {
+ throw new IllegalArgumentException("Unknown serialization class " +
+ cls.getName());
+ }
+ }
+ }
+
+ private static final Map<String, SerializationFactory> FACTORY_CACHE =
+ new HashMap<String, SerializationFactory>();
+
+ /**
+ * Get the cached factory for the given configuration. Two configurations
+ * that have the same io.configurations value will be considered identical
+ * because we can't keep a reference to the Configuration without locking it
+ * in memory.
+ * @param conf the configuration
+ * @return the factory for a given configuration
+ */
+ public static synchronized
+ SerializationFactory getInstance(Configuration conf) {
+ String serializerNames = conf.get(HADOOP_SERIALIZATIONS_KEY, "*default*");
+ String obsoleteSerializerNames = conf.get(IO_SERIALIZATIONS_KEY, "*default*");
+ String key = serializerNames + " " + obsoleteSerializerNames;
+ SerializationFactory result = FACTORY_CACHE.get(key);
+ if (result == null) {
+ result = new SerializationFactory(conf);
+ FACTORY_CACHE.put(key, result);
+ }
+ return result;
+ }
+
+ /**
+ * Look up a serialization by name and return a clone of it.
+ * @param name
+ * @return a newly cloned serialization of the right name
+ */
+ public Serialization<?> getSerialization(String name) {
+ return serializations.get(name).clone();
+ }
+
+ /**
+ * Find the first acceptable serialization for a given type.
+ * @param cls the class that should be serialized
+ * @return a serialization that should be used to serialize the class
+ */
+ @SuppressWarnings("unchecked")
+ public <T> TypedSerialization<? super T> getSerializationByType(Class<T> cls){
+ for (TypedSerialization<?> serial: typedSerializations) {
+ if (serial.accept(cls)) {
+ TypedSerialization<? super T> result =
+ (TypedSerialization<? super T>) serial.clone();
+ result.setSpecificType(cls);
+ return result;
+ }
+ }
+ throw new IllegalArgumentException("Could not find a serialization to"+
+ " accept " + cls.getName());
+ }
+
+}
Added: hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/serial/TypedSerialization.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/serial/TypedSerialization.java?rev=1042107&view=auto
==============================================================================
--- hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/serial/TypedSerialization.java (added)
+++ hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/serial/TypedSerialization.java Sat Dec 4 07:13:10 2010
@@ -0,0 +1,140 @@
+package org.apache.hadoop.io.serial;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.serial.lib.SerializationMetadata.TypedSerializationMetadata;
+import org.yaml.snakeyaml.Yaml;
+
+/**
+ * An abstract base class for serializers that handle types under a given
+ * parent type. Generally, their metadata consists of the class name of the
+ * specific type that is being serialized.
+ * <p>
+ * Typically, TypedSerializations have two types. The first is the base type,
+ * which is the static parent type that it can serialize. The other is the
+ * specific type that this instance is current serializing.
+ * @param <T> the base type that a given class of Serializers will serialize.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Stable
+public abstract class TypedSerialization<T> extends Serialization<T> {
+ protected Class<? extends T> specificType;
+
+ protected TypedSerialization() {
+ }
+
+ protected TypedSerialization(Class<? extends T> specificType) {
+ this.specificType = specificType;
+ }
+
+ /**
+ * Get the base class that this method of serialization can handle.
+ * @return the base class
+ */
+ public abstract Class<T> getBaseType();
+
+ public void setSpecificType(Class<? extends T> cls) {
+ specificType = cls;
+ }
+
+ public Class<? extends T> getSpecificType() {
+ return specificType;
+ }
+
+ /**
+ * Can this serialization serialize/deserialize a given class
+ * @param candidateClass the class in question
+ * @return true if the class can be serialized
+ */
+ public boolean accept(Class<?> candidateClass) {
+ return getBaseType().isAssignableFrom(candidateClass);
+ }
+
+ /**
+ * Read the specific class as the metadata.
+ * @throws IOException when class not found or the deserialization fails
+ */
+ @Override
+ public void deserializeSelf(InputStream in,
+ Configuration conf) throws IOException {
+ TypedSerializationMetadata data = TypedSerializationMetadata.parseFrom(in);
+ if (data.hasTypename()) {
+ setSpecificTypeByName(data.getTypename());
+ }
+ }
+
+ /**
+ * Write the specific class name as the metadata.
+ */
+ @Override
+ public void serializeSelf(OutputStream out) throws IOException {
+ TypedSerializationMetadata.newBuilder().
+ setTypename(specificType == null ? "" : specificType.getName()).
+ build().writeTo(out);
+ }
+
+ private static final String CLASS_ATTRIBUTE = "class";
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void fromString(String meta) throws IOException {
+ Yaml yaml = new Yaml();
+ Map<String, String> map = (Map<String,String>) yaml.load(meta);
+ String cls = map.get(CLASS_ATTRIBUTE);
+ setSpecificTypeByName(cls);
+ }
+
+ @SuppressWarnings("unchecked")
+ private void setSpecificTypeByName(String name) throws IOException {
+ if (name == null || name.length() == 0) {
+ specificType = null;
+ } else {
+ try {
+ setSpecificType((Class<? extends T>) Class.forName(name));
+ } catch (ClassNotFoundException e) {
+ throw new IOException("serializer class not found " + name, e);
+ }
+ }
+ }
+
+ public String toString() {
+ Yaml yaml = new Yaml();
+ Map<String,String> map = new HashMap<String,String>();
+ if (specificType != null) {
+ map.put(CLASS_ATTRIBUTE, specificType.getName());
+ }
+ return yaml.dump(map);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public boolean equals(Object right) {
+ if (this == right) {
+ return true;
+ } else if (right == null || right.getClass() != getClass()) {
+ return false;
+ } else {
+ TypedSerialization<T> rightTyped = (TypedSerialization<T>) right;
+ return specificType == rightTyped.specificType;
+ }
+ }
+
+ @Override
+ public int hashCode() {
+ return specificType == null ? 42 : specificType.hashCode();
+ }
+
+ @Override
+ public TypedSerialization<T> clone() {
+ TypedSerialization<T> result = (TypedSerialization<T>) super.clone();
+ result.specificType = specificType;
+ return result;
+ }
+}
Added: hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/serial/lib/CompatibilitySerialization.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/serial/lib/CompatibilitySerialization.java?rev=1042107&view=auto
==============================================================================
--- hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/serial/lib/CompatibilitySerialization.java (added)
+++ hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/serial/lib/CompatibilitySerialization.java Sat Dec 4 07:13:10 2010
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.io.serial.lib;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.avro.Schema;
+import org.apache.avro.reflect.ReflectData;
+import org.apache.avro.specific.SpecificData;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.io.serial.RawComparator;
+import org.apache.hadoop.io.serial.TypedSerialization;
+import org.apache.hadoop.io.serial.lib.avro.AvroComparator;
+import org.apache.hadoop.util.ReflectionUtils;
+
+/**
+ * This class allows user-defined old style serializers to run inside the new
+ * framework. This will only be used for user serializations that haven't been
+ * ported yet.
+ */
+@SuppressWarnings("deprecation")
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class CompatibilitySerialization extends TypedSerialization<Object>
+ implements Configurable {
+ private org.apache.hadoop.io.serializer.SerializationFactory factory;
+
+ @SuppressWarnings("unchecked")
+ private org.apache.hadoop.io.serializer.Serialization
+ serialization = null;
+
+ public CompatibilitySerialization() {
+ // NOTHING
+ }
+
+ @Override
+ public CompatibilitySerialization clone() {
+ CompatibilitySerialization result =
+ (CompatibilitySerialization) super.clone();
+ result.factory = factory;
+ result.serialization = serialization;
+ return result;
+ }
+
+ @Override
+ public Class<Object> getBaseType() {
+ return Object.class;
+ }
+
+ @Override
+ public boolean accept(Class<? extends Object> candidateClass) {
+ return factory.getSerialization(candidateClass) != null;
+ }
+
+ @Override
+ public void setSpecificType(Class<?> cls) {
+ super.setSpecificType(cls);
+ serialization = factory.getSerialization(cls);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public Object deserialize(InputStream stream, Object reusableObject,
+ Configuration conf) throws IOException {
+ org.apache.hadoop.io.serializer.Deserializer deserializer =
+ serialization.getDeserializer(specificType);
+ deserializer.open(stream);
+ Object result = deserializer.deserialize(reusableObject);
+ // if the object is new, configure it
+ if (result != reusableObject) {
+ ReflectionUtils.setConf(result, conf);
+ }
+ return result;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public RawComparator getRawComparator() {
+ if (specificType == null) {
+ throw new
+ IllegalArgumentException("Must have specific type for comparision");
+ } else if (serialization instanceof
+ org.apache.hadoop.io.serializer.WritableSerialization) {
+ return WritableComparator.get((Class) specificType);
+ } else if (serialization instanceof
+ org.apache.hadoop.io.serializer.avro.AvroReflectSerialization){
+ Schema schema = ReflectData.get().getSchema(specificType);
+ return new AvroComparator(schema);
+ } else if (serialization instanceof
+ org.apache.hadoop.io.serializer.avro.AvroSpecificSerialization){
+ Schema schema = SpecificData.get().getSchema(specificType);
+ return new AvroComparator(schema);
+ } else if (Comparable.class.isAssignableFrom(specificType)) {
+ // if the type is comparable, we can deserialize
+ return new DeserializationRawComparator(this, null);
+ } else {
+ return new MemcmpRawComparator();
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void serialize(OutputStream stream, Object object) throws IOException {
+ org.apache.hadoop.io.serializer.Serializer serializer =
+ serialization.getSerializer(specificType);
+ serializer.open(stream);
+ serializer.serialize(object);
+ }
+
+ @Override
+ public String getName() {
+ return "compatibility";
+ }
+
+ @Override
+ public Configuration getConf() {
+ return null;
+ }
+
+ @Override
+ public void setConf(Configuration conf) {
+ factory = new org.apache.hadoop.io.serializer.SerializationFactory(conf);
+ }
+}
Added: hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/serial/lib/DeserializationRawComparator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/serial/lib/DeserializationRawComparator.java?rev=1042107&view=auto
==============================================================================
--- hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/serial/lib/DeserializationRawComparator.java (added)
+++ hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/serial/lib/DeserializationRawComparator.java Sat Dec 4 07:13:10 2010
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.io.serial.lib;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.serial.RawComparator;
+import org.apache.hadoop.io.serial.Serialization;
+
+/**
+ * <p>
+ * A {@link RawComparator} that uses a {@link Serialization}
+ * object to deserialize objects that are then compared via
+ * their {@link Comparable} interfaces.
+ * </p>
+ * @param <T>
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Stable
+public class DeserializationRawComparator<T extends Comparable<T>>
+ implements RawComparator {
+ private final Serialization<T> serialization;
+ private final Configuration conf;
+
+ private static final class ReusableObjects<T extends Comparable<T>> {
+ DataInputBuffer buf = new DataInputBuffer();
+ T left = null;
+ T right = null;
+ }
+
+ private static final ThreadLocal<ReusableObjects<?>> REUSE_FACTORY =
+ new ThreadLocal<ReusableObjects<?>>(){
+ @SuppressWarnings("unchecked")
+ @Override
+ protected ReusableObjects<?> initialValue() {
+ return new ReusableObjects();
+ }
+ };
+
+ public DeserializationRawComparator(Serialization<T> serialization,
+ Configuration conf) {
+ this.serialization = serialization;
+ this.conf = conf;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+ ReusableObjects<T> reuse = (ReusableObjects<T>) REUSE_FACTORY.get();
+ try {
+ reuse.buf.reset(b1, s1, l1);
+ reuse.left = serialization.deserialize(reuse.buf, reuse.left, conf);
+ reuse.buf.reset(b2, s2, l2);
+ reuse.right = serialization.deserialize(reuse.buf, reuse.right, conf);
+ return reuse.left.compareTo(reuse.right);
+ } catch (IOException e) {
+ throw new RuntimeException("Error in deserialization",e);
+ }
+ }
+
+}
Added: hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/serial/lib/JavaSerialization.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/serial/lib/JavaSerialization.java?rev=1042107&view=auto
==============================================================================
--- hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/serial/lib/JavaSerialization.java (added)
+++ hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/serial/lib/JavaSerialization.java Sat Dec 4 07:13:10 2010
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.io.serial.lib;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.OutputStream;
+import java.io.Serializable;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.serial.RawComparator;
+import org.apache.hadoop.io.serial.TypedSerialization;
+
+/**
+ * A serialization binding for Java serialization. It has the advantage of
+ * handling all serializable Java types, but is not space or time efficient. In
+ * particular, the type information is repeated in each record.
+ * It is not enabled by default.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Stable
+public class JavaSerialization extends TypedSerialization<Serializable> {
+
+ @Override
+ public Serializable deserialize(InputStream stream,
+ Serializable reusableObject,
+ Configuration conf) throws IOException {
+ ObjectInputStream ois = new ObjectInputStream(stream) {
+ @Override protected void readStreamHeader() {
+ // no header
+ }
+ };
+ try {
+ // ignore passed-in object
+ Serializable result = (Serializable) ois.readObject();
+ return result;
+ } catch (ClassNotFoundException e) {
+ throw new IOException(e.toString());
+ }
+ }
+
+ @Override
+ public void deserializeSelf(InputStream in, Configuration conf) {
+ // nothing
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public RawComparator getRawComparator() {
+ return new DeserializationRawComparator(this, null);
+ }
+
+ @Override
+ public void serialize(OutputStream stream, Serializable object
+ ) throws IOException {
+ ObjectOutputStream oos = new ObjectOutputStream(stream) {
+ @Override protected void writeStreamHeader() {
+ // no header
+ }
+ };
+ oos.reset(); // clear (class) back-references
+ oos.writeObject(object);
+ oos.flush();
+ }
+
+ @Override
+ public void serializeSelf(OutputStream out) throws IOException {
+ // nothing
+ }
+
+ @Override
+ public Class<Serializable> getBaseType() {
+ return Serializable.class;
+ }
+
+ @Override
+ public String getName() {
+ return "java";
+ }
+
+ @Override
+ public void fromString(String metadata) {
+ // NOTHING
+ }
+
+ @Override
+ public String toString() {
+ return "<Java Serialization>";
+ }
+}
Added: hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/serial/lib/MemcmpRawComparator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/serial/lib/MemcmpRawComparator.java?rev=1042107&view=auto
==============================================================================
--- hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/serial/lib/MemcmpRawComparator.java (added)
+++ hadoop/common/branches/HADOOP-6685/src/java/org/apache/hadoop/io/serial/lib/MemcmpRawComparator.java Sat Dec 4 07:13:10 2010
@@ -0,0 +1,19 @@
+package org.apache.hadoop.io.serial.lib;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.io.serial.RawComparator;
+import org.apache.hadoop.io.WritableComparator;
+
+/**
+ * A raw comparator that compares byte strings in lexicographical order.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Stable
+public final class MemcmpRawComparator implements RawComparator {
+ @Override
+ public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+ return WritableComparator.compareBytes(b1, s1, l1, b2, s2, l2);
+ }
+
+}