You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by dd...@apache.org on 2008/09/09 15:11:07 UTC

svn commit: r693455 - in /hadoop/core/trunk: ./ src/core/org/apache/hadoop/fs/ src/core/org/apache/hadoop/util/ src/mapred/org/apache/hadoop/mapred/ src/test/org/apache/hadoop/mapred/

Author: ddas
Date: Tue Sep  9 06:11:05 2008
New Revision: 693455

URL: http://svn.apache.org/viewvc?rev=693455&view=rev
Log:
HADOOP-3514. Inline the CRCs in intermediate files as opposed to reading it from a different .crc files. Contributed by Jothi Padmanabhan.

Added:
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IFileInputStream.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IFileOutputStream.java
Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/core/org/apache/hadoop/fs/LocalFileSystem.java
    hadoop/core/trunk/src/core/org/apache/hadoop/util/DataChecksum.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IFile.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTask.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Merger.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestReduceTask.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=693455&r1=693454&r2=693455&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Tue Sep  9 06:11:05 2008
@@ -316,6 +316,9 @@
     GenericMRLoadGenerator public, so they can be used in other contexts. 
     (Lingyun Yang via omalley)
 
+    HADOOP-3514. Inline the CRCs in intermediate files as opposed to reading
+    it from a different .crc file. (Jothi Padmanabhan via ddas)
+
   BUG FIXES
 
     HADOOP-3563.  Refactor the distributed upgrade code so that it is 

Modified: hadoop/core/trunk/src/core/org/apache/hadoop/fs/LocalFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/core/org/apache/hadoop/fs/LocalFileSystem.java?rev=693455&r1=693454&r2=693455&view=diff
==============================================================================
--- hadoop/core/trunk/src/core/org/apache/hadoop/fs/LocalFileSystem.java (original)
+++ hadoop/core/trunk/src/core/org/apache/hadoop/fs/LocalFileSystem.java Tue Sep  9 06:11:05 2008
@@ -29,13 +29,19 @@
 public class LocalFileSystem extends ChecksumFileSystem {
   static final URI NAME = URI.create("file:///");
   static private Random rand = new Random();
-
+  FileSystem rfs;
+  
   public LocalFileSystem() {
-    super(new RawLocalFileSystem());
+    this(new RawLocalFileSystem());
+  }
+  
+  public FileSystem getRaw() {
+    return rfs;
   }
     
   public LocalFileSystem(FileSystem rawLocalFileSystem) {
     super(rawLocalFileSystem);
+    rfs = rawLocalFileSystem;
   }
     
   /** Convert a path to a File. */

Modified: hadoop/core/trunk/src/core/org/apache/hadoop/util/DataChecksum.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/core/org/apache/hadoop/util/DataChecksum.java?rev=693455&r1=693454&r2=693455&view=diff
==============================================================================
--- hadoop/core/trunk/src/core/org/apache/hadoop/util/DataChecksum.java (original)
+++ hadoop/core/trunk/src/core/org/apache/hadoop/util/DataChecksum.java Tue Sep  9 06:11:05 2008
@@ -223,9 +223,6 @@
       summer.update( b, off, len );
       inSum += len;
     }
