You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ra...@apache.org on 2008/03/03 22:40:23 UTC

svn commit: r633285 - in /hadoop/core/trunk: ./ src/java/org/apache/hadoop/dfs/ src/test/org/apache/hadoop/dfs/

Author: rangadi
Date: Mon Mar  3 13:40:18 2008
New Revision: 633285

URL: http://svn.apache.org/viewvc?rev=633285&view=rev
Log:
HADOOP-2758. Reduce buffer copies in DataNode when data is read from
HDFS, without negatively affecting read throughput. (rangadi)

Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/java/org/apache/hadoop/dfs/Balancer.java
    hadoop/core/trunk/src/java/org/apache/hadoop/dfs/BlockCrcUpgrade.java
    hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java
    hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DataNode.java
    hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java
    hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestBlockReplacement.java
    hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestDataTransferProtocol.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=633285&r1=633284&r2=633285&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Mon Mar  3 13:40:18 2008
@@ -65,6 +65,9 @@
     repetitive calls to get the current time and late checking to see if
     we want speculation on at all. (omalley)
 
+    HADOOP-2758. Reduce buffer copies in DataNode when data is read from
+    HDFS, without negatively affecting read throughput. (rangadi)
+
   BUG FIXES
 
     HADOOP-2195. '-mkdir' behaviour is now closer to Linux shell in case of

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/Balancer.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/Balancer.java?rev=633285&r1=633284&r2=633285&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/Balancer.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/Balancer.java Mon Mar  3 13:40:18 2008
@@ -341,7 +341,7 @@
     
     /* Send a block copy request to the outputstream*/
     private void sendRequest(DataOutputStream out) throws IOException {
-      out.writeShort(FSConstants.DATA_TRANFER_VERSION);
+      out.writeShort(FSConstants.DATA_TRANSFER_VERSION);
       out.writeByte(FSConstants.OP_COPY_BLOCK);
       out.writeLong(block.getBlock().getBlockId());
       Text.writeString(out, source.getStorageID());

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/BlockCrcUpgrade.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/BlockCrcUpgrade.java?rev=633285&r1=633284&r2=633285&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/BlockCrcUpgrade.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/BlockCrcUpgrade.java Mon Mar  3 13:40:18 2008
@@ -853,7 +853,7 @@
         DataInputStream in = new DataInputStream(dnSock.getInputStream());
 
         // Write the header:
-        out.writeShort( DataNode.DATA_TRANFER_VERSION );
+        out.writeShort( DataNode.DATA_TRANSFER_VERSION );
         out.writeByte( DataNode.OP_READ_METADATA );
         out.writeLong( blockInfo.block.getBlockId() );
 

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java?rev=633285&r1=633284&r2=633285&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java Mon Mar  3 13:40:18 2008
@@ -638,6 +638,7 @@
     private DataChecksum checksum;
     private long lastChunkOffset = -1;
     private long lastChunkLen = -1;
+    private long lastSeqNo = -1;
 
     private long startOffset;
     private long firstChunkOffset;
@@ -646,6 +647,9 @@
     private boolean gotEOS = false;
     
     byte[] skipBuf = null;
+    ByteBuffer checksumBytes = null;
+    int dataLeft = 0;
+    boolean isLastPacket = false;
     
     /* FSInputChecker interface */
     
@@ -722,6 +726,22 @@
                                  "since seek is not required");
     }
     
+    /**
+     * Makes sure that checksumBytes has enough capacity 
+     * and limit is set to the number of checksum bytes needed 
+     * to be read.
+     */
+    private void adjustChecksumBytes(int dataLen) {
+      int requiredSize = 
+        ((dataLen + bytesPerChecksum - 1)/bytesPerChecksum)*checksumSize;
+      if (checksumBytes == null || requiredSize > checksumBytes.capacity()) {
+        checksumBytes =  ByteBuffer.wrap(new byte[requiredSize]);
+      } else {
+        checksumBytes.clear();
+      }
+      checksumBytes.limit(requiredSize);
+    }
+    
     @Override
     protected synchronized int readChunk(long pos, byte[] buf, int offset, 
                                          int len, byte[] checksumBuf) 
@@ -748,42 +768,60 @@
                               firstChunkOffset + " != " + chunkOffset);
       }
 
