You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by sz...@apache.org on 2011/10/27 22:00:22 UTC

svn commit: r1189981 - in /hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs: ./ src/main/java/ src/main/java/org/apache/hadoop/hdfs/server/datanode/

Author: szetszwo
Date: Thu Oct 27 20:00:21 2011
New Revision: 1189981

URL: http://svn.apache.org/viewvc?rev=1189981&view=rev
Log:
svn merge -c 1177161 from trunk for HDFS-2371.

Modified:
    hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/   (props changed)
    hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
    hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/   (props changed)
    hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java
    hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
    hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
    hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java

Propchange: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Oct 27 20:00:21 2011
@@ -1,4 +1,4 @@
-/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs:1161777,1161781,1162188,1162421,1162491,1162499,1162613,1162928,1162954,1162979,1163050,1163069,1163081,1163490,1163768,1164255,1164301,1164339,1166402,1167383,1167662,1170085,1170379,1170459,1170996,1171136,1171297,1171379,1171611,1172916,1173402,1173468,1175113,1176178,1176550,1176719,1176729,1176733,1177100,1177487,1177531,1177859,1177864,1177905,1179169,1179856,1179861,1180757,1183081,1183098,1183175,1183554,1186508,1187140,1189028,1189355,1189360,1189546,1189932
+/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs:1161777,1161781,1162188,1162421,1162491,1162499,1162613,1162928,1162954,1162979,1163050,1163069,1163081,1163490,1163768,1164255,1164301,1164339,1166402,1167383,1167662,1170085,1170379,1170459,1170996,1171136,1171297,1171379,1171611,1172916,1173402,1173468,1175113,1176178,1176550,1176719,1176729,1176733,1177100,1177161,1177487,1177531,1177859,1177864,1177905,1179169,1179856,1179861,1180757,1183081,1183098,1183175,1183554,1186508,1187140,1189028,1189355,1189360,1189546,1189932
 /hadoop/core/branches/branch-0.19/hdfs:713112
 /hadoop/hdfs/branches/HDFS-1052:987665-1095512
 /hadoop/hdfs/branches/HDFS-265:796829-820463

Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1189981&r1=1189980&r2=1189981&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Thu Oct 27 20:00:21 2011
@@ -742,6 +742,8 @@ Release 0.23.0 - Unreleased
     HDFS-2355. Federation: enable using the same configuration file across 
     all the nodes in the cluster. (suresh)
 
+    HDFS-2371. Refactor BlockSender.java for better readability. (suresh)
+
   OPTIMIZATIONS
 
     HDFS-1458. Improve checkpoint performance by avoiding unnecessary image

