You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by at...@apache.org on 2012/08/09 23:33:21 UTC

svn commit: r1371496 - in /hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs: ./ src/main/java/org/apache/hadoop/hdfs/ src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/ src/main/java/org/apache/hadoop/hdfs/server/datanode/

Author: atm
Date: Thu Aug  9 21:33:20 2012
New Revision: 1371496

URL: http://svn.apache.org/viewvc?rev=1371496&view=rev
Log:
HDFS-3721. hsync support broke wire compatibility. Contributed by Todd Lipcon and Aaron T. Myers.

Added:
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketReceiver.java
Modified:
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketHeader.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1371496&r1=1371495&r2=1371496&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Thu Aug  9 21:33:20 2012
@@ -422,6 +422,8 @@ Release 2.0.1-alpha - UNRELEASED
 
     HDFS-3710. libhdfs misuses O_RDONLY/WRONLY/RDWR. (Andy Isaacson via atm)
 
+    HDFS-3721. hsync support broke wire compatibility. (todd and atm)
+
   BREAKDOWN OF HDFS-3042 SUBTASKS
 
     HDFS-2185. HDFS portion of ZK-based FailoverController (todd)

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java?rev=1371496&r1=1371495&r2=1371496&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java Thu Aug  9 21:33:20 2012
@@ -30,7 +30,6 @@ import java.io.OutputStream;
 import java.net.InetSocketAddress;
 import java.net.Socket;
 import java.nio.BufferOverflowException;
-import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.EnumSet;
@@ -126,7 +125,7 @@ public class DFSOutputStream extends FSO
   private long lastQueuedSeqno = -1;
   private long lastAckedSeqno = -1;
   private long bytesCurBlock = 0; // bytes writen in current block
-  private int packetSize = 0; // write packet size, including the header.
+  private int packetSize = 0; // write packet size, not including the header.
   private int chunksPerPacket = 0;
   private volatile IOException lastException = null;
   private long artificialSlowdown = 0;
@@ -147,28 +146,31 @@ public class DFSOutputStream extends FSO
     int     numChunks;           // number of chunks currently in packet
     int     maxChunks;           // max chunks in packet
 
-    /** buffer for accumulating packet checksum and data */
-    ByteBuffer buffer; // wraps buf, only one of these two may be non-null
     byte[]  buf;
 
     /**
      * buf is pointed into like follows:
      *  (C is checksum data, D is payload data)
      *
-     * [HHHHHCCCCC________________DDDDDDDDDDDDDDDD___]
-     *       ^    ^               ^               ^
-     *       |    checksumPos     dataStart       dataPos
-     *   checksumStart
+     * [_________CCCCCCCCC________________DDDDDDDDDDDDDDDD___]
+     *           ^        ^               ^               ^
+     *           |        checksumPos     dataStart       dataPos
+     *           checksumStart
+     * 
+     * Right before sending, we move the checksum data to immediately precede
+     * the actual data, and then insert the header into the buffer immediately
+     * preceding the checksum data, so we make sure to keep enough space in
+     * front of the checksum data to support the largest conceivable header. 
      */
     int checksumStart;
+    int checksumPos;
     int dataStart;
     int dataPos;
-    int checksumPos;
 
     private static final long HEART_BEAT_SEQNO = -1L;
 
     /**
-     *  create a heartbeat packet
+     * Create a heartbeat packet.
      */
     Packet() {
       this.lastPacketInBlock = false;
@@ -176,17 +178,19 @@ public class DFSOutputStream extends FSO
       this.offsetInBlock = 0;
       this.seqno = HEART_BEAT_SEQNO;
       
-      buffer = null;
-      int packetSize = PacketHeader.PKT_HEADER_LEN + HdfsConstants.BYTES_IN_INTEGER;
-      buf = new byte[packetSize];
+      buf = new byte[PacketHeader.PKT_MAX_HEADER_LEN];
       
-      checksumStart = dataStart = packetSize;
-      checksumPos = checksumStart;
-      dataPos = dataStart;
+      checksumStart = checksumPos = dataPos = dataStart = PacketHeader.PKT_MAX_HEADER_LEN;
       maxChunks = 0;
     }
     