-      // The chunk is transmitted as one packet. Read packet headers.
-      int packetLen = in.readInt();
-      long offsetInBlock = in.readLong();
-      long seqno = in.readLong();
-      boolean lastPacketInBlock = in.readBoolean();
-      LOG.debug("DFSClient readChunk got seqno " + seqno +
-                " offsetInBlock " + offsetInBlock +
-                " lastPacketInBlock " + lastPacketInBlock +
-                " packetLen " + packetLen);
-
-      int chunkLen = in.readInt();
-      
-      // Sanity check the lengths
-      if ( chunkLen < 0 || chunkLen > bytesPerChecksum ||
-          ( lastChunkLen >= 0 && // prev packet exists
-              ( (chunkLen > 0 && lastChunkLen != bytesPerChecksum) ||
-                  chunkOffset != (lastChunkOffset + lastChunkLen) ) ) ) {
-        throw new IOException("BlockReader: error in chunk's offset " +
-                              "or length (" + chunkOffset + ":" +
-                              chunkLen + ")");
+      // Read next packet if the previous packet has been read completely.
+      if (dataLeft <= 0) {
+        //Read packet headers.
+        int packetLen = in.readInt();
+        long offsetInBlock = in.readLong();
+        long seqno = in.readLong();
+        boolean lastPacketInBlock = in.readBoolean();
+      
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("DFSClient readChunk got seqno " + seqno +
+                    " offsetInBlock " + offsetInBlock +
+                    " lastPacketInBlock " + lastPacketInBlock +
+                    " packetLen " + packetLen);
+        }
+        
+        int dataLen = in.readInt();
+      
+        // Sanity check the lengths
+        if ( dataLen < 0 || 
+             ( (dataLen % bytesPerChecksum) != 0 && !lastPacketInBlock ) ||
+             (seqno != (lastSeqNo + 1)) ) {
+             throw new IOException("BlockReader: error in packet header" +
+                                   "(chunkOffset : " + chunkOffset + 
+                                   ", dataLen : " + dataLen +
+                                   ", seqno : " + seqno + 
+                                   " (last: " + lastSeqNo + "))");
+        }
+        
+        lastSeqNo = seqno;
+        isLastPacket = lastPacketInBlock;
+        dataLeft = dataLen;
+        adjustChecksumBytes(dataLen);
+        if (dataLen > 0) {
+          IOUtils.readFully(in, checksumBytes.array(), 0,
+                            checksumBytes.limit());
+        }
       }
 
+      int chunkLen = Math.min(dataLeft, bytesPerChecksum);
+      
       if ( chunkLen > 0 ) {
         // len should be >= chunkLen
         IOUtils.readFully(in, buf, offset, chunkLen);
+        checksumBytes.get(checksumBuf, 0, checksumSize);
       }
       
-      if ( checksumSize > 0 ) {
-        IOUtils.readFully(in, checksumBuf, 0, checksumSize);
-      }
-
+      dataLeft -= chunkLen;
       lastChunkOffset = chunkOffset;
       lastChunkLen = chunkLen;
       
-      if ( chunkLen == 0 ) {
+      if ((dataLeft == 0 && isLastPacket) || chunkLen == 0) {
         gotEOS = true;
+      }
+      if ( chunkLen == 0 ) {
         return -1;
       }
       
@@ -827,7 +865,7 @@
                        new BufferedOutputStream(sock.getOutputStream()));
 
       //write the header.
-      out.writeShort( DATA_TRANFER_VERSION );
+      out.writeShort( DATA_TRANSFER_VERSION );
       out.write( OP_READ_BLOCK );
       out.writeLong( blockId );
       out.writeLong( startOffset );
@@ -2030,7 +2068,7 @@
         DataOutputStream out = new DataOutputStream(new BufferedOutputStream(s.getOutputStream(), buffersize));
         blockReplyStream = new DataInputStream(s.getInputStream());
 
-        out.writeShort( DATA_TRANFER_VERSION );
+        out.writeShort( DATA_TRANSFER_VERSION );
         out.write( OP_WRITE_BLOCK );
         out.writeLong( block.getBlockId() );
         out.writeInt( nodes.length );
@@ -2147,8 +2185,8 @@
         }
 
         currentPacket.writeInt(len);
-        currentPacket.write(b, offset, len);
         currentPacket.write(checksum, 0, cklen);
+        currentPacket.write(b, offset, len);
         currentPacket.numChunks++;
         bytesCurBlock += len;
 

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DataNode.java?rev=633285&r1=633284&r2=633285&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DataNode.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DataNode.java Mon Mar  3 13:40:18 2008
@@ -37,6 +37,7 @@
 
 import java.io.*;
 import java.net.*;
+import java.nio.ByteBuffer;
 import java.util.*;
 import java.util.concurrent.Semaphore;
 import java.security.NoSuchAlgorithmException;
