You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cd...@apache.org on 2008/03/09 05:06:08 UTC
svn commit: r635156 - in /hadoop/core/trunk: CHANGES.txt
src/java/org/apache/hadoop/io/SequenceFile.java
src/test/org/apache/hadoop/mapred/TestMapRed.java
Author: cdouglas
Date: Sat Mar 8 20:06:08 2008
New Revision: 635156
URL: http://svn.apache.org/viewvc?rev=635156&view=rev
Log:
HADOOP-2943. Compression of intermediate map output causes failures in the
merge.
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/java/org/apache/hadoop/io/SequenceFile.java
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMapRed.java
Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=635156&r1=635155&r2=635156&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Sat Mar 8 20:06:08 2008
@@ -150,7 +150,10 @@
HADOOP-2938. Some fs commands did not glob paths.
(Tsz Wo (Nicholas), SZE via rangadi)
-
+
+ HADOOP-2943. Compression of intermediate map output causes failures
+ in the merge. (cdouglas)
+
Release 0.16.1 - 2008-03-13
INCOMPATIBLE CHANGES
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/io/SequenceFile.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/io/SequenceFile.java?rev=635156&r1=635155&r2=635156&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/io/SequenceFile.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/io/SequenceFile.java Sat Mar 8 20:06:08 2008
@@ -795,9 +795,9 @@
Metadata metadata = null;
Compressor compressor = null;
- private Serializer keySerializer;
- private Serializer uncompressedValSerializer;
- private Serializer compressedValSerializer;
+ protected Serializer keySerializer;
+ protected Serializer uncompressedValSerializer;
+ protected Serializer compressedValSerializer;
// Insert a globally unique 16-byte value every few entries, so that one
// can seek into the middle of a file and then synchronize with record
@@ -1111,7 +1111,8 @@
boolean isBlockCompressed() { return false; }
/** Append a key/value pair. */
- public synchronized void append(Writable key, Writable val)
+ @SuppressWarnings("unchecked")
+ public synchronized void append(Object key, Object val)
throws IOException {
if (key.getClass() != keyClass)
throw new IOException("wrong key class: "+key.getClass().getName()
@@ -1123,14 +1124,14 @@
buffer.reset();
// Append the 'key'
- key.write(buffer);
+ keySerializer.serialize(key);
int keyLength = buffer.getLength();
if (keyLength == 0)
throw new IOException("zero length keys not allowed: " + key);
// Compress 'value' and append it
deflateFilter.resetState();
- val.write(deflateOut);
+ compressedValSerializer.serialize(val);
deflateOut.flush();
deflateFilter.finish();
@@ -1238,8 +1239,12 @@
boolean isBlockCompressed() { return true; }
/** Initialize */
- void init(int compressionBlockSize) {
+ void init(int compressionBlockSize) throws IOException {
this.compressionBlockSize = compressionBlockSize;
+ keySerializer.close();
+ keySerializer.open(keyBuffer);
+ uncompressedValSerializer.close();
+ uncompressedValSerializer.open(valBuffer);
}
/** Workhorse to check and write out compressed data/lengths */
@@ -1295,7 +1300,8 @@
}
/** Append a key/value pair. */
- public synchronized void append(Writable key, Writable val)
+ @SuppressWarnings("unchecked")
+ public synchronized void append(Object key, Object val)
throws IOException {
if (key.getClass() != keyClass)
throw new IOException("wrong key class: "+key+" is not "+keyClass);
@@ -1304,14 +1310,14 @@
// Save key/value into respective buffers
int oldKeyLength = keyBuffer.getLength();
- key.write(keyBuffer);
+ keySerializer.serialize(key);
int keyLength = keyBuffer.getLength() - oldKeyLength;
if (keyLength == 0)
throw new IOException("zero length keys not allowed: " + key);
WritableUtils.writeVInt(keyLenBuffer, keyLength);
int oldValLength = valBuffer.getLength();
- val.write(valBuffer);
+ uncompressedValSerializer.serialize(val);
int valLength = valBuffer.getLength() - oldValLength;
WritableUtils.writeVInt(valLenBuffer, valLength);
Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMapRed.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMapRed.java?rev=635156&r1=635155&r2=635156&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMapRed.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMapRed.java Sat Mar 8 20:06:08 2008
@@ -24,6 +24,7 @@
import java.io.*;
import java.util.*;
+import static org.apache.hadoop.io.SequenceFile.CompressionType;
/**********************************************************
* MapredLoadTest generates a bunch of work that exercises
@@ -282,8 +283,8 @@
}
- private void checkCompression(boolean compressMapOutput,
- boolean compressReduceOutput,
+ private void checkCompression(CompressionType mapCompression,
+ CompressionType redCompression,
boolean includeCombine
) throws Exception {
JobConf conf = new JobConf(TestMapRed.class);
@@ -302,12 +303,9 @@
if (includeCombine) {
conf.setCombinerClass(IdentityReducer.class);
}
- if (compressMapOutput) {
- conf.setCompressMapOutput(true);
- }
- if (compressReduceOutput) {
- SequenceFileOutputFormat.setCompressOutput(conf, true);
- }
+ conf.setMapOutputCompressionType(mapCompression);
+ conf.setCompressMapOutput(mapCompression != CompressionType.NONE);
+ SequenceFileOutputFormat.setOutputCompressionType(conf, redCompression);
try {
if (!fs.mkdirs(testdir)) {
throw new IOException("Mkdirs failed to create " + testdir.toString());
@@ -330,7 +328,7 @@
SequenceFile.Reader rdr =
new SequenceFile.Reader(fs, output, conf);
assertEquals("is reduce output compressed " + output,
- compressReduceOutput,
+ redCompression != CompressionType.NONE,
rdr.isCompressed());
rdr.close();
} finally {
@@ -339,10 +337,12 @@
}
public void testCompression() throws Exception {
- for(int compressMap=0; compressMap < 2; ++compressMap) {
- for(int compressOut=0; compressOut < 2; ++compressOut) {
+ EnumSet<SequenceFile.CompressionType> seq =
+ EnumSet.allOf(SequenceFile.CompressionType.class);
+ for (CompressionType mapCompression : seq) {
+ for (CompressionType redCompression : seq) {
for(int combine=0; combine < 2; ++combine) {
- checkCompression(compressMap == 1, compressOut == 1,
+ checkCompression(mapCompression, redCompression,
combine == 1);
}
}