You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by jg...@apache.org on 2010/09/07 19:52:25 UTC

svn commit: r993448 - in /hadoop/hdfs/trunk: ./ src/java/org/apache/hadoop/hdfs/ src/java/org/apache/hadoop/hdfs/protocol/ src/java/org/apache/hadoop/hdfs/server/datanode/ src/test/hdfs/org/apache/hadoop/hdfs/

Author: jghoman
Date: Tue Sep  7 17:52:25 2010
New Revision: 993448

URL: http://svn.apache.org/viewvc?rev=993448&view=rev
Log:
HDFS-881. Refactor DataNode Packet header into DataTransferProtocol. Contributed by Todd Lipcon.

Modified:
    hadoop/hdfs/trunk/CHANGES.txt
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/BlockReader.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSOutputStream.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
    hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDataTransferProtocol.java

Modified: hadoop/hdfs/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/CHANGES.txt?rev=993448&r1=993447&r2=993448&view=diff
==============================================================================
--- hadoop/hdfs/trunk/CHANGES.txt (original)
+++ hadoop/hdfs/trunk/CHANGES.txt Tue Sep  7 17:52:25 2010
@@ -108,6 +108,9 @@ Trunk (unreleased changes)
     HDFS-330.  Datanode Web UIs should provide robots.txt.
     (Allen Wittenauer via jghoman)
 
+    HDFS-881.  Refactor DataNode Packet header into DataTransferProtocol.
+    (Todd Lipcon via jghoman)
+
     HDFS-1036. docs for fetchdt
 
     HDFS-1318. Add JMX interface for read access to namenode and datanode
@@ -237,7 +240,7 @@ Trunk (unreleased changes)
     HDFS-1355. ant veryclean (clean-cache) doesn't clean enough. 
     (Luke Lu via jghoman)
 