@@ -450,16 +451,6 @@
     }
   }
 
-  private void enumerateThreadGroup(ThreadGroup tg) {
-    int count = tg.activeCount();
-    Thread[] info = new Thread[count];
-    int num = tg.enumerate(info);
-    for (int i = 0; i < num; i++) {
-      System.out.print(info[i].getName() + " ");
-    }
-    System.out.println("");
-  }
-
   /**
    * Shut down this instance of the datanode.
    * Returns only after shutdown is complete.
@@ -937,7 +928,7 @@
         in = new DataInputStream(
             new BufferedInputStream(s.getInputStream(), BUFFER_SIZE));
         short version = in.readShort();
-        if ( version != DATA_TRANFER_VERSION ) {
+        if ( version != DATA_TRANSFER_VERSION ) {
           throw new IOException( "Version Mismatch" );
         }
         boolean local = s.getInetAddress().equals(s.getLocalAddress());
@@ -1003,7 +994,7 @@
 
       // send the block
       DataOutputStream out = new DataOutputStream(
-          new BufferedOutputStream(s.getOutputStream(), BUFFER_SIZE));
+            new BufferedOutputStream(s.getOutputStream(), SMALL_BUFFER_SIZE));
       BlockSender blockSender = null;
       try {
         try {
@@ -1116,7 +1107,7 @@
             mirrorIn = new DataInputStream(mirrorSock.getInputStream());
 
             // Write header: Copied from DFSClient.java!
-            mirrorOut.writeShort( DATA_TRANFER_VERSION );
+            mirrorOut.writeShort( DATA_TRANSFER_VERSION );
             mirrorOut.write( OP_WRITE_BLOCK );
             mirrorOut.writeLong( block.getBlockId() );
             mirrorOut.writeInt( pipelineSize );
@@ -1269,11 +1260,11 @@
         targetSock.setSoTimeout(socketTimeout);
 
         targetOut = new DataOutputStream(new BufferedOutputStream(
-            targetSock.getOutputStream(), BUFFER_SIZE));
+            targetSock.getOutputStream(), SMALL_BUFFER_SIZE));
 
         /* send request to the target */
         // fist write header info
-        targetOut.writeShort(DATA_TRANFER_VERSION); // transfer version
+        targetOut.writeShort(DATA_TRANSFER_VERSION); // transfer version
         targetOut.writeByte(OP_REPLACE_BLOCK); // op code
         targetOut.writeLong(block.getBlockId()); // block id
         Text.writeString( targetOut, source); // del hint
@@ -1445,15 +1436,94 @@
     }
   }
 
