You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cd...@apache.org on 2008/03/09 05:06:08 UTC

svn commit: r635156 - in /hadoop/core/trunk: CHANGES.txt src/java/org/apache/hadoop/io/SequenceFile.java src/test/org/apache/hadoop/mapred/TestMapRed.java

Author: cdouglas
Date: Sat Mar  8 20:06:08 2008
New Revision: 635156

URL: http://svn.apache.org/viewvc?rev=635156&view=rev
Log:
HADOOP-2943. Compression of intermediate map output causes failures in the
merge.


Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/java/org/apache/hadoop/io/SequenceFile.java
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMapRed.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=635156&r1=635155&r2=635156&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Sat Mar  8 20:06:08 2008
@@ -150,7 +150,10 @@
 
     HADOOP-2938. Some fs commands did not glob paths.
     (Tsz Wo (Nicholas), SZE via rangadi)
-    
+
+    HADOOP-2943. Compression of intermediate map output causes failures
+    in the merge. (cdouglas)
+
 Release 0.16.1 - 2008-03-13
 
   INCOMPATIBLE CHANGES

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/io/SequenceFile.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/io/SequenceFile.java?rev=635156&r1=635155&r2=635156&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/io/SequenceFile.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/io/SequenceFile.java Sat Mar  8 20:06:08 2008
@@ -795,9 +795,9 @@
     Metadata metadata = null;
     Compressor compressor = null;
     
-    private Serializer keySerializer;
-    private Serializer uncompressedValSerializer;
-    private Serializer compressedValSerializer;
+    protected Serializer keySerializer;
+    protected Serializer uncompressedValSerializer;
+    protected Serializer compressedValSerializer;
     
     // Insert a globally unique 16-byte value every few entries, so that one
     // can seek into the middle of a file and then synchronize with record
@@ -1111,7 +1111,8 @@
     boolean isBlockCompressed() { return false; }
 
     /** Append a key/value pair. */
-    public synchronized void append(Writable key, Writable val)
+    @SuppressWarnings("unchecked")
+    public synchronized void append(Object key, Object val)
       throws IOException {
       if (key.getClass() != keyClass)
         throw new IOException("wrong key class: "+key.getClass().getName()
@@ -1123,14 +1124,14 @@
       buffer.reset();
 
       // Append the 'key'
-      key.write(buffer);
+      keySerializer.serialize(key);
       int keyLength = buffer.getLength();
       if (keyLength == 0)
         throw new IOException("zero length keys not allowed: " + key);
 
       // Compress 'value' and append it
       deflateFilter.resetState();
-      val.write(deflateOut);
+      compressedValSerializer.serialize(val);
       deflateOut.flush();
       deflateFilter.finish();
 
@@ -1238,8 +1239,12 @@
     boolean isBlockCompressed() { return true; }
 
     /** Initialize */
-    void init(int compressionBlockSize) {
+    void init(int compressionBlockSize) throws IOException {
       this.compressionBlockSize = compressionBlockSize;
+      keySerializer.close();
+      keySerializer.open(keyBuffer);
+      uncompressedValSerializer.close();
+      uncompressedValSerializer.open(valBuffer);
     }
     
     /** Workhorse to check and write out compressed data/lengths */
@@ -1295,7 +1300,8 @@
     }
 
     /** Append a key/value pair. */
-    public synchronized void append(Writable key, Writable val)
+    @SuppressWarnings("unchecked")
+    public synchronized void append(Object key, Object val)
       throws IOException {
       if (key.getClass() != keyClass)
         throw new IOException("wrong key class: "+key+" is not "+keyClass);
@@ -1304,14 +1310,14 @@
 
       // Save key/value into respective buffers 
       int oldKeyLength = keyBuffer.getLength();
-      key.write(keyBuffer);
+      keySerializer.serialize(key);
       int keyLength = keyBuffer.getLength() - oldKeyLength;
       if (keyLength == 0)
         throw new IOException("zero length keys not allowed: " + key);
       WritableUtils.writeVInt(keyLenBuffer, keyLength);
 
       int oldValLength = valBuffer.getLength();
-      val.write(valBuffer);
+      uncompressedValSerializer.serialize(val);
       int valLength = valBuffer.getLength() - oldValLength;
       WritableUtils.writeVInt(valLenBuffer, valLength);
       

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMapRed.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMapRed.java?rev=635156&r1=635155&r2=635156&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMapRed.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMapRed.java Sat Mar  8 20:06:08 2008
@@ -24,6 +24,7 @@
 import java.io.*;
 import java.util.*;
 
+import static org.apache.hadoop.io.SequenceFile.CompressionType;
 
 /**********************************************************
  * MapredLoadTest generates a bunch of work that exercises
@@ -282,8 +283,8 @@
       
   }
     
-  private void checkCompression(boolean compressMapOutput,
-                                boolean compressReduceOutput,
+  private void checkCompression(CompressionType mapCompression,
+                                CompressionType redCompression,
                                 boolean includeCombine
                                 ) throws Exception {
     JobConf conf = new JobConf(TestMapRed.class);
@@ -302,12 +303,9 @@
     if (includeCombine) {
       conf.setCombinerClass(IdentityReducer.class);
     }
-    if (compressMapOutput) {
-      conf.setCompressMapOutput(true);
-    }
-    if (compressReduceOutput) {
-      SequenceFileOutputFormat.setCompressOutput(conf, true);
-    }
+    conf.setMapOutputCompressionType(mapCompression);
+    conf.setCompressMapOutput(mapCompression != CompressionType.NONE);
+    SequenceFileOutputFormat.setOutputCompressionType(conf, redCompression);
     try {
       if (!fs.mkdirs(testdir)) {
         throw new IOException("Mkdirs failed to create " + testdir.toString());
@@ -330,7 +328,7 @@
       SequenceFile.Reader rdr = 
         new SequenceFile.Reader(fs, output, conf);
       assertEquals("is reduce output compressed " + output, 
-                   compressReduceOutput, 
+                   redCompression != CompressionType.NONE, 
                    rdr.isCompressed());
       rdr.close();
     } finally {
@@ -339,10 +337,12 @@
   }
     
   public void testCompression() throws Exception {
-    for(int compressMap=0; compressMap < 2; ++compressMap) {
-      for(int compressOut=0; compressOut < 2; ++compressOut) {
+    EnumSet<SequenceFile.CompressionType> seq =
+      EnumSet.allOf(SequenceFile.CompressionType.class);
+    for (CompressionType mapCompression : seq) {
+      for (CompressionType redCompression : seq) {
         for(int combine=0; combine < 2; ++combine) {
-          checkCompression(compressMap == 1, compressOut == 1,
+          checkCompression(mapCompression, redCompression,
                            combine == 1);
         }
       }