Propchange: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Oct 27 20:00:21 2011
@@ -1,4 +1,4 @@
-/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java:1161777,1161781,1162188,1162421,1162491,1162499,1162613,1162928,1162954,1162979,1163050,1163069,1163081,1163490,1163768,1164255,1164301,1164339,1166402,1167383,1167662,1170085,1170379,1170459,1170996,1171136,1171297,1171379,1171611,1172916,1173402,1173468,1175113,1176178,1176550,1176719,1176729,1176733,1177100,1177487,1177531,1177859,1177864,1177905,1179169,1179856,1179861,1180757,1183081,1183098,1183175,1183554,1186508,1187140,1189028,1189355,1189360,1189546,1189932
+/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java:1161777,1161781,1162188,1162421,1162491,1162499,1162613,1162928,1162954,1162979,1163050,1163069,1163081,1163490,1163768,1164255,1164301,1164339,1166402,1167383,1167662,1170085,1170379,1170459,1170996,1171136,1171297,1171379,1171611,1172916,1173402,1173468,1175113,1176178,1176550,1176719,1176729,1176733,1177100,1177161,1177487,1177531,1177859,1177864,1177905,1179169,1179856,1179861,1180757,1183081,1183098,1183175,1183554,1186508,1187140,1189028,1189355,1189360,1189546,1189932
 /hadoop/core/branches/branch-0.19/hdfs/src/java:713112
 /hadoop/core/trunk/src/hdfs:776175-785643,785929-786278
 /hadoop/hdfs/branches/HDFS-1052/src/java:987665-1095512

Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java?rev=1189981&r1=1189980&r2=1189981&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java Thu Oct 27 20:00:21 2011
@@ -404,7 +404,7 @@ class BlockPoolSliceScanner {
         adjustThrottler();
         
         blockSender = new BlockSender(block, 0, -1, false, false, true,
-            datanode);
+            datanode, null);
 
         DataOutputStream out = 
                 new DataOutputStream(new IOUtils.NullOutputStream());

Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java?rev=1189981&r1=1189980&r2=1189981&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java Thu Oct 27 20:00:21 2011
@@ -41,191 +41,230 @@ import org.apache.hadoop.util.DataChecks
 
 /**
  * Reads a block from the disk and sends it to a recipient.
+ * 
+ * Data sent from the BlockeSender in the following format:
+ * <br><b>Data format:</b> <pre>
+ *    +--------------------------------------------------+
+ *    | ChecksumHeader | Sequence of data PACKETS...     |
+ *    +--------------------------------------------------+ 
+ * </pre>   
+ * <b>ChecksumHeader format:</b> <pre>
+ *    +--------------------------------------------------+
+ *    | 1 byte CHECKSUM_TYPE | 4 byte BYTES_PER_CHECKSUM |
+ *    +--------------------------------------------------+ 
+ * </pre>   
+ * An empty packet is sent to mark the end of block and read completion.
+ * 
+ *  PACKET Contains a packet header, checksum and data. Amount of data
+ *  carried is set by BUFFER_SIZE.
+ *  <pre>
+ *    +-----------------------------------------------------+
+ *    | 4 byte packet length (excluding packet header)      |
+ *    +-----------------------------------------------------+
+ *    | 8 byte offset in the block | 8 byte sequence number |
+ *    +-----------------------------------------------------+
+ *    | 1 byte isLastPacketInBlock                          |
+ *    +-----------------------------------------------------+
+ *    | 4 byte Length of actual data                        |
+ *    +-----------------------------------------------------+
+ *    | x byte checksum data. x is defined below            |
+ *    +-----------------------------------------------------+
+ *    | actual data ......                                  |
+ *    +-----------------------------------------------------+
+ *    
+ *    Data is made of Chunks. Each chunk is of length <= BYTES_PER_CHECKSUM.
+ *    A checksum is calculated for each chunk.
+ *    
+ *    x = (length of data + BYTE_PER_CHECKSUM - 1)/BYTES_PER_CHECKSUM *
+ *        CHECKSUM_SIZE
+ *        
+ *    CHECKSUM_SIZE depends on CHECKSUM_TYPE (usually, 4 for CRC32) 
+ *    </pre>
+ *  
+ *  The client reads data until it receives a packet with 
+ *  "LastPacketInBlock" set to true or with a zero length. If there is 
+ *  no checksum error, it replies to DataNode with OP_STATUS_CHECKSUM_OK: 
+ *  <pre>
+ *    +------------------------------+
+ *    | 2 byte OP_STATUS_CHECKSUM_OK |
+ *    +------------------------------+
+ *  </pre>
  */
 class BlockSender implements java.io.Closeable {
-  public static final Log LOG = DataNode.LOG;
+  static final Log LOG = DataNode.LOG;
   static final Log ClientTraceLog = DataNode.ClientTraceLog;
-  
-  private ExtendedBlock block; // the block to read from
-
-  /** the replica to read from */
-  private final Replica replica;
-  /** The visible length of a replica. */
-  private final long replicaVisibleLength;
-
-  private static final boolean is32Bit = System.getProperty("sun.arch.data.model").equals("32");
-
-  private InputStream blockIn; // data stream
-  private long blockInPosition = -1; // updated while using transferTo().
-  private DataInputStream checksumIn; // checksum datastream
-  private DataChecksum checksum; // checksum stream
-  private long offset; // starting position to read
-  private long endOffset; // ending position
-  private int bytesPerChecksum; // chunk size
-  private int checksumSize; // checksum size
-  private boolean corruptChecksumOk; // if need to verify checksum
-  private boolean chunkOffsetOK; // if need to send chunk offset
-  private long seqno; // sequence number of packet
-
-  private boolean transferToAllowed = true;
-  // set once entire requested byte range has been sent to the client
-  private boolean sentEntireByteRange;
-  private boolean verifyChecksum; //if true, check is verified while reading
-  private DataTransferThrottler throttler;
-  private final String clientTraceFmt; // format of client trace log message
-
+  private static final boolean is32Bit = 
+      System.getProperty("sun.arch.data.model").equals("32");
   /**
    * Minimum buffer used while sending data to clients. Used only if
    * transferTo() is enabled. 64KB is not that large. It could be larger, but
    * not sure if there will be much more improvement.
    */
   private static final int MIN_BUFFER_WITH_TRANSFERTO = 64*1024;
+  private static final int TRANSFERTO_BUFFER_SIZE = Math.max(
+      HdfsConstants.IO_FILE_BUFFER_SIZE, MIN_BUFFER_WITH_TRANSFERTO);
+  
+  /** the block to read from */
+  private final ExtendedBlock block;
+  /** the replica to read from */
+  private final Replica replica;
+  /** The visible length of a replica. */
+  private final long replicaVisibleLength;
+  /** Stream to read block data from */
+  private InputStream blockIn;
+  /** updated while using transferTo() */
+  private long blockInPosition = -1;
+  /** Stream to read checksum */
+  private DataInputStream checksumIn;
+  /** Checksum utility */
+  private final DataChecksum checksum;
+  /** Starting position to read */
+  private long offset;
+  /** Position of last byte to read from block file */
+  private final long endOffset;
+  /** Number of bytes in chunk used for computing checksum */
+  private final int chunkSize;
+  /** Number bytes of checksum computed for a chunk */
+  private final int checksumSize;
+  /** If true, failure to read checksum is ignored */
+  private final boolean corruptChecksumOk;
+  /** true if chunk offset is needed to be sent in Checksum header */
+  private final boolean chunkOffsetOK;
+  /** Sequence number of packet being sent */
+  private long seqno;
+  /** Set to true if transferTo is allowed for sending data to the client */
+  private final boolean transferToAllowed;
+  /** Set to true once entire requested byte range has been sent to the client */
+  private boolean sentEntireByteRange;
+  /** When true, verify checksum while reading from checksum file */
+  private final boolean verifyChecksum;
+  /** Format used to print client trace log messages */
+  private final String clientTraceFmt;
   private volatile ChunkChecksum lastChunkChecksum = null;
-
   
-  BlockSender(ExtendedBlock block, long startOffset, long length,
-              boolean corruptChecksumOk, boolean chunkOffsetOK,
-              boolean verifyChecksum, DataNode datanode) throws IOException {
-    this(block, startOffset, length, corruptChecksumOk, chunkOffsetOK,
-         verifyChecksum, datanode, null);
-  }
-
+  /**
+   * Constructor
+   * 
+   * @param block Block that is being read
+   * @param startOffset starting offset to read from
+   * @param length length of data to read
+   * @param corruptChecksumOk
+   * @param chunkOffsetOK need to send check offset in checksum header
+   * @param verifyChecksum verify checksum while reading the data
+   * @param datanode datanode from which the block is being read
+   * @param clientTraceFmt format string used to print client trace logs
+   * @throws IOException
+   */
   BlockSender(ExtendedBlock block, long startOffset, long length,
               boolean corruptChecksumOk, boolean chunkOffsetOK,
               boolean verifyChecksum, DataNode datanode, String clientTraceFmt)
       throws IOException {
     try {
       this.block = block;
+      this.chunkOffsetOK = chunkOffsetOK;
+      this.corruptChecksumOk = corruptChecksumOk;
+      this.verifyChecksum = verifyChecksum;
+      this.clientTraceFmt = clientTraceFmt;
+      
       synchronized(datanode.data) { 
-        this.replica = datanode.data.getReplica(block.getBlockPoolId(), 
-            block.getBlockId());
-        if (replica == null) {
-          throw new ReplicaNotFoundException(block);
-        }
+        this.replica = getReplica(block, datanode);
         this.replicaVisibleLength = replica.getVisibleLength();
       }
-      long minEndOffset = startOffset + length;
-      // if this is a write in progress
+      // if there is a write in progress
       ChunkChecksum chunkChecksum = null;
       if (replica instanceof ReplicaBeingWritten) {
-        for (int i = 0; i < 30 && replica.getBytesOnDisk() < minEndOffset; i++) {
-          try {
-            Thread.sleep(100);
-          } catch (InterruptedException ie) {
-            throw new IOException(ie);
-          }
-        }
-
-        long currentBytesOnDisk = replica.getBytesOnDisk();
-        
-        if (currentBytesOnDisk < minEndOffset) {
-          throw new IOException(String.format(
-            "need %d bytes, but only %d bytes available",
-            minEndOffset,
-            currentBytesOnDisk
-          ));
-        }
-
+        long minEndOffset = startOffset + length;
+        waitForMinLength((ReplicaBeingWritten)replica, minEndOffset);
         ReplicaInPipeline rip = (ReplicaInPipeline) replica;
         chunkChecksum = rip.getLastChecksumAndDataLen();
       }
 
       if (replica.getGenerationStamp() < block.getGenerationStamp()) {
-        throw new IOException(
-            "replica.getGenerationStamp() < block.getGenerationStamp(), block="
+        throw new IOException("Replica gen stamp < block genstamp, block="
             + block + ", replica=" + replica);
       }
       if (replicaVisibleLength < 0) {
-        throw new IOException("The replica is not readable, block="
+        throw new IOException("Replica is not readable, block="
             + block + ", replica=" + replica);
       }
       if (DataNode.LOG.isDebugEnabled()) {
         DataNode.LOG.debug("block=" + block + ", replica=" + replica);
       }
-      
-      this.chunkOffsetOK = chunkOffsetOK;
-      this.corruptChecksumOk = corruptChecksumOk;
-      this.verifyChecksum = verifyChecksum;
 
       // transferToFully() fails on 32 bit platforms for block sizes >= 2GB,
       // use normal transfer in those cases
       this.transferToAllowed = datanode.transferToAllowed &&
-        (!is32Bit || length < (long) Integer.MAX_VALUE);
-      this.clientTraceFmt = clientTraceFmt;
+        (!is32Bit || length <= Integer.MAX_VALUE);
 
-      if ( !corruptChecksumOk || datanode.data.metaFileExists(block) ) {
+      DataChecksum csum;
+      if (!corruptChecksumOk || datanode.data.metaFileExists(block)) {
         checksumIn = new DataInputStream(new BufferedInputStream(datanode.data
             .getMetaDataInputStream(block), HdfsConstants.IO_FILE_BUFFER_SIZE));
 
         // read and handle the common header here. For now just a version
-       BlockMetadataHeader header = BlockMetadataHeader.readHeader(checksumIn);
-       short version = header.getVersion();
-
+        BlockMetadataHeader header = BlockMetadataHeader.readHeader(checksumIn);
+        short version = header.getVersion();
         if (version != FSDataset.METADATA_VERSION) {
           LOG.warn("Wrong version (" + version + ") for metadata file for "
               + block + " ignoring ...");
         }
-        checksum = header.getChecksum();
+        csum = header.getChecksum();
       } else {
         LOG.warn("Could not find metadata file for " + block);
         // This only decides the buffer size. Use BUFFER_SIZE?
-        checksum = DataChecksum.newDataChecksum(DataChecksum.CHECKSUM_NULL,
+        csum = DataChecksum.newDataChecksum(DataChecksum.CHECKSUM_NULL,
             16 * 1024);
       }
 
-      /* If bytesPerChecksum is very large, then the metadata file
-       * is mostly corrupted. For now just truncate bytesPerchecksum to
-       * blockLength.
-       */        
-      bytesPerChecksum = checksum.getBytesPerChecksum();
-      if (bytesPerChecksum > 10*1024*1024 && bytesPerChecksum > replicaVisibleLength) {
-        checksum = DataChecksum.newDataChecksum(checksum.getChecksumType(),
+      /*
+       * If chunkSize is very large, then the metadata file is mostly
+       * corrupted. For now just truncate bytesPerchecksum to blockLength.
+       */       
+      int size = csum.getBytesPerChecksum();
+      if (size > 10*1024*1024 && size > replicaVisibleLength) {
+        csum = DataChecksum.newDataChecksum(csum.getChecksumType(),
             Math.max((int)replicaVisibleLength, 10*1024*1024));
-        bytesPerChecksum = checksum.getBytesPerChecksum();        
+        size = csum.getBytesPerChecksum();        
       }
+      chunkSize = size;
+      checksum = csum;
       checksumSize = checksum.getChecksumSize();
-
-      if (length < 0) {
-        length = replicaVisibleLength;
-      }
+      length = length < 0 ? replicaVisibleLength : length;
 
       // end is either last byte on disk or the length for which we have a 
       // checksum
-      if (chunkChecksum != null) {
-        endOffset = chunkChecksum.getDataLength();
-      } else {
-        endOffset = replica.getBytesOnDisk();
-      }
-      
-      if (startOffset < 0 || startOffset > endOffset
-          || (length + startOffset) > endOffset) {
+      long end = chunkChecksum != null ? chunkChecksum.getDataLength()
+          : replica.getBytesOnDisk();
+      if (startOffset < 0 || startOffset > end
+          || (length + startOffset) > end) {
         String msg = " Offset " + startOffset + " and length " + length
-        + " don't match block " + block + " ( blockLen " + endOffset + " )";
+        + " don't match block " + block + " ( blockLen " + end + " )";
         LOG.warn(datanode.getDNRegistrationForBP(block.getBlockPoolId()) +
             ":sendBlock() : " + msg);
         throw new IOException(msg);
       }
       
-      offset = (startOffset - (startOffset % bytesPerChecksum));
+      // Ensure read offset is position at the beginning of chunk
+      offset = startOffset - (startOffset % chunkSize);
       if (length >= 0) {
-        // Make sure endOffset points to end of a checksumed chunk.
+        // Ensure endOffset points to end of chunk.
         long tmpLen = startOffset + length;
-        if (tmpLen % bytesPerChecksum != 0) {
-          tmpLen += (bytesPerChecksum - tmpLen % bytesPerChecksum);
+        if (tmpLen % chunkSize != 0) {
+          tmpLen += (chunkSize - tmpLen % chunkSize);
         }
-        if (tmpLen < endOffset) {
+        if (tmpLen < end) {
           // will use on-disk checksum here since the end is a stable chunk
-          endOffset = tmpLen;
+          end = tmpLen;
         } else if (chunkChecksum != null) {
-          //in last chunk which is changing. flag that we need to use in-memory 
-          // checksum 
+          // last chunk is changing. flag that we need to use in-memory checksum 
           this.lastChunkChecksum = chunkChecksum;
         }
       }
+      endOffset = end;
 
       // seek to the right offsets
       if (offset > 0) {
-        long checksumSkip = (offset / bytesPerChecksum) * checksumSize;
+        long checksumSkip = (offset / chunkSize) * checksumSize;
         // note blockInStream is seeked when created below
         if (checksumSkip > 0) {
           // Should we use seek() for checksum file as well?
@@ -237,7 +276,6 @@ class BlockSender implements java.io.Clo
       if (DataNode.LOG.isDebugEnabled()) {
         DataNode.LOG.debug("replica=" + replica);
       }
-
       blockIn = datanode.data.getBlockInputStream(block, offset); // seek to offset
     } catch (IOException ioe) {
       IOUtils.closeStream(this);
@@ -251,19 +289,17 @@ class BlockSender implements java.io.Clo
    */
   public void close() throws IOException {
     IOException ioe = null;
-    // close checksum file
     if(checksumIn!=null) {
       try {
-        checksumIn.close();
+        checksumIn.close(); // close checksum file
       } catch (IOException e) {
         ioe = e;
       }
       checksumIn = null;
-    }
-    // close data file
+    }   
     if(blockIn!=null) {
       try {
-        blockIn.close();
+        blockIn.close(); // close data file
       } catch (IOException e) {
         ioe = e;
       }
@@ -274,7 +310,41 @@ class BlockSender implements java.io.Clo
       throw ioe;
     }
   }
-
+  
+  private static Replica getReplica(ExtendedBlock block, DataNode datanode)
+      throws ReplicaNotFoundException {
+    Replica replica = datanode.data.getReplica(block.getBlockPoolId(),
+        block.getBlockId());
+    if (replica == null) {
+      throw new ReplicaNotFoundException(block);
+    }
+    return replica;
+  }
+  
+  /**
+   * Wait for rbw replica to reach the length
+   * @param rbw replica that is being written to
+   * @param len minimum length to reach
+   * @throws IOException on failing to reach the len in given wait time
+   */
+  private static void waitForMinLength(ReplicaBeingWritten rbw, long len)
+      throws IOException {
+    // Wait for 3 seconds for rbw replica to reach the minimum length
+    for (int i = 0; i < 30 && rbw.getBytesOnDisk() < len; i++) {
+      try {
+        Thread.sleep(100);
+      } catch (InterruptedException ie) {
+        throw new IOException(ie);
+      }
+    }
+    long bytesOnDisk = rbw.getBytesOnDisk();
+    if (bytesOnDisk < len) {
+      throw new IOException(
+          String.format("Need %d bytes, but only %d bytes available", len,
+              bytesOnDisk));
+    }
+  }
+  
   /**
    * Converts an IOExcpetion (not subclasses) to SocketException.
    * This is typically done to indicate to upper layers that the error 
@@ -296,54 +366,43 @@ class BlockSender implements java.io.Clo
   }
 
   /**
-   * Sends upto maxChunks chunks of data.
+   * @param datalen Length of data 
+   * @return number of chunks for data of given size
+   */
+  private int numberOfChunks(long datalen) {
+    return (int) ((datalen + chunkSize - 1)/chunkSize);
+  }
+  
+  /**
+   * Sends a packet with up to maxChunks chunks of data.
    * 
-   * When blockInPosition is >= 0, assumes 'out' is a 
-   * {@link SocketOutputStream} and tries 
-   * {@link SocketOutputStream#transferToFully(FileChannel, long, int)} to
-   * send data (and updates blockInPosition).
-   */
-  private int sendChunks(ByteBuffer pkt, int maxChunks, OutputStream out) 
-                         throws IOException {
-    // Sends multiple chunks in one packet with a single write().
-
-    int len = (int) Math.min(endOffset - offset,
-                             (((long) bytesPerChecksum) * ((long) maxChunks)));
-    int numChunks = (len + bytesPerChecksum - 1)/bytesPerChecksum;
-    int packetLen = len + numChunks*checksumSize + 4;
-    boolean lastDataPacket = offset + len == endOffset && len > 0;
-    pkt.clear();
-
+   * @param pkt buffer used for writing packet data
+   * @param maxChunks maximum number of chunks to send
+   * @param out stream to send data to
+   * @param transferTo use transferTo to send data
+   * @param throttler used for throttling data transfer bandwidth
+   */
+  private int sendPacket(ByteBuffer pkt, int maxChunks, OutputStream out,
+      boolean transferTo, DataTransferThrottler throttler) throws IOException {
+    int dataLen = (int) Math.min(endOffset - offset,
+                             (chunkSize * (long) maxChunks));
+    
+    int numChunks = numberOfChunks(dataLen); // Number of chunks be sent in the packet
+    int checksumDataLen = numChunks * checksumSize;
+    int packetLen = dataLen + checksumDataLen + 4;
+    boolean lastDataPacket = offset + dataLen == endOffset && dataLen > 0;
 
-    PacketHeader header = new PacketHeader(
-      packetLen, offset, seqno, (len == 0), len);
-    header.putInBuffer(pkt);
+    writePacketHeader(pkt, dataLen, packetLen);
 
     int checksumOff = pkt.position();
-    int checksumLen = numChunks * checksumSize;
     byte[] buf = pkt.array();
     
     if (checksumSize > 0 && checksumIn != null) {
-      try {
-        checksumIn.readFully(buf, checksumOff, checksumLen);
-      } catch (IOException e) {
-        LOG.warn(" Could not read or failed to veirfy checksum for data"
-            + " at offset " + offset + " for block " + block, e);
-        IOUtils.closeStream(checksumIn);
-        checksumIn = null;
-        if (corruptChecksumOk) {
-          if (checksumOff < checksumLen) {
-            // Just fill the array with zeros.
-            Arrays.fill(buf, checksumOff, checksumLen, (byte) 0);
-          }
-        } else {
-          throw e;
-        }
-      }
+      readChecksum(buf, checksumOff, checksumDataLen);
 
       // write in progress that we need to use to get last checksum
       if (lastDataPacket && lastChunkChecksum != null) {
-        int start = checksumOff + checksumLen - checksumSize;
+        int start = checksumOff + checksumDataLen - checksumSize;
         byte[] updatedChecksum = lastChunkChecksum.getChecksum();
         
         if (updatedChecksum != null) {
@@ -352,52 +411,28 @@ class BlockSender implements java.io.Clo
       }
     }
     
-    int dataOff = checksumOff + checksumLen;
-    
-    if (blockInPosition < 0) {
-      //normal transfer
-      IOUtils.readFully(blockIn, buf, dataOff, len);
+    int dataOff = checksumOff + checksumDataLen;
+    if (!transferTo) { // normal transfer
+      IOUtils.readFully(blockIn, buf, dataOff, dataLen);
 
       if (verifyChecksum) {
-        int dOff = dataOff;
-        int cOff = checksumOff;
-        int dLeft = len;
-
-        for (int i=0; i<numChunks; i++) {
-          checksum.reset();
-          int dLen = Math.min(dLeft, bytesPerChecksum);
-          checksum.update(buf, dOff, dLen);
-          if (!checksum.compare(buf, cOff)) {
-            long failedPos = offset + len -dLeft;
-            throw new ChecksumException("Checksum failed at " + 
-                                        failedPos, failedPos);
-          }
-          dLeft -= dLen;
-          dOff += dLen;
-          cOff += checksumSize;
-        }
+        verifyChecksum(buf, dataOff, dataLen, numChunks, checksumOff);
       }
-      //writing is done below (mainly to handle IOException)
     }
     
     try {
-      if (blockInPosition >= 0) {
-        //use transferTo(). Checks on out and blockIn are already done. 
-
+      if (transferTo) {
         SocketOutputStream sockOut = (SocketOutputStream)out;
-        //first write the packet
-        sockOut.write(buf, 0, dataOff);
+        sockOut.write(buf, 0, dataOff); // First write checksum
+        
         // no need to flush. since we know out is not a buffered stream. 
-
         sockOut.transferToFully(((FileInputStream)blockIn).getChannel(), 
-                                blockInPosition, len);
-
-        blockInPosition += len;
-      } else {
+                                blockInPosition, dataLen);
+        blockInPosition += dataLen;
+      } else { 
         // normal transfer
-        out.write(buf, 0, dataOff + len);
+        out.write(buf, 0, dataOff + dataLen);
       }
-      
     } catch (IOException e) {
       /* Exception while writing to the client. Connection closure from
        * the other end is mostly the case and we do not care much about
@@ -419,9 +454,72 @@ class BlockSender implements java.io.Clo
       throttler.throttle(packetLen);
     }
 
-    return len;
+    return dataLen;
   }
-
+  
+  /**
+   * Read checksum into given buffer
+   * @param buf buffer to read the checksum into
+   * @param checksumOffset offset at which to write the checksum into buf
+   * @param checksumLen length of checksum to write
+   * @throws IOException on error
+   */
+  private void readChecksum(byte[] buf, final int checksumOffset,
+      final int checksumLen) throws IOException {
+    if (checksumSize <= 0 && checksumIn == null) {
+      return;
+    }
+    try {
+      checksumIn.readFully(buf, checksumOffset, checksumLen);
+    } catch (IOException e) {
+      LOG.warn(" Could not read or failed to veirfy checksum for data"
+          + " at offset " + offset + " for block " + block, e);
+      IOUtils.closeStream(checksumIn);
+      checksumIn = null;
+      if (corruptChecksumOk) {
+        if (checksumOffset < checksumLen) {
+          // Just fill the array with zeros.
+          Arrays.fill(buf, checksumOffset, checksumLen, (byte) 0);
+        }
+      } else {
+        throw e;
+      }
+    }
+  }
+  
+  /**
+   * Compute checksum for chunks and verify the checksum that is read from
+   * the metadata file is correct.
+   * 
+   * @param buf buffer that has checksum and data
+   * @param dataOffset position where data is written in the buf
+   * @param datalen length of data
+   * @param numChunks number of chunks corresponding to data
+   * @param checksumOffset offset where checksum is written in the buf
+   * @throws ChecksumException on failed checksum verification
+   */
+  public void verifyChecksum(final byte[] buf, final int dataOffset,
+      final int datalen, final int numChunks, final int checksumOffset)
+      throws ChecksumException {
+    int dOff = dataOffset;
+    int cOff = checksumOffset;
+    int dLeft = datalen;
+
+    for (int i = 0; i < numChunks; i++) {
+      checksum.reset();
+      int dLen = Math.min(dLeft, chunkSize);
+      checksum.update(buf, dOff, dLen);
+      if (!checksum.compare(buf, cOff)) {
+        long failedPos = offset + datalen - dLeft;
+        throw new ChecksumException("Checksum failed at " + failedPos,
+            failedPos);
+      }
+      dLeft -= dLen;
+      dOff += dLen;
+      cOff += checksumSize;
+    }
+  }
+  
   /**
    * sendBlock() is used to read block and its metadata and stream the data to
    * either a client or to another datanode. 
@@ -433,70 +531,54 @@ class BlockSender implements java.io.Clo
    *        {@link SocketOutputStream#transferToFully(FileChannel, 
    *        long, int)}.
    * @param throttler for sending data.
-   * @return total bytes reads, including crc.
+   * @return total bytes read, including checksum data.
    */
   long sendBlock(DataOutputStream out, OutputStream baseStream, 
                  DataTransferThrottler throttler) throws IOException {
-    if( out == null ) {
+    if (out == null) {
       throw new IOException( "out stream is null" );
     }
-    this.throttler = throttler;
-
-    long initialOffset = offset;
+    final long initialOffset = offset;
     long totalRead = 0;
     OutputStream streamForSendChunks = out;
     
     final long startTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0;
     try {
-      try {
-        checksum.writeHeader(out);
-        if ( chunkOffsetOK ) {
-          out.writeLong( offset );
-        }
-        out.flush();
-      } catch (IOException e) { //socket error
-        throw ioeToSocketException(e);
-      }
+      writeChecksumHeader(out);
       
       int maxChunksPerPacket;
       int pktSize = PacketHeader.PKT_HEADER_LEN;
-      
-      if (transferToAllowed && !verifyChecksum && 
-          baseStream instanceof SocketOutputStream && 
-          blockIn instanceof FileInputStream) {
-        
+      boolean transferTo = transferToAllowed && !verifyChecksum
+          && baseStream instanceof SocketOutputStream
+          && blockIn instanceof FileInputStream;
+      if (transferTo) {
         FileChannel fileChannel = ((FileInputStream)blockIn).getChannel();
-        
-        // blockInPosition also indicates sendChunks() uses transferTo.
         blockInPosition = fileChannel.position();
         streamForSendChunks = baseStream;
+        maxChunksPerPacket = numberOfChunks(TRANSFERTO_BUFFER_SIZE);
         
-        // assure a mininum buffer size.
-        maxChunksPerPacket = (Math.max(HdfsConstants.IO_FILE_BUFFER_SIZE, 
-                                       MIN_BUFFER_WITH_TRANSFERTO)
-                              + bytesPerChecksum - 1)/bytesPerChecksum;
-        
-        // allocate smaller buffer while using transferTo(). 
+        // Smaller packet size to only hold checksum when doing transferTo
         pktSize += checksumSize * maxChunksPerPacket;
       } else {
-        maxChunksPerPacket = Math.max(1, (HdfsConstants.IO_FILE_BUFFER_SIZE
-            + bytesPerChecksum - 1) / bytesPerChecksum);
-        pktSize += (bytesPerChecksum + checksumSize) * maxChunksPerPacket;
+        maxChunksPerPacket = Math.max(1,
+            numberOfChunks(HdfsConstants.IO_FILE_BUFFER_SIZE));
+        // Packet size includes both checksum and data
+        pktSize += (chunkSize + checksumSize) * maxChunksPerPacket;
       }
 
       ByteBuffer pktBuf = ByteBuffer.allocate(pktSize);
 
       while (endOffset > offset) {
-        long len = sendChunks(pktBuf, maxChunksPerPacket, 
-                              streamForSendChunks);
+        long len = sendPacket(pktBuf, maxChunksPerPacket, streamForSendChunks,
+            transferTo, throttler);
         offset += len;
-        totalRead += len + ((len + bytesPerChecksum - 1)/bytesPerChecksum*
-                            checksumSize);
+        totalRead += len + (numberOfChunks(len) * checksumSize);
         seqno++;
       }
       try {
         // send an empty packet to mark the end of the block
-        sendChunks(pktBuf, maxChunksPerPacket, streamForSendChunks);        
+        sendPacket(pktBuf, maxChunksPerPacket, streamForSendChunks, transferTo,
+            throttler);
         out.flush();
       } catch (IOException e) { //socket error
         throw ioeToSocketException(e);
@@ -506,14 +588,39 @@ class BlockSender implements java.io.Clo
     } finally {
       if (clientTraceFmt != null) {
         final long endTime = System.nanoTime();
-        ClientTraceLog.info(String.format(clientTraceFmt, totalRead, initialOffset, endTime - startTime));
+        ClientTraceLog.info(String.format(clientTraceFmt, totalRead,
+            initialOffset, endTime - startTime));
       }
       close();
     }
-
     return totalRead;
   }
   
+  /**
+   * Write checksum header to the output stream
+   */
+  private void writeChecksumHeader(DataOutputStream out) throws IOException {
+    try {
+      checksum.writeHeader(out);
+      if (chunkOffsetOK) {
+        out.writeLong(offset);
+      }
+      out.flush();
+    } catch (IOException e) { //socket error
+      throw ioeToSocketException(e);
+    }
+  }
+    
+  /**
+   * Write packet header into {@code pkt}
+   */
+  private void writePacketHeader(ByteBuffer pkt, int dataLen, int packetLen) {
+    pkt.clear();
+    PacketHeader header = new PacketHeader(packetLen, offset, seqno,
+        (dataLen == 0), dataLen);
+    header.putInBuffer(pkt);
+  }
+  
   boolean didSendEntireByteRange() {
     return sentEntireByteRange;
   }

Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=1189981&r1=1189980&r2=1189981&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java Thu Oct 27 20:00:21 2011
@@ -2026,7 +2026,7 @@ public class DataNode extends Configured
         out = new DataOutputStream(new BufferedOutputStream(baseStream,
             HdfsConstants.SMALL_BUFFER_SIZE));
         blockSender = new BlockSender(b, 0, b.getNumBytes(), 
-            false, false, false, DataNode.this);
+            false, false, false, DataNode.this, null);
         DatanodeInfo srcNode = new DatanodeInfo(bpReg);
 
         //

Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java?rev=1189981&r1=1189980&r2=1189981&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java Thu Oct 27 20:00:21 2011
@@ -597,7 +597,7 @@ class DataXceiver extends Receiver imple
     try {
       // check if the block exists or not
       blockSender = new BlockSender(block, 0, -1, false, false, false, 
-          datanode);
+          datanode, null);
 
       // set up response stream
       OutputStream baseStream = NetUtils.getOutputStream(