You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by at...@apache.org on 2012/08/09 23:33:21 UTC
svn commit: r1371496 - in
/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs: ./
src/main/java/org/apache/hadoop/hdfs/
src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/
src/main/java/org/apache/hadoop/hdfs/server/datanode/
Author: atm
Date: Thu Aug 9 21:33:20 2012
New Revision: 1371496
URL: http://svn.apache.org/viewvc?rev=1371496&view=rev
Log:
HDFS-3721. hsync support broke wire compatibility. Contributed by Todd Lipcon and Aaron T. Myers.
Added:
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketReceiver.java
Modified:
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketHeader.java
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1371496&r1=1371495&r2=1371496&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Thu Aug 9 21:33:20 2012
@@ -422,6 +422,8 @@ Release 2.0.1-alpha - UNRELEASED
HDFS-3710. libhdfs misuses O_RDONLY/WRONLY/RDWR. (Andy Isaacson via atm)
+ HDFS-3721. hsync support broke wire compatibility. (todd and atm)
+
BREAKDOWN OF HDFS-3042 SUBTASKS
HDFS-2185. HDFS portion of ZK-based FailoverController (todd)
Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java?rev=1371496&r1=1371495&r2=1371496&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java Thu Aug 9 21:33:20 2012
@@ -30,7 +30,6 @@ import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.BufferOverflowException;
-import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.EnumSet;
@@ -126,7 +125,7 @@ public class DFSOutputStream extends FSO
private long lastQueuedSeqno = -1;
private long lastAckedSeqno = -1;
private long bytesCurBlock = 0; // bytes writen in current block
- private int packetSize = 0; // write packet size, including the header.
+ private int packetSize = 0; // write packet size, not including the header.
private int chunksPerPacket = 0;
private volatile IOException lastException = null;
private long artificialSlowdown = 0;
@@ -147,28 +146,31 @@ public class DFSOutputStream extends FSO
int numChunks; // number of chunks currently in packet
int maxChunks; // max chunks in packet
- /** buffer for accumulating packet checksum and data */
- ByteBuffer buffer; // wraps buf, only one of these two may be non-null
byte[] buf;
/**
* buf is pointed into like follows:
* (C is checksum data, D is payload data)
*
- * [HHHHHCCCCC________________DDDDDDDDDDDDDDDD___]
- * ^ ^ ^ ^
- * | checksumPos dataStart dataPos
- * checksumStart
+ * [_________CCCCCCCCC________________DDDDDDDDDDDDDDDD___]
+ * ^ ^ ^ ^
+ * | checksumPos dataStart dataPos
+ * checksumStart
+ *
+ * Right before sending, we move the checksum data to immediately precede
+ * the actual data, and then insert the header into the buffer immediately
+ * preceding the checksum data, so we make sure to keep enough space in
+ * front of the checksum data to support the largest conceivable header.
*/
int checksumStart;
+ int checksumPos;
int dataStart;
int dataPos;
- int checksumPos;
private static final long HEART_BEAT_SEQNO = -1L;
/**
- * create a heartbeat packet
+ * Create a heartbeat packet.
*/
Packet() {
this.lastPacketInBlock = false;
@@ -176,17 +178,19 @@ public class DFSOutputStream extends FSO
this.offsetInBlock = 0;
this.seqno = HEART_BEAT_SEQNO;
- buffer = null;
- int packetSize = PacketHeader.PKT_HEADER_LEN + HdfsConstants.BYTES_IN_INTEGER;
- buf = new byte[packetSize];
+ buf = new byte[PacketHeader.PKT_MAX_HEADER_LEN];
- checksumStart = dataStart = packetSize;
- checksumPos = checksumStart;
- dataPos = dataStart;
+ checksumStart = checksumPos = dataPos = dataStart = PacketHeader.PKT_MAX_HEADER_LEN;
maxChunks = 0;
}
- // create a new packet
+ /**
+ * Create a new packet.
+ *
+ * @param pktSize maximum size of the packet, including checksum data and actual data.
+ * @param chunksPerPkt maximum number of chunks per packet.
+ * @param offsetInBlock offset in bytes into the HDFS block.
+ */
Packet(int pktSize, int chunksPerPkt, long offsetInBlock) {
this.lastPacketInBlock = false;
this.numChunks = 0;
@@ -194,25 +198,24 @@ public class DFSOutputStream extends FSO
this.seqno = currentSeqno;
currentSeqno++;
- buffer = null;
- buf = new byte[pktSize];
+ buf = new byte[PacketHeader.PKT_MAX_HEADER_LEN + pktSize];
- checksumStart = PacketHeader.PKT_HEADER_LEN;
+ checksumStart = PacketHeader.PKT_MAX_HEADER_LEN;
checksumPos = checksumStart;
- dataStart = checksumStart + chunksPerPkt * checksum.getChecksumSize();
+ dataStart = checksumStart + (chunksPerPkt * checksum.getChecksumSize());
dataPos = dataStart;
maxChunks = chunksPerPkt;
}
void writeData(byte[] inarray, int off, int len) {
- if ( dataPos + len > buf.length) {
+ if (dataPos + len > buf.length) {
throw new BufferOverflowException();
}
System.arraycopy(inarray, off, buf, dataPos, len);
dataPos += len;
}
- void writeChecksum(byte[] inarray, int off, int len) {
+ void writeChecksum(byte[] inarray, int off, int len) {
if (checksumPos + len > dataStart) {
throw new BufferOverflowException();
}
@@ -221,45 +224,38 @@ public class DFSOutputStream extends FSO
}
/**
- * Returns ByteBuffer that contains one full packet, including header.
+ * Write the full packet, including the header, to the given output stream.
*/
- ByteBuffer getBuffer() {
- /* Once this is called, no more data can be added to the packet.
- * setting 'buf' to null ensures that.
- * This is called only when the packet is ready to be sent.
- */
- if (buffer != null) {
- return buffer;
- }
-
- //prepare the header and close any gap between checksum and data.
-
- int dataLen = dataPos - dataStart;
- int checksumLen = checksumPos - checksumStart;
+ void writeTo(DataOutputStream stm) throws IOException {
+ final int dataLen = dataPos - dataStart;
+ final int checksumLen = checksumPos - checksumStart;
+ final int pktLen = HdfsConstants.BYTES_IN_INTEGER + dataLen + checksumLen;
+
+ PacketHeader header = new PacketHeader(
+ pktLen, offsetInBlock, seqno, lastPacketInBlock, dataLen, syncBlock);
if (checksumPos != dataStart) {
- /* move the checksum to cover the gap.
- * This can happen for the last packet.
- */
+ // Move the checksum to cover the gap. This can happen for the last
+ // packet or during an hflush/hsync call.
System.arraycopy(buf, checksumStart, buf,
dataStart - checksumLen , checksumLen);
+ checksumPos = dataStart;
+ checksumStart = checksumPos - checksumLen;
}
- int pktLen = HdfsConstants.BYTES_IN_INTEGER + dataLen + checksumLen;
+ final int headerStart = checksumStart - header.getSerializedSize();
+ assert checksumStart + 1 >= header.getSerializedSize();
+ assert checksumPos == dataStart;
+ assert headerStart >= 0;
+ assert headerStart + header.getSerializedSize() == checksumStart;
- //normally dataStart == checksumPos, i.e., offset is zero.
- buffer = ByteBuffer.wrap(
- buf, dataStart - checksumPos,
- PacketHeader.PKT_HEADER_LEN + pktLen - HdfsConstants.BYTES_IN_INTEGER);
- buf = null;
- buffer.mark();
-
- PacketHeader header = new PacketHeader(
- pktLen, offsetInBlock, seqno, lastPacketInBlock, dataLen, syncBlock);
- header.putInBuffer(buffer);
+ // Copy the header data into the buffer immediately preceding the checksum
+ // data.
+ System.arraycopy(header.getBytes(), 0, buf, headerStart,
+ header.getSerializedSize());
- buffer.reset();
- return buffer;
+ // Write the now contiguous full packet to the output stream.
+ stm.write(buf, headerStart, header.getSerializedSize() + checksumLen + dataLen);
}
// get the packet's last byte's offset in the block
@@ -502,8 +498,6 @@ public class DFSOutputStream extends FSO
}
// send the packet
- ByteBuffer buf = one.getBuffer();
-
synchronized (dataQueue) {
// move packet from dataQueue to ackQueue
if (!one.isHeartbeatPacket()) {
@@ -519,8 +513,8 @@ public class DFSOutputStream extends FSO
}
// write out data to remote datanode
- try {
- blockStream.write(buf.array(), buf.position(), buf.remaining());
+ try {
+ one.writeTo(blockStream);
blockStream.flush();
} catch (IOException e) {
// HDFS-3398 treat primary DN is down since client is unable to
@@ -1358,9 +1352,8 @@ public class DFSOutputStream extends FSO
private void computePacketChunkSize(int psize, int csize) {
int chunkSize = csize + checksum.getChecksumSize();
- int n = PacketHeader.PKT_HEADER_LEN;
- chunksPerPacket = Math.max((psize - n + chunkSize-1)/chunkSize, 1);
- packetSize = n + chunkSize*chunksPerPacket;
+ chunksPerPacket = Math.max(psize/chunkSize, 1);
+ packetSize = chunkSize*chunksPerPacket;
if (DFSClient.LOG.isDebugEnabled()) {
DFSClient.LOG.debug("computePacketChunkSize: src=" + src +
", chunkSize=" + chunkSize +
@@ -1474,8 +1467,7 @@ public class DFSOutputStream extends FSO
// indicate the end of block and reset bytesCurBlock.
//
if (bytesCurBlock == blockSize) {
- currentPacket = new Packet(PacketHeader.PKT_HEADER_LEN, 0,
- bytesCurBlock);
+ currentPacket = new Packet(0, 0, bytesCurBlock);
currentPacket.lastPacketInBlock = true;
currentPacket.syncBlock = shouldSyncBlock;
waitAndQueueCurrentPacket();
@@ -1751,8 +1743,7 @@ public class DFSOutputStream extends FSO
if (bytesCurBlock != 0) {
// send an empty packet to mark the end of the block
- currentPacket = new Packet(PacketHeader.PKT_HEADER_LEN, 0,
- bytesCurBlock);
+ currentPacket = new Packet(0, 0, bytesCurBlock);
currentPacket.lastPacketInBlock = true;
currentPacket.syncBlock = shouldSyncBlock;
}
@@ -1805,8 +1796,7 @@ public class DFSOutputStream extends FSO
@VisibleForTesting
public synchronized void setChunksPerPacket(int value) {
chunksPerPacket = Math.min(chunksPerPacket, value);
- packetSize = PacketHeader.PKT_HEADER_LEN +
- (checksum.getBytesPerChecksum() +
+ packetSize = (checksum.getBytesPerChecksum() +
checksum.getChecksumSize()) * chunksPerPacket;
}
Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java?rev=1371496&r1=1371495&r2=1371496&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java Thu Aug 9 21:33:20 2012
@@ -33,12 +33,12 @@ import java.nio.channels.ReadableByteCha
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.fs.ChecksumException;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferEncryptor;
import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
+import org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver;
import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientReadStatusProto;
@@ -48,14 +48,11 @@ import org.apache.hadoop.hdfs.security.t
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
-import org.apache.hadoop.hdfs.util.DirectBufferPool;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.net.SocketInputWrapper;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.DataChecksum;
-import com.google.common.base.Preconditions;
-
/**
* This is a wrapper around connection to datanode
* and understands checksum, offset etc.
@@ -93,11 +90,9 @@ public class RemoteBlockReader2 impleme
private final ReadableByteChannel in;
private DataChecksum checksum;
- private PacketHeader curHeader;
- private ByteBuffer curPacketBuf = null;
+ private PacketReceiver packetReceiver = new PacketReceiver(true);
private ByteBuffer curDataSlice = null;
-
/** offset in block of the last chunk received */
private long lastSeqNo = -1;
@@ -105,10 +100,6 @@ public class RemoteBlockReader2 impleme
private long startOffset;
private final String filename;
- private static DirectBufferPool bufferPool = new DirectBufferPool();
- private final ByteBuffer headerBuf = ByteBuffer.allocate(
- PacketHeader.PKT_HEADER_LEN);
-
private final int bytesPerChecksum;
private final int checksumSize;
@@ -132,7 +123,7 @@ public class RemoteBlockReader2 impleme
public synchronized int read(byte[] buf, int off, int len)
throws IOException {
- if (curPacketBuf == null || curDataSlice.remaining() == 0 && bytesNeededToFinish > 0) {
+ if (curDataSlice == null || curDataSlice.remaining() == 0 && bytesNeededToFinish > 0) {
readNextPacket();
}
if (curDataSlice.remaining() == 0) {
@@ -149,7 +140,7 @@ public class RemoteBlockReader2 impleme
@Override
public int read(ByteBuffer buf) throws IOException {
- if (curPacketBuf == null || curDataSlice.remaining() == 0 && bytesNeededToFinish > 0) {
+ if (curDataSlice == null || curDataSlice.remaining() == 0 && bytesNeededToFinish > 0) {
readNextPacket();
}
if (curDataSlice.remaining() == 0) {
@@ -167,11 +158,13 @@ public class RemoteBlockReader2 impleme
}
private void readNextPacket() throws IOException {
- Preconditions.checkState(curHeader == null || !curHeader.isLastPacketInBlock());
-
//Read packet headers.
- readPacketHeader();
+ packetReceiver.receiveNextPacket(in);
+ PacketHeader curHeader = packetReceiver.getHeader();
+ curDataSlice = packetReceiver.getDataSlice();
+ assert curDataSlice.capacity() == curHeader.getDataLen();
+
if (LOG.isTraceEnabled()) {
LOG.trace("DFSClient readNextPacket got header " + curHeader);
}
@@ -185,17 +178,20 @@ public class RemoteBlockReader2 impleme
if (curHeader.getDataLen() > 0) {
int chunks = 1 + (curHeader.getDataLen() - 1) / bytesPerChecksum;
int checksumsLen = chunks * checksumSize;
- int bufsize = checksumsLen + curHeader.getDataLen();
+
+ assert packetReceiver.getChecksumSlice().capacity() == checksumsLen :
+ "checksum slice capacity=" + packetReceiver.getChecksumSlice().capacity() +
+ " checksumsLen=" + checksumsLen;
- resetPacketBuffer(checksumsLen, curHeader.getDataLen());
-
lastSeqNo = curHeader.getSeqno();
- if (bufsize > 0) {
- readChannelFully(in, curPacketBuf);
- curPacketBuf.flip();
- if (verifyChecksum) {
- verifyPacketChecksums();
- }
+ if (verifyChecksum && curDataSlice.remaining() > 0) {
+ // N.B.: the checksum error offset reported here is actually
+ // relative to the start of the block, not the start of the file.
+ // This is slightly misleading, but preserves the behavior from
+ // the older BlockReader.
+ checksum.verifyChunkedSums(curDataSlice,
+ packetReceiver.getChecksumSlice(),
+ filename, curHeader.getOffsetInBlock());
}
bytesNeededToFinish -= curHeader.getDataLen();
}
@@ -218,40 +214,7 @@ public class RemoteBlockReader2 impleme
}
}
}
-
- private void verifyPacketChecksums() throws ChecksumException {
- // N.B.: the checksum error offset reported here is actually
- // relative to the start of the block, not the start of the file.
- // This is slightly misleading, but preserves the behavior from
- // the older BlockReader.
- checksum.verifyChunkedSums(curDataSlice, curPacketBuf,
- filename, curHeader.getOffsetInBlock());
- }
-
- private static void readChannelFully(ReadableByteChannel ch, ByteBuffer buf)
- throws IOException {
- while (buf.remaining() > 0) {
- int n = ch.read(buf);
- if (n < 0) {
- throw new IOException("Premature EOF reading from " + ch);
- }
- }
- }
-
- private void resetPacketBuffer(int checksumsLen, int dataLen) {
- int packetLen = checksumsLen + dataLen;
- if (curPacketBuf == null ||
- curPacketBuf.capacity() < packetLen) {
- returnPacketBufToPool();
- curPacketBuf = bufferPool.getBuffer(packetLen);
- }
- curPacketBuf.position(checksumsLen);
- curDataSlice = curPacketBuf.slice();
- curDataSlice.limit(dataLen);
- curPacketBuf.clear();
- curPacketBuf.limit(checksumsLen + dataLen);
- }
-
+
@Override
public synchronized long skip(long n) throws IOException {
/* How can we make sure we don't throw a ChecksumException, at least
@@ -272,23 +235,14 @@ public class RemoteBlockReader2 impleme
return nSkipped;
}
- private void readPacketHeader() throws IOException {
- headerBuf.clear();
- readChannelFully(in, headerBuf);
- headerBuf.flip();
- if (curHeader == null) curHeader = new PacketHeader();
- curHeader.readFields(headerBuf);
- }
-
private void readTrailingEmptyPacket() throws IOException {
if (LOG.isTraceEnabled()) {
LOG.trace("Reading empty packet at end of read");
}
- headerBuf.clear();
- readChannelFully(in, headerBuf);
- headerBuf.flip();
- PacketHeader trailer = new PacketHeader();
- trailer.readFields(headerBuf);
+
+ packetReceiver.receiveNextPacket(in);
+
+ PacketHeader trailer = packetReceiver.getHeader();
if (!trailer.isLastPacketInBlock() ||
trailer.getDataLen() != 0) {
throw new IOException("Expected empty end-of-read packet! Header: " +
@@ -321,7 +275,7 @@ public class RemoteBlockReader2 impleme
@Override
public synchronized void close() throws IOException {
- returnPacketBufToPool();
+ packetReceiver.close();
startOffset = -1;
checksum = null;
@@ -332,24 +286,6 @@ public class RemoteBlockReader2 impleme
// in will be closed when its Socket is closed.
}
- @Override
- protected void finalize() throws Throwable {
- try {
- // just in case it didn't get closed, we
- // may as well still try to return the buffer
- returnPacketBufToPool();
- } finally {
- super.finalize();
- }
- }
-
- private void returnPacketBufToPool() {
- if (curPacketBuf != null) {
- bufferPool.returnBuffer(curPacketBuf);
- curPacketBuf = null;
- }
- }
-
/**
* Take the socket used to talk to the DN.
*/
Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketHeader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketHeader.java?rev=1371496&r1=1371495&r2=1371496&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketHeader.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketHeader.java Thu Aug 9 21:33:20 2012
@@ -27,14 +27,31 @@ import org.apache.hadoop.classification.
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.PacketHeaderProto;
import org.apache.hadoop.hdfs.util.ByteBufferOutputStream;
+import com.google.common.base.Preconditions;
+import com.google.common.primitives.Shorts;
+import com.google.common.primitives.Ints;
+import com.google.protobuf.InvalidProtocolBufferException;
+
/**
* Header data for each packet that goes through the read/write pipelines.
+ * Includes all of the information about the packet, excluding checksums and
+ * actual data.
+ *
+ * This data includes:
+ * - the offset in bytes into the HDFS block of the data in this packet
+ * - the sequence number of this packet in the pipeline
+ * - whether or not this is the last packet in the pipeline
+ * - the length of the data in this packet
+ * - whether or not this packet should be synced by the DNs.
+ *
+ * When serialized, this header is written out as a protocol buffer, preceded
+ * by a 4-byte integer representing the full packet length, and a 2-byte short
+ * representing the header length.
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class PacketHeader {
- /** Header size for a packet */
- private static final int PROTO_SIZE =
+ private static final int MAX_PROTO_SIZE =
PacketHeaderProto.newBuilder()
.setOffsetInBlock(0)
.setSeqno(0)
@@ -42,8 +59,10 @@ public class PacketHeader {
.setDataLen(0)
.setSyncBlock(false)
.build().getSerializedSize();
- public static final int PKT_HEADER_LEN =
- 6 + PROTO_SIZE;
+ public static final int PKT_LENGTHS_LEN =
+ Ints.BYTES + Shorts.BYTES;
+ public static final int PKT_MAX_HEADER_LEN =
+ PKT_LENGTHS_LEN + MAX_PROTO_SIZE;
private int packetLen;
private PacketHeaderProto proto;
@@ -54,13 +73,25 @@ public class PacketHeader {
public PacketHeader(int packetLen, long offsetInBlock, long seqno,
boolean lastPacketInBlock, int dataLen, boolean syncBlock) {
this.packetLen = packetLen;
- proto = PacketHeaderProto.newBuilder()
+ Preconditions.checkArgument(packetLen >= Ints.BYTES,
+ "packet len %s should always be at least 4 bytes",
+ packetLen);
+
+ PacketHeaderProto.Builder builder = PacketHeaderProto.newBuilder()
.setOffsetInBlock(offsetInBlock)
.setSeqno(seqno)
.setLastPacketInBlock(lastPacketInBlock)
- .setDataLen(dataLen)
- .setSyncBlock(syncBlock)
- .build();
+ .setDataLen(dataLen);
+
+ if (syncBlock) {
+ // Only set syncBlock if it is specified.
+ // This is wire-incompatible with Hadoop 2.0.0-alpha due to HDFS-3721
+ // because it changes the length of the packet header, and BlockReceiver
+ // in that version did not support variable-length headers.
+ builder.setSyncBlock(syncBlock);
+ }
+
+ proto = builder.build();
}
public int getDataLen() {
@@ -90,10 +121,16 @@ public class PacketHeader {
@Override
public String toString() {
return "PacketHeader with packetLen=" + packetLen +
- "Header data: " +
+ " header data: " +
proto.toString();
}
+ public void setFieldsFromData(
+ int packetLen, byte[] headerData) throws InvalidProtocolBufferException {
+ this.packetLen = packetLen;
+ proto = PacketHeaderProto.parseFrom(headerData);
+ }
+
public void readFields(ByteBuffer buf) throws IOException {
packetLen = buf.getInt();
short protoLen = buf.getShort();
@@ -110,14 +147,21 @@ public class PacketHeader {
proto = PacketHeaderProto.parseFrom(data);
}
+ /**
+ * @return the number of bytes necessary to write out this header,
+ * including the length-prefixing of the payload and header
+ */
+ public int getSerializedSize() {
+ return PKT_LENGTHS_LEN + proto.getSerializedSize();
+ }
/**
* Write the header into the buffer.
* This requires that PKT_HEADER_LEN bytes are available.
*/
public void putInBuffer(final ByteBuffer buf) {
- assert proto.getSerializedSize() == PROTO_SIZE
- : "Expected " + (PROTO_SIZE) + " got: " + proto.getSerializedSize();
+ assert proto.getSerializedSize() <= MAX_PROTO_SIZE
+ : "Expected " + (MAX_PROTO_SIZE) + " got: " + proto.getSerializedSize();
try {
buf.putInt(packetLen);
buf.putShort((short) proto.getSerializedSize());
@@ -128,12 +172,18 @@ public class PacketHeader {
}
public void write(DataOutputStream out) throws IOException {
- assert proto.getSerializedSize() == PROTO_SIZE
- : "Expected " + (PROTO_SIZE) + " got: " + proto.getSerializedSize();
+ assert proto.getSerializedSize() <= MAX_PROTO_SIZE
+ : "Expected " + (MAX_PROTO_SIZE) + " got: " + proto.getSerializedSize();
out.writeInt(packetLen);
out.writeShort(proto.getSerializedSize());
proto.writeTo(out);
}
+
+ public byte[] getBytes() {
+ ByteBuffer buf = ByteBuffer.allocate(getSerializedSize());
+ putInBuffer(buf);
+ return buf.array();
+ }
/**
* Perform a sanity check on the packet, returning true if it is sane.
Added: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketReceiver.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketReceiver.java?rev=1371496&view=auto
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketReceiver.java (added)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketReceiver.java Thu Aug 9 21:33:20 2012
@@ -0,0 +1,292 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.protocol.datatransfer;
+
+import java.io.Closeable;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.ReadableByteChannel;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdfs.util.DirectBufferPool;
+import org.apache.hadoop.io.IOUtils;
+
+import com.google.common.base.Preconditions;
+import com.google.common.primitives.Ints;
+
+/**
+ * Class to handle reading packets one-at-a-time from the wire.
+ * These packets are used both for reading and writing data to/from
+ * DataNodes.
+ */
+@InterfaceAudience.Private
+public class PacketReceiver implements Closeable {
+
+ /**
+ * The max size of any single packet. This prevents OOMEs when
+ * invalid data is sent.
+ */
+ private static final int MAX_PACKET_SIZE = 16 * 1024 * 1024;
+
+ static Log LOG = LogFactory.getLog(PacketReceiver.class);
+
+ private static final DirectBufferPool bufferPool = new DirectBufferPool();
+ private final boolean useDirectBuffers;
+
+ /**
+ * Internal buffer for reading the length prefixes at the start of
+ * the packet.
+ */
+ private final ByteBuffer lengthPrefixBuf = ByteBuffer.allocate(
+ PacketHeader.PKT_LENGTHS_LEN);
+
+ /**
+ * The entirety of the most recently read packet, excepting the
+ * length prefixes.
+ */
+ private ByteBuffer curPacketBuf = null;
+
+ /**
+ * A slice of {@link #curPacketBuf} which contains just the checksums.
+ */
+ private ByteBuffer curChecksumSlice = null;
+
+ /**
+ * A slice of {@link #curPacketBuf} which contains just the data.
+ */
+ private ByteBuffer curDataSlice = null;
+
+ /**
+ * The packet header of the most recently read packet.
+ */
+ private PacketHeader curHeader;
+
+ public PacketReceiver(boolean useDirectBuffers) {
+ this.useDirectBuffers = useDirectBuffers;
+ }
+
+ public PacketHeader getHeader() {
+ return curHeader;
+ }
+
+ public ByteBuffer getDataSlice() {
+ return curDataSlice;
+ }
+
+ public ByteBuffer getChecksumSlice() {
+ return curChecksumSlice;
+ }
+
+ /**
+ * Reads all of the data for the next packet into the appropriate buffers.
+ *
+ * The data slice and checksum slice members will be set to point to the
+ * user data and corresponding checksums. The header will be parsed and
+ * set.
+ */
+ public void receiveNextPacket(ReadableByteChannel in) throws IOException {
+ doRead(in, null);
+ }
+
+ /**
+ * @see #receiveNextPacket(ReadableByteChannel)
+ */
+ public void receiveNextPacket(InputStream in) throws IOException {
+ doRead(null, in);
+ }
+
+ private void doRead(ReadableByteChannel ch, InputStream in)
+ throws IOException {
+ // Each packet looks like:
+ // PLEN HLEN HEADER CHECKSUMS DATA
+ // 32-bit 16-bit <protobuf> <variable length>
+ //
+ // PLEN: Payload length
+ // = length(PLEN) + length(CHECKSUMS) + length(DATA)
+ // This length includes its own encoded length in
+ // the sum for historical reasons.
+ //
+ // HLEN: Header length
+ // = length(HEADER)
+ //
+ // HEADER: the actual packet header fields, encoded in protobuf
+ // CHECKSUMS: the crcs for the data chunk. May be missing if
+ // checksums were not requested
+ // DATA the actual block data
+ Preconditions.checkState(curHeader == null || !curHeader.isLastPacketInBlock());
+
+ lengthPrefixBuf.clear();
+ doReadFully(ch, in, lengthPrefixBuf);
+ lengthPrefixBuf.flip();
+ int payloadLen = lengthPrefixBuf.getInt();
+
+ if (payloadLen < Ints.BYTES) {
+ // The "payload length" includes its own length. Therefore it
+ // should never be less than 4 bytes
+ throw new IOException("Invalid payload length " +
+ payloadLen);
+ }
+ int dataPlusChecksumLen = payloadLen - Ints.BYTES;
+ int headerLen = lengthPrefixBuf.getShort();
+ if (headerLen < 0) {
+ throw new IOException("Invalid header length " + headerLen);
+ }
+
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("readNextPacket: dataPlusChecksumLen = " + dataPlusChecksumLen +
+ " headerLen = " + headerLen);
+ }
+
+ // Sanity check the buffer size so we don't allocate too much memory
+ // and OOME.
+ int totalLen = payloadLen + headerLen;
+ if (totalLen < 0 || totalLen > MAX_PACKET_SIZE) {
+ throw new IOException("Incorrect value for packet payload size: " +
+ payloadLen);
+ }
+
+ // Make sure we have space for the whole packet, and
+ // read it.
+ reallocPacketBuf(dataPlusChecksumLen + headerLen);
+ curPacketBuf.clear();
+ curPacketBuf.limit(dataPlusChecksumLen + headerLen);
+ doReadFully(ch, in, curPacketBuf);
+ curPacketBuf.flip();
+
+ // Extract the header from the front of the buffer.
+ byte[] headerBuf = new byte[headerLen];
+ curPacketBuf.get(headerBuf);
+ if (curHeader == null) {
+ curHeader = new PacketHeader();
+ }
+ curHeader.setFieldsFromData(dataPlusChecksumLen, headerBuf);
+
+ // Compute the sub-slices of the packet
+ int checksumLen = dataPlusChecksumLen - curHeader.getDataLen();
+ if (checksumLen < 0) {
+ throw new IOException("Invalid packet: data length in packet header " +
+ "exceeds data length received. dataPlusChecksumLen=" +
+ dataPlusChecksumLen + " header: " + curHeader);
+ }
+
+ reslicePacket(headerLen, checksumLen, curHeader.getDataLen());
+ }
+
+ /**
+ * Rewrite the last-read packet on the wire to the given output stream.
+ */
+ public void mirrorPacketTo(DataOutputStream mirrorOut) throws IOException {
+ Preconditions.checkState(!useDirectBuffers,
+ "Currently only supported for non-direct buffers");
+ assert lengthPrefixBuf.capacity() == PacketHeader.PKT_LENGTHS_LEN;
+ mirrorOut.write(lengthPrefixBuf.array(),
+ lengthPrefixBuf.arrayOffset(),
+ lengthPrefixBuf.capacity());
+ mirrorOut.write(curPacketBuf.array(),
+ curPacketBuf.arrayOffset(),
+ curPacketBuf.remaining());
+ }
+
+
+ private static void doReadFully(ReadableByteChannel ch, InputStream in,
+ ByteBuffer buf) throws IOException {
+ if (ch != null) {
+ readChannelFully(ch, buf);
+ } else {
+ Preconditions.checkState(!buf.isDirect(),
+ "Must not use direct buffers with InputStream API");
+ IOUtils.readFully(in, buf.array(),
+ buf.arrayOffset() + buf.position(),
+ buf.remaining());
+ buf.position(buf.position() + buf.remaining());
+ }
+ }
+
+ private void reslicePacket(
+ int headerLen, int checksumsLen, int dataLen) {
+ assert dataLen >= 0 : "invalid datalen: " + dataLen;
+
+ assert curPacketBuf.position() == headerLen;
+ assert checksumsLen + dataLen == curPacketBuf.remaining() :
+ "headerLen= " + headerLen + " clen=" + checksumsLen + " dlen=" + dataLen +
+ " rem=" + curPacketBuf.remaining();
+
+ curPacketBuf.position(headerLen);
+ curPacketBuf.limit(headerLen + checksumsLen);
+ curChecksumSlice = curPacketBuf.slice();
+
+ curPacketBuf.position(headerLen + checksumsLen);
+ curPacketBuf.limit(headerLen + checksumsLen + dataLen);
+ curDataSlice = curPacketBuf.slice();
+
+ curPacketBuf.position(0);
+ curPacketBuf.limit(headerLen + checksumsLen + dataLen);
+ }
+
+
+ private static void readChannelFully(ReadableByteChannel ch, ByteBuffer buf)
+ throws IOException {
+ while (buf.remaining() > 0) {
+ int n = ch.read(buf);
+ if (n < 0) {
+ throw new IOException("Premature EOF reading from " + ch);
+ }
+ }
+ }
+
+ private void reallocPacketBuf(int atLeastCapacity) {
+ // Realloc the buffer if this packet is longer than the previous
+ // one.
+ if (curPacketBuf == null ||
+ curPacketBuf.capacity() < atLeastCapacity) {
+ returnPacketBufToPool();
+ if (useDirectBuffers) {
+ curPacketBuf = bufferPool.getBuffer(atLeastCapacity);
+ } else {
+ curPacketBuf = ByteBuffer.allocate(atLeastCapacity);
+ }
+ }
+ }
+
+ private void returnPacketBufToPool() {
+ if (curPacketBuf != null && curPacketBuf.isDirect()) {
+ bufferPool.returnBuffer(curPacketBuf);
+ curPacketBuf = null;
+ }
+ }
+
+ @Override // Closeable
+ public void close() {
+ returnPacketBufToPool();
+ }
+
+ @Override
+ protected void finalize() throws Throwable {
+ try {
+ // just in case it didn't get closed, we
+ // may as well still try to return the buffer
+ returnPacketBufToPool();
+ } finally {
+ super.finalize();
+ }
+ }
+}
Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java?rev=1371496&r1=1371495&r2=1371496&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java Thu Aug 9 21:33:20 2012
@@ -23,7 +23,6 @@ import java.io.BufferedOutputStream;
import java.io.Closeable;
import java.io.DataInputStream;
import java.io.DataOutputStream;
-import java.io.EOFException;
import java.io.FileDescriptor;
import java.io.FileOutputStream;
import java.io.IOException;
@@ -34,12 +33,14 @@ import java.util.LinkedList;
import java.util.zip.Checksum;
import org.apache.commons.logging.Log;
+import org.apache.hadoop.fs.ChecksumException;
import org.apache.hadoop.fs.FSOutputSummer;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
+import org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver;
import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams;
@@ -77,9 +78,10 @@ class BlockReceiver implements Closeable
private DataOutputStream checksumOut = null; // to crc file at local disk
private int bytesPerChecksum;
private int checksumSize;
- private ByteBuffer buf; // contains one full packet.
- private int bufRead; //amount of valid data in the buf
- private int maxPacketReadLen;
+
+ private PacketReceiver packetReceiver =
+ new PacketReceiver(false);
+
protected final String inAddr;
protected final String myAddr;
private String mirrorAddr;
@@ -248,6 +250,10 @@ class BlockReceiver implements Closeable
*/
@Override
public void close() throws IOException {
+ if (packetReceiver != null) {
+ packetReceiver.close();
+ }
+
IOException ioe = null;
if (syncOnClose && (out != null || checksumOut != null)) {
datanode.metrics.incrFsyncCount();
@@ -365,33 +371,24 @@ class BlockReceiver implements Closeable
/**
* Verify multiple CRC chunks.
*/
- private void verifyChunks( byte[] dataBuf, int dataOff, int len,
- byte[] checksumBuf, int checksumOff )
- throws IOException {
- while (len > 0) {
- int chunkLen = Math.min(len, bytesPerChecksum);
-
- clientChecksum.update(dataBuf, dataOff, chunkLen);
-
- if (!clientChecksum.compare(checksumBuf, checksumOff)) {
- if (srcDataNode != null) {
- try {
- LOG.info("report corrupt block " + block + " from datanode " +
- srcDataNode + " to namenode");
- datanode.reportRemoteBadBlock(srcDataNode, block);
- } catch (IOException e) {
- LOG.warn("Failed to report bad block " + block +
- " from datanode " + srcDataNode + " to namenode");
- }
+ private void verifyChunks(ByteBuffer dataBuf, ByteBuffer checksumBuf)
+ throws IOException {
+ try {
+ clientChecksum.verifyChunkedSums(dataBuf, checksumBuf, clientname, 0);
+ } catch (ChecksumException ce) {
+ LOG.warn("Checksum error in block " + block + " from " + inAddr, ce);
+ if (srcDataNode != null) {
+ try {
+ LOG.info("report corrupt block " + block + " from datanode " +
+ srcDataNode + " to namenode");
+ datanode.reportRemoteBadBlock(srcDataNode, block);
+ } catch (IOException e) {
+ LOG.warn("Failed to report bad block " + block +
+ " from datanode " + srcDataNode + " to namenode");
}
- throw new IOException("Unexpected checksum mismatch " +
- "while writing " + block + " from " + inAddr);
}
-
- clientChecksum.reset();
- dataOff += chunkLen;
- checksumOff += checksumSize;
- len -= chunkLen;
+ throw new IOException("Unexpected checksum mismatch " +
+ "while writing " + block + " from " + inAddr);
}
}
@@ -403,163 +400,24 @@ class BlockReceiver implements Closeable
* This does not verify the original checksums, under the assumption
* that they have already been validated.
*/
- private void translateChunks( byte[] dataBuf, int dataOff, int len,
- byte[] checksumBuf, int checksumOff ) {
- if (len == 0) return;
-
- int numChunks = (len - 1)/bytesPerChecksum + 1;
-
- diskChecksum.calculateChunkedSums(
- ByteBuffer.wrap(dataBuf, dataOff, len),
- ByteBuffer.wrap(checksumBuf, checksumOff, numChunks * checksumSize));
+ private void translateChunks(ByteBuffer dataBuf, ByteBuffer checksumBuf) {
+ diskChecksum.calculateChunkedSums(dataBuf, checksumBuf);
}
- /**
- * Makes sure buf.position() is zero without modifying buf.remaining().
- * It moves the data if position needs to be changed.
- */
- private void shiftBufData() {
- if (bufRead != buf.limit()) {
- throw new IllegalStateException("bufRead should be same as " +
- "buf.limit()");
- }
-
- //shift the remaining data on buf to the front
- if (buf.position() > 0) {
- int dataLeft = buf.remaining();
- if (dataLeft > 0) {
- byte[] b = buf.array();
- System.arraycopy(b, buf.position(), b, 0, dataLeft);
- }
- buf.position(0);
- bufRead = dataLeft;
- buf.limit(bufRead);
- }
- }
-
- /**
- * reads upto toRead byte to buf at buf.limit() and increments the limit.
- * throws an IOException if read does not succeed.
- */
- private int readToBuf(int toRead) throws IOException {
- if (toRead < 0) {
- toRead = (maxPacketReadLen > 0 ? maxPacketReadLen : buf.capacity())
- - buf.limit();
- }
-
- int nRead = in.read(buf.array(), buf.limit(), toRead);
-
- if (nRead < 0) {
- throw new EOFException("while trying to read " + toRead + " bytes");
- }
- bufRead = buf.limit() + nRead;
- buf.limit(bufRead);
- return nRead;
- }
-
-
- /**
- * Reads (at least) one packet and returns the packet length.
- * buf.position() points to the start of the packet and
- * buf.limit() point to the end of the packet. There could
- * be more data from next packet in buf.<br><br>
- *
- * It tries to read a full packet with single read call.
- * Consecutive packets are usually of the same length.
- */
- private void readNextPacket() throws IOException {
- /* This dances around buf a little bit, mainly to read
- * full packet with single read and to accept arbitrary size
- * for next packet at the same time.
- */
- if (buf == null) {
- /* initialize buffer to the best guess size:
- * 'chunksPerPacket' calculation here should match the same
- * calculation in DFSClient to make the guess accurate.
- */
- int chunkSize = bytesPerChecksum + checksumSize;
- int chunksPerPacket = (datanode.getDnConf().writePacketSize - PacketHeader.PKT_HEADER_LEN
- + chunkSize - 1)/chunkSize;
- buf = ByteBuffer.allocate(PacketHeader.PKT_HEADER_LEN +
- Math.max(chunksPerPacket, 1) * chunkSize);
- buf.limit(0);
- }
-
- // See if there is data left in the buffer :
- if (bufRead > buf.limit()) {
- buf.limit(bufRead);
- }
-
- while (buf.remaining() < HdfsConstants.BYTES_IN_INTEGER) {
- if (buf.position() > 0) {
- shiftBufData();
- }
- readToBuf(-1);
- }
-
- /* We mostly have the full packet or at least enough for an int
- */
- buf.mark();
- int payloadLen = buf.getInt();
- buf.reset();
-
- // check corrupt values for pktLen, 100MB upper limit should be ok?
- if (payloadLen < 0 || payloadLen > (100*1024*1024)) {
- throw new IOException("Incorrect value for packet payload : " +
- payloadLen);
- }
-
- // Subtract BYTES_IN_INTEGER since that accounts for the payloadLen that
- // we read above.
- int pktSize = payloadLen + PacketHeader.PKT_HEADER_LEN
- - HdfsConstants.BYTES_IN_INTEGER;
-
- if (buf.remaining() < pktSize) {
- //we need to read more data
- int toRead = pktSize - buf.remaining();
-
- // first make sure buf has enough space.
- int spaceLeft = buf.capacity() - buf.limit();
- if (toRead > spaceLeft && buf.position() > 0) {
- shiftBufData();
- spaceLeft = buf.capacity() - buf.limit();
- }
- if (toRead > spaceLeft) {
- byte oldBuf[] = buf.array();
- int toCopy = buf.limit();
- buf = ByteBuffer.allocate(toCopy + toRead);
- System.arraycopy(oldBuf, 0, buf.array(), 0, toCopy);
- buf.limit(toCopy);
- }
-
- //now read:
- while (toRead > 0) {
- toRead -= readToBuf(toRead);
- }
- }
-
- if (buf.remaining() > pktSize) {
- buf.limit(buf.position() + pktSize);
- }
-
- if (pktSize > maxPacketReadLen) {
- maxPacketReadLen = pktSize;
- }
- }
-
+
/**
* Receives and processes a packet. It can contain many chunks.
* returns the number of data bytes that the packet has.
*/
private int receivePacket() throws IOException {
// read the next packet
- readNextPacket();
+ packetReceiver.receiveNextPacket(in);
- buf.mark();
- PacketHeader header = new PacketHeader();
- header.readFields(buf);
- int endOfHeader = buf.position();
- buf.reset();
+ PacketHeader header = packetReceiver.getHeader();
+ if (LOG.isDebugEnabled()){
+ LOG.debug("Receiving one packet for block " + block +
+ ": " + header);
+ }
// Sanity check the header
if (header.getOffsetInBlock() > replicaInfo.getNumBytes()) {
@@ -574,38 +432,12 @@ class BlockReceiver implements Closeable
header.getDataLen());
}
- return receivePacket(
- header.getOffsetInBlock(),
- header.getSeqno(),
- header.isLastPacketInBlock(),
- header.getDataLen(),
- header.getSyncBlock(),
- endOfHeader);
- }
+ long offsetInBlock = header.getOffsetInBlock();
+ long seqno = header.getSeqno();
+ boolean lastPacketInBlock = header.isLastPacketInBlock();
+ int len = header.getDataLen();
+ boolean syncBlock = header.getSyncBlock();
- /**
- * Write the received packet to disk (data only)
- */
- private void writePacketToDisk(byte[] pktBuf, int startByteToDisk,
- int numBytesToDisk) throws IOException {
- out.write(pktBuf, startByteToDisk, numBytesToDisk);
- }
-
- /**
- * Receives and processes a packet. It can contain many chunks.
- * returns the number of data bytes that the packet has.
- */
- private int receivePacket(long offsetInBlock, long seqno,
- boolean lastPacketInBlock, int len, boolean syncBlock,
- int endOfHeader) throws IOException {
- if (LOG.isDebugEnabled()){
- LOG.debug("Receiving one packet for block " + block +
- " of length " + len +
- " seqno " + seqno +
- " offsetInBlock " + offsetInBlock +
- " syncBlock " + syncBlock +
- " lastPacketInBlock " + lastPacketInBlock);
- }
// make sure the block gets sync'ed upon close
this.syncOnClose |= syncBlock && lastPacketInBlock;
@@ -625,14 +457,15 @@ class BlockReceiver implements Closeable
//First write the packet to the mirror:
if (mirrorOut != null && !mirrorError) {
try {
- mirrorOut.write(buf.array(), buf.position(), buf.remaining());
+ packetReceiver.mirrorPacketTo(mirrorOut);
mirrorOut.flush();
} catch (IOException e) {
handleMirrorOutError(e);
}
}
- buf.position(endOfHeader);
+ ByteBuffer dataBuf = packetReceiver.getDataSlice();
+ ByteBuffer checksumBuf = packetReceiver.getChecksumSlice();
if (lastPacketInBlock || len == 0) {
if(LOG.isDebugEnabled()) {
@@ -646,18 +479,11 @@ class BlockReceiver implements Closeable
int checksumLen = ((len + bytesPerChecksum - 1)/bytesPerChecksum)*
checksumSize;
- if ( buf.remaining() != (checksumLen + len)) {
- throw new IOException("Data remaining in packet does not match" +
- "sum of checksumLen and dataLen " +
- " size remaining: " + buf.remaining() +
- " data len: " + len +
- " checksum Len: " + checksumLen);
- }
- int checksumOff = buf.position();
- int dataOff = checksumOff + checksumLen;
- byte pktBuf[] = buf.array();
-
- buf.position(buf.limit()); // move to the end of the data.
+ if ( checksumBuf.capacity() != checksumLen) {
+ throw new IOException("Length of checksums in packet " +
+ checksumBuf.capacity() + " does not match calculated checksum " +
+ "length " + checksumLen);
+ }
/* skip verifying checksum iff this is not the last one in the
* pipeline and clientName is non-null. i.e. Checksum is verified
@@ -667,11 +493,11 @@ class BlockReceiver implements Closeable
* checksum.
*/
if (mirrorOut == null || isDatanode || needsChecksumTranslation) {
- verifyChunks(pktBuf, dataOff, len, pktBuf, checksumOff);
+ verifyChunks(dataBuf, checksumBuf);
if (needsChecksumTranslation) {
// overwrite the checksums in the packet buffer with the
// appropriate polynomial for the disk storage.
- translateChunks(pktBuf, dataOff, len, pktBuf, checksumOff);
+ translateChunks(dataBuf, checksumBuf);
}
}
@@ -700,9 +526,13 @@ class BlockReceiver implements Closeable
computePartialChunkCrc(onDiskLen, offsetInChecksum, bytesPerChecksum);
}
- int startByteToDisk = dataOff+(int)(onDiskLen-firstByteInBlock);
+ int startByteToDisk = (int)(onDiskLen-firstByteInBlock)
+ + dataBuf.arrayOffset() + dataBuf.position();
+
int numBytesToDisk = (int)(offsetInBlock-onDiskLen);
- writePacketToDisk(pktBuf, startByteToDisk, numBytesToDisk);
+
+ // Write data to disk.
+ out.write(dataBuf.array(), startByteToDisk, numBytesToDisk);
// If this is a partial chunk, then verify that this is the only
// chunk in the packet. Calculate new crc for this chunk.
@@ -714,7 +544,7 @@ class BlockReceiver implements Closeable
" len = " + len +
" bytesPerChecksum " + bytesPerChecksum);
}
- partialCrc.update(pktBuf, startByteToDisk, numBytesToDisk);
+ partialCrc.update(dataBuf.array(), startByteToDisk, numBytesToDisk);
byte[] buf = FSOutputSummer.convertToByteStream(partialCrc, checksumSize);
lastChunkChecksum = Arrays.copyOfRange(
buf, buf.length - checksumSize, buf.length
@@ -726,11 +556,12 @@ class BlockReceiver implements Closeable
partialCrc = null;
} else {
lastChunkChecksum = Arrays.copyOfRange(
- pktBuf,
- checksumOff + checksumLen - checksumSize,
- checksumOff + checksumLen
- );
- checksumOut.write(pktBuf, checksumOff, checksumLen);
+ checksumBuf.array(),
+ checksumBuf.arrayOffset() + checksumBuf.position() + checksumLen - checksumSize,
+ checksumBuf.arrayOffset() + checksumBuf.position() + checksumLen);
+ checksumOut.write(checksumBuf.array(),
+ checksumBuf.arrayOffset() + checksumBuf.position(),
+ checksumLen);
}
/// flush entire packet, sync unless close() will sync
flushOrSync(syncBlock && !lastPacketInBlock);
Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java?rev=1371496&r1=1371495&r2=1371496&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java Thu Aug 9 21:33:20 2012
@@ -62,40 +62,29 @@ import org.apache.hadoop.util.DataChecks
* </pre>
* An empty packet is sent to mark the end of block and read completion.
*
- * PACKET Contains a packet header, checksum and data. Amount of data
- * carried is set by BUFFER_SIZE.
- * <pre>
- * +-----------------------------------------------------+
- * | 4 byte packet length (excluding packet header) |
- * +-----------------------------------------------------+
- * | 8 byte offset in the block | 8 byte sequence number |
- * +-----------------------------------------------------+
- * | 1 byte isLastPacketInBlock |
- * +-----------------------------------------------------+
- * | 4 byte Length of actual data |
- * +-----------------------------------------------------+
- * | x byte checksum data. x is defined below |
- * +-----------------------------------------------------+
- * | actual data ...... |
- * +-----------------------------------------------------+
- *
- * Data is made of Chunks. Each chunk is of length <= BYTES_PER_CHECKSUM.
- * A checksum is calculated for each chunk.
- *
- * x = (length of data + BYTE_PER_CHECKSUM - 1)/BYTES_PER_CHECKSUM *
- * CHECKSUM_SIZE
- *
- * CHECKSUM_SIZE depends on CHECKSUM_TYPE (usually, 4 for CRC32)
- * </pre>
+ * PACKET Contains a packet header, checksum and data. Amount of data
+ * carried is set by BUFFER_SIZE.
+ * <pre>
+ * +-----------------------------------------------------+
+ * | Variable length header. See {@link PacketHeader} |
+ * +-----------------------------------------------------+
+ * | x byte checksum data. x is defined below |
+ * +-----------------------------------------------------+
+ * | actual data ...... |
+ * +-----------------------------------------------------+
+ *
+ * Data is made of Chunks. Each chunk is of length <= BYTES_PER_CHECKSUM.
+ * A checksum is calculated for each chunk.
+ *
+ * x = (length of data + BYTE_PER_CHECKSUM - 1)/BYTES_PER_CHECKSUM *
+ * CHECKSUM_SIZE
+ *
+ * CHECKSUM_SIZE depends on CHECKSUM_TYPE (usually, 4 for CRC32)
+ * </pre>
*
* The client reads data until it receives a packet with
* "LastPacketInBlock" set to true or with a zero length. If there is
- * no checksum error, it replies to DataNode with OP_STATUS_CHECKSUM_OK:
- * <pre>
- * +------------------------------+
- * | 2 byte OP_STATUS_CHECKSUM_OK |
- * +------------------------------+
- * </pre>
+ * no checksum error, it replies to DataNode with OP_STATUS_CHECKSUM_OK.
*/
class BlockSender implements java.io.Closeable {
static final Log LOG = DataNode.LOG;
@@ -448,8 +437,22 @@ class BlockSender implements java.io.Clo
int packetLen = dataLen + checksumDataLen + 4;
boolean lastDataPacket = offset + dataLen == endOffset && dataLen > 0;
- writePacketHeader(pkt, dataLen, packetLen);
-
+ // The packet buffer is organized as follows:
+ // _______HHHHCCCCD?D?D?D?
+ // ^ ^
+ // | \ checksumOff
+ // \ headerOff
+ // _ padding, since the header is variable-length
+ // H = header and length prefixes
+ // C = checksums
+ // D? = data, if transferTo is false.
+
+ int headerLen = writePacketHeader(pkt, dataLen, packetLen);
+
+ // Per above, the header doesn't start at the beginning of the
+ // buffer
+ int headerOff = pkt.position() - headerLen;
+
int checksumOff = pkt.position();
byte[] buf = pkt.array();
@@ -479,7 +482,8 @@ class BlockSender implements java.io.Clo
try {
if (transferTo) {
SocketOutputStream sockOut = (SocketOutputStream)out;
- sockOut.write(buf, 0, dataOff); // First write checksum
+ // First write header and checksums
+ sockOut.write(buf, headerOff, dataOff - headerOff);
// no need to flush since we know out is not a buffered stream
FileChannel fileCh = ((FileInputStream)blockIn).getChannel();
@@ -492,7 +496,7 @@ class BlockSender implements java.io.Clo
blockInPosition += dataLen;
} else {
// normal transfer
- out.write(buf, 0, dataOff + dataLen);
+ out.write(buf, headerOff, dataOff + dataLen - headerOff);
}
} catch (IOException e) {
if (e instanceof SocketTimeoutException) {
@@ -625,7 +629,7 @@ class BlockSender implements java.io.Clo
final long startTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0;
try {
int maxChunksPerPacket;
- int pktSize = PacketHeader.PKT_HEADER_LEN;
+ int pktBufSize = PacketHeader.PKT_MAX_HEADER_LEN;
boolean transferTo = transferToAllowed && !verifyChecksum
&& baseStream instanceof SocketOutputStream
&& blockIn instanceof FileInputStream;
@@ -636,15 +640,15 @@ class BlockSender implements java.io.Clo
maxChunksPerPacket = numberOfChunks(TRANSFERTO_BUFFER_SIZE);
// Smaller packet size to only hold checksum when doing transferTo
- pktSize += checksumSize * maxChunksPerPacket;
+ pktBufSize += checksumSize * maxChunksPerPacket;
} else {
maxChunksPerPacket = Math.max(1,
numberOfChunks(HdfsConstants.IO_FILE_BUFFER_SIZE));
// Packet size includes both checksum and data
- pktSize += (chunkSize + checksumSize) * maxChunksPerPacket;
+ pktBufSize += (chunkSize + checksumSize) * maxChunksPerPacket;
}
- ByteBuffer pktBuf = ByteBuffer.allocate(pktSize);
+ ByteBuffer pktBuf = ByteBuffer.allocate(pktBufSize);
while (endOffset > offset) {
manageOsCache();
@@ -714,14 +718,19 @@ class BlockSender implements java.io.Clo
}
/**
- * Write packet header into {@code pkt}
+ * Write packet header into {@code pkt},
+ * return the length of the header written.
*/
- private void writePacketHeader(ByteBuffer pkt, int dataLen, int packetLen) {
+ private int writePacketHeader(ByteBuffer pkt, int dataLen, int packetLen) {
pkt.clear();
// both syncBlock and syncPacket are false
PacketHeader header = new PacketHeader(packetLen, offset, seqno,
(dataLen == 0), dataLen, false);
+
+ int size = header.getSerializedSize();
+ pkt.position(PacketHeader.PKT_MAX_HEADER_LEN - size);
header.putInBuffer(pkt);
+ return size;
}
boolean didSendEntireByteRange() {