+  /* ********************************************************************
+  Protocol when a client reads data from Datanode (Cur Ver: 9):
+  
+  Client's Request :
+  =================
+   
+     Processed in DataXceiver:
+     +----------------------------------------------+
+     | Common Header   | 1 byte OP == OP_READ_BLOCK |
+     +----------------------------------------------+
+     
+     Processed in readBlock() :
+     +-------------------------------------------------------+
+     | 8 byte Block ID | 8 byte start offset | 8 byte length |
+     +-------------------------------------------------------+
+     
+     Client sends optional response only at the end of receiving data.
+       
+  DataNode Response :
+  ===================
+   
+    In readBlock() :
+    If there is an error while initializing BlockSender :
+       +---------------------------+
+       | 2 byte OP_STATUS_ERROR    | and connection will be closed.
+       +---------------------------+
+    Otherwise
+       +---------------------------+
+       | 2 byte OP_STATUS_SUCCESS  |
+       +---------------------------+
+       
+    Actual data, sent by BlockSender.sendBlock() :
+    
+      ChecksumHeader :
+      +--------------------------------------------------+
+      | 1 byte CHECKSUM_TYPE | 4 byte BYTES_PER_CHECKSUM |
+      +--------------------------------------------------+
+      Followed by actual data in the form of PACKETS: 
+      +------------------------------------+
+      | Sequence of data PACKETs ....      |
+      +------------------------------------+
+    
+    A "PACKET" is defined further below.
+    
+    The client reads data until it receives a packet with 
+    "LastPacketInBlock" set to true or with a zero length. If there is 
+    no checksum error, it replies to DataNode with OP_STATUS_CHECKSUM_OK:
+    
+    Client optional response at the end of data transmission :
+      +------------------------------+
+      | 2 byte OP_STATUS_CHECKSUM_OK |
+      +------------------------------+
+    
+    PACKET : Contains a packet header, checksum and data. Amount of data
+    ======== carried is set by BUFFER_SIZE.
+    
+      +-----------------------------------------------------+
+      | 4 byte packet length (excluding packet header)      |
+      +-----------------------------------------------------+
+      | 8 byte offset in the block | 8 byte sequence number |
+      +-----------------------------------------------------+
+      | 1 byte isLastPacketInBlock                          |
+      +-----------------------------------------------------+
+      | 4 byte Length of actual data                        |
+      +-----------------------------------------------------+
+      | x byte checksum data. x is defined below            |
+      +-----------------------------------------------------+
+      | actual data ......                                  |
+      +-----------------------------------------------------+
+      
+      x = (length of data + BYTE_PER_CHECKSUM - 1)/BYTES_PER_CHECKSUM *
+          CHECKSUM_SIZE
+          
+      CHECKSUM_SIZE depends on CHECKSUM_TYPE (usually, 4 for CRC32)
+      
+      The above packet format is used while writing data to DFS also.
+      Not all the fields might be used while reading.
+    
+   ************************************************************************ */
+  
   class BlockSender implements java.io.Closeable {
     private Block block; // the block to read from
-    private DataInputStream blockIn; // data strean
+    private InputStream blockIn; // data stream
     private DataInputStream checksumIn; // checksum datastream
     private DataChecksum checksum; // checksum stream
     private long offset; // starting position to read
     private long endOffset; // ending position
     private long blockLength;
-    private byte buf[]; // buffer to store data read from the block file & crc
     private int bytesPerChecksum; // chunk size
     private int checksumSize; // checksum size
     private boolean corruptChecksumOk; // if need to verify checksum
@@ -1463,8 +1533,14 @@
     private boolean blockReadFully; //set when the whole block is read
     private boolean verifyChecksum; //if true, check is verified while reading
     private Throttler throttler;
-    private DataOutputStream out;
-
+    private OutputStream out;
+    
+    static final int PKT_HEADER_LEN = ( 4 + /* PacketLen */
+                                        8 + /* offset in block */
+                                        8 + /* seqno */
+                                        1 + /* isLastPacketInBlock */
+                                        4   /* data len */ );
+       
     BlockSender(Block block, long startOffset, long length,
                 boolean corruptChecksumOk, boolean chunkOffsetOK,
                 boolean verifyChecksum) throws IOException {
@@ -1511,7 +1587,7 @@
           throw new IOException(msg);
         }
 
-        buf = new byte[bytesPerChecksum + checksumSize];
+        
         offset = (startOffset - (startOffset % bytesPerChecksum));
         if (length >= 0) {
           // Make sure endOffset points to end of a checksumed chunk.
@@ -1535,8 +1611,7 @@
         }
         seqno = 0;
 
-        InputStream blockInStream = data.getBlockInputStream(block, offset); // seek to offset
-        blockIn = new DataInputStream(new BufferedInputStream(blockInStream, BUFFER_SIZE));
+        blockIn = data.getBlockInputStream(block, offset); // seek to offset
       } catch (IOException ioe) {
         IOUtils.closeStream(this);
         IOUtils.closeStream(blockIn);
@@ -1571,26 +1646,37 @@
       }
     }
 