-    // create a new packet
+    /**
+     * Create a new packet.
+     * 
+     * @param pktSize maximum size of the packet, including checksum data and actual data.
+     * @param chunksPerPkt maximum number of chunks per packet.
+     * @param offsetInBlock offset in bytes into the HDFS block.
+     */
     Packet(int pktSize, int chunksPerPkt, long offsetInBlock) {
       this.lastPacketInBlock = false;
       this.numChunks = 0;
@@ -194,25 +198,24 @@ public class DFSOutputStream extends FSO
       this.seqno = currentSeqno;
       currentSeqno++;
       
-      buffer = null;
-      buf = new byte[pktSize];
+      buf = new byte[PacketHeader.PKT_MAX_HEADER_LEN + pktSize];
       
-      checksumStart = PacketHeader.PKT_HEADER_LEN;
+      checksumStart = PacketHeader.PKT_MAX_HEADER_LEN;
       checksumPos = checksumStart;
-      dataStart = checksumStart + chunksPerPkt * checksum.getChecksumSize();
+      dataStart = checksumStart + (chunksPerPkt * checksum.getChecksumSize());
       dataPos = dataStart;
       maxChunks = chunksPerPkt;
     }
 
     void writeData(byte[] inarray, int off, int len) {
-      if ( dataPos + len > buf.length) {
+      if (dataPos + len > buf.length) {
         throw new BufferOverflowException();
       }
       System.arraycopy(inarray, off, buf, dataPos, len);
       dataPos += len;
     }
 
-    void  writeChecksum(byte[] inarray, int off, int len) {
+    void writeChecksum(byte[] inarray, int off, int len) {
       if (checksumPos + len > dataStart) {
         throw new BufferOverflowException();
       }
@@ -221,45 +224,38 @@ public class DFSOutputStream extends FSO
     }
     
     /**
-     * Returns ByteBuffer that contains one full packet, including header.
+     * Write the full packet, including the header, to the given output stream.
      */
-    ByteBuffer getBuffer() {
-      /* Once this is called, no more data can be added to the packet.
-       * setting 'buf' to null ensures that.
-       * This is called only when the packet is ready to be sent.
-       */
-      if (buffer != null) {
-        return buffer;
-      }
-      
-      //prepare the header and close any gap between checksum and data.
-      
-      int dataLen = dataPos - dataStart;
-      int checksumLen = checksumPos - checksumStart;
+    void writeTo(DataOutputStream stm) throws IOException {
+      final int dataLen = dataPos - dataStart;
+      final int checksumLen = checksumPos - checksumStart;
+      final int pktLen = HdfsConstants.BYTES_IN_INTEGER + dataLen + checksumLen;
+
+      PacketHeader header = new PacketHeader(
+        pktLen, offsetInBlock, seqno, lastPacketInBlock, dataLen, syncBlock);
       
       if (checksumPos != dataStart) {
-        /* move the checksum to cover the gap.
-         * This can happen for the last packet.
-         */
+        // Move the checksum to cover the gap. This can happen for the last
+        // packet or during an hflush/hsync call.
         System.arraycopy(buf, checksumStart, buf, 
                          dataStart - checksumLen , checksumLen); 
+        checksumPos = dataStart;
+        checksumStart = checksumPos - checksumLen;
       }
       
-      int pktLen = HdfsConstants.BYTES_IN_INTEGER + dataLen + checksumLen;
+      final int headerStart = checksumStart - header.getSerializedSize();
+      assert checksumStart + 1 >= header.getSerializedSize();
+      assert checksumPos == dataStart;
+      assert headerStart >= 0;
+      assert headerStart + header.getSerializedSize() == checksumStart;
       
-      //normally dataStart == checksumPos, i.e., offset is zero.
-      buffer = ByteBuffer.wrap(
-        buf, dataStart - checksumPos,
-        PacketHeader.PKT_HEADER_LEN + pktLen - HdfsConstants.BYTES_IN_INTEGER);
-      buf = null;
-      buffer.mark();
-
-      PacketHeader header = new PacketHeader(
-        pktLen, offsetInBlock, seqno, lastPacketInBlock, dataLen, syncBlock);
-      header.putInBuffer(buffer);
+      // Copy the header data into the buffer immediately preceding the checksum
+      // data.
+      System.arraycopy(header.getBytes(), 0, buf, headerStart,
+          header.getSerializedSize());
       
-      buffer.reset();
-      return buffer;
+      // Write the now contiguous full packet to the output stream.
+      stm.write(buf, headerStart, header.getSerializedSize() + checksumLen + dataLen);
     }
     
     // get the packet's last byte's offset in the block
@@ -502,8 +498,6 @@ public class DFSOutputStream extends FSO
           }
           
           // send the packet
-          ByteBuffer buf = one.getBuffer();
-
           synchronized (dataQueue) {
             // move packet from dataQueue to ackQueue
             if (!one.isHeartbeatPacket()) {
@@ -519,8 +513,8 @@ public class DFSOutputStream extends FSO
           }
 
           // write out data to remote datanode
-          try {            
-            blockStream.write(buf.array(), buf.position(), buf.remaining());
+          try {
+            one.writeTo(blockStream);
             blockStream.flush();   
           } catch (IOException e) {
             // HDFS-3398 treat primary DN is down since client is unable to 
@@ -1358,9 +1352,8 @@ public class DFSOutputStream extends FSO
 
   private void computePacketChunkSize(int psize, int csize) {
     int chunkSize = csize + checksum.getChecksumSize();
-    int n = PacketHeader.PKT_HEADER_LEN;
-    chunksPerPacket = Math.max((psize - n + chunkSize-1)/chunkSize, 1);
-    packetSize = n + chunkSize*chunksPerPacket;
+    chunksPerPacket = Math.max(psize/chunkSize, 1);
+    packetSize = chunkSize*chunksPerPacket;
     if (DFSClient.LOG.isDebugEnabled()) {
       DFSClient.LOG.debug("computePacketChunkSize: src=" + src +
                 ", chunkSize=" + chunkSize +
@@ -1474,8 +1467,7 @@ public class DFSOutputStream extends FSO
       // indicate the end of block and reset bytesCurBlock.
       //
       if (bytesCurBlock == blockSize) {
-        currentPacket = new Packet(PacketHeader.PKT_HEADER_LEN, 0, 
-            bytesCurBlock);
+        currentPacket = new Packet(0, 0, bytesCurBlock);
         currentPacket.lastPacketInBlock = true;
         currentPacket.syncBlock = shouldSyncBlock;
         waitAndQueueCurrentPacket();
@@ -1751,8 +1743,7 @@ public class DFSOutputStream extends FSO
 
       if (bytesCurBlock != 0) {
         // send an empty packet to mark the end of the block
-        currentPacket = new Packet(PacketHeader.PKT_HEADER_LEN, 0, 
-            bytesCurBlock);
+        currentPacket = new Packet(0, 0, bytesCurBlock);
         currentPacket.lastPacketInBlock = true;
         currentPacket.syncBlock = shouldSyncBlock;
       }
@@ -1805,8 +1796,7 @@ public class DFSOutputStream extends FSO
   @VisibleForTesting
   public synchronized void setChunksPerPacket(int value) {
     chunksPerPacket = Math.min(chunksPerPacket, value);
-    packetSize = PacketHeader.PKT_HEADER_LEN +
-                 (checksum.getBytesPerChecksum() + 
+    packetSize = (checksum.getBytesPerChecksum() + 
                   checksum.getChecksumSize()) * chunksPerPacket;
   }
 

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java?rev=1371496&r1=1371495&r2=1371496&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java Thu Aug  9 21:33:20 2012
@@ -33,12 +33,12 @@ import java.nio.channels.ReadableByteCha
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.fs.ChecksumException;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
 import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferEncryptor;
 import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
 import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
+import org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver;
 import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientReadStatusProto;
@@ -48,14 +48,11 @@ import org.apache.hadoop.hdfs.security.t
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
-import org.apache.hadoop.hdfs.util.DirectBufferPool;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.net.SocketInputWrapper;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.DataChecksum;
 
-import com.google.common.base.Preconditions;
-
 /**
  * This is a wrapper around connection to datanode
  * and understands checksum, offset etc.
@@ -93,11 +90,9 @@ public class RemoteBlockReader2  impleme
   private final ReadableByteChannel in;
   private DataChecksum checksum;
   
-  private PacketHeader curHeader;
-  private ByteBuffer curPacketBuf = null;
+  private PacketReceiver packetReceiver = new PacketReceiver(true);
   private ByteBuffer curDataSlice = null;
 
-
   /** offset in block of the last chunk received */
   private long lastSeqNo = -1;
 
@@ -105,10 +100,6 @@ public class RemoteBlockReader2  impleme
   private long startOffset;
   private final String filename;
 
-  private static DirectBufferPool bufferPool = new DirectBufferPool();
-  private final ByteBuffer headerBuf = ByteBuffer.allocate(
-      PacketHeader.PKT_HEADER_LEN);
-
   private final int bytesPerChecksum;
   private final int checksumSize;
 
@@ -132,7 +123,7 @@ public class RemoteBlockReader2  impleme
   public synchronized int read(byte[] buf, int off, int len) 
                                throws IOException {
 
-    if (curPacketBuf == null || curDataSlice.remaining() == 0 && bytesNeededToFinish > 0) {
+    if (curDataSlice == null || curDataSlice.remaining() == 0 && bytesNeededToFinish > 0) {
       readNextPacket();
     }
     if (curDataSlice.remaining() == 0) {
@@ -149,7 +140,7 @@ public class RemoteBlockReader2  impleme
 
   @Override
   public int read(ByteBuffer buf) throws IOException {
-    if (curPacketBuf == null || curDataSlice.remaining() == 0 && bytesNeededToFinish > 0) {
+    if (curDataSlice == null || curDataSlice.remaining() == 0 && bytesNeededToFinish > 0) {
       readNextPacket();
     }
     if (curDataSlice.remaining() == 0) {
@@ -167,11 +158,13 @@ public class RemoteBlockReader2  impleme
   }
 
   private void readNextPacket() throws IOException {
-    Preconditions.checkState(curHeader == null || !curHeader.isLastPacketInBlock());
-    
     //Read packet headers.
-    readPacketHeader();
+    packetReceiver.receiveNextPacket(in);
 
+    PacketHeader curHeader = packetReceiver.getHeader();
+    curDataSlice = packetReceiver.getDataSlice();
+    assert curDataSlice.capacity() == curHeader.getDataLen();
+    
     if (LOG.isTraceEnabled()) {
       LOG.trace("DFSClient readNextPacket got header " + curHeader);
     }
@@ -185,17 +178,20 @@ public class RemoteBlockReader2  impleme
     if (curHeader.getDataLen() > 0) {
       int chunks = 1 + (curHeader.getDataLen() - 1) / bytesPerChecksum;
       int checksumsLen = chunks * checksumSize;
-      int bufsize = checksumsLen + curHeader.getDataLen();
+
+      assert packetReceiver.getChecksumSlice().capacity() == checksumsLen :
+        "checksum slice capacity=" + packetReceiver.getChecksumSlice().capacity() + 
+          " checksumsLen=" + checksumsLen;
       
-      resetPacketBuffer(checksumsLen, curHeader.getDataLen());
-  
       lastSeqNo = curHeader.getSeqno();
-      if (bufsize > 0) {
-        readChannelFully(in, curPacketBuf);
-        curPacketBuf.flip();
-        if (verifyChecksum) {
-          verifyPacketChecksums();
-        }
+      if (verifyChecksum && curDataSlice.remaining() > 0) {
+        // N.B.: the checksum error offset reported here is actually
+        // relative to the start of the block, not the start of the file.
+        // This is slightly misleading, but preserves the behavior from
+        // the older BlockReader.
+        checksum.verifyChunkedSums(curDataSlice,
+            packetReceiver.getChecksumSlice(),
+            filename, curHeader.getOffsetInBlock());
       }
       bytesNeededToFinish -= curHeader.getDataLen();
     }    
@@ -218,40 +214,7 @@ public class RemoteBlockReader2  impleme
       }
     }
   }
-
-  private void verifyPacketChecksums() throws ChecksumException {
-    // N.B.: the checksum error offset reported here is actually
-    // relative to the start of the block, not the start of the file.
-    // This is slightly misleading, but preserves the behavior from
-    // the older BlockReader.
-    checksum.verifyChunkedSums(curDataSlice, curPacketBuf,
-        filename, curHeader.getOffsetInBlock());
-  }
-
-  private static void readChannelFully(ReadableByteChannel ch, ByteBuffer buf)
-  throws IOException {
-    while (buf.remaining() > 0) {
-      int n = ch.read(buf);
-      if (n < 0) {
-        throw new IOException("Premature EOF reading from " + ch);
-      }
-    }
-  }
-
-  private void resetPacketBuffer(int checksumsLen, int dataLen) {
-    int packetLen = checksumsLen + dataLen;
-    if (curPacketBuf == null ||
-        curPacketBuf.capacity() < packetLen) {
-      returnPacketBufToPool();
-      curPacketBuf = bufferPool.getBuffer(packetLen);
-    }
-    curPacketBuf.position(checksumsLen);
-    curDataSlice = curPacketBuf.slice();
-    curDataSlice.limit(dataLen);
-    curPacketBuf.clear();
-    curPacketBuf.limit(checksumsLen + dataLen);
-  }
-
+  
   @Override
   public synchronized long skip(long n) throws IOException {
     /* How can we make sure we don't throw a ChecksumException, at least
@@ -272,23 +235,14 @@ public class RemoteBlockReader2  impleme
     return nSkipped;
   }
 
-  private void readPacketHeader() throws IOException {
-    headerBuf.clear();
-    readChannelFully(in, headerBuf);
-    headerBuf.flip();
-    if (curHeader == null) curHeader = new PacketHeader();
-    curHeader.readFields(headerBuf);
-  }
-
   private void readTrailingEmptyPacket() throws IOException {
     if (LOG.isTraceEnabled()) {
       LOG.trace("Reading empty packet at end of read");
     }
-    headerBuf.clear();
-    readChannelFully(in, headerBuf);
-    headerBuf.flip();
-    PacketHeader trailer = new PacketHeader();
-    trailer.readFields(headerBuf);
+    
+    packetReceiver.receiveNextPacket(in);
+
+    PacketHeader trailer = packetReceiver.getHeader();
     if (!trailer.isLastPacketInBlock() ||
        trailer.getDataLen() != 0) {
       throw new IOException("Expected empty end-of-read packet! Header: " +
@@ -321,7 +275,7 @@ public class RemoteBlockReader2  impleme
 
   @Override
   public synchronized void close() throws IOException {
-    returnPacketBufToPool();
+    packetReceiver.close();
     
     startOffset = -1;
     checksum = null;
@@ -332,24 +286,6 @@ public class RemoteBlockReader2  impleme
     // in will be closed when its Socket is closed.
   }
   
-  @Override
-  protected void finalize() throws Throwable {
-    try {
-      // just in case it didn't get closed, we
-      // may as well still try to return the buffer
-      returnPacketBufToPool();
-    } finally {
-      super.finalize();
-    }
-  }
-  
-  private void returnPacketBufToPool() {
-    if (curPacketBuf != null) {
-      bufferPool.returnBuffer(curPacketBuf);
-      curPacketBuf = null;
-    }
-  }
-
   /**
    * Take the socket used to talk to the DN.
    */

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketHeader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketHeader.java?rev=1371496&r1=1371495&r2=1371496&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketHeader.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketHeader.java Thu Aug  9 21:33:20 2012
@@ -27,14 +27,31 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.PacketHeaderProto;
 import org.apache.hadoop.hdfs.util.ByteBufferOutputStream;
 
+import com.google.common.base.Preconditions;
+import com.google.common.primitives.Shorts;
+import com.google.common.primitives.Ints;
+import com.google.protobuf.InvalidProtocolBufferException;
+
 /**
  * Header data for each packet that goes through the read/write pipelines.
+ * Includes all of the information about the packet, excluding checksums and
+ * actual data.
+ * 
+ * This data includes:
+ *  - the offset in bytes into the HDFS block of the data in this packet
+ *  - the sequence number of this packet in the pipeline
+ *  - whether or not this is the last packet in the pipeline
+ *  - the length of the data in this packet
+ *  - whether or not this packet should be synced by the DNs.
+ *  
+ * When serialized, this header is written out as a protocol buffer, preceded
+ * by a 4-byte integer representing the full packet length, and a 2-byte short
+ * representing the header length.
  */
 @InterfaceAudience.Private
 @InterfaceStability.Evolving
 public class PacketHeader {
-  /** Header size for a packet */
-  private static final int PROTO_SIZE = 
+  private static final int MAX_PROTO_SIZE = 
     PacketHeaderProto.newBuilder()
       .setOffsetInBlock(0)
       .setSeqno(0)
@@ -42,8 +59,10 @@ public class PacketHeader {
       .setDataLen(0)
       .setSyncBlock(false)
       .build().getSerializedSize();
-  public static final int PKT_HEADER_LEN =
-    6 + PROTO_SIZE;
+  public static final int PKT_LENGTHS_LEN =
+      Ints.BYTES + Shorts.BYTES;
+  public static final int PKT_MAX_HEADER_LEN =
+      PKT_LENGTHS_LEN + MAX_PROTO_SIZE;
 
   private int packetLen;
   private PacketHeaderProto proto;
@@ -54,13 +73,25 @@ public class PacketHeader {
   public PacketHeader(int packetLen, long offsetInBlock, long seqno,
                       boolean lastPacketInBlock, int dataLen, boolean syncBlock) {
     this.packetLen = packetLen;
-    proto = PacketHeaderProto.newBuilder()
+    Preconditions.checkArgument(packetLen >= Ints.BYTES,
+        "packet len %s should always be at least 4 bytes",
+        packetLen);
+    
+    PacketHeaderProto.Builder builder = PacketHeaderProto.newBuilder()
       .setOffsetInBlock(offsetInBlock)
       .setSeqno(seqno)
       .setLastPacketInBlock(lastPacketInBlock)
-      .setDataLen(dataLen)
-      .setSyncBlock(syncBlock)
-      .build();
+      .setDataLen(dataLen);
+      
+    if (syncBlock) {
+      // Only set syncBlock if it is specified.
+      // This is wire-incompatible with Hadoop 2.0.0-alpha due to HDFS-3721
+      // because it changes the length of the packet header, and BlockReceiver
+      // in that version did not support variable-length headers.
+      builder.setSyncBlock(syncBlock);
+    }
+      
+    proto = builder.build();
   }
 
   public int getDataLen() {
@@ -90,10 +121,16 @@ public class PacketHeader {
   @Override
   public String toString() {
     return "PacketHeader with packetLen=" + packetLen +
-      "Header data: " + 
+      " header data: " + 
       proto.toString();
   }
   
+  public void setFieldsFromData(
+      int packetLen, byte[] headerData) throws InvalidProtocolBufferException {
+    this.packetLen = packetLen;
+    proto = PacketHeaderProto.parseFrom(headerData);
+  }
+  
   public void readFields(ByteBuffer buf) throws IOException {
     packetLen = buf.getInt();
     short protoLen = buf.getShort();
@@ -110,14 +147,21 @@ public class PacketHeader {
     proto = PacketHeaderProto.parseFrom(data);
   }
 
+  /**
+   * @return the number of bytes necessary to write out this header,
+   * including the length-prefixing of the payload and header
+   */
+  public int getSerializedSize() {
+    return PKT_LENGTHS_LEN + proto.getSerializedSize();
+  }
 
   /**
    * Write the header into the buffer.
    * This requires that PKT_HEADER_LEN bytes are available.
    */
   public void putInBuffer(final ByteBuffer buf) {
-    assert proto.getSerializedSize() == PROTO_SIZE
-      : "Expected " + (PROTO_SIZE) + " got: " + proto.getSerializedSize();
+    assert proto.getSerializedSize() <= MAX_PROTO_SIZE
+      : "Expected " + (MAX_PROTO_SIZE) + " got: " + proto.getSerializedSize();
     try {
       buf.putInt(packetLen);
       buf.putShort((short) proto.getSerializedSize());
@@ -128,12 +172,18 @@ public class PacketHeader {
   }
   
   public void write(DataOutputStream out) throws IOException {
-    assert proto.getSerializedSize() == PROTO_SIZE
-    : "Expected " + (PROTO_SIZE) + " got: " + proto.getSerializedSize();
+    assert proto.getSerializedSize() <= MAX_PROTO_SIZE
+    : "Expected " + (MAX_PROTO_SIZE) + " got: " + proto.getSerializedSize();
     out.writeInt(packetLen);
     out.writeShort(proto.getSerializedSize());
     proto.writeTo(out);
   }
+  
+  public byte[] getBytes() {
+    ByteBuffer buf = ByteBuffer.allocate(getSerializedSize());
+    putInBuffer(buf);
+    return buf.array();
+  }
 
   /**
    * Perform a sanity check on the packet, returning true if it is sane.

Added: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketReceiver.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketReceiver.java?rev=1371496&view=auto
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketReceiver.java (added)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketReceiver.java Thu Aug  9 21:33:20 2012
@@ -0,0 +1,292 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.protocol.datatransfer;
+
+import java.io.Closeable;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.ReadableByteChannel;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdfs.util.DirectBufferPool;
+import org.apache.hadoop.io.IOUtils;
+
+import com.google.common.base.Preconditions;
+import com.google.common.primitives.Ints;
+
+/**
+ * Class to handle reading packets one-at-a-time from the wire.
+ * These packets are used both for reading and writing data to/from
+ * DataNodes.
+ */
+@InterfaceAudience.Private
+public class PacketReceiver implements Closeable {
+
+  /**
+   * The max size of any single packet. This prevents OOMEs when
+   * invalid data is sent.
+   */
+  private static final int MAX_PACKET_SIZE = 16 * 1024 * 1024;
+
+  static Log LOG = LogFactory.getLog(PacketReceiver.class);
+  
+  private static final DirectBufferPool bufferPool = new DirectBufferPool();
+  private final boolean useDirectBuffers;
+
+  /**
+   * Internal buffer for reading the length prefixes at the start of
+   * the packet.
+   */
+  private final ByteBuffer lengthPrefixBuf = ByteBuffer.allocate(
+      PacketHeader.PKT_LENGTHS_LEN);
+
+  /**
+   * The entirety of the most recently read packet, excepting the
+   * length prefixes.
+   */
+  private ByteBuffer curPacketBuf = null;
+  
+  /**
+   * A slice of {@link #curPacketBuf} which contains just the checksums.
+   */
+  private ByteBuffer curChecksumSlice = null;
+  
+  /**
+   * A slice of {@link #curPacketBuf} which contains just the data.
+   */
+  private ByteBuffer curDataSlice = null;
+
+  /**
+   * The packet header of the most recently read packet.
+   */
+  private PacketHeader curHeader;
+  
+  public PacketReceiver(boolean useDirectBuffers) {
+    this.useDirectBuffers = useDirectBuffers;
+  }
+
+  public PacketHeader getHeader() {
+    return curHeader;
+  }
+
+  public ByteBuffer getDataSlice() {
+    return curDataSlice;
+  }
+  
+  public ByteBuffer getChecksumSlice() {
+    return curChecksumSlice;
+  }
+
+  /**
+   * Reads all of the data for the next packet into the appropriate buffers.
+   * 
+   * The data slice and checksum slice members will be set to point to the
+   * user data and corresponding checksums. The header will be parsed and
+   * set.
+   */
+  public void receiveNextPacket(ReadableByteChannel in) throws IOException {
+    doRead(in, null);
+  }
+
+  /**
+   * @see #receiveNextPacket(ReadableByteChannel)
+   */
+  public void receiveNextPacket(InputStream in) throws IOException {
+    doRead(null, in);
+  }
+
+  private void doRead(ReadableByteChannel ch, InputStream in)
+      throws IOException {
+    // Each packet looks like:
+    //   PLEN    HLEN      HEADER     CHECKSUMS  DATA
+    //   32-bit  16-bit   <protobuf>  <variable length>
+    //
+    // PLEN:      Payload length
+    //            = length(PLEN) + length(CHECKSUMS) + length(DATA)
+    //            This length includes its own encoded length in
+    //            the sum for historical reasons.
+    //
+    // HLEN:      Header length
+    //            = length(HEADER)
+    //
+    // HEADER:    the actual packet header fields, encoded in protobuf
+    // CHECKSUMS: the crcs for the data chunk. May be missing if
+    //            checksums were not requested
+    // DATA       the actual block data
+    Preconditions.checkState(curHeader == null || !curHeader.isLastPacketInBlock());
+    
+    lengthPrefixBuf.clear();
+    doReadFully(ch, in, lengthPrefixBuf);
+    lengthPrefixBuf.flip();
+    int payloadLen = lengthPrefixBuf.getInt();
+    
+    if (payloadLen < Ints.BYTES) {
+      // The "payload length" includes its own length. Therefore it
+      // should never be less than 4 bytes
+      throw new IOException("Invalid payload length " +
+          payloadLen);
+    }
+    int dataPlusChecksumLen = payloadLen - Ints.BYTES;
+    int headerLen = lengthPrefixBuf.getShort();
+    if (headerLen < 0) {
+      throw new IOException("Invalid header length " + headerLen);
+    }
+    
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("readNextPacket: dataPlusChecksumLen = " + dataPlusChecksumLen +
+          " headerLen = " + headerLen);
+    }
+    
+    // Sanity check the buffer size so we don't allocate too much memory
+    // and OOME.
+    int totalLen = payloadLen + headerLen;
+    if (totalLen < 0 || totalLen > MAX_PACKET_SIZE) {
+      throw new IOException("Incorrect value for packet payload size: " +
+                            payloadLen);
+    }
+
+    // Make sure we have space for the whole packet, and
+    // read it.
+    reallocPacketBuf(dataPlusChecksumLen + headerLen);
+    curPacketBuf.clear();
+    curPacketBuf.limit(dataPlusChecksumLen + headerLen);
+    doReadFully(ch, in, curPacketBuf);
+    curPacketBuf.flip();
+
+    // Extract the header from the front of the buffer.
+    byte[] headerBuf = new byte[headerLen];
+    curPacketBuf.get(headerBuf);
+    if (curHeader == null) {
+      curHeader = new PacketHeader();
+    }
+    curHeader.setFieldsFromData(dataPlusChecksumLen, headerBuf);
+    
+    // Compute the sub-slices of the packet
+    int checksumLen = dataPlusChecksumLen - curHeader.getDataLen();
+    if (checksumLen < 0) {
+      throw new IOException("Invalid packet: data length in packet header " + 
+          "exceeds data length received. dataPlusChecksumLen=" +
+          dataPlusChecksumLen + " header: " + curHeader); 
+    }
+    
+    reslicePacket(headerLen, checksumLen, curHeader.getDataLen());
+  }
+  
+  /**
+   * Rewrite the last-read packet on the wire to the given output stream.
+   */
+  public void mirrorPacketTo(DataOutputStream mirrorOut) throws IOException {
+    Preconditions.checkState(!useDirectBuffers,
+        "Currently only supported for non-direct buffers");
+    assert lengthPrefixBuf.capacity() == PacketHeader.PKT_LENGTHS_LEN;
+    mirrorOut.write(lengthPrefixBuf.array(),
+        lengthPrefixBuf.arrayOffset(),
+        lengthPrefixBuf.capacity());
+    mirrorOut.write(curPacketBuf.array(),
+        curPacketBuf.arrayOffset(),
+        curPacketBuf.remaining());
+  }
+
+  
+  private static void doReadFully(ReadableByteChannel ch, InputStream in,
+      ByteBuffer buf) throws IOException {
+    if (ch != null) {
+      readChannelFully(ch, buf);
+    } else {
+      Preconditions.checkState(!buf.isDirect(),
+          "Must not use direct buffers with InputStream API");
+      IOUtils.readFully(in, buf.array(),
+          buf.arrayOffset() + buf.position(),
+          buf.remaining());
+      buf.position(buf.position() + buf.remaining());
+    }
+  }
+
+  private void reslicePacket(
+      int headerLen, int checksumsLen, int dataLen) {
+    assert dataLen >= 0 : "invalid datalen: " + dataLen;
+    
+    assert curPacketBuf.position() == headerLen;
+    assert checksumsLen + dataLen == curPacketBuf.remaining() :
+      "headerLen= " + headerLen + " clen=" + checksumsLen + " dlen=" + dataLen +
+      " rem=" + curPacketBuf.remaining();
+    
+    curPacketBuf.position(headerLen);
+    curPacketBuf.limit(headerLen + checksumsLen);
+    curChecksumSlice = curPacketBuf.slice();
+
+    curPacketBuf.position(headerLen + checksumsLen);
+    curPacketBuf.limit(headerLen + checksumsLen + dataLen);
+    curDataSlice = curPacketBuf.slice();
+    
+    curPacketBuf.position(0);
+    curPacketBuf.limit(headerLen + checksumsLen + dataLen);
+  }
+
+  
+  private static void readChannelFully(ReadableByteChannel ch, ByteBuffer buf)
+      throws IOException {
+    while (buf.remaining() > 0) {
+      int n = ch.read(buf);
+      if (n < 0) {
+        throw new IOException("Premature EOF reading from " + ch);
+      }
+    }
+  }
+  
+  private void reallocPacketBuf(int atLeastCapacity) {
+    // Realloc the buffer if this packet is longer than the previous
+    // one.
+    if (curPacketBuf == null ||
+        curPacketBuf.capacity() < atLeastCapacity) {
+      returnPacketBufToPool();
+      if (useDirectBuffers) {
+        curPacketBuf = bufferPool.getBuffer(atLeastCapacity);
+      } else {
+        curPacketBuf = ByteBuffer.allocate(atLeastCapacity);
+      }
+    }
+  }
+  
+  private void returnPacketBufToPool() {
+    if (curPacketBuf != null && curPacketBuf.isDirect()) {
+      bufferPool.returnBuffer(curPacketBuf);
+      curPacketBuf = null;
+    }
+  }
+
+  @Override // Closeable
+  public void close() {
+    returnPacketBufToPool();
+  }
+  
+  @Override
+  protected void finalize() throws Throwable {
+    try {
+      // just in case it didn't get closed, we
+      // may as well still try to return the buffer
+      returnPacketBufToPool();
+    } finally {
+      super.finalize();
+    }
+  }
+}

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java?rev=1371496&r1=1371495&r2=1371496&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java Thu Aug  9 21:33:20 2012
@@ -23,7 +23,6 @@ import java.io.BufferedOutputStream;
 import java.io.Closeable;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
-import java.io.EOFException;
 import java.io.FileDescriptor;
 import java.io.FileOutputStream;
 import java.io.IOException;
@@ -34,12 +33,14 @@ import java.util.LinkedList;
 import java.util.zip.Checksum;
 
 import org.apache.commons.logging.Log;
+import org.apache.hadoop.fs.ChecksumException;
 import org.apache.hadoop.fs.FSOutputSummer;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
 import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
+import org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver;
 import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams;
@@ -77,9 +78,10 @@ class BlockReceiver implements Closeable
   private DataOutputStream checksumOut = null; // to crc file at local disk
   private int bytesPerChecksum;
   private int checksumSize;
-  private ByteBuffer buf; // contains one full packet.
-  private int bufRead; //amount of valid data in the buf
-  private int maxPacketReadLen;
+  
+  private PacketReceiver packetReceiver =
+      new PacketReceiver(false);
+  
   protected final String inAddr;
   protected final String myAddr;
   private String mirrorAddr;
@@ -248,6 +250,10 @@ class BlockReceiver implements Closeable
    */
   @Override
   public void close() throws IOException {
+    if (packetReceiver != null) {
+      packetReceiver.close();
+    }
+    
     IOException ioe = null;
     if (syncOnClose && (out != null || checksumOut != null)) {
       datanode.metrics.incrFsyncCount();      
@@ -365,33 +371,24 @@ class BlockReceiver implements Closeable
   /**
    * Verify multiple CRC chunks. 
    */
-  private void verifyChunks( byte[] dataBuf, int dataOff, int len, 
-                             byte[] checksumBuf, int checksumOff ) 
-                             throws IOException {
-    while (len > 0) {
-      int chunkLen = Math.min(len, bytesPerChecksum);
-      
-      clientChecksum.update(dataBuf, dataOff, chunkLen);
-
-      if (!clientChecksum.compare(checksumBuf, checksumOff)) {
-        if (srcDataNode != null) {
-          try {
-            LOG.info("report corrupt block " + block + " from datanode " +
-                      srcDataNode + " to namenode");
-            datanode.reportRemoteBadBlock(srcDataNode, block);
-          } catch (IOException e) {
-            LOG.warn("Failed to report bad block " + block + 
-                      " from datanode " + srcDataNode + " to namenode");
-          }
+  private void verifyChunks(ByteBuffer dataBuf, ByteBuffer checksumBuf)
+      throws IOException {
+    try {
+      clientChecksum.verifyChunkedSums(dataBuf, checksumBuf, clientname, 0);
+    } catch (ChecksumException ce) {
+      LOG.warn("Checksum error in block " + block + " from " + inAddr, ce);
+      if (srcDataNode != null) {
+        try {
+          LOG.info("report corrupt block " + block + " from datanode " +
+                    srcDataNode + " to namenode");
+          datanode.reportRemoteBadBlock(srcDataNode, block);
+        } catch (IOException e) {
+          LOG.warn("Failed to report bad block " + block + 
+                    " from datanode " + srcDataNode + " to namenode");
         }
-        throw new IOException("Unexpected checksum mismatch " + 
-                              "while writing " + block + " from " + inAddr);
       }
-
-      clientChecksum.reset();
-      dataOff += chunkLen;
-      checksumOff += checksumSize;
-      len -= chunkLen;
+      throw new IOException("Unexpected checksum mismatch " + 
+                            "while writing " + block + " from " + inAddr);
     }
   }
   
@@ -403,163 +400,24 @@ class BlockReceiver implements Closeable
    * This does not verify the original checksums, under the assumption
    * that they have already been validated.
    */
-  private void translateChunks( byte[] dataBuf, int dataOff, int len,
-      byte[] checksumBuf, int checksumOff ) {
-    if (len == 0) return;
-    
-    int numChunks = (len - 1)/bytesPerChecksum + 1;
-    
-    diskChecksum.calculateChunkedSums(
-        ByteBuffer.wrap(dataBuf, dataOff, len),
-        ByteBuffer.wrap(checksumBuf, checksumOff, numChunks * checksumSize));
+  private void translateChunks(ByteBuffer dataBuf, ByteBuffer checksumBuf) {
+    diskChecksum.calculateChunkedSums(dataBuf, checksumBuf);
   }
 
-  /**
-   * Makes sure buf.position() is zero without modifying buf.remaining().
-   * It moves the data if position needs to be changed.
-   */
-  private void shiftBufData() {
-    if (bufRead != buf.limit()) {
-      throw new IllegalStateException("bufRead should be same as " +
-                                      "buf.limit()");
-    }
-    
-    //shift the remaining data on buf to the front
-    if (buf.position() > 0) {
-      int dataLeft = buf.remaining();
-      if (dataLeft > 0) {
-        byte[] b = buf.array();
-        System.arraycopy(b, buf.position(), b, 0, dataLeft);
-      }
-      buf.position(0);
-      bufRead = dataLeft;
-      buf.limit(bufRead);
-    }
-  }
-  
-  /**
-   * reads upto toRead byte to buf at buf.limit() and increments the limit.
-   * throws an IOException if read does not succeed.
-   */
-  private int readToBuf(int toRead) throws IOException {
-    if (toRead < 0) {
-      toRead = (maxPacketReadLen > 0 ? maxPacketReadLen : buf.capacity())
-               - buf.limit();
-    }
-    
-    int nRead = in.read(buf.array(), buf.limit(), toRead);
-    
-    if (nRead < 0) {
-      throw new EOFException("while trying to read " + toRead + " bytes");
-    }
-    bufRead = buf.limit() + nRead;
-    buf.limit(bufRead);
-    return nRead;
-  }
-  
-  
-  /**
-   * Reads (at least) one packet and returns the packet length.
-   * buf.position() points to the start of the packet and 
-   * buf.limit() point to the end of the packet. There could 
-   * be more data from next packet in buf.<br><br>
-   * 
-   * It tries to read a full packet with single read call.
-   * Consecutive packets are usually of the same length.
-   */
-  private void readNextPacket() throws IOException {
-    /* This dances around buf a little bit, mainly to read 
-     * full packet with single read and to accept arbitrary size  
-     * for next packet at the same time.
-     */
-    if (buf == null) {
-      /* initialize buffer to the best guess size:
-       * 'chunksPerPacket' calculation here should match the same 
-       * calculation in DFSClient to make the guess accurate.
-       */
-      int chunkSize = bytesPerChecksum + checksumSize;
-      int chunksPerPacket = (datanode.getDnConf().writePacketSize - PacketHeader.PKT_HEADER_LEN
-                             + chunkSize - 1)/chunkSize;
-      buf = ByteBuffer.allocate(PacketHeader.PKT_HEADER_LEN +
-                                Math.max(chunksPerPacket, 1) * chunkSize);
-      buf.limit(0);
-    }
-    
-    // See if there is data left in the buffer :
-    if (bufRead > buf.limit()) {
-      buf.limit(bufRead);
-    }
-    
-    while (buf.remaining() < HdfsConstants.BYTES_IN_INTEGER) {
-      if (buf.position() > 0) {
-        shiftBufData();
-      }
-      readToBuf(-1);
-    }
-    
-    /* We mostly have the full packet or at least enough for an int
-     */
-    buf.mark();
-    int payloadLen = buf.getInt();
-    buf.reset();
-    
-    // check corrupt values for pktLen, 100MB upper limit should be ok?
-    if (payloadLen < 0 || payloadLen > (100*1024*1024)) {
-      throw new IOException("Incorrect value for packet payload : " +
-                            payloadLen);
-    }
-    
-    // Subtract BYTES_IN_INTEGER since that accounts for the payloadLen that
-    // we read above.
-    int pktSize = payloadLen + PacketHeader.PKT_HEADER_LEN
-        - HdfsConstants.BYTES_IN_INTEGER;
-    
-    if (buf.remaining() < pktSize) {
-      //we need to read more data
-      int toRead = pktSize - buf.remaining();
-      
-      // first make sure buf has enough space.        
-      int spaceLeft = buf.capacity() - buf.limit();
-      if (toRead > spaceLeft && buf.position() > 0) {
-        shiftBufData();
-        spaceLeft = buf.capacity() - buf.limit();
-      }
-      if (toRead > spaceLeft) {
-        byte oldBuf[] = buf.array();
-        int toCopy = buf.limit();
-        buf = ByteBuffer.allocate(toCopy + toRead);
-        System.arraycopy(oldBuf, 0, buf.array(), 0, toCopy);
-        buf.limit(toCopy);
-      }
-      
-      //now read:
-      while (toRead > 0) {
-        toRead -= readToBuf(toRead);
-      }
-    }
-    
-    if (buf.remaining() > pktSize) {
-      buf.limit(buf.position() + pktSize);
-    }
-    
-    if (pktSize > maxPacketReadLen) {
-      maxPacketReadLen = pktSize;
-    }
-  }
-  
+
   /** 
    * Receives and processes a packet. It can contain many chunks.
    * returns the number of data bytes that the packet has.
    */
   private int receivePacket() throws IOException {
     // read the next packet
-    readNextPacket();
+    packetReceiver.receiveNextPacket(in);
 
-    buf.mark();
-    PacketHeader header = new PacketHeader();
-    header.readFields(buf);
-    int endOfHeader = buf.position();
-    buf.reset();
+    PacketHeader header = packetReceiver.getHeader();
+    if (LOG.isDebugEnabled()){
+      LOG.debug("Receiving one packet for block " + block +
+                ": " + header);
+    }
 
     // Sanity check the header
     if (header.getOffsetInBlock() > replicaInfo.getNumBytes()) {
@@ -574,38 +432,12 @@ class BlockReceiver implements Closeable
                             header.getDataLen()); 
     }
 
-    return receivePacket(
-      header.getOffsetInBlock(),
-      header.getSeqno(),
-      header.isLastPacketInBlock(),
-      header.getDataLen(),
-      header.getSyncBlock(),
-      endOfHeader);
-  }
+    long offsetInBlock = header.getOffsetInBlock();
+    long seqno = header.getSeqno();
+    boolean lastPacketInBlock = header.isLastPacketInBlock();
+    int len = header.getDataLen();
+    boolean syncBlock = header.getSyncBlock();
 
-  /**
-   * Write the received packet to disk (data only)
-   */
-  private void writePacketToDisk(byte[] pktBuf, int startByteToDisk, 
-      int numBytesToDisk) throws IOException {
-    out.write(pktBuf, startByteToDisk, numBytesToDisk);
-  }
-  
-  /** 
-   * Receives and processes a packet. It can contain many chunks.
-   * returns the number of data bytes that the packet has.
-   */
-  private int receivePacket(long offsetInBlock, long seqno,
-      boolean lastPacketInBlock, int len, boolean syncBlock,
-      int endOfHeader) throws IOException {
-    if (LOG.isDebugEnabled()){
-      LOG.debug("Receiving one packet for block " + block +
-                " of length " + len +
-                " seqno " + seqno +
-                " offsetInBlock " + offsetInBlock +
-                " syncBlock " + syncBlock +
-                " lastPacketInBlock " + lastPacketInBlock);
-    }
     // make sure the block gets sync'ed upon close
     this.syncOnClose |= syncBlock && lastPacketInBlock;
 
@@ -625,14 +457,15 @@ class BlockReceiver implements Closeable
     //First write the packet to the mirror:
     if (mirrorOut != null && !mirrorError) {
       try {
-        mirrorOut.write(buf.array(), buf.position(), buf.remaining());
+        packetReceiver.mirrorPacketTo(mirrorOut);
         mirrorOut.flush();
       } catch (IOException e) {
         handleMirrorOutError(e);
       }
     }
     
-    buf.position(endOfHeader);        
+    ByteBuffer dataBuf = packetReceiver.getDataSlice();
+    ByteBuffer checksumBuf = packetReceiver.getChecksumSlice();
     
     if (lastPacketInBlock || len == 0) {
       if(LOG.isDebugEnabled()) {
@@ -646,18 +479,11 @@ class BlockReceiver implements Closeable
       int checksumLen = ((len + bytesPerChecksum - 1)/bytesPerChecksum)*
                                                             checksumSize;
 
-      if ( buf.remaining() != (checksumLen + len)) {
-        throw new IOException("Data remaining in packet does not match" +
-                              "sum of checksumLen and dataLen " +
-                              " size remaining: " + buf.remaining() +
-                              " data len: " + len +
-                              " checksum Len: " + checksumLen);
-      }
-      int checksumOff = buf.position();
-      int dataOff = checksumOff + checksumLen;
-      byte pktBuf[] = buf.array();
-
-      buf.position(buf.limit()); // move to the end of the data.
+      if ( checksumBuf.capacity() != checksumLen) {
+        throw new IOException("Length of checksums in packet " +
+            checksumBuf.capacity() + " does not match calculated checksum " +
+            "length " + checksumLen);
+     }
 
       /* skip verifying checksum iff this is not the last one in the 
        * pipeline and clientName is non-null. i.e. Checksum is verified
@@ -667,11 +493,11 @@ class BlockReceiver implements Closeable
        * checksum.
        */
       if (mirrorOut == null || isDatanode || needsChecksumTranslation) {
-        verifyChunks(pktBuf, dataOff, len, pktBuf, checksumOff);
+        verifyChunks(dataBuf, checksumBuf);
         if (needsChecksumTranslation) {
           // overwrite the checksums in the packet buffer with the
           // appropriate polynomial for the disk storage.
-          translateChunks(pktBuf, dataOff, len, pktBuf, checksumOff);
+          translateChunks(dataBuf, checksumBuf);
         }
       }
       
@@ -700,9 +526,13 @@ class BlockReceiver implements Closeable
             computePartialChunkCrc(onDiskLen, offsetInChecksum, bytesPerChecksum);
           }
 
-          int startByteToDisk = dataOff+(int)(onDiskLen-firstByteInBlock);
+          int startByteToDisk = (int)(onDiskLen-firstByteInBlock) 
+              + dataBuf.arrayOffset() + dataBuf.position();
+
           int numBytesToDisk = (int)(offsetInBlock-onDiskLen);
-          writePacketToDisk(pktBuf, startByteToDisk, numBytesToDisk);
+          
+          // Write data to disk.
+          out.write(dataBuf.array(), startByteToDisk, numBytesToDisk);
 
           // If this is a partial chunk, then verify that this is the only
           // chunk in the packet. Calculate new crc for this chunk.
@@ -714,7 +544,7 @@ class BlockReceiver implements Closeable
                                     " len = " + len + 
                                     " bytesPerChecksum " + bytesPerChecksum);
             }
-            partialCrc.update(pktBuf, startByteToDisk, numBytesToDisk);
+            partialCrc.update(dataBuf.array(), startByteToDisk, numBytesToDisk);
             byte[] buf = FSOutputSummer.convertToByteStream(partialCrc, checksumSize);
             lastChunkChecksum = Arrays.copyOfRange(
               buf, buf.length - checksumSize, buf.length
@@ -726,11 +556,12 @@ class BlockReceiver implements Closeable
             partialCrc = null;
           } else {
             lastChunkChecksum = Arrays.copyOfRange(
-              pktBuf, 
-              checksumOff + checksumLen - checksumSize, 
-              checksumOff + checksumLen
-            );
-            checksumOut.write(pktBuf, checksumOff, checksumLen);
+                checksumBuf.array(),
+                checksumBuf.arrayOffset() + checksumBuf.position() + checksumLen - checksumSize,
+                checksumBuf.arrayOffset() + checksumBuf.position() + checksumLen);
+            checksumOut.write(checksumBuf.array(),
+                checksumBuf.arrayOffset() + checksumBuf.position(),
+                checksumLen);
           }
           /// flush entire packet, sync unless close() will sync
           flushOrSync(syncBlock && !lastPacketInBlock);

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java?rev=1371496&r1=1371495&r2=1371496&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java Thu Aug  9 21:33:20 2012
@@ -62,40 +62,29 @@ import org.apache.hadoop.util.DataChecks
  * </pre>   
  * An empty packet is sent to mark the end of block and read completion.
  * 
- *  PACKET Contains a packet header, checksum and data. Amount of data
- *  carried is set by BUFFER_SIZE.
- *  <pre>
- *    +-----------------------------------------------------+
- *    | 4 byte packet length (excluding packet header)      |
- *    +-----------------------------------------------------+
- *    | 8 byte offset in the block | 8 byte sequence number |
- *    +-----------------------------------------------------+
- *    | 1 byte isLastPacketInBlock                          |
- *    +-----------------------------------------------------+
- *    | 4 byte Length of actual data                        |
- *    +-----------------------------------------------------+
- *    | x byte checksum data. x is defined below            |
- *    +-----------------------------------------------------+
- *    | actual data ......                                  |
- *    +-----------------------------------------------------+
- *    
- *    Data is made of Chunks. Each chunk is of length <= BYTES_PER_CHECKSUM.
- *    A checksum is calculated for each chunk.
- *    
- *    x = (length of data + BYTE_PER_CHECKSUM - 1)/BYTES_PER_CHECKSUM *
- *        CHECKSUM_SIZE
- *        
- *    CHECKSUM_SIZE depends on CHECKSUM_TYPE (usually, 4 for CRC32) 
- *    </pre>
+ * PACKET Contains a packet header, checksum and data. Amount of data
+ * carried is set by BUFFER_SIZE.
+ * <pre>
+ *   +-----------------------------------------------------+
+ *   | Variable length header. See {@link PacketHeader}    |
+ *   +-----------------------------------------------------+
+ *   | x byte checksum data. x is defined below            |
+ *   +-----------------------------------------------------+
+ *   | actual data ......                                  |
+ *   +-----------------------------------------------------+
+ * 
+ *   Data is made of Chunks. Each chunk is of length <= BYTES_PER_CHECKSUM.
+ *   A checksum is calculated for each chunk.
+ *  
+ *   x = (length of data + BYTE_PER_CHECKSUM - 1)/BYTES_PER_CHECKSUM *
+ *       CHECKSUM_SIZE
+ *  
+ *   CHECKSUM_SIZE depends on CHECKSUM_TYPE (usually, 4 for CRC32) 
+ *  </pre>
  *  
  *  The client reads data until it receives a packet with 
  *  "LastPacketInBlock" set to true or with a zero length. If there is 
- *  no checksum error, it replies to DataNode with OP_STATUS_CHECKSUM_OK: 
- *  <pre>
- *    +------------------------------+
- *    | 2 byte OP_STATUS_CHECKSUM_OK |
- *    +------------------------------+
- *  </pre>
+ *  no checksum error, it replies to DataNode with OP_STATUS_CHECKSUM_OK.
  */
 class BlockSender implements java.io.Closeable {
   static final Log LOG = DataNode.LOG;
@@ -448,8 +437,22 @@ class BlockSender implements java.io.Clo
     int packetLen = dataLen + checksumDataLen + 4;
     boolean lastDataPacket = offset + dataLen == endOffset && dataLen > 0;
 
-    writePacketHeader(pkt, dataLen, packetLen);
-
+    // The packet buffer is organized as follows:
+    // _______HHHHCCCCD?D?D?D?
+    //        ^   ^
+    //        |   \ checksumOff
+    //        \ headerOff
+    // _ padding, since the header is variable-length
+    // H = header and length prefixes
+    // C = checksums
+    // D? = data, if transferTo is false.
+    
+    int headerLen = writePacketHeader(pkt, dataLen, packetLen);
+    
+    // Per above, the header doesn't start at the beginning of the
+    // buffer
+    int headerOff = pkt.position() - headerLen;
+    
     int checksumOff = pkt.position();
     byte[] buf = pkt.array();
     
@@ -479,7 +482,8 @@ class BlockSender implements java.io.Clo
     try {
       if (transferTo) {
         SocketOutputStream sockOut = (SocketOutputStream)out;
-        sockOut.write(buf, 0, dataOff); // First write checksum
+        // First write header and checksums
+        sockOut.write(buf, headerOff, dataOff - headerOff);
         
         // no need to flush since we know out is not a buffered stream
         FileChannel fileCh = ((FileInputStream)blockIn).getChannel();
@@ -492,7 +496,7 @@ class BlockSender implements java.io.Clo
         blockInPosition += dataLen;
       } else {
         // normal transfer
-        out.write(buf, 0, dataOff + dataLen);
+        out.write(buf, headerOff, dataOff + dataLen - headerOff);
       }
     } catch (IOException e) {
       if (e instanceof SocketTimeoutException) {
@@ -625,7 +629,7 @@ class BlockSender implements java.io.Clo
     final long startTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0;
     try {
       int maxChunksPerPacket;
-      int pktSize = PacketHeader.PKT_HEADER_LEN;
+      int pktBufSize = PacketHeader.PKT_MAX_HEADER_LEN;
       boolean transferTo = transferToAllowed && !verifyChecksum
           && baseStream instanceof SocketOutputStream
           && blockIn instanceof FileInputStream;
@@ -636,15 +640,15 @@ class BlockSender implements java.io.Clo
         maxChunksPerPacket = numberOfChunks(TRANSFERTO_BUFFER_SIZE);
         
         // Smaller packet size to only hold checksum when doing transferTo
-        pktSize += checksumSize * maxChunksPerPacket;
+        pktBufSize += checksumSize * maxChunksPerPacket;
       } else {
         maxChunksPerPacket = Math.max(1,
             numberOfChunks(HdfsConstants.IO_FILE_BUFFER_SIZE));
         // Packet size includes both checksum and data
-        pktSize += (chunkSize + checksumSize) * maxChunksPerPacket;
+        pktBufSize += (chunkSize + checksumSize) * maxChunksPerPacket;
       }
 
-      ByteBuffer pktBuf = ByteBuffer.allocate(pktSize);
+      ByteBuffer pktBuf = ByteBuffer.allocate(pktBufSize);
 
       while (endOffset > offset) {
         manageOsCache();
@@ -714,14 +718,19 @@ class BlockSender implements java.io.Clo
   }
 
   /**
-   * Write packet header into {@code pkt}
+   * Write packet header into {@code pkt},
+   * return the length of the header written.
    */
-  private void writePacketHeader(ByteBuffer pkt, int dataLen, int packetLen) {
+  private int writePacketHeader(ByteBuffer pkt, int dataLen, int packetLen) {
     pkt.clear();
     // both syncBlock and syncPacket are false
     PacketHeader header = new PacketHeader(packetLen, offset, seqno,
         (dataLen == 0), dataLen, false);
+    
+    int size = header.getSerializedSize();
+    pkt.position(PacketHeader.PKT_MAX_HEADER_LEN - size);
     header.putInBuffer(pkt);
+    return size;
   }
   
   boolean didSendEntireByteRange() {