-    // Can be removed.
-    assert inSum <= bytesPerChecksum : "DataChecksum.update() : inSum " + 
-                inSum + " > " + " bytesPerChecksum " + bytesPerChecksum ; 
   }
   public void update( int b ) {
     summer.update( b );

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IFile.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IFile.java?rev=693455&r1=693454&r2=693455&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IFile.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IFile.java Tue Sep  9 06:11:05 2008
@@ -66,6 +66,8 @@
     long decompressedBytesWritten = 0;
     long compressedBytesWritten = 0;
     
+    IFileOutputStream checksumOut;
+
     Class<K> keyClass;
     Class<V> valueClass;
     Serializer<K> keySerializer;
@@ -83,17 +85,18 @@
     public Writer(Configuration conf, FSDataOutputStream out, 
         Class<K> keyClass, Class<V> valueClass,
         CompressionCodec codec) throws IOException {
+      this.checksumOut = new IFileOutputStream(out);
       this.rawOut = out;
       this.start = this.rawOut.getPos();
       
       if (codec != null) {
         this.compressor = CodecPool.getCompressor(codec);
         this.compressor.reset();
-        this.compressedOut = codec.createOutputStream(out, compressor);
+        this.compressedOut = codec.createOutputStream(checksumOut, compressor);
         this.out = new FSDataOutputStream(this.compressedOut,  null);
         this.compressOutput = true;
       } else {
-        this.out = out;
+        this.out = new FSDataOutputStream(checksumOut,null);
       }
       
       this.keyClass = keyClass;
@@ -106,6 +109,7 @@
     }
     
     public void close() throws IOException {
+
       // Close the serializers
       keySerializer.close();
       valueSerializer.close();
@@ -115,24 +119,25 @@
       WritableUtils.writeVInt(out, EOF_MARKER);
       decompressedBytesWritten += 2 * WritableUtils.getVIntSize(EOF_MARKER);
       
+      //Flush the stream
+      out.flush();
+  
       if (compressOutput) {
-        // Flush data from buffers into the compressor
-        out.flush();
-        
         // Flush & return the compressor
         compressedOut.finish();
         compressedOut.resetState();
         CodecPool.returnCompressor(compressor);
         compressor = null;
       }
-
+      
       // Close the stream
-      rawOut.flush();
+      checksumOut.close();
+      
       compressedBytesWritten = rawOut.getPos() - start;
-
+    
       // Close the underlying stream iff we own it...
       if (ownOutputStream) {
-        out.close();
+        rawOut.close();
       }
       out = null;
     }
@@ -216,43 +221,71 @@
     private static final int DEFAULT_BUFFER_SIZE = 128*1024;
     private static final int MAX_VINT_SIZE = 9;
 
-    FSDataInputStream rawIn;   // Raw InputStream from file
     InputStream in;            // Possibly decompressed stream that we read
     Decompressor decompressor;
     long bytesRead = 0;
     long fileLength = 0;
     boolean eof = false;
+    IFileInputStream checksumIn;
     
     byte[] buffer = null;
     int bufferSize = DEFAULT_BUFFER_SIZE;
     DataInputBuffer dataIn = new DataInputBuffer();
 
     int recNo = 1;
-
+    
+    /**
+     * Construct an IFile Reader.
+     * 
+     * @param conf Configuration File 
+     * @param fs  FileSystem
+     * @param file Path of the file to be opened. This file should have
+     *             checksum bytes for the data at the end of the file.
+     * @param codec codec
+     * @throws IOException
+     */
+    
     public Reader(Configuration conf, FileSystem fs, Path file,
                   CompressionCodec codec) throws IOException {
-      this(conf, fs.open(file), fs.getFileStatus(file).getLen(), codec);
+      this(conf, fs.open(file), 
+           fs.getFileStatus(file).getLen(),
+           codec);
     }
     
     protected Reader() {}
+
+    /**
+     * Construct an IFile Reader.
+     * 
+     * @param conf Configuration File 
+     * @param in   The input stream
+     * @param length Length of the data in the stream, including the checksum
+     *               bytes.
+     * @param codec codec
+     * @throws IOException
+     */
     
     public Reader(Configuration conf, FSDataInputStream in, long length, 
                   CompressionCodec codec) throws IOException {
-      this.rawIn = in;
+      checksumIn = new IFileInputStream(in,length);
       if (codec != null) {
         decompressor = CodecPool.getDecompressor(codec);
-        this.in = codec.createInputStream(in, decompressor);
+        this.in = codec.createInputStream(checksumIn, decompressor);
       } else {
-        this.in = in;
+        this.in = checksumIn;
       }
       this.fileLength = length;
       
       this.bufferSize = conf.getInt("io.file.buffer.size", DEFAULT_BUFFER_SIZE);
     }
     
-    public long getLength() { return fileLength; }
+    public long getLength() { 
+      return fileLength - checksumIn.getSize();
+    }
     
-    public long getPosition() throws IOException { return rawIn.getPos(); }
+    public long getPosition() throws IOException {    
+      return checksumIn.getPosition(); 
+    }
     
     /**
      * Read upto len bytes into buf starting at offset off.
@@ -414,6 +447,11 @@
       return bytesRead;
     }
     
+    @Override
+    public long getLength() { 
+      return fileLength;
+    }
+    
     private void dumpOnError() {
       File dumpFile = new File("../output/" + taskAttemptId + ".dump");
       System.err.println("Dumping corrupt map-output of " + taskAttemptId + 

Added: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IFileInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IFileInputStream.java?rev=693455&view=auto
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IFileInputStream.java (added)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IFileInputStream.java Tue Sep  9 06:11:05 2008
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.hadoop.fs.ChecksumException;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.util.DataChecksum;
+/**
+ * A checksum input stream, used for IFiles.
+ * Used to validate the checksum of files created by {@link IFileOutputStream}. 
+ */
+
+class IFileInputStream extends InputStream {
+  
+  private final InputStream in; //The input stream to be verified for checksum. 
+  private final long length; //The total length of the input file
+  private final long dataLength;
+  private DataChecksum sum;
+  private long currentOffset = 0;
+  private byte b[]; 
+  private byte csum[] = null;
+  private int checksumSize;
+  
+  /**
+   * Create a checksum input stream that reads
+   * @param in The input stream to be verified for checksum.
+   * @param len The length of the input stream including checksum bytes.
+   */
+  public IFileInputStream(InputStream in, long len) {
+    this.in = in;
+    sum = DataChecksum.newDataChecksum(DataChecksum.CHECKSUM_CRC32, 
+        Integer.MAX_VALUE);
+    checksumSize = sum.getChecksumSize();
+    length = len;
+    dataLength = length - checksumSize;
+    b = new byte[1];
+  }
+
+  /**
+   * Close the input stream.
+   */
+  @Override
+  public void close() throws IOException {
+    in.close();
+  }
+  
+  @Override
+  public long skip(long n) throws IOException {
+   throw new IOException("Skip not supported for IFileInputStream");
+  }
+  
+  public long getPosition() {
+    return (currentOffset >= dataLength) ? dataLength : currentOffset;
+  }
+  
+  public long getSize() {
+    return checksumSize;
+  }
+  
+  /**
+   * Read bytes from the stream.
+   * At EOF, checksum is validated, but the checksum
+   * bytes are not passed back in the buffer. 
+   */
+  public int read(byte[] b, int off, int len) throws IOException {
+
+    if (currentOffset >= dataLength) {
+      return -1;
+    }
+    
+    return doRead(b,off,len);
+  }
+
+  /**
+   * Read bytes from the stream.
+   * At EOF, checksum is validated and sent back
+   * as the last four bytes of the buffer. The caller should handle
+   * these bytes appropriately
+   */
+  public int readWithChecksum(byte[] b, int off, int len) throws IOException {
+
+    if (currentOffset == length) {
+      return -1;
+    }
+    else if (currentOffset >= dataLength) {
+      // If the previous read drained off all the data, then just return
+      // the checksum now. Note that checksum validation would have 
+      // happened in the earlier read
+      int lenToCopy = (int) (checksumSize - (currentOffset - dataLength));
+      if (len < lenToCopy) {
+        lenToCopy = len;
+      }
+      System.arraycopy(csum, (int) (currentOffset - dataLength), b, off, 
+          lenToCopy);
+      currentOffset += lenToCopy;
+      return lenToCopy;
+    }
+
+    int bytesRead = doRead(b,off,len);
+
+    if (currentOffset == dataLength) {
+      if (len >= bytesRead + checksumSize) {
+        System.arraycopy(csum, 0, b, off + bytesRead, checksumSize);
+        bytesRead += checksumSize;
+        currentOffset += checksumSize;
+      }
+    }
+    return bytesRead;
+  }
+
+  private int doRead(byte[]b, int off, int len) throws IOException {
+    
+    // If we are trying to read past the end of data, just read
+    // the left over data
+    if (currentOffset + len > dataLength) {
+      len = (int) dataLength - (int)currentOffset;
+    }
+    
+    int bytesRead = in.read(b, off, len);
+
+    if (bytesRead < 0) {
+      throw new ChecksumException("Checksum Error", 0);
+    }
+    
+    sum.update(b,off,bytesRead);
+
+    currentOffset += bytesRead;
+    
+    if (currentOffset == dataLength) {
+      // The last four bytes are checksum. Strip them and verify
+      csum = new byte[checksumSize];
+      IOUtils.readFully(in, csum, 0, checksumSize);
+      if (!sum.compare(csum, 0)) {
+        throw new ChecksumException("Checksum Error", 0);
+      }
+    }
+    return bytesRead;
+  }
+
+
+  @Override
+  public int read() throws IOException {    
+    b[0] = 0;
+    int l = read(b,0,1);
+    if (l < 0)  return l;
+    
+    // Upgrade the b[0] to an int so as not to misinterpret the
+    // first bit of the byte as a sign bit
+    int result = 0xFF & b[0];
+    return result;
+  }
+
+  public byte[] getChecksum() {
+    return csum;
+  }
+}

Added: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IFileOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IFileOutputStream.java?rev=693455&view=auto
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IFileOutputStream.java (added)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IFileOutputStream.java Tue Sep  9 06:11:05 2008
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import java.io.OutputStream; 
+import java.io.FilterOutputStream;
+
+import org.apache.hadoop.util.DataChecksum;
+/**
+ * A Checksum output stream.
+ * Checksum for the contents of the file is calculated and
+ * appended to the end of the file on close of the stream.
+ * Used for IFiles
+ */
+class IFileOutputStream extends FilterOutputStream {
+  /**
+   * The output stream to be checksummed. 
+   */
+  private final DataChecksum sum;
+  private byte[] barray;
+  private boolean closed = false;
+
+  /**
+   * Create a checksum output stream that writes
+   * the bytes to the given stream.
+   * @param out
+   */
+  public IFileOutputStream(OutputStream out) {
+    super(out);
+    sum = DataChecksum.newDataChecksum(DataChecksum.CHECKSUM_CRC32,
+        Integer.MAX_VALUE);
+    barray = new byte[sum.getChecksumSize()];
+  }
+  
+  @Override
+  public void close() throws IOException {
+    if (closed) {
+      return;
+    }
+    closed = true;
+    sum.writeValue(barray, 0, false);
+    out.write (barray, 0, sum.getChecksumSize());
+    out.flush();
+  }
+  
+  /**
+   * Write bytes to the stream.
+   */
+  @Override
+  public void write(byte[] b, int off, int len) throws IOException {
+    sum.update(b, off,len);
+    out.write(b,off,len);
+  }
+ 
+  @Override
+  public void write(int b) throws IOException {
+    barray[0] = (byte) (b & 0xFF);
+    write(barray,0,1);
+  }
+
+}

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTask.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTask.java?rev=693455&r1=693454&r2=693455&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTask.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTask.java Tue Sep  9 06:11:05 2008
@@ -40,6 +40,7 @@
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.DataInputBuffer;
@@ -425,7 +426,8 @@
     private final SpillThread spillThread = new SpillThread();
 
     private final FileSystem localFs;
-
+    private final FileSystem rfs;
+   
     private final Counters.Counter mapOutputByteCounter;
     private final Counters.Counter mapOutputRecordCounter;
     private final Counters.Counter combineInputCounter;
@@ -439,7 +441,10 @@
       localFs = FileSystem.getLocal(job);
       partitions = job.getNumReduceTasks();
       partitioner = ReflectionUtils.newInstance(job.getPartitionerClass(), job);
-      // sanity checks
+       
+      rfs = ((LocalFileSystem)localFs).getRaw();
+
+      //sanity checks
       final float spillper = job.getFloat("io.sort.spill.percent",(float)0.8);
       final float recper = job.getFloat("io.sort.record.percent",(float)0.05);
       final int sortmb = job.getInt("io.sort.mb", 100);
@@ -891,7 +896,7 @@
         // create spill file
         Path filename = mapOutputFile.getSpillFileForWrite(getTaskID(),
                                       numSpills, size);
-        out = localFs.create(filename);
+        out = rfs.create(filename);
         // create spill index
         Path indexFilename = mapOutputFile.getSpillIndexFileForWrite(
                              getTaskID(), numSpills,
@@ -972,7 +977,7 @@
         // create spill file
         Path filename = mapOutputFile.getSpillFileForWrite(getTaskID(),
                                       numSpills, size);
-        out = localFs.create(filename);
+        out = rfs.create(filename);
         // create spill index
         Path indexFilename = mapOutputFile.getSpillIndexFileForWrite(
                              getTaskID(), numSpills,
@@ -1107,15 +1112,15 @@
       for(int i = 0; i < numSpills; i++) {
         filename[i] = mapOutputFile.getSpillFile(getTaskID(), i);
         indexFileName[i] = mapOutputFile.getSpillIndexFile(getTaskID(), i);
-        finalOutFileSize += localFs.getFileStatus(filename[i]).getLen();
+        finalOutFileSize += rfs.getFileStatus(filename[i]).getLen();
       }
       
       if (numSpills == 1) { //the spill is the final output
-        localFs.rename(filename[0], 
-                       new Path(filename[0].getParent(), "file.out"));
-        localFs.rename(indexFileName[0], 
-                       new Path(indexFileName[0].getParent(),"file.out.index"));
-        return;
+    	  rfs.rename(filename[0],
+    			  new Path(filename[0].getParent(), "file.out"));
+    	  localFs.rename(indexFileName[0],
+    			  new Path(indexFileName[0].getParent(),"file.out.index"));
+    	  return;
       }
       //make correction in the length to include the sequence file header
       //lengths for each partition
@@ -1129,9 +1134,10 @@
                             getTaskID(), finalIndexFileSize);
       
       //The output stream for the final single output file
-      FSDataOutputStream finalOut = localFs.create(finalOutputFile, true, 
-                                                   4096);
-      
+
+      FSDataOutputStream finalOut = rfs.create(finalOutputFile, true,
+                                               4096);
+
       //The final index file output stream
       FSDataOutputStream finalIndexOut = localFs.create(finalIndexFile, true,
                                                         4096);
@@ -1160,8 +1166,9 @@
             long rawSegmentLength = indexIn.readLong();
             long segmentLength = indexIn.readLong();
             indexIn.close();
-            FSDataInputStream in = localFs.open(filename[i]);
+            FSDataInputStream in = rfs.open(filename[i]);
             in.seek(segmentOffset);
+
             Segment<K, V> s = 
               new Segment<K, V>(new Reader<K, V>(job, in, segmentLength, codec),
                                 true);
@@ -1176,7 +1183,7 @@
           //merge
           @SuppressWarnings("unchecked")
           RawKeyValueIterator kvIter = 
-            Merger.merge(job, localFs, 
+            Merger.merge(job, rfs,
                          keyClass, valClass,
                          segmentList, job.getInt("io.sort.factor", 100), 
                          new Path(getTaskID().toString()), 
@@ -1203,7 +1210,7 @@
         finalIndexOut.close();
         //cleanup
         for(int i = 0; i < numSpills; i++) {
-          localFs.delete(filename[i], true);
+          rfs.delete(filename[i],true);
           localFs.delete(indexFileName[i], true);
         }
       }
@@ -1223,7 +1230,7 @@
       //StringBuffer sb = new StringBuffer();
       indexOut.writeLong(start);
       indexOut.writeLong(writer.getRawLength());
-      long segmentLength = out.getPos() - start;
+      long segmentLength = writer.getCompressedLength();
       indexOut.writeLong(segmentLength);
       LOG.info("Index: (" + start + ", " + writer.getRawLength() + ", " + 
                segmentLength + ")");

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Merger.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Merger.java?rev=693455&r1=693454&r2=693455&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Merger.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Merger.java Tue Sep  9 06:11:05 2008
@@ -128,7 +128,10 @@
     DataInputBuffer getKey() { return key; }
     DataInputBuffer getValue() { return value; }
 
-    long getLength() { return segmentLength; }
+    long getLength() { 
+      return (reader == null) ?
+        segmentLength : reader.getLength();
+    }
     
     boolean next() throws IOException {
       return reader.next(key, value);

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java?rev=693455&r1=693454&r2=693455&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java Tue Sep  9 06:11:05 2008
@@ -24,6 +24,7 @@
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.lang.Math;
 import java.net.URI;
 import java.net.URL;
 import java.net.URLClassLoader;
@@ -335,7 +336,8 @@
     }
     
     FileSystem lfs = FileSystem.getLocal(job);
-    
+    FileSystem rfs = ((LocalFileSystem)lfs).getRaw();
+
     // Initialize the codec
     codec = initCodec();
 
@@ -362,7 +364,7 @@
     LOG.info("Initiating final on-disk merge with " + mapFiles.length + 
              " files");
     RawKeyValueIterator rIter = 
-      Merger.merge(job, lfs,
+      Merger.merge(job,rfs,
                    job.getMapOutputKeyClass(), job.getMapOutputValueClass(),
                    codec, mapFiles, !conf.getKeepFailedTaskFiles(), 
                    job.getInt("io.sort.factor", 100), tempDir, 
@@ -509,6 +511,7 @@
      */
     private FileSystem localFileSys;
 
+    private FileSystem rfs;
     /**
      * Number of files to merge at a time
      */
@@ -1215,13 +1218,16 @@
               compressedLength + " raw bytes) " + 
               "into RAM from " + mapOutputLoc.getTaskAttemptId());
 
-          mapOutput = shuffleInMemory(mapOutputLoc, connection, input, (int)decompressedLength);
+          mapOutput = shuffleInMemory(mapOutputLoc, connection, input,
+                                      (int)decompressedLength,
+                                      (int)compressedLength);
         } else {
           LOG.info("Shuffling " + decompressedLength + " bytes (" + 
               compressedLength + " raw bytes) " + 
               "into Local-FS from " + mapOutputLoc.getTaskAttemptId());
 
-          mapOutput = shuffleToDisk(mapOutputLoc, input, filename, compressedLength);
+          mapOutput = shuffleToDisk(mapOutputLoc, input, filename, 
+              compressedLength);
         }
             
         return mapOutput;
@@ -1266,7 +1272,8 @@
       private MapOutput shuffleInMemory(MapOutputLocation mapOutputLoc,
                                         URLConnection connection, 
                                         InputStream input,
-                                        int mapOutputLength) 
+                                        int mapOutputLength,
+                                        int compressedLength)
       throws IOException, InterruptedException {
         // Reserve ram for the map-output
         boolean createdNow = ramManager.reserve(mapOutputLength, input);
@@ -1289,6 +1296,11 @@
             throw ioe;
           }
         }
+
+        IFileInputStream checksumIn = 
+          new IFileInputStream(input,compressedLength);
+
+        input = checksumIn;       
       
         // Are map-outputs compressed?
         if (codec != null) {
@@ -1402,7 +1414,7 @@
         OutputStream output = null;
         long bytesRead = 0;
         try {
-          output = localFileSys.create(localFilename);
+          output = rfs.create(localFilename);
           
           byte[] buf = new byte[64 * 1024];
           int n = input.read(buf, 0, buf.length);
@@ -1541,7 +1553,9 @@
         (long)(MAX_INMEM_FILESYS_USE * ramManager.getMemoryLimit());
       
       localFileSys = FileSystem.getLocal(conf);
-      
+
+      rfs = ((LocalFileSystem)localFileSys).getRaw();
+
       // hosts -> next contact time
       this.penaltyBox = new LinkedHashMap<String, Long>();
       
@@ -2187,7 +2201,7 @@
                                              approxOutputSize, conf)
               .suffix(".merged");
             Writer writer = 
-              new Writer(conf, localFileSys, outputPath, 
+              new Writer(conf,rfs, outputPath, 
                          conf.getMapOutputKeyClass(), 
                          conf.getMapOutputValueClass(),
                          codec);
@@ -2195,7 +2209,7 @@
             Path tmpDir = new Path(reduceTask.getTaskID().toString());
             final Reporter reporter = getReporter(umbilical);
             try {
-              iter = Merger.merge(conf, localFileSys,
+              iter = Merger.merge(conf, rfs,
                                   conf.getMapOutputKeyClass(),
                                   conf.getMapOutputValueClass(),
                                   codec, mapFiles.toArray(new Path[mapFiles.size()]), 
@@ -2275,7 +2289,7 @@
                           reduceTask.getTaskID(), ramfsMergeOutputSize);
 
         Writer writer = 
-          new Writer(conf, localFileSys, outputPath,
+          new Writer(conf, rfs, outputPath,
                      conf.getMapOutputKeyClass(),
                      conf.getMapOutputValueClass(),
                      codec);
@@ -2289,7 +2303,7 @@
           LOG.info("Initiating in-memory merge with " + noInMemorySegments + 
                    " segments...");
           
-          rIter = Merger.merge(conf, localFileSys,
+          rIter = Merger.merge(conf, rfs,
                                (Class<K>)conf.getMapOutputKeyClass(),
                                (Class<V>)conf.getMapOutputValueClass(),
                                inMemorySegments, inMemorySegments.size(),

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java?rev=693455&r1=693454&r2=693455&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java Tue Sep  9 06:11:05 2008
@@ -59,6 +59,7 @@
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.WritableUtils;
@@ -2538,6 +2539,8 @@
       OutputStream outStream = null;
       FSDataInputStream indexIn = null;
       FSDataInputStream mapOutputIn = null;
+ 
+      IFileInputStream checksumInputStream = null;
       
       long totalRead = 0;
       ShuffleServerMetrics shuffleMetrics = (ShuffleServerMetrics)
@@ -2598,8 +2601,9 @@
          * send it to the reducer.
          */
         //open the map-output file
-        mapOutputIn = fileSys.open(mapOutputFileName);
-        
+        FileSystem rfs = ((LocalFileSystem)fileSys).getRaw();
+
+        mapOutputIn = rfs.open(mapOutputFileName);
         // TODO: Remove this after a 'fix' for HADOOP-3647
         // The clever trick here to reduce the impact of the extra seek for
         // logging the first key/value lengths is to read the lengths before
@@ -2618,8 +2622,9 @@
 
         //seek to the correct offset for the reduce
         mapOutputIn.seek(startOffset);
+        checksumInputStream = new IFileInputStream(mapOutputIn,partLength);
           
-        int len = mapOutputIn.read(buffer, 0,
+        int len = checksumInputStream.readWithChecksum(buffer, 0,
                                    partLength < MAX_BYTES_TO_READ 
                                    ? (int)partLength : MAX_BYTES_TO_READ);
         while (len > 0) {
@@ -2633,9 +2638,9 @@
           }
           totalRead += len;
           if (totalRead == partLength) break;
-          len = mapOutputIn.read(buffer, 0, 
-                                 (partLength - totalRead) < MAX_BYTES_TO_READ
-                                 ? (int)(partLength - totalRead) : MAX_BYTES_TO_READ);
+          len = checksumInputStream.readWithChecksum(buffer, 0, 
+                        (partLength - totalRead) < MAX_BYTES_TO_READ
+                          ? (int)(partLength - totalRead) : MAX_BYTES_TO_READ);
         }
         
         LOG.info("Sent out " + totalRead + " bytes for reduce: " + reduce + 
@@ -2660,8 +2665,9 @@
         if (indexIn != null) {
           indexIn.close();
         }
-        if (mapOutputIn != null) {
-          mapOutputIn.close();
+
+        if (checksumInputStream != null) {
+          checksumInputStream.close();
         }
         shuffleMetrics.serverHandlerFree();
         if (ClientTraceLog.isInfoEnabled()) {

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestReduceTask.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestReduceTask.java?rev=693455&r1=693454&r2=693455&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestReduceTask.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestReduceTask.java Tue Sep  9 06:11:05 2008
@@ -23,6 +23,7 @@
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.WritableComparator;
@@ -75,10 +76,11 @@
   public void runValueIterator(Path tmpDir, Pair[] vals, 
                                Configuration conf, 
                                CompressionCodec codec) throws IOException {
-    FileSystem fs = tmpDir.getFileSystem(conf);
+    FileSystem localFs = FileSystem.getLocal(conf);
+    FileSystem rfs = ((LocalFileSystem)localFs).getRaw();
     Path path = new Path(tmpDir, "data.in");
     IFile.Writer<Text, Text> writer = 
-      new IFile.Writer<Text, Text>(conf, fs, path, Text.class, Text.class, codec);
+      new IFile.Writer<Text, Text>(conf, rfs, path, Text.class, Text.class, codec);
     for(Pair p: vals) {
       writer.append(new Text(p.key), new Text(p.value));
     }
@@ -86,7 +88,7 @@
     
     @SuppressWarnings("unchecked")
     RawKeyValueIterator rawItr = 
-      Merger.merge(conf, fs, Text.class, Text.class, codec, new Path[]{path}, 
+      Merger.merge(conf, rfs, Text.class, Text.class, codec, new Path[]{path}, 
                    false, conf.getInt("io.sort.factor", 100), tmpDir, 
                    new Text.Comparator(), new NullProgress());
     @SuppressWarnings("unchecked") // WritableComparators are not generic