You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ji...@apache.org on 2015/03/05 19:59:06 UTC

hadoop git commit: HDFS-7855. Separate class Packet from DFSOutputStream. Contributed by Li Bo.

Repository: hadoop
Updated Branches:
  refs/heads/trunk 138c9cade -> 952640fa4


HDFS-7855. Separate class Packet from DFSOutputStream. Contributed by Li Bo.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/952640fa
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/952640fa
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/952640fa

Branch: refs/heads/trunk
Commit: 952640fa4cbdc23fe8781e5627c2e8eab565c535
Parents: 138c9ca
Author: Jing Zhao <ji...@apache.org>
Authored: Thu Mar 5 10:57:48 2015 -0800
Committer: Jing Zhao <ji...@apache.org>
Committed: Thu Mar 5 10:58:53 2015 -0800

----------------------------------------------------------------------
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt     |   2 +
 .../org/apache/hadoop/hdfs/DFSOutputStream.java | 238 +++-------------
 .../java/org/apache/hadoop/hdfs/DFSPacket.java  | 270 +++++++++++++++++++
 .../org/apache/hadoop/hdfs/TestDFSPacket.java   |  68 +++++
 4 files changed, 381 insertions(+), 197 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/952640fa/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 59f69fb..763d327 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -715,6 +715,8 @@ Release 2.7.0 - UNRELEASED
     HADOOP-11648. Set DomainSocketWatcher thread name explicitly.
     (Liang Xie via ozawa)
 
+    HDFS-7855. Separate class Packet from DFSOutputStream. (Li Bo bia jing9)
+
   OPTIMIZATIONS
 
     HDFS-7454. Reduce memory footprint for AclEntries in NameNode.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/952640fa/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
old mode 100644
new mode 100755
index dc2f674..130bb6e
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
@@ -30,7 +30,6 @@ import java.io.OutputStream;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.Socket;
-import java.nio.BufferOverflowException;
 import java.nio.channels.ClosedChannelException;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -79,7 +78,6 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseP
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
 import org.apache.hadoop.hdfs.protocolPB.PBHelper;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
-import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
 import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
 import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException;
@@ -160,9 +158,9 @@ public class DFSOutputStream extends FSOutputSummer
   private final int bytesPerChecksum; 
 
   // both dataQueue and ackQueue are protected by dataQueue lock
-  private final LinkedList<Packet> dataQueue = new LinkedList<Packet>();
-  private final LinkedList<Packet> ackQueue = new LinkedList<Packet>();
-  private Packet currentPacket = null;
+  private final LinkedList<DFSPacket> dataQueue = new LinkedList<DFSPacket>();
+  private final LinkedList<DFSPacket> ackQueue = new LinkedList<DFSPacket>();
+  private DFSPacket currentPacket = null;
   private DataStreamer streamer;
   private long currentSeqno = 0;
   private long lastQueuedSeqno = -1;
@@ -187,8 +185,8 @@ public class DFSOutputStream extends FSOutputSummer
       BlockStoragePolicySuite.createDefaultSuite();
 
   /** Use {@link ByteArrayManager} to create buffer for non-heartbeat packets.*/
-  private Packet createPacket(int packetSize, int chunksPerPkt, long offsetInBlock,
-      long seqno) throws InterruptedIOException {
+  private DFSPacket createPacket(int packetSize, int chunksPerPkt, long offsetInBlock,
+      long seqno, boolean lastPacketInBlock) throws InterruptedIOException {
     final byte[] buf;
     final int bufferSize = PacketHeader.PKT_MAX_HEADER_LEN + packetSize;
 
@@ -201,171 +199,20 @@ public class DFSOutputStream extends FSOutputSummer
       throw iioe;
     }
 
-    return new Packet(buf, chunksPerPkt, offsetInBlock, seqno, getChecksumSize());
+    return new DFSPacket(buf, chunksPerPkt, offsetInBlock, seqno,
+                         getChecksumSize(), lastPacketInBlock);
   }
 
   /**
    * For heartbeat packets, create buffer directly by new byte[]
    * since heartbeats should not be blocked.
    */