-    
-    private int sendChunk()
-        throws IOException {
-      int len = (int) Math.min(endOffset - offset, bytesPerChecksum);
+    /**
+     * Sends upto maxChunks chunks of data.
+     */
+    private int sendChunks(ByteBuffer pkt, int maxChunks) throws IOException {
+      // Sends multiple chunks in one packet with a single write().
+
+      int len = Math.min((int) (endOffset - offset),
+                         bytesPerChecksum*maxChunks);
       if (len == 0) {
         return 0;
       }
-      blockIn.readFully(buf, 0, len);
 
+      int numChunks = (len + bytesPerChecksum - 1)/bytesPerChecksum;
+      int packetLen = len + numChunks*checksumSize + 4;
+      pkt.clear();
+      
+      // write packet header
+      pkt.putInt(packetLen);
+      pkt.putLong(offset);
+      pkt.putLong(seqno);
+      pkt.put((byte)((offset + len >= endOffset) ? 1 : 0));
+                 //why no ByteBuf.putBoolean()?
+      pkt.putInt(len);
+      
+      int checksumOff = pkt.position();
+      int checksumLen = numChunks * checksumSize;
+      byte[] buf = pkt.array();
+      
       if (checksumSize > 0 && checksumIn != null) {
         try {
-          checksumIn.readFully(buf, len, checksumSize);
-          
-          if (verifyChecksum) {
-            checksum.reset();
-            checksum.update(buf, 0, len);
-            if (!checksum.compare(buf, len)) {
-              throw new ChecksumException("Checksum failed at " + offset, len);
-            }
-          }
+          checksumIn.readFully(buf, checksumOff, checksumLen);
         } catch (IOException e) {
           LOG.warn(" Could not read or failed to veirfy checksum for data" +
                    " at offset " + offset + " for block " + block + " got : "
@@ -1599,28 +1685,39 @@
           checksumIn = null;
           if (corruptChecksumOk) {
             // Just fill the array with zeros.
-            Arrays.fill(buf, len, len + checksumSize, (byte) 0);
+            Arrays.fill(buf, checksumOff, checksumLen, (byte) 0);
           } else {
             throw e;
           }
         }
       }
-      boolean lastPacketInBlock = false;
-      if (offset + len >= endOffset) {
-        lastPacketInBlock = true;
+      
+      int dataOff = checksumOff + checksumLen;
+      IOUtils.readFully(blockIn, buf, dataOff, len);
+      
+      if (verifyChecksum) {
+        int dOff = dataOff;
+        int cOff = checksumOff;
+        int dLeft = len;
+        
+        for (int i=0; i<numChunks; i++) {
+          checksum.reset();
+          int dLen = Math.min(dLeft, bytesPerChecksum);
+          checksum.update(buf, dOff, dLen);
+          if (!checksum.compare(buf, cOff)) {
+            throw new ChecksumException("Checksum failed at " + 
+                                        (offset + len - dLeft), len);
+          }
+          dLeft -= dLen;
+          dOff += dLen;
+          cOff += checksumSize;
+        }
       }
 
-      // write packet header
-      out.writeInt(len + checksumSize + 4);
-      out.writeLong(offset);
-      out.writeLong(seqno);
-      out.writeBoolean(lastPacketInBlock);
-      
-      out.writeInt(len);
-      out.write(buf, 0, len + checksumSize);
+      out.write(buf, 0, dataOff + len);
 
       if (throttler != null) { // rebalancing so throttle
-        throttler.throttle(len + checksumSize + 4);
+        throttler.throttle(packetLen);
       }
 
       return len;
@@ -1648,15 +1745,21 @@
         if ( chunkOffsetOK ) {
           out.writeLong( offset );
         }
+        //set up sendBuf:
+        int maxChunksPerPacket = Math.max(1,
+                      (BUFFER_SIZE + bytesPerChecksum - 1)/bytesPerChecksum);
+        ByteBuffer pktBuf = ByteBuffer.allocate(PKT_HEADER_LEN + 
+                      (bytesPerChecksum + checksumSize) * maxChunksPerPacket);
+
 
         while (endOffset > offset) {
-          // Write one data chunk per loop.
-          long len = sendChunk();
+          long len = sendChunks(pktBuf, maxChunksPerPacket);
           offset += len;
-          totalRead += len + checksumSize;
+          totalRead += len + ((len + bytesPerChecksum - 1)/bytesPerChecksum*
+                              checksumSize);
           seqno++;
         }
-        out.writeInt(0); // mark the end of block
+        out.writeInt(0); // mark the end of block        
         out.flush();
       } finally {
         close();
@@ -1965,6 +2068,7 @@
     private int bytesPerChecksum;
     private int checksumSize;
     private byte buf[];
+    private byte checksumBuf[];
     private long offsetInBlock;
     final private String inAddr;
     private String mirrorAddr;
@@ -1995,6 +2099,7 @@
         this.bytesPerChecksum = checksum.getBytesPerChecksum();
         this.checksumSize = checksum.getChecksumSize();
         this.buf = new byte[bytesPerChecksum + checksumSize];
+        this.checksumBuf = new byte[checksumSize];
         //
         // Open local disk out
         //
@@ -2055,7 +2160,8 @@
     }
 
     /* receive a chunk: write it to disk & mirror it to another stream */
-    private void receiveChunk( int len ) throws IOException {
+    private void receiveChunk( int len, byte[] checksumBuf, int checksumOff ) 
+                              throws IOException {
       if (len <= 0 || len > bytesPerChecksum) {
         throw new IOException("Got wrong length during writeBlock(" + block
             + ") from " + inAddr + " at offset " + offsetInBlock + ": " + len
@@ -2071,7 +2177,7 @@
       lastLen = curLen;
       curLen = len;
 
-      in.readFully(buf, 0, len + checksumSize);
+      in.readFully(buf, 0, len);
 
       /*
        * Verification is not included in the initial design. For now, it at
@@ -2080,7 +2186,7 @@
        */
       checksum.update(buf, 0, len);
 
-      if (!checksum.compare(buf, len)) {
+      if (!checksum.compare(checksumBuf, checksumOff)) {
         throw new IOException("Unexpected checksum mismatch "
             + "while writing " + block + " from " + inAddr);
       }
@@ -2097,7 +2203,8 @@
       if (mirrorOut != null) {
         try {
           mirrorOut.writeInt(len);
-          mirrorOut.write(buf, 0, len + checksumSize);
+          mirrorOut.write(checksumBuf, checksumOff, checksumSize);
+          mirrorOut.write(buf, 0, len);
         } catch (IOException ioe) {
           LOG.info(dnRegistration + ":Exception writing block " +
                    block + " to mirror " + mirrorAddr + "\n" +
@@ -2123,7 +2230,7 @@
         if (!finalized) {
           out.write(buf, 0, len);
           // Write checksum
-          checksumOut.write(buf, len, checksumSize);
+          checksumOut.write(checksumBuf, checksumOff, checksumSize);
           myMetrics.bytesWritten.inc(len);
         }
       } catch (IOException iex) {
@@ -2145,7 +2252,15 @@
      * Receive and process a packet. It contains many chunks.
      */
     private void receivePacket(int packetSize) throws IOException {
-
+      /* TEMP: Currently this handles both interleaved 
+       * and non-interleaved DATA_CHUNKs in side the packet.
+       * non-interleaved is required for HADOOP-2758 and in future.
+       * iterleaved will be removed once extra buffer copies are removed
+       * in write path (HADOOP-1702).
+       * 
+       * Format of Non-interleaved data packets is described in the 
+       * comment before BlockSender.
+       */
       offsetInBlock = in.readLong(); // get offset of packet in block
       long seqno = in.readLong();    // get seqno
       boolean lastPacketInBlock = in.readBoolean();
@@ -2157,9 +2272,6 @@
                 " lastPacketInBlock " + lastPacketInBlock);
       setBlockPosition(offsetInBlock);
 
-      int len = in.readInt();
-      curPacketSize += 4;            // read an integer in previous line
-
       // send packet header to next datanode in pipeline
       if (mirrorOut != null) {
         try {
@@ -2189,6 +2301,9 @@
         }
       }
 
+      int len = in.readInt();
+      curPacketSize += 4;            // read an integer in previous line
+
       if (len == 0) {
         LOG.info("Receiving empty packet for block " + block);
         if (mirrorOut != null) {
@@ -2198,15 +2313,37 @@
       }
 
       while (len != 0) {
-        LOG.debug("Receiving one chunk for block " + block +
-                  " of size " + len);
-        receiveChunk( len );
-        curPacketSize += (len + checksumSize);
-        if (curPacketSize > packetSize) {
-          throw new IOException("Packet size for block " + block +
-                                " too long " + curPacketSize +
-                                " was expecting " + packetSize);
-        } 
+        int checksumOff = 0;    
+        if (len > 0) {
+          int checksumLen = (len + bytesPerChecksum - 1)/bytesPerChecksum*
+                            checksumSize;
+          if (checksumBuf.length < checksumLen) {
+            checksumBuf = new byte[checksumLen];
+          }
+          // read the checksum
+          in.readFully(checksumBuf, 0, checksumLen);
+        }
+        
+        while (len != 0) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Receiving one chunk for block " + block +
+                      " of size " + len);
+          }
+          
+          int toRecv = Math.min(len, bytesPerChecksum);
+          
+          receiveChunk(toRecv, checksumBuf, checksumOff);
+          
+          len -= toRecv;
+          checksumOff += checksumSize;       
+          curPacketSize += (toRecv + checksumSize);
+          if (curPacketSize > packetSize) {
+            throw new IOException("Packet size for block " + block +
+                                  " too long " + curPacketSize +
+                                  " was expecting " + packetSize);
+          } 
+        }
+        
         if (curPacketSize == packetSize) {
           if (mirrorOut != null) {
             mirrorOut.flush();
@@ -2388,13 +2525,14 @@
         sock.setSoTimeout(targets.length * socketTimeout);
 
         out = new DataOutputStream(new BufferedOutputStream(
-            sock.getOutputStream(), BUFFER_SIZE));
+                       sock.getOutputStream(), SMALL_BUFFER_SIZE));
+
         blockSender = new BlockSender(b, 0, -1, false, false, false);
 
         //
         // Header info
         //
-        out.writeShort(DATA_TRANFER_VERSION);
+        out.writeShort(DATA_TRANSFER_VERSION);
         out.writeByte(OP_WRITE_BLOCK);
         out.writeLong(b.getBlockId());
         out.writeInt(0);           // no pipelining

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java?rev=633285&r1=633284&r2=633285&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java Mon Mar  3 13:40:18 2008
@@ -100,21 +100,13 @@
    * This should change when serialization of DatanodeInfo, not just
    * when protocol changes. It is not very obvious. 
    */
-  /* Version 7: 
-   * Add two operations to data node
-   * OP_COPY_BLOCK: 
-   *   The command is for sending to a proxy source for the balancing purpose
-   *   The datanode then sends OP_REPLACE_BLOCK request to the destination
-   *   OP_COPY_BLOCK BlockID(long) SourceID (UTF8) Destination (DatanodeInfo)
-   *   return OP_STATUS_ERROR if any error occurs; OP_STATUS_SUCCESS otherwise
-   * OP_REPLACE_BLOCK: 
-   *   the command is for sending to a destination for the balancing purpose
-   *   The datanode then writes the block to disk and notifies namenode of this
-   *   received block together with a deletion hint: sourceID
-   *   OP_REPLACE_BLOCK BlockID(long) SourceID(UTF8) Block_Data_With_Crc
-   *   return OP_STATUS_ERROR if any error occurs; OP_STATUS_SUCCESS otherwise
+  /*
+   * Version 9:
+   *   While reading data from Datanode, each PACKET can consist
+   *   of non-interleaved data (check for for larger amount of data,
+   *   followed by data).
    */
-  public static final int DATA_TRANFER_VERSION = 8;
+  public static final int DATA_TRANSFER_VERSION = 9;
 
   // Return codes for file create
   public static final int OPERATION_FAILED = 0;
@@ -140,6 +132,8 @@
   public static int MAX_PATH_DEPTH = 1000;
     
   public static final int BUFFER_SIZE = new Configuration().getInt("io.file.buffer.size", 4096);
+  //Used for writing header etc.
+  static final int SMALL_BUFFER_SIZE = Math.min(BUFFER_SIZE/2, 512);
   //TODO mb@media-style.com: should be conf injected?
   public static final long DEFAULT_BLOCK_SIZE = 64 * 1024 * 1024;
   public static final int DEFAULT_DATA_SOCKET_SIZE = 128 * 1024;

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestBlockReplacement.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestBlockReplacement.java?rev=633285&r1=633284&r2=633285&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestBlockReplacement.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestBlockReplacement.java Mon Mar  3 13:40:18 2008
@@ -215,7 +215,7 @@
     sock.setSoTimeout(FSConstants.READ_TIMEOUT);
     // sendRequest
     DataOutputStream out = new DataOutputStream(sock.getOutputStream());
-    out.writeShort(FSConstants.DATA_TRANFER_VERSION);
+    out.writeShort(FSConstants.DATA_TRANSFER_VERSION);
     out.writeByte(FSConstants.OP_COPY_BLOCK);
     out.writeLong(block.getBlockId());
     Text.writeString(out, source.getStorageID());

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestDataTransferProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestDataTransferProtocol.java?rev=633285&r1=633284&r2=633285&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestDataTransferProtocol.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestDataTransferProtocol.java Mon Mar  3 13:40:18 2008
@@ -152,19 +152,19 @@
     sendBuf.reset();
     
     // bad version
-    recvOut.writeShort((short)(FSConstants.DATA_TRANFER_VERSION-1));
-    sendOut.writeShort((short)(FSConstants.DATA_TRANFER_VERSION-1));
+    recvOut.writeShort((short)(FSConstants.DATA_TRANSFER_VERSION-1));
+    sendOut.writeShort((short)(FSConstants.DATA_TRANSFER_VERSION-1));
     sendRecvData("Wrong Version", true);
 
     // bad ops
     sendBuf.reset();
-    sendOut.writeShort((short)FSConstants.DATA_TRANFER_VERSION);
+    sendOut.writeShort((short)FSConstants.DATA_TRANSFER_VERSION);
     sendOut.writeByte((byte)(FSConstants.OP_WRITE_BLOCK-1));
     sendRecvData("Wrong Op Code", true);
     
     /* Test OP_WRITE_BLOCK */
     sendBuf.reset();
-    sendOut.writeShort((short)FSConstants.DATA_TRANFER_VERSION);
+    sendOut.writeShort((short)FSConstants.DATA_TRANSFER_VERSION);
     sendOut.writeByte((byte)FSConstants.OP_WRITE_BLOCK);
     sendOut.writeLong(newBlockId); // block id
     sendOut.writeInt(0);           // targets in pipeline 
@@ -181,7 +181,7 @@
 
     sendBuf.reset();
     recvBuf.reset();
-    sendOut.writeShort((short)FSConstants.DATA_TRANFER_VERSION);
+    sendOut.writeShort((short)FSConstants.DATA_TRANSFER_VERSION);
     sendOut.writeByte((byte)FSConstants.OP_WRITE_BLOCK);
     sendOut.writeLong(newBlockId);
     sendOut.writeInt(0);           // targets in pipeline 
@@ -195,7 +195,7 @@
 
     sendBuf.reset();
     recvBuf.reset();
-    sendOut.writeShort((short)FSConstants.DATA_TRANFER_VERSION);
+    sendOut.writeShort((short)FSConstants.DATA_TRANSFER_VERSION);
     sendOut.writeByte((byte)FSConstants.OP_WRITE_BLOCK);
     sendOut.writeLong(++newBlockId);
     sendOut.writeInt(0);           // targets in pipeline 
@@ -220,7 +220,7 @@
     // test for writing a valid zero size block
     sendBuf.reset();
     recvBuf.reset();
-    sendOut.writeShort((short)FSConstants.DATA_TRANFER_VERSION);
+    sendOut.writeShort((short)FSConstants.DATA_TRANSFER_VERSION);
     sendOut.writeByte((byte)FSConstants.OP_WRITE_BLOCK);
     sendOut.writeLong(++newBlockId);
     sendOut.writeInt(0);           // targets in pipeline 
@@ -247,7 +247,7 @@
     // bad block id
     sendBuf.reset();
     recvBuf.reset();
-    sendOut.writeShort((short)FSConstants.DATA_TRANFER_VERSION);
+    sendOut.writeShort((short)FSConstants.DATA_TRANSFER_VERSION);
     sendOut.writeByte((byte)FSConstants.OP_READ_BLOCK);
     newBlockId = firstBlock.getBlockId()-1;
     sendOut.writeLong(newBlockId);
@@ -258,7 +258,7 @@
 
     // negative block start offset
     sendBuf.reset();
-    sendOut.writeShort((short)FSConstants.DATA_TRANFER_VERSION);
+    sendOut.writeShort((short)FSConstants.DATA_TRANSFER_VERSION);
     sendOut.writeByte((byte)FSConstants.OP_READ_BLOCK);
     sendOut.writeLong(firstBlock.getBlockId());
     sendOut.writeLong(-1L);
@@ -268,7 +268,7 @@
 
     // bad block start offset
     sendBuf.reset();
-    sendOut.writeShort((short)FSConstants.DATA_TRANFER_VERSION);
+    sendOut.writeShort((short)FSConstants.DATA_TRANSFER_VERSION);
     sendOut.writeByte((byte)FSConstants.OP_READ_BLOCK);
     sendOut.writeLong(firstBlock.getBlockId());
     sendOut.writeLong(fileLen);
@@ -280,7 +280,7 @@
     recvBuf.reset();
     recvOut.writeShort((short)FSConstants.OP_STATUS_SUCCESS);    
     sendBuf.reset();
-    sendOut.writeShort((short)FSConstants.DATA_TRANFER_VERSION);
+    sendOut.writeShort((short)FSConstants.DATA_TRANSFER_VERSION);
     sendOut.writeByte((byte)FSConstants.OP_READ_BLOCK);
     sendOut.writeLong(firstBlock.getBlockId());
     sendOut.writeLong(0);
@@ -292,7 +292,7 @@
     recvBuf.reset();
     recvOut.writeShort((short)FSConstants.OP_STATUS_ERROR);    
     sendBuf.reset();
-    sendOut.writeShort((short)FSConstants.DATA_TRANFER_VERSION);
+    sendOut.writeShort((short)FSConstants.DATA_TRANSFER_VERSION);
     sendOut.writeByte((byte)FSConstants.OP_READ_BLOCK);
     sendOut.writeLong(firstBlock.getBlockId());
     sendOut.writeLong(0);
@@ -302,7 +302,7 @@
     
     //At the end of all this, read the file to make sure that succeeds finally.
     sendBuf.reset();
-    sendOut.writeShort((short)FSConstants.DATA_TRANFER_VERSION);
+    sendOut.writeShort((short)FSConstants.DATA_TRANSFER_VERSION);
     sendOut.writeByte((byte)FSConstants.OP_READ_BLOCK);
     sendOut.writeLong(firstBlock.getBlockId());
     sendOut.writeLong(0);