You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ji...@apache.org on 2015/03/05 19:59:06 UTC
hadoop git commit: HDFS-7855. Separate class Packet from
DFSOutputStream. Contributed by Li Bo.
Repository: hadoop
Updated Branches:
refs/heads/trunk 138c9cade -> 952640fa4
HDFS-7855. Separate class Packet from DFSOutputStream. Contributed by Li Bo.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/952640fa
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/952640fa
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/952640fa
Branch: refs/heads/trunk
Commit: 952640fa4cbdc23fe8781e5627c2e8eab565c535
Parents: 138c9ca
Author: Jing Zhao <ji...@apache.org>
Authored: Thu Mar 5 10:57:48 2015 -0800
Committer: Jing Zhao <ji...@apache.org>
Committed: Thu Mar 5 10:58:53 2015 -0800
----------------------------------------------------------------------
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 2 +
.../org/apache/hadoop/hdfs/DFSOutputStream.java | 238 +++-------------
.../java/org/apache/hadoop/hdfs/DFSPacket.java | 270 +++++++++++++++++++
.../org/apache/hadoop/hdfs/TestDFSPacket.java | 68 +++++
4 files changed, 381 insertions(+), 197 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/952640fa/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 59f69fb..763d327 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -715,6 +715,8 @@ Release 2.7.0 - UNRELEASED
HADOOP-11648. Set DomainSocketWatcher thread name explicitly.
(Liang Xie via ozawa)
+ HDFS-7855. Separate class Packet from DFSOutputStream. (Li Bo bia jing9)
+
OPTIMIZATIONS
HDFS-7454. Reduce memory footprint for AclEntries in NameNode.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/952640fa/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
old mode 100644
new mode 100755
index dc2f674..130bb6e
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
@@ -30,7 +30,6 @@ import java.io.OutputStream;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
-import java.nio.BufferOverflowException;
import java.nio.channels.ClosedChannelException;
import java.util.ArrayList;
import java.util.Arrays;
@@ -79,7 +78,6 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseP
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
import org.apache.hadoop.hdfs.protocolPB.PBHelper;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
-import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException;
@@ -160,9 +158,9 @@ public class DFSOutputStream extends FSOutputSummer
private final int bytesPerChecksum;
// both dataQueue and ackQueue are protected by dataQueue lock
- private final LinkedList<Packet> dataQueue = new LinkedList<Packet>();
- private final LinkedList<Packet> ackQueue = new LinkedList<Packet>();
- private Packet currentPacket = null;
+ private final LinkedList<DFSPacket> dataQueue = new LinkedList<DFSPacket>();
+ private final LinkedList<DFSPacket> ackQueue = new LinkedList<DFSPacket>();
+ private DFSPacket currentPacket = null;
private DataStreamer streamer;
private long currentSeqno = 0;
private long lastQueuedSeqno = -1;
@@ -187,8 +185,8 @@ public class DFSOutputStream extends FSOutputSummer
BlockStoragePolicySuite.createDefaultSuite();
/** Use {@link ByteArrayManager} to create buffer for non-heartbeat packets.*/
- private Packet createPacket(int packetSize, int chunksPerPkt, long offsetInBlock,
- long seqno) throws InterruptedIOException {
+ private DFSPacket createPacket(int packetSize, int chunksPerPkt, long offsetInBlock,
+ long seqno, boolean lastPacketInBlock) throws InterruptedIOException {
final byte[] buf;
final int bufferSize = PacketHeader.PKT_MAX_HEADER_LEN + packetSize;
@@ -201,171 +199,20 @@ public class DFSOutputStream extends FSOutputSummer
throw iioe;
}
- return new Packet(buf, chunksPerPkt, offsetInBlock, seqno, getChecksumSize());
+ return new DFSPacket(buf, chunksPerPkt, offsetInBlock, seqno,
+ getChecksumSize(), lastPacketInBlock);
}
/**
* For heartbeat packets, create buffer directly by new byte[]
* since heartbeats should not be blocked.
*/
- private Packet createHeartbeatPacket() throws InterruptedIOException {
+ private DFSPacket createHeartbeatPacket() throws InterruptedIOException {
final byte[] buf = new byte[PacketHeader.PKT_MAX_HEADER_LEN];
- return new Packet(buf, 0, 0, Packet.HEART_BEAT_SEQNO, getChecksumSize());
+ return new DFSPacket(buf, 0, 0, DFSPacket.HEART_BEAT_SEQNO,
+ getChecksumSize(), false);
}
- private static class Packet {
- private static final long HEART_BEAT_SEQNO = -1L;
- final long seqno; // sequencenumber of buffer in block
- final long offsetInBlock; // offset in block
- boolean syncBlock; // this packet forces the current block to disk
- int numChunks; // number of chunks currently in packet
- final int maxChunks; // max chunks in packet
- private byte[] buf;
- private boolean lastPacketInBlock; // is this the last packet in block?
-
- /**
- * buf is pointed into like follows:
- * (C is checksum data, D is payload data)
- *
- * [_________CCCCCCCCC________________DDDDDDDDDDDDDDDD___]
- * ^ ^ ^ ^
- * | checksumPos dataStart dataPos
- * checksumStart
- *
- * Right before sending, we move the checksum data to immediately precede
- * the actual data, and then insert the header into the buffer immediately
- * preceding the checksum data, so we make sure to keep enough space in
- * front of the checksum data to support the largest conceivable header.
- */
- int checksumStart;
- int checksumPos;
- final int dataStart;
- int dataPos;
-
- /**
- * Create a new packet.
- *
- * @param chunksPerPkt maximum number of chunks per packet.
- * @param offsetInBlock offset in bytes into the HDFS block.
- */
- private Packet(byte[] buf, int chunksPerPkt, long offsetInBlock, long seqno,
- int checksumSize) {
- this.lastPacketInBlock = false;
- this.numChunks = 0;
- this.offsetInBlock = offsetInBlock;
- this.seqno = seqno;
-
- this.buf = buf;
-
- checksumStart = PacketHeader.PKT_MAX_HEADER_LEN;
- checksumPos = checksumStart;
- dataStart = checksumStart + (chunksPerPkt * checksumSize);
- dataPos = dataStart;
- maxChunks = chunksPerPkt;
- }
-
- synchronized void writeData(byte[] inarray, int off, int len)
- throws ClosedChannelException {
- checkBuffer();
- if (dataPos + len > buf.length) {
- throw new BufferOverflowException();
- }
- System.arraycopy(inarray, off, buf, dataPos, len);
- dataPos += len;
- }
-
- synchronized void writeChecksum(byte[] inarray, int off, int len)
- throws ClosedChannelException {
- checkBuffer();
- if (len == 0) {
- return;
- }
- if (checksumPos + len > dataStart) {
- throw new BufferOverflowException();
- }
- System.arraycopy(inarray, off, buf, checksumPos, len);
- checksumPos += len;
- }
-
- /**
- * Write the full packet, including the header, to the given output stream.
- */
- synchronized void writeTo(DataOutputStream stm) throws IOException {
- checkBuffer();
-
- final int dataLen = dataPos - dataStart;
- final int checksumLen = checksumPos - checksumStart;
- final int pktLen = HdfsConstants.BYTES_IN_INTEGER + dataLen + checksumLen;
-
- PacketHeader header = new PacketHeader(
- pktLen, offsetInBlock, seqno, lastPacketInBlock, dataLen, syncBlock);
-
- if (checksumPos != dataStart) {
- // Move the checksum to cover the gap. This can happen for the last
- // packet or during an hflush/hsync call.
- System.arraycopy(buf, checksumStart, buf,
- dataStart - checksumLen , checksumLen);
- checksumPos = dataStart;
- checksumStart = checksumPos - checksumLen;
- }
-
- final int headerStart = checksumStart - header.getSerializedSize();
- assert checksumStart + 1 >= header.getSerializedSize();
- assert checksumPos == dataStart;
- assert headerStart >= 0;
- assert headerStart + header.getSerializedSize() == checksumStart;
-
- // Copy the header data into the buffer immediately preceding the checksum
- // data.
- System.arraycopy(header.getBytes(), 0, buf, headerStart,
- header.getSerializedSize());
-
- // corrupt the data for testing.
- if (DFSClientFaultInjector.get().corruptPacket()) {
- buf[headerStart+header.getSerializedSize() + checksumLen + dataLen-1] ^= 0xff;
- }
-
- // Write the now contiguous full packet to the output stream.
- stm.write(buf, headerStart, header.getSerializedSize() + checksumLen + dataLen);
-
- // undo corruption.
- if (DFSClientFaultInjector.get().uncorruptPacket()) {
- buf[headerStart+header.getSerializedSize() + checksumLen + dataLen-1] ^= 0xff;
- }
- }
-
- private synchronized void checkBuffer() throws ClosedChannelException {
- if (buf == null) {
- throw new ClosedChannelException();
- }
- }
-
- private synchronized void releaseBuffer(ByteArrayManager bam) {
- bam.release(buf);
- buf = null;
- }
-
- // get the packet's last byte's offset in the block
- synchronized long getLastByteOffsetBlock() {
- return offsetInBlock + dataPos - dataStart;
- }
-
- /**
- * Check if this packet is a heart beat packet
- * @return true if the sequence number is HEART_BEAT_SEQNO
- */
- private boolean isHeartbeatPacket() {
- return seqno == HEART_BEAT_SEQNO;
- }
-
- @Override
- public String toString() {
- return "packet seqno: " + this.seqno +
- " offsetInBlock: " + this.offsetInBlock +
- " lastPacketInBlock: " + this.lastPacketInBlock +
- " lastByteOffsetInBlock: " + this.getLastByteOffsetBlock();
- }
- }
//
// The DataStreamer class is responsible for sending data packets to the
@@ -556,7 +403,7 @@ public class DFSOutputStream extends FSOutputSummer
}
}
- Packet one;
+ DFSPacket one;
try {
// process datanode IO errors if any
boolean doSleep = false;
@@ -620,7 +467,7 @@ public class DFSOutputStream extends FSOutputSummer
" Aborting file " + src);
}
- if (one.lastPacketInBlock) {
+ if (one.isLastPacketInBlock()) {
// wait for all data packets have been successfully acked
synchronized (dataQueue) {
while (!streamerClosed && !hasError &&
@@ -681,7 +528,7 @@ public class DFSOutputStream extends FSOutputSummer
}
// Is this block full?
- if (one.lastPacketInBlock) {
+ if (one.isLastPacketInBlock()) {
// wait for the close packet has been acked
synchronized (dataQueue) {
while (!streamerClosed && !hasError &&
@@ -883,7 +730,7 @@ public class DFSOutputStream extends FSOutputSummer
ack.readFields(blockReplyStream);
long duration = Time.monotonicNow() - begin;
if (duration > dfsclientSlowLogThresholdMs
- && ack.getSeqno() != Packet.HEART_BEAT_SEQNO) {
+ && ack.getSeqno() != DFSPacket.HEART_BEAT_SEQNO) {
DFSClient.LOG
.warn("Slow ReadProcessor read fields took " + duration
+ "ms (threshold=" + dfsclientSlowLogThresholdMs + "ms); ack: "
@@ -920,21 +767,21 @@ public class DFSOutputStream extends FSOutputSummer
assert seqno != PipelineAck.UNKOWN_SEQNO :
"Ack for unknown seqno should be a failed ack: " + ack;
- if (seqno == Packet.HEART_BEAT_SEQNO) { // a heartbeat ack
+ if (seqno == DFSPacket.HEART_BEAT_SEQNO) { // a heartbeat ack
continue;
}
// a success ack for a data packet
- Packet one;
+ DFSPacket one;
synchronized (dataQueue) {
one = ackQueue.getFirst();
}
- if (one.seqno != seqno) {
+ if (one.getSeqno() != seqno) {
throw new IOException("ResponseProcessor: Expecting seqno " +
" for block " + block +
- one.seqno + " but received " + seqno);
+ one.getSeqno() + " but received " + seqno);
}
- isLastPacketInBlock = one.lastPacketInBlock;
+ isLastPacketInBlock = one.isLastPacketInBlock();
// Fail the packet write for testing in order to force a
// pipeline recovery.
@@ -1032,10 +879,10 @@ public class DFSOutputStream extends FSOutputSummer
// We also need to set lastAckedSeqno to the end-of-block Packet's seqno, so that
// a client waiting on close() will be aware that the flush finished.
synchronized (dataQueue) {
- Packet endOfBlockPacket = dataQueue.remove(); // remove the end of block packet
- assert endOfBlockPacket.lastPacketInBlock;
- assert lastAckedSeqno == endOfBlockPacket.seqno - 1;
- lastAckedSeqno = endOfBlockPacket.seqno;
+ DFSPacket endOfBlockPacket = dataQueue.remove(); // remove the end of block packet
+ assert endOfBlockPacket.isLastPacketInBlock();
+ assert lastAckedSeqno == endOfBlockPacket.getSeqno() - 1;
+ lastAckedSeqno = endOfBlockPacket.getSeqno();
dataQueue.notifyAll();
}
endBlock();
@@ -1862,9 +1709,9 @@ public class DFSOutputStream extends FSOutputSummer
synchronized (dataQueue) {
if (currentPacket == null) return;
dataQueue.addLast(currentPacket);
- lastQueuedSeqno = currentPacket.seqno;
+ lastQueuedSeqno = currentPacket.getSeqno();
if (DFSClient.LOG.isDebugEnabled()) {
- DFSClient.LOG.debug("Queued packet " + currentPacket.seqno);
+ DFSClient.LOG.debug("Queued packet " + currentPacket.getSeqno());
}
currentPacket = null;
dataQueue.notifyAll();
@@ -1916,10 +1763,10 @@ public class DFSOutputStream extends FSOutputSummer
if (currentPacket == null) {
currentPacket = createPacket(packetSize, chunksPerPacket,
- bytesCurBlock, currentSeqno++);
+ bytesCurBlock, currentSeqno++, false);
if (DFSClient.LOG.isDebugEnabled()) {
DFSClient.LOG.debug("DFSClient writeChunk allocating new packet seqno=" +
- currentPacket.seqno +
+ currentPacket.getSeqno() +
", src=" + src +
", packetSize=" + packetSize +
", chunksPerPacket=" + chunksPerPacket +
@@ -1929,16 +1776,16 @@ public class DFSOutputStream extends FSOutputSummer
currentPacket.writeChecksum(checksum, ckoff, cklen);
currentPacket.writeData(b, offset, len);
- currentPacket.numChunks++;
+ currentPacket.incNumChunks();
bytesCurBlock += len;
// If packet is full, enqueue it for transmission
//
- if (currentPacket.numChunks == currentPacket.maxChunks ||
+ if (currentPacket.getNumChunks() == currentPacket.getMaxChunks() ||
bytesCurBlock == blockSize) {
if (DFSClient.LOG.isDebugEnabled()) {
DFSClient.LOG.debug("DFSClient writeChunk packet full seqno=" +
- currentPacket.seqno +
+ currentPacket.getSeqno() +
", src=" + src +
", bytesCurBlock=" + bytesCurBlock +
", blockSize=" + blockSize +
@@ -1963,9 +1810,8 @@ public class DFSOutputStream extends FSOutputSummer
// indicate the end of block and reset bytesCurBlock.
//
if (bytesCurBlock == blockSize) {
- currentPacket = createPacket(0, 0, bytesCurBlock, currentSeqno++);
- currentPacket.lastPacketInBlock = true;
- currentPacket.syncBlock = shouldSyncBlock;
+ currentPacket = createPacket(0, 0, bytesCurBlock, currentSeqno++, true);
+ currentPacket.setSyncBlock(shouldSyncBlock);
waitAndQueueCurrentPacket();
bytesCurBlock = 0;
lastFlushOffset = 0;
@@ -2053,7 +1899,7 @@ public class DFSOutputStream extends FSOutputSummer
// but sync was requested.
// Send an empty packet if we do not end the block right now
currentPacket = createPacket(packetSize, chunksPerPacket,
- bytesCurBlock, currentSeqno++);
+ bytesCurBlock, currentSeqno++, false);
}
} else {
if (isSync && bytesCurBlock > 0 && !endBlock) {
@@ -2062,7 +1908,7 @@ public class DFSOutputStream extends FSOutputSummer
// and sync was requested.
// So send an empty sync packet if we do not end the block right now
currentPacket = createPacket(packetSize, chunksPerPacket,
- bytesCurBlock, currentSeqno++);
+ bytesCurBlock, currentSeqno++, false);
} else if (currentPacket != null) {
// just discard the current packet since it is already been sent.
currentPacket.releaseBuffer(byteArrayManager);
@@ -2070,15 +1916,14 @@ public class DFSOutputStream extends FSOutputSummer
}
}
if (currentPacket != null) {
- currentPacket.syncBlock = isSync;
+ currentPacket.setSyncBlock(isSync);
waitAndQueueCurrentPacket();
}
if (endBlock && bytesCurBlock > 0) {
// Need to end the current block, thus send an empty packet to
// indicate this is the end of the block and reset bytesCurBlock
- currentPacket = createPacket(0, 0, bytesCurBlock, currentSeqno++);
- currentPacket.lastPacketInBlock = true;
- currentPacket.syncBlock = shouldSyncBlock || isSync;
+ currentPacket = createPacket(0, 0, bytesCurBlock, currentSeqno++, true);
+ currentPacket.setSyncBlock(shouldSyncBlock || isSync);
waitAndQueueCurrentPacket();
bytesCurBlock = 0;
lastFlushOffset = 0;
@@ -2249,8 +2094,8 @@ public class DFSOutputStream extends FSOutputSummer
}
}
- private static void releaseBuffer(List<Packet> packets, ByteArrayManager bam) {
- for(Packet p : packets) {
+ private static void releaseBuffer(List<DFSPacket> packets, ByteArrayManager bam) {
+ for (DFSPacket p : packets) {
p.releaseBuffer(bam);
}
packets.clear();
@@ -2297,9 +2142,8 @@ public class DFSOutputStream extends FSOutputSummer
if (bytesCurBlock != 0) {
// send an empty packet to mark the end of the block
- currentPacket = createPacket(0, 0, bytesCurBlock, currentSeqno++);
- currentPacket.lastPacketInBlock = true;
- currentPacket.syncBlock = shouldSyncBlock;
+ currentPacket = createPacket(0, 0, bytesCurBlock, currentSeqno++, true);
+ currentPacket.setSyncBlock(shouldSyncBlock);
}
flushInternal(); // flush all data to Datanodes
http://git-wip-us.apache.org/repos/asf/hadoop/blob/952640fa/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSPacket.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSPacket.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSPacket.java
new file mode 100755
index 0000000..9b3ea51
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSPacket.java
@@ -0,0 +1,270 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.BufferOverflowException;
+import java.nio.channels.ClosedChannelException;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
+import org.apache.hadoop.hdfs.util.ByteArrayManager;
+
+/****************************************************************
+ * DFSPacket is used by DataStreamer and DFSOutputStream.
+ * DFSOutputStream generates packets and then ask DatStreamer
+ * to send them to datanodes.
+ ****************************************************************/
+
+class DFSPacket {
+ public static final long HEART_BEAT_SEQNO = -1L;
+ private final long seqno; // sequence number of buffer in block
+ private final long offsetInBlock; // offset in block
+ private boolean syncBlock; // this packet forces the current block to disk
+ private int numChunks; // number of chunks currently in packet
+ private final int maxChunks; // max chunks in packet
+ private byte[] buf;
+ private final boolean lastPacketInBlock; // is this the last packet in block?
+
+ /**
+ * buf is pointed into like follows:
+ * (C is checksum data, D is payload data)
+ *
+ * [_________CCCCCCCCC________________DDDDDDDDDDDDDDDD___]
+ * ^ ^ ^ ^
+ * | checksumPos dataStart dataPos
+ * checksumStart
+ *
+ * Right before sending, we move the checksum data to immediately precede
+ * the actual data, and then insert the header into the buffer immediately
+ * preceding the checksum data, so we make sure to keep enough space in
+ * front of the checksum data to support the largest conceivable header.
+ */
+ private int checksumStart;
+ private int checksumPos;
+ private final int dataStart;
+ private int dataPos;
+
+ /**
+ * Create a new packet.
+ *
+ * @param buf the buffer storing data and checksums
+ * @param chunksPerPkt maximum number of chunks per packet.
+ * @param offsetInBlock offset in bytes into the HDFS block.
+ * @param seqno the sequence number of this packet
+ * @param checksumSize the size of checksum
+ * @param lastPacketInBlock if this is the last packet
+ */
+ DFSPacket(byte[] buf, int chunksPerPkt, long offsetInBlock, long seqno,
+ int checksumSize, boolean lastPacketInBlock) {
+ this.lastPacketInBlock = lastPacketInBlock;
+ this.numChunks = 0;
+ this.offsetInBlock = offsetInBlock;
+ this.seqno = seqno;
+
+ this.buf = buf;
+
+ checksumStart = PacketHeader.PKT_MAX_HEADER_LEN;
+ checksumPos = checksumStart;
+ dataStart = checksumStart + (chunksPerPkt * checksumSize);
+ dataPos = dataStart;
+ maxChunks = chunksPerPkt;
+ }
+
+ /**
+ * Write data to this packet.
+ *
+ * @param inarray input array of data
+ * @param off the offset of data to write
+ * @param len the length of data to write
+ * @throws ClosedChannelException
+ */
+ synchronized void writeData(byte[] inarray, int off, int len)
+ throws ClosedChannelException {
+ checkBuffer();
+ if (dataPos + len > buf.length) {
+ throw new BufferOverflowException();
+ }
+ System.arraycopy(inarray, off, buf, dataPos, len);
+ dataPos += len;
+ }
+
+ /**
+ * Write checksums to this packet
+ *
+ * @param inarray input array of checksums
+ * @param off the offset of checksums to write
+ * @param len the length of checksums to write
+ * @throws ClosedChannelException
+ */
+ synchronized void writeChecksum(byte[] inarray, int off, int len)
+ throws ClosedChannelException {
+ checkBuffer();
+ if (len == 0) {
+ return;
+ }
+ if (checksumPos + len > dataStart) {
+ throw new BufferOverflowException();
+ }
+ System.arraycopy(inarray, off, buf, checksumPos, len);
+ checksumPos += len;
+ }
+
+ /**
+ * Write the full packet, including the header, to the given output stream.
+ *
+ * @param stm
+ * @throws IOException
+ */
+ synchronized void writeTo(DataOutputStream stm) throws IOException {
+ checkBuffer();
+
+ final int dataLen = dataPos - dataStart;
+ final int checksumLen = checksumPos - checksumStart;
+ final int pktLen = HdfsConstants.BYTES_IN_INTEGER + dataLen + checksumLen;
+
+ PacketHeader header = new PacketHeader(
+ pktLen, offsetInBlock, seqno, lastPacketInBlock, dataLen, syncBlock);
+
+ if (checksumPos != dataStart) {
+ // Move the checksum to cover the gap. This can happen for the last
+ // packet or during an hflush/hsync call.
+ System.arraycopy(buf, checksumStart, buf,
+ dataStart - checksumLen , checksumLen);
+ checksumPos = dataStart;
+ checksumStart = checksumPos - checksumLen;
+ }
+
+ final int headerStart = checksumStart - header.getSerializedSize();
+ assert checksumStart + 1 >= header.getSerializedSize();
+ assert headerStart >= 0;
+ assert headerStart + header.getSerializedSize() == checksumStart;
+
+ // Copy the header data into the buffer immediately preceding the checksum
+ // data.
+ System.arraycopy(header.getBytes(), 0, buf, headerStart,
+ header.getSerializedSize());
+
+ // corrupt the data for testing.
+ if (DFSClientFaultInjector.get().corruptPacket()) {
+ buf[headerStart+header.getSerializedSize() + checksumLen + dataLen-1] ^= 0xff;
+ }
+
+ // Write the now contiguous full packet to the output stream.
+ stm.write(buf, headerStart, header.getSerializedSize() + checksumLen + dataLen);
+
+ // undo corruption.
+ if (DFSClientFaultInjector.get().uncorruptPacket()) {
+ buf[headerStart+header.getSerializedSize() + checksumLen + dataLen-1] ^= 0xff;
+ }
+ }
+
+ private synchronized void checkBuffer() throws ClosedChannelException {
+ if (buf == null) {
+ throw new ClosedChannelException();
+ }
+ }
+
+ /**
+ * Release the buffer in this packet to ByteArrayManager.
+ *
+ * @param bam
+ */
+ synchronized void releaseBuffer(ByteArrayManager bam) {
+ bam.release(buf);
+ buf = null;
+ }
+
+ /**
+ * get the packet's last byte's offset in the block
+ *
+ * @return the packet's last byte's offset in the block
+ */
+ synchronized long getLastByteOffsetBlock() {
+ return offsetInBlock + dataPos - dataStart;
+ }
+
+ /**
+ * Check if this packet is a heart beat packet
+ *
+ * @return true if the sequence number is HEART_BEAT_SEQNO
+ */
+ boolean isHeartbeatPacket() {
+ return seqno == HEART_BEAT_SEQNO;
+ }
+
+ /**
+ * check if this packet is the last packet in block
+ *
+ * @return true if the packet is the last packet
+ */
+ boolean isLastPacketInBlock(){
+ return lastPacketInBlock;
+ }
+
+ /**
+ * get sequence number of this packet
+ *
+ * @return the sequence number of this packet
+ */
+ long getSeqno(){
+ return seqno;
+ }
+
+ /**
+ * get the number of chunks this packet contains
+ *
+ * @return the number of chunks in this packet
+ */
+ synchronized int getNumChunks(){
+ return numChunks;
+ }
+
+ /**
+ * increase the number of chunks by one
+ */
+ synchronized void incNumChunks(){
+ numChunks++;
+ }
+
+ /**
+ * get the maximum number of packets
+ *
+ * @return the maximum number of packets
+ */
+ int getMaxChunks(){
+ return maxChunks;
+ }
+
+ /**
+ * set if to sync block
+ *
+ * @param syncBlock if to sync block
+ */
+ synchronized void setSyncBlock(boolean syncBlock){
+ this.syncBlock = syncBlock;
+ }
+
+ @Override
+ public String toString() {
+ return "packet seqno: " + this.seqno +
+ " offsetInBlock: " + this.offsetInBlock +
+ " lastPacketInBlock: " + this.lastPacketInBlock +
+ " lastByteOffsetInBlock: " + this.getLastByteOffsetBlock();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/952640fa/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSPacket.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSPacket.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSPacket.java
new file mode 100755
index 0000000..8bf6097
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSPacket.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import java.util.Random;
+import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestDFSPacket {
+ private static final int chunkSize = 512;
+ private static final int checksumSize = 4;
+ private static final int maxChunksPerPacket = 4;
+
+ @Test
+ public void testPacket() throws Exception {
+ Random r = new Random(12345L);
+ byte[] data = new byte[chunkSize];
+ r.nextBytes(data);
+ byte[] checksum = new byte[checksumSize];
+ r.nextBytes(checksum);
+
+ DataOutputBuffer os = new DataOutputBuffer(data.length * 2);
+
+ byte[] packetBuf = new byte[data.length * 2];
+ DFSPacket p = new DFSPacket(packetBuf, maxChunksPerPacket,
+ 0, 0, checksumSize, false);
+ p.setSyncBlock(true);
+ p.writeData(data, 0, data.length);
+ p.writeChecksum(checksum, 0, checksum.length);
+ p.writeTo(os);
+
+ //we have set syncBlock to true, so the header has the maximum length
+ int headerLen = PacketHeader.PKT_MAX_HEADER_LEN;
+ byte[] readBuf = os.getData();
+
+ assertArrayRegionsEqual(readBuf, headerLen, checksum, 0, checksum.length);
+ assertArrayRegionsEqual(readBuf, headerLen + checksum.length, data, 0, data.length);
+
+ }
+
+ public static void assertArrayRegionsEqual(byte []buf1, int off1, byte []buf2,
+ int off2, int len) {
+ for (int i = 0; i < len; i++) {
+ if (buf1[off1 + i] != buf2[off2 + i]) {
+ Assert.fail("arrays differ at byte " + i + ". " +
+ "The first array has " + (int) buf1[off1 + i] +
+ ", but the second array has " + (int) buf2[off2 + i]);
+ }
+ }
+ }
+}