-  private Packet createHeartbeatPacket() throws InterruptedIOException {
+  private DFSPacket createHeartbeatPacket() throws InterruptedIOException {
     final byte[] buf = new byte[PacketHeader.PKT_MAX_HEADER_LEN];
-    return new Packet(buf, 0, 0, Packet.HEART_BEAT_SEQNO, getChecksumSize());
+    return new DFSPacket(buf, 0, 0, DFSPacket.HEART_BEAT_SEQNO,
+                         getChecksumSize(), false);
   }
 
-  private static class Packet {
-    private static final long HEART_BEAT_SEQNO = -1L;
-    final long seqno; // sequencenumber of buffer in block
-    final long offsetInBlock; // offset in block
-    boolean syncBlock; // this packet forces the current block to disk
-    int numChunks; // number of chunks currently in packet
-    final int maxChunks; // max chunks in packet
-    private byte[] buf;
-    private boolean lastPacketInBlock; // is this the last packet in block?
-
-    /**
-     * buf is pointed into like follows:
-     *  (C is checksum data, D is payload data)
-     *
-     * [_________CCCCCCCCC________________DDDDDDDDDDDDDDDD___]
-     *           ^        ^               ^               ^
-     *           |        checksumPos     dataStart       dataPos
-     *           checksumStart
-     * 
-     * Right before sending, we move the checksum data to immediately precede
-     * the actual data, and then insert the header into the buffer immediately
-     * preceding the checksum data, so we make sure to keep enough space in
-     * front of the checksum data to support the largest conceivable header. 
-     */
-    int checksumStart;
-    int checksumPos;
-    final int dataStart;
-    int dataPos;
-
-    /**
-     * Create a new packet.
-     * 
-     * @param chunksPerPkt maximum number of chunks per packet.
-     * @param offsetInBlock offset in bytes into the HDFS block.
-     */
-    private Packet(byte[] buf, int chunksPerPkt, long offsetInBlock, long seqno,
-        int checksumSize) {
-      this.lastPacketInBlock = false;
-      this.numChunks = 0;
-      this.offsetInBlock = offsetInBlock;
-      this.seqno = seqno;
-
-      this.buf = buf;
-
-      checksumStart = PacketHeader.PKT_MAX_HEADER_LEN;
-      checksumPos = checksumStart;
-      dataStart = checksumStart + (chunksPerPkt * checksumSize);
-      dataPos = dataStart;
-      maxChunks = chunksPerPkt;
-    }
-
-    synchronized void writeData(byte[] inarray, int off, int len)
-        throws ClosedChannelException {
-      checkBuffer();
-      if (dataPos + len > buf.length) {
-        throw new BufferOverflowException();
-      }
-      System.arraycopy(inarray, off, buf, dataPos, len);
-      dataPos += len;
-    }
-
-    synchronized void writeChecksum(byte[] inarray, int off, int len)
-        throws ClosedChannelException {
-      checkBuffer();
-      if (len == 0) {
-        return;
-      }
-      if (checksumPos + len > dataStart) {
-        throw new BufferOverflowException();
-      }
-      System.arraycopy(inarray, off, buf, checksumPos, len);
-      checksumPos += len;
-    }
-    
-    /**
-     * Write the full packet, including the header, to the given output stream.
-     */
-    synchronized void writeTo(DataOutputStream stm) throws IOException {
-      checkBuffer();
-
-      final int dataLen = dataPos - dataStart;
-      final int checksumLen = checksumPos - checksumStart;
-      final int pktLen = HdfsConstants.BYTES_IN_INTEGER + dataLen + checksumLen;
-
-      PacketHeader header = new PacketHeader(
-        pktLen, offsetInBlock, seqno, lastPacketInBlock, dataLen, syncBlock);
-      
-      if (checksumPos != dataStart) {
-        // Move the checksum to cover the gap. This can happen for the last
-        // packet or during an hflush/hsync call.
-        System.arraycopy(buf, checksumStart, buf, 
-                         dataStart - checksumLen , checksumLen); 
-        checksumPos = dataStart;
-        checksumStart = checksumPos - checksumLen;
-      }
-      
-      final int headerStart = checksumStart - header.getSerializedSize();
-      assert checksumStart + 1 >= header.getSerializedSize();
-      assert checksumPos == dataStart;
-      assert headerStart >= 0;
-      assert headerStart + header.getSerializedSize() == checksumStart;
-      
-      // Copy the header data into the buffer immediately preceding the checksum
-      // data.
-      System.arraycopy(header.getBytes(), 0, buf, headerStart,
-          header.getSerializedSize());
-      
-      // corrupt the data for testing.
-      if (DFSClientFaultInjector.get().corruptPacket()) {
-        buf[headerStart+header.getSerializedSize() + checksumLen + dataLen-1] ^= 0xff;
-      }
-
-      // Write the now contiguous full packet to the output stream.
-      stm.write(buf, headerStart, header.getSerializedSize() + checksumLen + dataLen);
-
-      // undo corruption.
-      if (DFSClientFaultInjector.get().uncorruptPacket()) {
-        buf[headerStart+header.getSerializedSize() + checksumLen + dataLen-1] ^= 0xff;
-      }
-    }
-
-    private synchronized void checkBuffer() throws ClosedChannelException {
-      if (buf == null) {
-        throw new ClosedChannelException();
-      }
-    }
-
-    private synchronized void releaseBuffer(ByteArrayManager bam) {
-      bam.release(buf);
-      buf = null;
-    }
-
-    // get the packet's last byte's offset in the block
-    synchronized long getLastByteOffsetBlock() {
-      return offsetInBlock + dataPos - dataStart;
-    }
-
-    /**
-     * Check if this packet is a heart beat packet
-     * @return true if the sequence number is HEART_BEAT_SEQNO
-     */
-    private boolean isHeartbeatPacket() {
-      return seqno == HEART_BEAT_SEQNO;
-    }
-    
-    @Override
-    public String toString() {
-      return "packet seqno: " + this.seqno +
-      " offsetInBlock: " + this.offsetInBlock +
-      " lastPacketInBlock: " + this.lastPacketInBlock +
-      " lastByteOffsetInBlock: " + this.getLastByteOffsetBlock();
-    }
-  }
 
   //
   // The DataStreamer class is responsible for sending data packets to the
@@ -556,7 +403,7 @@ public class DFSOutputStream extends FSOutputSummer
           }
         }
 