-    HDFS-1353.  Remove most of getBlockLocation optimization. (jghoman)
+    HDFS-1353. Remove most of getBlockLocation optimization. (jghoman)
 
     HDFS-1369. Invalid javadoc reference in FSDatasetMBean.java (Eli Collins)
 

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/BlockReader.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/BlockReader.java?rev=993448&r1=993447&r2=993448&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/BlockReader.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/BlockReader.java Tue Sep  7 17:52:25 2010
@@ -34,6 +34,7 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.fs.FSInputChecker;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
+import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.PacketHeader;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants;
@@ -211,35 +212,23 @@ public class BlockReader extends FSInput
     // Read next packet if the previous packet has been read completely.
     if (dataLeft <= 0) {
       //Read packet headers.
-      int packetLen = in.readInt();
-      long offsetInBlock = in.readLong();
-      long seqno = in.readLong();
-      boolean lastPacketInBlock = in.readBoolean();
-    
+      PacketHeader header = new PacketHeader();
+      header.readFields(in);
+
       if (LOG.isDebugEnabled()) {
-        LOG.debug("DFSClient readChunk got seqno " + seqno +
-                  " offsetInBlock " + offsetInBlock +
-                  " lastPacketInBlock " + lastPacketInBlock +
-                  " packetLen " + packetLen);
+        LOG.debug("DFSClient readChunk got header " + header);
       }
-      
-      int dataLen = in.readInt();
-    
+
       // Sanity check the lengths
-      if ( ( dataLen <= 0 && !lastPacketInBlock ) ||
-           ( dataLen != 0 && lastPacketInBlock) ||
-           (seqno != (lastSeqNo + 1)) ) {
-           throw new IOException("BlockReader: error in packet header" +
-                                 "(chunkOffset : " + chunkOffset + 
-                                 ", dataLen : " + dataLen +
-                                 ", seqno : " + seqno + 
-                                 " (last: " + lastSeqNo + "))");
+      if (!header.sanityCheck(lastSeqNo)) {
+           throw new IOException("BlockReader: error in packet header " +
+                                 header);
       }
-      
-      lastSeqNo = seqno;
-      dataLeft = dataLen;
-      adjustChecksumBytes(dataLen);
-      if (dataLen > 0) {
+
+      lastSeqNo = header.getSeqno();
+      dataLeft = header.getDataLen();
+      adjustChecksumBytes(header.getDataLen());
+      if (header.getDataLen() > 0) {
         IOUtils.readFully(in, checksumBytes.array(), 0,
                           checksumBytes.limit());
       }

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSOutputStream.java?rev=993448&r1=993447&r2=993448&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSOutputStream.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSOutputStream.java Tue Sep  7 17:52:25 2010
@@ -52,6 +52,7 @@ import org.apache.hadoop.hdfs.protocol.L
 import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
 import org.apache.hadoop.hdfs.protocol.UnresolvedPathException;
 import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.BlockConstructionStage;
+import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.PacketHeader;
 import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.PipelineAck;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
@@ -128,17 +129,30 @@ class DFSOutputStream extends FSOutputSu
   private short blockReplication; // replication factor of file
   
   private class Packet {
-    ByteBuffer buffer;           // only one of buf and buffer is non-null
-    byte[]  buf;  
     long    seqno;               // sequencenumber of buffer in block
     long    offsetInBlock;       // offset in block
     boolean lastPacketInBlock;   // is this the last packet in block?
     int     numChunks;           // number of chunks currently in packet
     int     maxChunks;           // max chunks in packet
-    int     dataStart;
-    int     dataPos;
-    int     checksumStart;
-    int     checksumPos;      
+
+    /** buffer for accumulating packet checksum and data */
+    ByteBuffer buffer; // wraps buf, only one of these two may be non-null
+    byte[]  buf;
+
+    /**
+     * buf is pointed into like follows:
+     *  (C is checksum data, D is payload data)
+     *
+     * [HHHHHCCCCC________________DDDDDDDDDDDDDDDD___]
+     *       ^    ^               ^               ^
+     *       |    checksumPos     dataStart       dataPos
+     *   checksumStart
+     */
+    int checksumStart;
+    int dataStart;
+    int dataPos;
+    int checksumPos;
+
     private static final long HEART_BEAT_SEQNO = -1L;
 
     /**
@@ -151,7 +165,7 @@ class DFSOutputStream extends FSOutputSu
       this.seqno = HEART_BEAT_SEQNO;
       
       buffer = null;
-      int packetSize = DataNode.PKT_HEADER_LEN + DFSClient.SIZE_OF_INTEGER;
+      int packetSize = PacketHeader.PKT_HEADER_LEN + DFSClient.SIZE_OF_INTEGER; // TODO(todd) strange
       buf = new byte[packetSize];
       
       checksumStart = dataStart = packetSize;
@@ -171,7 +185,7 @@ class DFSOutputStream extends FSOutputSu
       buffer = null;
       buf = new byte[pktSize];
       
-      checksumStart = DataNode.PKT_HEADER_LEN + DFSClient.SIZE_OF_INTEGER;
+      checksumStart = PacketHeader.PKT_HEADER_LEN;
       checksumPos = checksumStart;
       dataStart = checksumStart + chunksPerPkt * checksum.getChecksumSize();
       dataPos = dataStart;
@@ -222,20 +236,15 @@ class DFSOutputStream extends FSOutputSu
       int pktLen = DFSClient.SIZE_OF_INTEGER + dataLen + checksumLen;
       
       //normally dataStart == checksumPos, i.e., offset is zero.
-      buffer = ByteBuffer.wrap(buf, dataStart - checksumPos,
-                               DataNode.PKT_HEADER_LEN + pktLen);
+      buffer = ByteBuffer.wrap(
+        buf, dataStart - checksumPos,
+        PacketHeader.PKT_HEADER_LEN + pktLen - DFSClient.SIZE_OF_INTEGER);
       buf = null;
       buffer.mark();
-      
-      /* write the header and data length.
-       * The format is described in comment before DataNode.BlockSender
-       */
-      buffer.putInt(pktLen);  // pktSize
-      buffer.putLong(offsetInBlock); 
-      buffer.putLong(seqno);
-      buffer.put((byte) ((lastPacketInBlock) ? 1 : 0));
-      //end of pkt header
-      buffer.putInt(dataLen); // actual data length, excluding checksum.
+
+      PacketHeader header = new PacketHeader(
+        pktLen, offsetInBlock, seqno, lastPacketInBlock, dataLen);
+      header.putInBuffer(buffer);
       
       buffer.reset();
       return buffer;
@@ -1111,7 +1120,7 @@ class DFSOutputStream extends FSOutputSu
 
   private void computePacketChunkSize(int psize, int csize) {
     int chunkSize = csize + checksum.getChecksumSize();
-    int n = DataNode.PKT_HEADER_LEN + DFSClient.SIZE_OF_INTEGER;
+    int n = PacketHeader.PKT_HEADER_LEN;
     chunksPerPacket = Math.max((psize - n + chunkSize-1)/chunkSize, 1);
     packetSize = n + chunkSize*chunksPerPacket;
     if (DFSClient.LOG.isDebugEnabled()) {
@@ -1213,7 +1222,7 @@ class DFSOutputStream extends FSOutputSu
       // indicate the end of block and reset bytesCurBlock.
       //
       if (bytesCurBlock == blockSize) {
-        currentPacket = new Packet(DataNode.PKT_HEADER_LEN+4, 0, 
+        currentPacket = new Packet(PacketHeader.PKT_HEADER_LEN, 0, 
             bytesCurBlock);
         currentPacket.lastPacketInBlock = true;
         waitAndQueuePacket(currentPacket);
@@ -1406,7 +1415,7 @@ class DFSOutputStream extends FSOutputSu
 
       if (bytesCurBlock != 0) {
         // send an empty packet to mark the end of the block
-        currentPacket = new Packet(DataNode.PKT_HEADER_LEN+4, 0, 
+        currentPacket = new Packet(PacketHeader.PKT_HEADER_LEN, 0, 
             bytesCurBlock);
         currentPacket.lastPacketInBlock = true;
       }
@@ -1457,7 +1466,7 @@ class DFSOutputStream extends FSOutputSu
 
   synchronized void setChunksPerPacket(int value) {
     chunksPerPacket = Math.min(chunksPerPacket, value);
-    packetSize = DataNode.PKT_HEADER_LEN + DFSClient.SIZE_OF_INTEGER +
+    packetSize = PacketHeader.PKT_HEADER_LEN +
                  (checksum.getBytesPerChecksum() + 
                   checksum.getChecksumSize()) * chunksPerPacket;
   }

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java?rev=993448&r1=993447&r2=993448&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java Tue Sep  7 17:52:25 2010
@@ -23,6 +23,7 @@ import java.io.DataOutput;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
+import java.nio.ByteBuffer;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
@@ -561,4 +562,141 @@ public interface DataTransferProtocol {
       return ack.toString();
     }
   }
+
+  /**
+   * Header data for each packet that goes through the read/write pipelines.
+   */
+  public static class PacketHeader implements Writable {
+    /** Header size for a packet */
+    public static final int PKT_HEADER_LEN = ( 4 + /* Packet payload length */
+                                               8 + /* offset in block */
+                                               8 + /* seqno */
+                                               1 + /* isLastPacketInBlock */
+                                               4   /* data length */ );
+
+    private int packetLen;
+    private long offsetInBlock;
+    private long seqno;
+    private boolean lastPacketInBlock;
+    private int dataLen;
+
+    public PacketHeader() {
+    }
+
+    public PacketHeader(int packetLen, long offsetInBlock, long seqno,
+                        boolean lastPacketInBlock, int dataLen) {
+      this.packetLen = packetLen;
+      this.offsetInBlock = offsetInBlock;
+      this.seqno = seqno;
+      this.lastPacketInBlock = lastPacketInBlock;
+      this.dataLen = dataLen;
+    }
+
+    public int getDataLen() {
+      return dataLen;
+    }
+
+    public boolean isLastPacketInBlock() {
+      return lastPacketInBlock;
+    }
+
+    public long getSeqno() {
+      return seqno;
+    }
+
+    public long getOffsetInBlock() {
+      return offsetInBlock;
+    }
+
+    public int getPacketLen() {
+      return packetLen;
+    }
+
+    @Override
+    public String toString() {
+      StringBuilder sb = new StringBuilder();
+      sb.append("PacketHeader(")
+        .append("packetLen=").append(packetLen)
+        .append(" offsetInBlock=").append(offsetInBlock)
+        .append(" seqno=").append(seqno)
+        .append(" lastPacketInBlock=").append(lastPacketInBlock)
+        .append(" dataLen=").append(dataLen)
+        .append(")");
+      return sb.toString();
+    }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+      // Note that it's important for packetLen to come first and not
+      // change format -
+      // this is used by BlockReceiver to read entire packets with
+      // a single read call.
+      packetLen = in.readInt();
+      offsetInBlock = in.readLong();
+      seqno = in.readLong();
+      lastPacketInBlock = in.readBoolean();
+      dataLen = in.readInt();
+    }
+
+    public void readFields(ByteBuffer buf) throws IOException {
+      packetLen = buf.getInt();
+      offsetInBlock = buf.getLong();
+      seqno = buf.getLong();
+      lastPacketInBlock = (buf.get() != 0);
+      dataLen = buf.getInt();
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+      out.writeInt(packetLen);
+      out.writeLong(offsetInBlock);
+      out.writeLong(seqno);
+      out.writeBoolean(lastPacketInBlock);
+      out.writeInt(dataLen);
+    }
+
+    /**
+     * Write the header into the buffer.
+     * This requires that PKT_HEADER_LEN bytes are available.
+     */
+    public void putInBuffer(ByteBuffer buf) {
+      buf.putInt(packetLen)
+        .putLong(offsetInBlock)
+        .putLong(seqno)
+        .put((byte)(lastPacketInBlock ? 1 : 0))
+        .putInt(dataLen);
+    }
+
+    /**
+     * Perform a sanity check on the packet, returning true if it is sane.
+     * @param lastSeqNo the previous sequence number received - we expect the current
+     * sequence number to be larger by 1.
+     */
+    public boolean sanityCheck(long lastSeqNo) {
+      // We should only have a non-positive data length for the last packet
+      if (dataLen <= 0 && lastPacketInBlock) return false;
+      // The last packet should not contain data
+      if (lastPacketInBlock && dataLen != 0) return false;
+      // Seqnos should always increase by 1 with each packet received
+      if (seqno != lastSeqNo + 1) return false;
+      return true;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (!(o instanceof PacketHeader)) return false;
+      PacketHeader other = (PacketHeader)o;
+      return (other.packetLen == packetLen &&
+              other.offsetInBlock == offsetInBlock &&
+              other.seqno == seqno &&
+              other.lastPacketInBlock == lastPacketInBlock &&
+              other.dataLen == dataLen);
+    }
+
+    @Override
+    public int hashCode() {
+      return (int)seqno;
+    }
+  }
+
 }

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java?rev=993448&r1=993447&r2=993448&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java Tue Sep  7 17:52:25 2010
@@ -41,6 +41,7 @@ import org.apache.hadoop.hdfs.protocol.F
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.BlockConstructionStage;
 import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.PipelineAck;
+import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.PacketHeader;
 import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.util.Daemon;
@@ -334,9 +335,9 @@ class BlockReceiver implements java.io.C
        * calculation in DFSClient to make the guess accurate.
        */
       int chunkSize = bytesPerChecksum + checksumSize;
-      int chunksPerPacket = (datanode.writePacketSize - DataNode.PKT_HEADER_LEN - 
-                             SIZE_OF_INTEGER + chunkSize - 1)/chunkSize;
-      buf = ByteBuffer.allocate(DataNode.PKT_HEADER_LEN + SIZE_OF_INTEGER +
+      int chunksPerPacket = (datanode.writePacketSize - PacketHeader.PKT_HEADER_LEN
+                             + chunkSize - 1)/chunkSize;
+      buf = ByteBuffer.allocate(PacketHeader.PKT_HEADER_LEN +
                                 Math.max(chunksPerPacket, 1) * chunkSize);
       buf.limit(0);
     }
@@ -365,7 +366,9 @@ class BlockReceiver implements java.io.C
                             payloadLen);
     }
     
-    int pktSize = payloadLen + DataNode.PKT_HEADER_LEN;
+    // Subtract SIZE_OF_INTEGER since that accounts for the payloadLen that
+    // we read above.
+    int pktSize = payloadLen + PacketHeader.PKT_HEADER_LEN - SIZE_OF_INTEGER;
     
     if (buf.remaining() < pktSize) {
       //we need to read more data
@@ -407,30 +410,31 @@ class BlockReceiver implements java.io.C
   private int receivePacket() throws IOException {
     // read the next packet
     readNextPacket();
-    
+
     buf.mark();
-    //read the header
-    buf.getInt(); // packet length
-    long offsetInBlock = buf.getLong(); // get offset of packet in block
-    
-    if (offsetInBlock > replicaInfo.getNumBytes()) {
+    PacketHeader header = new PacketHeader();
+    header.readFields(buf);
+    int endOfHeader = buf.position();
+    buf.reset();
+
+    // Sanity check the header
+    if (header.getOffsetInBlock() > replicaInfo.getNumBytes()) {
       throw new IOException("Received an out-of-sequence packet for " + block + 
-          "from " + inAddr + " at offset " + offsetInBlock +
+          "from " + inAddr + " at offset " + header.getOffsetInBlock() +
           ". Expecting packet starting at " + replicaInfo.getNumBytes());
     }
-    long seqno = buf.getLong();    // get seqno
-    boolean lastPacketInBlock = (buf.get() != 0);
-    
-    int len = buf.getInt();
-    if (len < 0) {
+    if (header.getDataLen() < 0) {
       throw new IOException("Got wrong length during writeBlock(" + block + 
                             ") from " + inAddr + " at offset " + 
-                            offsetInBlock + ": " + len); 
-    } 
-    int endOfHeader = buf.position();
-    buf.reset();
-    
-    return receivePacket(offsetInBlock, seqno, lastPacketInBlock, len, endOfHeader);
+                            header.getOffsetInBlock() + ": " +
+                            header.getDataLen()); 
+    }
+
+    return receivePacket(
+      header.getOffsetInBlock(),
+      header.getSeqno(),
+      header.isLastPacketInBlock(),
+      header.getDataLen(), endOfHeader);
   }
 
   /**

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java?rev=993448&r1=993447&r2=993448&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java Tue Sep  7 17:52:25 2010
@@ -33,6 +33,7 @@ import org.apache.commons.logging.Log;
 import org.apache.hadoop.fs.ChecksumException;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
+import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.PacketHeader;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.net.SocketOutputStream;
 import org.apache.hadoop.util.DataChecksum;
@@ -304,15 +305,12 @@ class BlockSender implements java.io.Clo
     int packetLen = len + numChunks*checksumSize + 4;
     boolean lastDataPacket = offset + len == endOffset && len > 0;
     pkt.clear();
-    
-    // write packet header
-    pkt.putInt(packetLen);
-    pkt.putLong(offset);
-    pkt.putLong(seqno);
-    pkt.put((byte)((len == 0) ? 1 : 0));
-               //why no ByteBuf.putBoolean()?
-    pkt.putInt(len);
-    
+
+
+    PacketHeader header = new PacketHeader(
+      packetLen, offset, seqno, (len == 0), len);
+    header.putInBuffer(pkt);
+
     int checksumOff = pkt.position();
     int checksumLen = numChunks * checksumSize;
     byte[] buf = pkt.array();
@@ -444,7 +442,7 @@ class BlockSender implements java.io.Clo
       }
       
       int maxChunksPerPacket;
-      int pktSize = DataNode.PKT_HEADER_LEN + SIZE_OF_INTEGER;
+      int pktSize = PacketHeader.PKT_HEADER_LEN;
       
       if (transferToAllowed && !verifyChecksum && 
           baseStream instanceof SocketOutputStream && 

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=993448&r1=993447&r2=993448&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java Tue Sep  7 17:52:25 2010
@@ -927,6 +927,12 @@ public class DataNode extends Configured
           return;
         }
         LOG.warn(StringUtils.stringifyException(re));
+        try {
+          long sleepTime = Math.min(1000, heartBeatInterval);
+          Thread.sleep(sleepTime);
+        } catch (InterruptedException ie) {
+          Thread.currentThread().interrupt();
+        }
       } catch (IOException e) {
         LOG.warn(StringUtils.stringifyException(e));
       }
@@ -1281,14 +1287,6 @@ public class DataNode extends Configured
       Not all the fields might be used while reading.
     
    ************************************************************************ */
-  
-  /** Header size for a packet */
-  public static final int PKT_HEADER_LEN = ( 4 + /* Packet payload length */
-                                      8 + /* offset in block */
-                                      8 + /* seqno */
-                                      1   /* isLastPacketInBlock */);
-  
-
 
   /**
    * Used for transferring a block of data.  This class

Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDataTransferProtocol.java?rev=993448&r1=993447&r2=993448&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDataTransferProtocol.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDataTransferProtocol.java Tue Sep  7 17:52:25 2010
@@ -19,11 +19,13 @@ package org.apache.hadoop.hdfs;
 
 import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Op.READ_BLOCK;
 import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Op.WRITE_BLOCK;
+import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.PacketHeader;
 import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.PipelineAck;
 import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status;
 import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status.ERROR;
 import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status.SUCCESS;
 
+import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
@@ -32,6 +34,7 @@ import java.io.IOException;
 import java.io.OutputStream;
 import java.net.InetSocketAddress;
 import java.net.Socket;
+import java.nio.ByteBuffer;
 import java.util.Random;
 
 import junit.framework.TestCase;
@@ -148,14 +151,16 @@ public class TestDataTransferProtocol ex
   throws IOException {
     sendOut.writeByte((byte)DataChecksum.CHECKSUM_CRC32);
     sendOut.writeInt(512);         // checksum size
-    sendOut.writeInt(8);           // size of packet
-    sendOut.writeLong(block.getNumBytes());          // OffsetInBlock
-    sendOut.writeLong(100);        // sequencenumber
-    sendOut.writeBoolean(true);    // lastPacketInBlock
 
-    sendOut.writeInt(0);           // chunk length
+    PacketHeader hdr = new PacketHeader(
+      8,                   // size of packet
+      block.getNumBytes(), // OffsetInBlock
+      100,                 // sequencenumber
+      true,                // lastPacketInBlock
+      0);                  // chunk length
+    hdr.write(sendOut);
     sendOut.writeInt(0);           // zero checksum
-        
+
     //ok finally write a block with 0 len
     SUCCESS.write(recvOut);
     Text.writeString(recvOut, "");
@@ -373,13 +378,15 @@ public class TestDataTransferProtocol ex
         new DatanodeInfo[1], BlockTokenSecretManager.DUMMY_TOKEN);
     sendOut.writeByte((byte)DataChecksum.CHECKSUM_CRC32);
     sendOut.writeInt(512);
-    sendOut.writeInt(4);           // size of packet
-    sendOut.writeLong(0);          // OffsetInBlock
-    sendOut.writeLong(100);        // sequencenumber
-    sendOut.writeBoolean(false);   // lastPacketInBlock
-    
-    // bad data chunk length
-    sendOut.writeInt(-1-random.nextInt(oneMil));
+
+    PacketHeader hdr = new PacketHeader(
+      4,     // size of packet
+      0,     // offset in block,
+      100,   // seqno
+      false, // last packet
+      -1 - random.nextInt(oneMil)); // bad datalen
+    hdr.write(sendOut);
+
     SUCCESS.write(recvOut);
     Text.writeString(recvOut, "");
     new PipelineAck(100, new Status[]{ERROR}).write(recvOut);
@@ -395,12 +402,14 @@ public class TestDataTransferProtocol ex
         new DatanodeInfo[1], BlockTokenSecretManager.DUMMY_TOKEN);
     sendOut.writeByte((byte)DataChecksum.CHECKSUM_CRC32);
     sendOut.writeInt(512);         // checksum size
-    sendOut.writeInt(8);           // size of packet
-    sendOut.writeLong(0);          // OffsetInBlock
-    sendOut.writeLong(100);        // sequencenumber
-    sendOut.writeBoolean(true);    // lastPacketInBlock
 
-    sendOut.writeInt(0);           // chunk length
+    hdr = new PacketHeader(
+      8,     // size of packet
+      0,     // OffsetInBlock
+      100,   // sequencenumber
+      true,  // lastPacketInBlock
+      0);    // chunk length
+    hdr.write(sendOut);
     sendOut.writeInt(0);           // zero checksum
     sendOut.flush();
     //ok finally write a block with 0 len
@@ -497,4 +506,39 @@ public class TestDataTransferProtocol ex
       cluster.shutdown();
     }
   }
+
+  @Test
+  public void testPacketHeader() throws IOException {
+    PacketHeader hdr = new PacketHeader(
+      4,                   // size of packet
+      1024,                // OffsetInBlock
+      100,                 // sequencenumber
+      false,               // lastPacketInBlock
+      4096);               // chunk length
+
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    hdr.write(new DataOutputStream(baos));
+
+    // Read back using DataInput
+    PacketHeader readBack = new PacketHeader();
+    ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
+    readBack.readFields(new DataInputStream(bais));
+    assertEquals(hdr, readBack);
+
+    // Read back using ByteBuffer
+    readBack = new PacketHeader();
+    readBack.readFields(ByteBuffer.wrap(baos.toByteArray()));
+    assertEquals(hdr, readBack);
+
+    // Test sanity check for good header
+    PacketHeader goodHeader = new PacketHeader(
+      4,                   // size of packet
+      0,                   // OffsetInBlock
+      100,                 // sequencenumber
+      true,                // lastPacketInBlock
+      0);                  // chunk length
+
+    assertTrue(hdr.sanityCheck(99));
+    assertFalse(hdr.sanityCheck(100));
+  }
 }