-        Packet one;
+        DFSPacket one;
         try {
           // process datanode IO errors if any
           boolean doSleep = false;
@@ -620,7 +467,7 @@ public class DFSOutputStream extends FSOutputSummer
                 " Aborting file " + src);
           }
 
-          if (one.lastPacketInBlock) {
+          if (one.isLastPacketInBlock()) {
             // wait for all data packets have been successfully acked
             synchronized (dataQueue) {
               while (!streamerClosed && !hasError && 
@@ -681,7 +528,7 @@ public class DFSOutputStream extends FSOutputSummer
           }
 
           // Is this block full?
-          if (one.lastPacketInBlock) {
+          if (one.isLastPacketInBlock()) {
             // wait for the close packet has been acked
             synchronized (dataQueue) {
               while (!streamerClosed && !hasError && 
@@ -883,7 +730,7 @@ public class DFSOutputStream extends FSOutputSummer
             ack.readFields(blockReplyStream);
             long duration = Time.monotonicNow() - begin;
             if (duration > dfsclientSlowLogThresholdMs
-                && ack.getSeqno() != Packet.HEART_BEAT_SEQNO) {
+                && ack.getSeqno() != DFSPacket.HEART_BEAT_SEQNO) {
               DFSClient.LOG
                   .warn("Slow ReadProcessor read fields took " + duration
                       + "ms (threshold=" + dfsclientSlowLogThresholdMs + "ms); ack: "
@@ -920,21 +767,21 @@ public class DFSOutputStream extends FSOutputSummer
             
             assert seqno != PipelineAck.UNKOWN_SEQNO : 
               "Ack for unknown seqno should be a failed ack: " + ack;
-            if (seqno == Packet.HEART_BEAT_SEQNO) {  // a heartbeat ack
+            if (seqno == DFSPacket.HEART_BEAT_SEQNO) {  // a heartbeat ack
               continue;
             }
 
             // a success ack for a data packet
-            Packet one;
+            DFSPacket one;
             synchronized (dataQueue) {
               one = ackQueue.getFirst();
             }
-            if (one.seqno != seqno) {
+            if (one.getSeqno() != seqno) {
               throw new IOException("ResponseProcessor: Expecting seqno " +
                                     " for block " + block +
-                                    one.seqno + " but received " + seqno);
+                                    one.getSeqno() + " but received " + seqno);
             }
-            isLastPacketInBlock = one.lastPacketInBlock;
+            isLastPacketInBlock = one.isLastPacketInBlock();
 
             // Fail the packet write for testing in order to force a
             // pipeline recovery.
@@ -1032,10 +879,10 @@ public class DFSOutputStream extends FSOutputSummer
           // We also need to set lastAckedSeqno to the end-of-block Packet's seqno, so that
           // a client waiting on close() will be aware that the flush finished.
           synchronized (dataQueue) {
-            Packet endOfBlockPacket = dataQueue.remove();  // remove the end of block packet
-            assert endOfBlockPacket.lastPacketInBlock;
-            assert lastAckedSeqno == endOfBlockPacket.seqno - 1;
-            lastAckedSeqno = endOfBlockPacket.seqno;
+            DFSPacket endOfBlockPacket = dataQueue.remove();  // remove the end of block packet
+            assert endOfBlockPacket.isLastPacketInBlock();
+            assert lastAckedSeqno == endOfBlockPacket.getSeqno() - 1;
+            lastAckedSeqno = endOfBlockPacket.getSeqno();
             dataQueue.notifyAll();
           }
           endBlock();
@@ -1862,9 +1709,9 @@ public class DFSOutputStream extends FSOutputSummer
     synchronized (dataQueue) {
       if (currentPacket == null) return;
       dataQueue.addLast(currentPacket);
-      lastQueuedSeqno = currentPacket.seqno;
+      lastQueuedSeqno = currentPacket.getSeqno();
       if (DFSClient.LOG.isDebugEnabled()) {
-        DFSClient.LOG.debug("Queued packet " + currentPacket.seqno);
+        DFSClient.LOG.debug("Queued packet " + currentPacket.getSeqno());
       }
       currentPacket = null;
       dataQueue.notifyAll();
@@ -1916,10 +1763,10 @@ public class DFSOutputStream extends FSOutputSummer
 
     if (currentPacket == null) {
       currentPacket = createPacket(packetSize, chunksPerPacket, 
-          bytesCurBlock, currentSeqno++);
+          bytesCurBlock, currentSeqno++, false);
       if (DFSClient.LOG.isDebugEnabled()) {
         DFSClient.LOG.debug("DFSClient writeChunk allocating new packet seqno=" + 
-            currentPacket.seqno +
+            currentPacket.getSeqno() +
             ", src=" + src +
             ", packetSize=" + packetSize +
             ", chunksPerPacket=" + chunksPerPacket +
@@ -1929,16 +1776,16 @@ public class DFSOutputStream extends FSOutputSummer
 
     currentPacket.writeChecksum(checksum, ckoff, cklen);
     currentPacket.writeData(b, offset, len);
-    currentPacket.numChunks++;
+    currentPacket.incNumChunks();
     bytesCurBlock += len;
 
     // If packet is full, enqueue it for transmission
     //
-    if (currentPacket.numChunks == currentPacket.maxChunks ||
+    if (currentPacket.getNumChunks() == currentPacket.getMaxChunks() ||
         bytesCurBlock == blockSize) {
       if (DFSClient.LOG.isDebugEnabled()) {
         DFSClient.LOG.debug("DFSClient writeChunk packet full seqno=" +
-            currentPacket.seqno +
+            currentPacket.getSeqno() +
             ", src=" + src +
             ", bytesCurBlock=" + bytesCurBlock +
             ", blockSize=" + blockSize +
@@ -1963,9 +1810,8 @@ public class DFSOutputStream extends FSOutputSummer
       // indicate the end of block and reset bytesCurBlock.
       //
       if (bytesCurBlock == blockSize) {
-        currentPacket = createPacket(0, 0, bytesCurBlock, currentSeqno++);
-        currentPacket.lastPacketInBlock = true;
-        currentPacket.syncBlock = shouldSyncBlock;
+        currentPacket = createPacket(0, 0, bytesCurBlock, currentSeqno++, true);
+        currentPacket.setSyncBlock(shouldSyncBlock);
         waitAndQueueCurrentPacket();
         bytesCurBlock = 0;
         lastFlushOffset = 0;
@@ -2053,7 +1899,7 @@ public class DFSOutputStream extends FSOutputSummer
             // but sync was requested.
             // Send an empty packet if we do not end the block right now
             currentPacket = createPacket(packetSize, chunksPerPacket,
-                bytesCurBlock, currentSeqno++);
+                bytesCurBlock, currentSeqno++, false);
           }
         } else {
           if (isSync && bytesCurBlock > 0 && !endBlock) {
@@ -2062,7 +1908,7 @@ public class DFSOutputStream extends FSOutputSummer
             // and sync was requested.
             // So send an empty sync packet if we do not end the block right now
             currentPacket = createPacket(packetSize, chunksPerPacket,
-                bytesCurBlock, currentSeqno++);
+                bytesCurBlock, currentSeqno++, false);
           } else if (currentPacket != null) {
             // just discard the current packet since it is already been sent.
             currentPacket.releaseBuffer(byteArrayManager);
@@ -2070,15 +1916,14 @@ public class DFSOutputStream extends FSOutputSummer
           }
         }
         if (currentPacket != null) {
-          currentPacket.syncBlock = isSync;
+          currentPacket.setSyncBlock(isSync);
           waitAndQueueCurrentPacket();          
         }
         if (endBlock && bytesCurBlock > 0) {
           // Need to end the current block, thus send an empty packet to
           // indicate this is the end of the block and reset bytesCurBlock
-          currentPacket = createPacket(0, 0, bytesCurBlock, currentSeqno++);
-          currentPacket.lastPacketInBlock = true;
-          currentPacket.syncBlock = shouldSyncBlock || isSync;
+          currentPacket = createPacket(0, 0, bytesCurBlock, currentSeqno++, true);
+          currentPacket.setSyncBlock(shouldSyncBlock || isSync);
           waitAndQueueCurrentPacket();
           bytesCurBlock = 0;
           lastFlushOffset = 0;
@@ -2249,8 +2094,8 @@ public class DFSOutputStream extends FSOutputSummer
     }
   }
   
-  private static void releaseBuffer(List<Packet> packets, ByteArrayManager bam) {
-    for(Packet p : packets) {
+  private static void releaseBuffer(List<DFSPacket> packets, ByteArrayManager bam) {
+    for (DFSPacket p : packets) {
       p.releaseBuffer(bam);
     }
     packets.clear();
@@ -2297,9 +2142,8 @@ public class DFSOutputStream extends FSOutputSummer
 
       if (bytesCurBlock != 0) {
         // send an empty packet to mark the end of the block
-        currentPacket = createPacket(0, 0, bytesCurBlock, currentSeqno++);
-        currentPacket.lastPacketInBlock = true;
-        currentPacket.syncBlock = shouldSyncBlock;
+        currentPacket = createPacket(0, 0, bytesCurBlock, currentSeqno++, true);
+        currentPacket.setSyncBlock(shouldSyncBlock);
       }
 
       flushInternal();             // flush all data to Datanodes

http://git-wip-us.apache.org/repos/asf/hadoop/blob/952640fa/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSPacket.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSPacket.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSPacket.java
new file mode 100755
index 0000000..9b3ea51
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSPacket.java
@@ -0,0 +1,270 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.BufferOverflowException;
+import java.nio.channels.ClosedChannelException;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
+import org.apache.hadoop.hdfs.util.ByteArrayManager;
+
+/****************************************************************
+ * DFSPacket is used by DataStreamer and DFSOutputStream.
+ * DFSOutputStream generates packets and then ask DatStreamer
+ * to send them to datanodes.
+ ****************************************************************/
+
+class DFSPacket {
+  public static final long HEART_BEAT_SEQNO = -1L;
+  private final long seqno; // sequence number of buffer in block
+  private final long offsetInBlock; // offset in block
+  private boolean syncBlock; // this packet forces the current block to disk
+  private int numChunks; // number of chunks currently in packet
+  private final int maxChunks; // max chunks in packet
+  private byte[] buf;
+  private final boolean lastPacketInBlock; // is this the last packet in block?
+
+  /**
+   * buf is pointed into like follows:
+   *  (C is checksum data, D is payload data)
+   *
+   * [_________CCCCCCCCC________________DDDDDDDDDDDDDDDD___]
+   *           ^        ^               ^               ^
+   *           |        checksumPos     dataStart       dataPos
+   *           checksumStart
+   *
+   * Right before sending, we move the checksum data to immediately precede
+   * the actual data, and then insert the header into the buffer immediately
+   * preceding the checksum data, so we make sure to keep enough space in
+   * front of the checksum data to support the largest conceivable header.
+   */
+  private int checksumStart;
+  private int checksumPos;
+  private final int dataStart;
+  private int dataPos;
+
+  /**
+   * Create a new packet.
+   *
+   * @param buf the buffer storing data and checksums
+   * @param chunksPerPkt maximum number of chunks per packet.
+   * @param offsetInBlock offset in bytes into the HDFS block.
+   * @param seqno the sequence number of this packet
+   * @param checksumSize the size of checksum
+   * @param lastPacketInBlock if this is the last packet
+   */
+  DFSPacket(byte[] buf, int chunksPerPkt, long offsetInBlock, long seqno,
+                   int checksumSize, boolean lastPacketInBlock) {
+    this.lastPacketInBlock = lastPacketInBlock;
+    this.numChunks = 0;
+    this.offsetInBlock = offsetInBlock;
+    this.seqno = seqno;
+
+    this.buf = buf;
+
+    checksumStart = PacketHeader.PKT_MAX_HEADER_LEN;
+    checksumPos = checksumStart;
+    dataStart = checksumStart + (chunksPerPkt * checksumSize);
+    dataPos = dataStart;
+    maxChunks = chunksPerPkt;
+  }
+
+  /**
+   * Write data to this packet.
+   *
+   * @param inarray input array of data
+   * @param off the offset of data to write
+   * @param len the length of data to write
+   * @throws ClosedChannelException
+   */
+  synchronized void writeData(byte[] inarray, int off, int len)
+      throws ClosedChannelException {
+    checkBuffer();
+    if (dataPos + len > buf.length) {
+      throw new BufferOverflowException();
+    }
+    System.arraycopy(inarray, off, buf, dataPos, len);
+    dataPos += len;
+  }
+
+  /**
+   * Write checksums to this packet
+   *
+   * @param inarray input array of checksums
+   * @param off the offset of checksums to write
+   * @param len the length of checksums to write
+   * @throws ClosedChannelException
+   */
+  synchronized void writeChecksum(byte[] inarray, int off, int len)
+      throws ClosedChannelException {
+    checkBuffer();
+    if (len == 0) {
+      return;
+    }
+    if (checksumPos + len > dataStart) {
+      throw new BufferOverflowException();
+    }
+    System.arraycopy(inarray, off, buf, checksumPos, len);
+    checksumPos += len;
+  }
+
+  /**
+   * Write the full packet, including the header, to the given output stream.
+   *
+   * @param stm
+   * @throws IOException
+   */
+  synchronized void writeTo(DataOutputStream stm) throws IOException {
+    checkBuffer();
+
+    final int dataLen = dataPos - dataStart;
+    final int checksumLen = checksumPos - checksumStart;
+    final int pktLen = HdfsConstants.BYTES_IN_INTEGER + dataLen + checksumLen;
+
+    PacketHeader header = new PacketHeader(
+        pktLen, offsetInBlock, seqno, lastPacketInBlock, dataLen, syncBlock);
+
+    if (checksumPos != dataStart) {
+      // Move the checksum to cover the gap. This can happen for the last
+      // packet or during an hflush/hsync call.
+      System.arraycopy(buf, checksumStart, buf,
+          dataStart - checksumLen , checksumLen);
+      checksumPos = dataStart;
+      checksumStart = checksumPos - checksumLen;
+    }
+
+    final int headerStart = checksumStart - header.getSerializedSize();
+    assert checksumStart + 1 >= header.getSerializedSize();
+    assert headerStart >= 0;
+    assert headerStart + header.getSerializedSize() == checksumStart;
+
+    // Copy the header data into the buffer immediately preceding the checksum
+    // data.
+    System.arraycopy(header.getBytes(), 0, buf, headerStart,
+        header.getSerializedSize());
+
+    // corrupt the data for testing.
+    if (DFSClientFaultInjector.get().corruptPacket()) {
+      buf[headerStart+header.getSerializedSize() + checksumLen + dataLen-1] ^= 0xff;
+    }
+
+    // Write the now contiguous full packet to the output stream.
+    stm.write(buf, headerStart, header.getSerializedSize() + checksumLen + dataLen);
+
+    // undo corruption.
+    if (DFSClientFaultInjector.get().uncorruptPacket()) {
+      buf[headerStart+header.getSerializedSize() + checksumLen + dataLen-1] ^= 0xff;
+    }
+  }
+
+  private synchronized void checkBuffer() throws ClosedChannelException {
+    if (buf == null) {
+      throw new ClosedChannelException();
+    }
+  }
+
+  /**
+   * Release the buffer in this packet to ByteArrayManager.
+   *
+   * @param bam
+   */
+  synchronized void releaseBuffer(ByteArrayManager bam) {
+    bam.release(buf);
+    buf = null;
+  }
+
+  /**
+   * get the packet's last byte's offset in the block
+   *
+   * @return the packet's last byte's offset in the block
+   */
+  synchronized long getLastByteOffsetBlock() {
+    return offsetInBlock + dataPos - dataStart;
+  }
+
+  /**
+   * Check if this packet is a heart beat packet
+   *
+   * @return true if the sequence number is HEART_BEAT_SEQNO
+   */
+  boolean isHeartbeatPacket() {
+    return seqno == HEART_BEAT_SEQNO;
+  }
+
+  /**
+   * check if this packet is the last packet in block
+   *
+   * @return true if the packet is the last packet
+   */
+  boolean isLastPacketInBlock(){
+    return lastPacketInBlock;
+  }
+
+  /**
+   * get sequence number of this packet
+   *
+   * @return the sequence number of this packet
+   */
+  long getSeqno(){
+    return seqno;
+  }
+
+  /**
+   * get the number of chunks this packet contains
+   *
+   * @return the number of chunks in this packet
+   */
+  synchronized int getNumChunks(){
+    return numChunks;
+  }
+
+  /**
+   * increase the number of chunks by one
+   */
+  synchronized void incNumChunks(){
+    numChunks++;
+  }
+
+  /**
+   * get the maximum number of packets
+   *
+   * @return the maximum number of packets
+   */
+  int getMaxChunks(){
+    return maxChunks;
+  }
+
+  /**
+   * set if to sync block
+   *
+   * @param syncBlock if to sync block
+   */
+  synchronized void setSyncBlock(boolean syncBlock){
+    this.syncBlock = syncBlock;
+  }
+
+  @Override
+  public String toString() {
+    return "packet seqno: " + this.seqno +
+        " offsetInBlock: " + this.offsetInBlock +
+        " lastPacketInBlock: " + this.lastPacketInBlock +
+        " lastByteOffsetInBlock: " + this.getLastByteOffsetBlock();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/952640fa/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSPacket.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSPacket.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSPacket.java
new file mode 100755
index 0000000..8bf6097
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSPacket.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import java.util.Random;
+import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestDFSPacket {
+  private static final int chunkSize = 512;
+  private static final int checksumSize = 4;
+  private static final int maxChunksPerPacket = 4;
+
+  @Test
+  public void testPacket() throws Exception {
+    Random r = new Random(12345L);
+    byte[] data =  new byte[chunkSize];
+    r.nextBytes(data);
+    byte[] checksum = new byte[checksumSize];
+    r.nextBytes(checksum);
+
+    DataOutputBuffer os =  new DataOutputBuffer(data.length * 2);
+
+    byte[] packetBuf = new byte[data.length * 2];
+    DFSPacket p = new DFSPacket(packetBuf, maxChunksPerPacket,
+                                0, 0, checksumSize, false);
+    p.setSyncBlock(true);
+    p.writeData(data, 0, data.length);
+    p.writeChecksum(checksum, 0, checksum.length);
+    p.writeTo(os);
+
+    //we have set syncBlock to true, so the header has the maximum length
+    int headerLen = PacketHeader.PKT_MAX_HEADER_LEN;
+    byte[] readBuf = os.getData();
+
+    assertArrayRegionsEqual(readBuf, headerLen, checksum, 0, checksum.length);
+    assertArrayRegionsEqual(readBuf, headerLen + checksum.length, data, 0, data.length);
+
+  }
+
+  public static void assertArrayRegionsEqual(byte []buf1, int off1, byte []buf2,
+                                             int off2, int len) {
+    for (int i = 0; i < len; i++) {
+      if (buf1[off1 + i] != buf2[off2 + i]) {
+        Assert.fail("arrays differ at byte " + i + ". " +
+            "The first array has " + (int) buf1[off1 + i] +
+            ", but the second array has " + (int) buf2[off2 + i]);
+      }
+    }
+  }
+}