You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ra...@apache.org on 2008/03/03 22:40:23 UTC
svn commit: r633285 - in /hadoop/core/trunk: ./
src/java/org/apache/hadoop/dfs/ src/test/org/apache/hadoop/dfs/
Author: rangadi
Date: Mon Mar 3 13:40:18 2008
New Revision: 633285
URL: http://svn.apache.org/viewvc?rev=633285&view=rev
Log:
HADOOP-2758. Reduce buffer copies in DataNode when data is read from
HDFS, without negatively affecting read throughput. (rangadi)
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/java/org/apache/hadoop/dfs/Balancer.java
hadoop/core/trunk/src/java/org/apache/hadoop/dfs/BlockCrcUpgrade.java
hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java
hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DataNode.java
hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java
hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestBlockReplacement.java
hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestDataTransferProtocol.java
Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=633285&r1=633284&r2=633285&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Mon Mar 3 13:40:18 2008
@@ -65,6 +65,9 @@
repetitive calls to get the current time and late checking to see if
we want speculation on at all. (omalley)
+ HADOOP-2758. Reduce buffer copies in DataNode when data is read from
+ HDFS, without negatively affecting read throughput. (rangadi)
+
BUG FIXES
HADOOP-2195. '-mkdir' behaviour is now closer to Linux shell in case of
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/Balancer.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/Balancer.java?rev=633285&r1=633284&r2=633285&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/Balancer.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/Balancer.java Mon Mar 3 13:40:18 2008
@@ -341,7 +341,7 @@
/* Send a block copy request to the outputstream*/
private void sendRequest(DataOutputStream out) throws IOException {
- out.writeShort(FSConstants.DATA_TRANFER_VERSION);
+ out.writeShort(FSConstants.DATA_TRANSFER_VERSION);
out.writeByte(FSConstants.OP_COPY_BLOCK);
out.writeLong(block.getBlock().getBlockId());
Text.writeString(out, source.getStorageID());
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/BlockCrcUpgrade.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/BlockCrcUpgrade.java?rev=633285&r1=633284&r2=633285&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/BlockCrcUpgrade.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/BlockCrcUpgrade.java Mon Mar 3 13:40:18 2008
@@ -853,7 +853,7 @@
DataInputStream in = new DataInputStream(dnSock.getInputStream());
// Write the header:
- out.writeShort( DataNode.DATA_TRANFER_VERSION );
+ out.writeShort( DataNode.DATA_TRANSFER_VERSION );
out.writeByte( DataNode.OP_READ_METADATA );
out.writeLong( blockInfo.block.getBlockId() );
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java?rev=633285&r1=633284&r2=633285&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java Mon Mar 3 13:40:18 2008
@@ -638,6 +638,7 @@
private DataChecksum checksum;
private long lastChunkOffset = -1;
private long lastChunkLen = -1;
+ private long lastSeqNo = -1;
private long startOffset;
private long firstChunkOffset;
@@ -646,6 +647,9 @@
private boolean gotEOS = false;
byte[] skipBuf = null;
+ ByteBuffer checksumBytes = null;
+ int dataLeft = 0;
+ boolean isLastPacket = false;
/* FSInputChecker interface */
@@ -722,6 +726,22 @@
"since seek is not required");
}
+ /**
+ * Makes sure that checksumBytes has enough capacity
+ * and limit is set to the number of checksum bytes needed
+ * to be read.
+ */
+ private void adjustChecksumBytes(int dataLen) {
+ int requiredSize =
+ ((dataLen + bytesPerChecksum - 1)/bytesPerChecksum)*checksumSize;
+ if (checksumBytes == null || requiredSize > checksumBytes.capacity()) {
+ checksumBytes = ByteBuffer.wrap(new byte[requiredSize]);
+ } else {
+ checksumBytes.clear();
+ }
+ checksumBytes.limit(requiredSize);
+ }
+
@Override
protected synchronized int readChunk(long pos, byte[] buf, int offset,
int len, byte[] checksumBuf)
@@ -748,42 +768,60 @@
firstChunkOffset + " != " + chunkOffset);
}
- // The chunk is transmitted as one packet. Read packet headers.
- int packetLen = in.readInt();
- long offsetInBlock = in.readLong();
- long seqno = in.readLong();
- boolean lastPacketInBlock = in.readBoolean();
- LOG.debug("DFSClient readChunk got seqno " + seqno +
- " offsetInBlock " + offsetInBlock +
- " lastPacketInBlock " + lastPacketInBlock +
- " packetLen " + packetLen);
-
- int chunkLen = in.readInt();
-
- // Sanity check the lengths
- if ( chunkLen < 0 || chunkLen > bytesPerChecksum ||
- ( lastChunkLen >= 0 && // prev packet exists
- ( (chunkLen > 0 && lastChunkLen != bytesPerChecksum) ||
- chunkOffset != (lastChunkOffset + lastChunkLen) ) ) ) {
- throw new IOException("BlockReader: error in chunk's offset " +
- "or length (" + chunkOffset + ":" +
- chunkLen + ")");
+ // Read next packet if the previous packet has been read completely.
+ if (dataLeft <= 0) {
+ //Read packet headers.
+ int packetLen = in.readInt();
+ long offsetInBlock = in.readLong();
+ long seqno = in.readLong();
+ boolean lastPacketInBlock = in.readBoolean();
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("DFSClient readChunk got seqno " + seqno +
+ " offsetInBlock " + offsetInBlock +
+ " lastPacketInBlock " + lastPacketInBlock +
+ " packetLen " + packetLen);
+ }
+
+ int dataLen = in.readInt();
+
+ // Sanity check the lengths
+ if ( dataLen < 0 ||
+ ( (dataLen % bytesPerChecksum) != 0 && !lastPacketInBlock ) ||
+ (seqno != (lastSeqNo + 1)) ) {
+ throw new IOException("BlockReader: error in packet header" +
+ "(chunkOffset : " + chunkOffset +
+ ", dataLen : " + dataLen +
+ ", seqno : " + seqno +
+ " (last: " + lastSeqNo + "))");
+ }
+
+ lastSeqNo = seqno;
+ isLastPacket = lastPacketInBlock;
+ dataLeft = dataLen;
+ adjustChecksumBytes(dataLen);
+ if (dataLen > 0) {
+ IOUtils.readFully(in, checksumBytes.array(), 0,
+ checksumBytes.limit());
+ }
}
+ int chunkLen = Math.min(dataLeft, bytesPerChecksum);
+
if ( chunkLen > 0 ) {
// len should be >= chunkLen
IOUtils.readFully(in, buf, offset, chunkLen);
+ checksumBytes.get(checksumBuf, 0, checksumSize);
}
- if ( checksumSize > 0 ) {
- IOUtils.readFully(in, checksumBuf, 0, checksumSize);
- }
-
+ dataLeft -= chunkLen;
lastChunkOffset = chunkOffset;
lastChunkLen = chunkLen;
- if ( chunkLen == 0 ) {
+ if ((dataLeft == 0 && isLastPacket) || chunkLen == 0) {
gotEOS = true;
+ }
+ if ( chunkLen == 0 ) {
return -1;
}
@@ -827,7 +865,7 @@
new BufferedOutputStream(sock.getOutputStream()));
//write the header.
- out.writeShort( DATA_TRANFER_VERSION );
+ out.writeShort( DATA_TRANSFER_VERSION );
out.write( OP_READ_BLOCK );
out.writeLong( blockId );
out.writeLong( startOffset );
@@ -2030,7 +2068,7 @@
DataOutputStream out = new DataOutputStream(new BufferedOutputStream(s.getOutputStream(), buffersize));
blockReplyStream = new DataInputStream(s.getInputStream());
- out.writeShort( DATA_TRANFER_VERSION );
+ out.writeShort( DATA_TRANSFER_VERSION );
out.write( OP_WRITE_BLOCK );
out.writeLong( block.getBlockId() );
out.writeInt( nodes.length );
@@ -2147,8 +2185,8 @@
}
currentPacket.writeInt(len);
- currentPacket.write(b, offset, len);
currentPacket.write(checksum, 0, cklen);
+ currentPacket.write(b, offset, len);
currentPacket.numChunks++;
bytesCurBlock += len;
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DataNode.java?rev=633285&r1=633284&r2=633285&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DataNode.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DataNode.java Mon Mar 3 13:40:18 2008
@@ -37,6 +37,7 @@
import java.io.*;
import java.net.*;
+import java.nio.ByteBuffer;
import java.util.*;
import java.util.concurrent.Semaphore;
import java.security.NoSuchAlgorithmException;
@@ -450,16 +451,6 @@
}
}
- private void enumerateThreadGroup(ThreadGroup tg) {
- int count = tg.activeCount();
- Thread[] info = new Thread[count];
- int num = tg.enumerate(info);
- for (int i = 0; i < num; i++) {
- System.out.print(info[i].getName() + " ");
- }
- System.out.println("");
- }
-
/**
* Shut down this instance of the datanode.
* Returns only after shutdown is complete.
@@ -937,7 +928,7 @@
in = new DataInputStream(
new BufferedInputStream(s.getInputStream(), BUFFER_SIZE));
short version = in.readShort();
- if ( version != DATA_TRANFER_VERSION ) {
+ if ( version != DATA_TRANSFER_VERSION ) {
throw new IOException( "Version Mismatch" );
}
boolean local = s.getInetAddress().equals(s.getLocalAddress());
@@ -1003,7 +994,7 @@
// send the block
DataOutputStream out = new DataOutputStream(
- new BufferedOutputStream(s.getOutputStream(), BUFFER_SIZE));
+ new BufferedOutputStream(s.getOutputStream(), SMALL_BUFFER_SIZE));
BlockSender blockSender = null;
try {
try {
@@ -1116,7 +1107,7 @@
mirrorIn = new DataInputStream(mirrorSock.getInputStream());
// Write header: Copied from DFSClient.java!
- mirrorOut.writeShort( DATA_TRANFER_VERSION );
+ mirrorOut.writeShort( DATA_TRANSFER_VERSION );
mirrorOut.write( OP_WRITE_BLOCK );
mirrorOut.writeLong( block.getBlockId() );
mirrorOut.writeInt( pipelineSize );
@@ -1269,11 +1260,11 @@
targetSock.setSoTimeout(socketTimeout);
targetOut = new DataOutputStream(new BufferedOutputStream(
- targetSock.getOutputStream(), BUFFER_SIZE));
+ targetSock.getOutputStream(), SMALL_BUFFER_SIZE));
/* send request to the target */
// fist write header info
- targetOut.writeShort(DATA_TRANFER_VERSION); // transfer version
+ targetOut.writeShort(DATA_TRANSFER_VERSION); // transfer version
targetOut.writeByte(OP_REPLACE_BLOCK); // op code
targetOut.writeLong(block.getBlockId()); // block id
Text.writeString( targetOut, source); // del hint
@@ -1445,15 +1436,94 @@
}
}
+ /* ********************************************************************
+ Protocol when a client reads data from Datanode (Cur Ver: 9):
+
+ Client's Request :
+ =================
+
+ Processed in DataXceiver:
+ +----------------------------------------------+
+ | Common Header | 1 byte OP == OP_READ_BLOCK |
+ +----------------------------------------------+
+
+ Processed in readBlock() :
+ +-------------------------------------------------------+
+ | 8 byte Block ID | 8 byte start offset | 8 byte length |
+ +-------------------------------------------------------+
+
+ Client sends optional response only at the end of receiving data.
+
+ DataNode Response :
+ ===================
+
+ In readBlock() :
+ If there is an error while initializing BlockSender :
+ +---------------------------+
+ | 2 byte OP_STATUS_ERROR | and connection will be closed.
+ +---------------------------+
+ Otherwise
+ +---------------------------+
+ | 2 byte OP_STATUS_SUCCESS |
+ +---------------------------+
+
+ Actual data, sent by BlockSender.sendBlock() :
+
+ ChecksumHeader :
+ +--------------------------------------------------+
+ | 1 byte CHECKSUM_TYPE | 4 byte BYTES_PER_CHECKSUM |
+ +--------------------------------------------------+
+ Followed by actual data in the form of PACKETS:
+ +------------------------------------+
+ | Sequence of data PACKETs .... |
+ +------------------------------------+
+
+ A "PACKET" is defined further below.
+
+ The client reads data until it receives a packet with
+ "LastPacketInBlock" set to true or with a zero length. If there is
+ no checksum error, it replies to DataNode with OP_STATUS_CHECKSUM_OK:
+
+ Client optional response at the end of data transmission :
+ +------------------------------+
+ | 2 byte OP_STATUS_CHECKSUM_OK |
+ +------------------------------+
+
+ PACKET : Contains a packet header, checksum and data. Amount of data
+ ======== carried is set by BUFFER_SIZE.
+
+ +-----------------------------------------------------+
+ | 4 byte packet length (excluding packet header) |
+ +-----------------------------------------------------+
+ | 8 byte offset in the block | 8 byte sequence number |
+ +-----------------------------------------------------+
+ | 1 byte isLastPacketInBlock |
+ +-----------------------------------------------------+
+ | 4 byte Length of actual data |
+ +-----------------------------------------------------+
+ | x byte checksum data. x is defined below |
+ +-----------------------------------------------------+
+ | actual data ...... |
+ +-----------------------------------------------------+
+
+ x = (length of data + BYTE_PER_CHECKSUM - 1)/BYTES_PER_CHECKSUM *
+ CHECKSUM_SIZE
+
+ CHECKSUM_SIZE depends on CHECKSUM_TYPE (usually, 4 for CRC32)
+
+ The above packet format is used while writing data to DFS also.
+ Not all the fields might be used while reading.
+
+ ************************************************************************ */
+
class BlockSender implements java.io.Closeable {
private Block block; // the block to read from
- private DataInputStream blockIn; // data strean
+ private InputStream blockIn; // data stream
private DataInputStream checksumIn; // checksum datastream
private DataChecksum checksum; // checksum stream
private long offset; // starting position to read
private long endOffset; // ending position
private long blockLength;
- private byte buf[]; // buffer to store data read from the block file & crc
private int bytesPerChecksum; // chunk size
private int checksumSize; // checksum size
private boolean corruptChecksumOk; // if need to verify checksum
@@ -1463,8 +1533,14 @@
private boolean blockReadFully; //set when the whole block is read
private boolean verifyChecksum; //if true, check is verified while reading
private Throttler throttler;
- private DataOutputStream out;
-
+ private OutputStream out;
+
+ static final int PKT_HEADER_LEN = ( 4 + /* PacketLen */
+ 8 + /* offset in block */
+ 8 + /* seqno */
+ 1 + /* isLastPacketInBlock */
+ 4 /* data len */ );
+
BlockSender(Block block, long startOffset, long length,
boolean corruptChecksumOk, boolean chunkOffsetOK,
boolean verifyChecksum) throws IOException {
@@ -1511,7 +1587,7 @@
throw new IOException(msg);
}
- buf = new byte[bytesPerChecksum + checksumSize];
+
offset = (startOffset - (startOffset % bytesPerChecksum));
if (length >= 0) {
// Make sure endOffset points to end of a checksumed chunk.
@@ -1535,8 +1611,7 @@
}
seqno = 0;
- InputStream blockInStream = data.getBlockInputStream(block, offset); // seek to offset
- blockIn = new DataInputStream(new BufferedInputStream(blockInStream, BUFFER_SIZE));
+ blockIn = data.getBlockInputStream(block, offset); // seek to offset
} catch (IOException ioe) {
IOUtils.closeStream(this);
IOUtils.closeStream(blockIn);
@@ -1571,26 +1646,37 @@
}
}
-
- private int sendChunk()
- throws IOException {
- int len = (int) Math.min(endOffset - offset, bytesPerChecksum);
+ /**
+ * Sends upto maxChunks chunks of data.
+ */
+ private int sendChunks(ByteBuffer pkt, int maxChunks) throws IOException {
+ // Sends multiple chunks in one packet with a single write().
+
+ int len = Math.min((int) (endOffset - offset),
+ bytesPerChecksum*maxChunks);
if (len == 0) {
return 0;
}
- blockIn.readFully(buf, 0, len);
+ int numChunks = (len + bytesPerChecksum - 1)/bytesPerChecksum;
+ int packetLen = len + numChunks*checksumSize + 4;
+ pkt.clear();
+
+ // write packet header
+ pkt.putInt(packetLen);
+ pkt.putLong(offset);
+ pkt.putLong(seqno);
+ pkt.put((byte)((offset + len >= endOffset) ? 1 : 0));
+ //why no ByteBuf.putBoolean()?
+ pkt.putInt(len);
+
+ int checksumOff = pkt.position();
+ int checksumLen = numChunks * checksumSize;
+ byte[] buf = pkt.array();
+
if (checksumSize > 0 && checksumIn != null) {
try {
- checksumIn.readFully(buf, len, checksumSize);
-
- if (verifyChecksum) {
- checksum.reset();
- checksum.update(buf, 0, len);
- if (!checksum.compare(buf, len)) {
- throw new ChecksumException("Checksum failed at " + offset, len);
- }
- }
+ checksumIn.readFully(buf, checksumOff, checksumLen);
} catch (IOException e) {
LOG.warn(" Could not read or failed to veirfy checksum for data" +
" at offset " + offset + " for block " + block + " got : "
@@ -1599,28 +1685,39 @@
checksumIn = null;
if (corruptChecksumOk) {
// Just fill the array with zeros.
- Arrays.fill(buf, len, len + checksumSize, (byte) 0);
+ Arrays.fill(buf, checksumOff, checksumLen, (byte) 0);
} else {
throw e;
}
}
}
- boolean lastPacketInBlock = false;
- if (offset + len >= endOffset) {
- lastPacketInBlock = true;
+
+ int dataOff = checksumOff + checksumLen;
+ IOUtils.readFully(blockIn, buf, dataOff, len);
+
+ if (verifyChecksum) {
+ int dOff = dataOff;
+ int cOff = checksumOff;
+ int dLeft = len;
+
+ for (int i=0; i<numChunks; i++) {
+ checksum.reset();
+ int dLen = Math.min(dLeft, bytesPerChecksum);
+ checksum.update(buf, dOff, dLen);
+ if (!checksum.compare(buf, cOff)) {
+ throw new ChecksumException("Checksum failed at " +
+ (offset + len - dLeft), len);
+ }
+ dLeft -= dLen;
+ dOff += dLen;
+ cOff += checksumSize;
+ }
}
- // write packet header
- out.writeInt(len + checksumSize + 4);
- out.writeLong(offset);
- out.writeLong(seqno);
- out.writeBoolean(lastPacketInBlock);
-
- out.writeInt(len);
- out.write(buf, 0, len + checksumSize);
+ out.write(buf, 0, dataOff + len);
if (throttler != null) { // rebalancing so throttle
- throttler.throttle(len + checksumSize + 4);
+ throttler.throttle(packetLen);
}
return len;
@@ -1648,15 +1745,21 @@
if ( chunkOffsetOK ) {
out.writeLong( offset );
}
+ //set up sendBuf:
+ int maxChunksPerPacket = Math.max(1,
+ (BUFFER_SIZE + bytesPerChecksum - 1)/bytesPerChecksum);
+ ByteBuffer pktBuf = ByteBuffer.allocate(PKT_HEADER_LEN +
+ (bytesPerChecksum + checksumSize) * maxChunksPerPacket);
+
while (endOffset > offset) {
- // Write one data chunk per loop.
- long len = sendChunk();
+ long len = sendChunks(pktBuf, maxChunksPerPacket);
offset += len;
- totalRead += len + checksumSize;
+ totalRead += len + ((len + bytesPerChecksum - 1)/bytesPerChecksum*
+ checksumSize);
seqno++;
}
- out.writeInt(0); // mark the end of block
+ out.writeInt(0); // mark the end of block
out.flush();
} finally {
close();
@@ -1965,6 +2068,7 @@
private int bytesPerChecksum;
private int checksumSize;
private byte buf[];
+ private byte checksumBuf[];
private long offsetInBlock;
final private String inAddr;
private String mirrorAddr;
@@ -1995,6 +2099,7 @@
this.bytesPerChecksum = checksum.getBytesPerChecksum();
this.checksumSize = checksum.getChecksumSize();
this.buf = new byte[bytesPerChecksum + checksumSize];
+ this.checksumBuf = new byte[checksumSize];
//
// Open local disk out
//
@@ -2055,7 +2160,8 @@
}
/* receive a chunk: write it to disk & mirror it to another stream */
- private void receiveChunk( int len ) throws IOException {
+ private void receiveChunk( int len, byte[] checksumBuf, int checksumOff )
+ throws IOException {
if (len <= 0 || len > bytesPerChecksum) {
throw new IOException("Got wrong length during writeBlock(" + block
+ ") from " + inAddr + " at offset " + offsetInBlock + ": " + len
@@ -2071,7 +2177,7 @@
lastLen = curLen;
curLen = len;
- in.readFully(buf, 0, len + checksumSize);
+ in.readFully(buf, 0, len);
/*
* Verification is not included in the initial design. For now, it at
@@ -2080,7 +2186,7 @@
*/
checksum.update(buf, 0, len);
- if (!checksum.compare(buf, len)) {
+ if (!checksum.compare(checksumBuf, checksumOff)) {
throw new IOException("Unexpected checksum mismatch "
+ "while writing " + block + " from " + inAddr);
}
@@ -2097,7 +2203,8 @@
if (mirrorOut != null) {
try {
mirrorOut.writeInt(len);
- mirrorOut.write(buf, 0, len + checksumSize);
+ mirrorOut.write(checksumBuf, checksumOff, checksumSize);
+ mirrorOut.write(buf, 0, len);
} catch (IOException ioe) {
LOG.info(dnRegistration + ":Exception writing block " +
block + " to mirror " + mirrorAddr + "\n" +
@@ -2123,7 +2230,7 @@
if (!finalized) {
out.write(buf, 0, len);
// Write checksum
- checksumOut.write(buf, len, checksumSize);
+ checksumOut.write(checksumBuf, checksumOff, checksumSize);
myMetrics.bytesWritten.inc(len);
}
} catch (IOException iex) {
@@ -2145,7 +2252,15 @@
* Receive and process a packet. It contains many chunks.
*/
private void receivePacket(int packetSize) throws IOException {
-
+ /* TEMP: Currently this handles both interleaved
+ * and non-interleaved DATA_CHUNKs in side the packet.
+ * non-interleaved is required for HADOOP-2758 and in future.
+ * iterleaved will be removed once extra buffer copies are removed
+ * in write path (HADOOP-1702).
+ *
+ * Format of Non-interleaved data packets is described in the
+ * comment before BlockSender.
+ */
offsetInBlock = in.readLong(); // get offset of packet in block
long seqno = in.readLong(); // get seqno
boolean lastPacketInBlock = in.readBoolean();
@@ -2157,9 +2272,6 @@
" lastPacketInBlock " + lastPacketInBlock);
setBlockPosition(offsetInBlock);
- int len = in.readInt();
- curPacketSize += 4; // read an integer in previous line
-
// send packet header to next datanode in pipeline
if (mirrorOut != null) {
try {
@@ -2189,6 +2301,9 @@
}
}
+ int len = in.readInt();
+ curPacketSize += 4; // read an integer in previous line
+
if (len == 0) {
LOG.info("Receiving empty packet for block " + block);
if (mirrorOut != null) {
@@ -2198,15 +2313,37 @@
}
while (len != 0) {
- LOG.debug("Receiving one chunk for block " + block +
- " of size " + len);
- receiveChunk( len );
- curPacketSize += (len + checksumSize);
- if (curPacketSize > packetSize) {
- throw new IOException("Packet size for block " + block +
- " too long " + curPacketSize +
- " was expecting " + packetSize);
- }
+ int checksumOff = 0;
+ if (len > 0) {
+ int checksumLen = (len + bytesPerChecksum - 1)/bytesPerChecksum*
+ checksumSize;
+ if (checksumBuf.length < checksumLen) {
+ checksumBuf = new byte[checksumLen];
+ }
+ // read the checksum
+ in.readFully(checksumBuf, 0, checksumLen);
+ }
+
+ while (len != 0) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Receiving one chunk for block " + block +
+ " of size " + len);
+ }
+
+ int toRecv = Math.min(len, bytesPerChecksum);
+
+ receiveChunk(toRecv, checksumBuf, checksumOff);
+
+ len -= toRecv;
+ checksumOff += checksumSize;
+ curPacketSize += (toRecv + checksumSize);
+ if (curPacketSize > packetSize) {
+ throw new IOException("Packet size for block " + block +
+ " too long " + curPacketSize +
+ " was expecting " + packetSize);
+ }
+ }
+
if (curPacketSize == packetSize) {
if (mirrorOut != null) {
mirrorOut.flush();
@@ -2388,13 +2525,14 @@
sock.setSoTimeout(targets.length * socketTimeout);
out = new DataOutputStream(new BufferedOutputStream(
- sock.getOutputStream(), BUFFER_SIZE));
+ sock.getOutputStream(), SMALL_BUFFER_SIZE));
+
blockSender = new BlockSender(b, 0, -1, false, false, false);
//
// Header info
//
- out.writeShort(DATA_TRANFER_VERSION);
+ out.writeShort(DATA_TRANSFER_VERSION);
out.writeByte(OP_WRITE_BLOCK);
out.writeLong(b.getBlockId());
out.writeInt(0); // no pipelining
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java?rev=633285&r1=633284&r2=633285&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java Mon Mar 3 13:40:18 2008
@@ -100,21 +100,13 @@
* This should change when serialization of DatanodeInfo, not just
* when protocol changes. It is not very obvious.
*/
- /* Version 7:
- * Add two operations to data node
- * OP_COPY_BLOCK:
- * The command is for sending to a proxy source for the balancing purpose
- * The datanode then sends OP_REPLACE_BLOCK request to the destination
- * OP_COPY_BLOCK BlockID(long) SourceID (UTF8) Destination (DatanodeInfo)
- * return OP_STATUS_ERROR if any error occurs; OP_STATUS_SUCCESS otherwise
- * OP_REPLACE_BLOCK:
- * the command is for sending to a destination for the balancing purpose
- * The datanode then writes the block to disk and notifies namenode of this
- * received block together with a deletion hint: sourceID
- * OP_REPLACE_BLOCK BlockID(long) SourceID(UTF8) Block_Data_With_Crc
- * return OP_STATUS_ERROR if any error occurs; OP_STATUS_SUCCESS otherwise
+ /*
+ * Version 9:
+ * While reading data from Datanode, each PACKET can consist
+ * of non-interleaved data (check for for larger amount of data,
+ * followed by data).
*/
- public static final int DATA_TRANFER_VERSION = 8;
+ public static final int DATA_TRANSFER_VERSION = 9;
// Return codes for file create
public static final int OPERATION_FAILED = 0;
@@ -140,6 +132,8 @@
public static int MAX_PATH_DEPTH = 1000;
public static final int BUFFER_SIZE = new Configuration().getInt("io.file.buffer.size", 4096);
+ //Used for writing header etc.
+ static final int SMALL_BUFFER_SIZE = Math.min(BUFFER_SIZE/2, 512);
//TODO mb@media-style.com: should be conf injected?
public static final long DEFAULT_BLOCK_SIZE = 64 * 1024 * 1024;
public static final int DEFAULT_DATA_SOCKET_SIZE = 128 * 1024;
Modified: hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestBlockReplacement.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestBlockReplacement.java?rev=633285&r1=633284&r2=633285&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestBlockReplacement.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestBlockReplacement.java Mon Mar 3 13:40:18 2008
@@ -215,7 +215,7 @@
sock.setSoTimeout(FSConstants.READ_TIMEOUT);
// sendRequest
DataOutputStream out = new DataOutputStream(sock.getOutputStream());
- out.writeShort(FSConstants.DATA_TRANFER_VERSION);
+ out.writeShort(FSConstants.DATA_TRANSFER_VERSION);
out.writeByte(FSConstants.OP_COPY_BLOCK);
out.writeLong(block.getBlockId());
Text.writeString(out, source.getStorageID());
Modified: hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestDataTransferProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestDataTransferProtocol.java?rev=633285&r1=633284&r2=633285&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestDataTransferProtocol.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestDataTransferProtocol.java Mon Mar 3 13:40:18 2008
@@ -152,19 +152,19 @@
sendBuf.reset();
// bad version
- recvOut.writeShort((short)(FSConstants.DATA_TRANFER_VERSION-1));
- sendOut.writeShort((short)(FSConstants.DATA_TRANFER_VERSION-1));
+ recvOut.writeShort((short)(FSConstants.DATA_TRANSFER_VERSION-1));
+ sendOut.writeShort((short)(FSConstants.DATA_TRANSFER_VERSION-1));
sendRecvData("Wrong Version", true);
// bad ops
sendBuf.reset();
- sendOut.writeShort((short)FSConstants.DATA_TRANFER_VERSION);
+ sendOut.writeShort((short)FSConstants.DATA_TRANSFER_VERSION);
sendOut.writeByte((byte)(FSConstants.OP_WRITE_BLOCK-1));
sendRecvData("Wrong Op Code", true);
/* Test OP_WRITE_BLOCK */
sendBuf.reset();
- sendOut.writeShort((short)FSConstants.DATA_TRANFER_VERSION);
+ sendOut.writeShort((short)FSConstants.DATA_TRANSFER_VERSION);
sendOut.writeByte((byte)FSConstants.OP_WRITE_BLOCK);
sendOut.writeLong(newBlockId); // block id
sendOut.writeInt(0); // targets in pipeline
@@ -181,7 +181,7 @@
sendBuf.reset();
recvBuf.reset();
- sendOut.writeShort((short)FSConstants.DATA_TRANFER_VERSION);
+ sendOut.writeShort((short)FSConstants.DATA_TRANSFER_VERSION);
sendOut.writeByte((byte)FSConstants.OP_WRITE_BLOCK);
sendOut.writeLong(newBlockId);
sendOut.writeInt(0); // targets in pipeline
@@ -195,7 +195,7 @@
sendBuf.reset();
recvBuf.reset();
- sendOut.writeShort((short)FSConstants.DATA_TRANFER_VERSION);
+ sendOut.writeShort((short)FSConstants.DATA_TRANSFER_VERSION);
sendOut.writeByte((byte)FSConstants.OP_WRITE_BLOCK);
sendOut.writeLong(++newBlockId);
sendOut.writeInt(0); // targets in pipeline
@@ -220,7 +220,7 @@
// test for writing a valid zero size block
sendBuf.reset();
recvBuf.reset();
- sendOut.writeShort((short)FSConstants.DATA_TRANFER_VERSION);
+ sendOut.writeShort((short)FSConstants.DATA_TRANSFER_VERSION);
sendOut.writeByte((byte)FSConstants.OP_WRITE_BLOCK);
sendOut.writeLong(++newBlockId);
sendOut.writeInt(0); // targets in pipeline
@@ -247,7 +247,7 @@
// bad block id
sendBuf.reset();
recvBuf.reset();
- sendOut.writeShort((short)FSConstants.DATA_TRANFER_VERSION);
+ sendOut.writeShort((short)FSConstants.DATA_TRANSFER_VERSION);
sendOut.writeByte((byte)FSConstants.OP_READ_BLOCK);
newBlockId = firstBlock.getBlockId()-1;
sendOut.writeLong(newBlockId);
@@ -258,7 +258,7 @@
// negative block start offset
sendBuf.reset();
- sendOut.writeShort((short)FSConstants.DATA_TRANFER_VERSION);
+ sendOut.writeShort((short)FSConstants.DATA_TRANSFER_VERSION);
sendOut.writeByte((byte)FSConstants.OP_READ_BLOCK);
sendOut.writeLong(firstBlock.getBlockId());
sendOut.writeLong(-1L);
@@ -268,7 +268,7 @@
// bad block start offset
sendBuf.reset();
- sendOut.writeShort((short)FSConstants.DATA_TRANFER_VERSION);
+ sendOut.writeShort((short)FSConstants.DATA_TRANSFER_VERSION);
sendOut.writeByte((byte)FSConstants.OP_READ_BLOCK);
sendOut.writeLong(firstBlock.getBlockId());
sendOut.writeLong(fileLen);
@@ -280,7 +280,7 @@
recvBuf.reset();
recvOut.writeShort((short)FSConstants.OP_STATUS_SUCCESS);
sendBuf.reset();
- sendOut.writeShort((short)FSConstants.DATA_TRANFER_VERSION);
+ sendOut.writeShort((short)FSConstants.DATA_TRANSFER_VERSION);
sendOut.writeByte((byte)FSConstants.OP_READ_BLOCK);
sendOut.writeLong(firstBlock.getBlockId());
sendOut.writeLong(0);
@@ -292,7 +292,7 @@
recvBuf.reset();
recvOut.writeShort((short)FSConstants.OP_STATUS_ERROR);
sendBuf.reset();
- sendOut.writeShort((short)FSConstants.DATA_TRANFER_VERSION);
+ sendOut.writeShort((short)FSConstants.DATA_TRANSFER_VERSION);
sendOut.writeByte((byte)FSConstants.OP_READ_BLOCK);
sendOut.writeLong(firstBlock.getBlockId());
sendOut.writeLong(0);
@@ -302,7 +302,7 @@
//At the end of all this, read the file to make sure that succeeds finally.
sendBuf.reset();
- sendOut.writeShort((short)FSConstants.DATA_TRANFER_VERSION);
+ sendOut.writeShort((short)FSConstants.DATA_TRANSFER_VERSION);
sendOut.writeByte((byte)FSConstants.OP_READ_BLOCK);
sendOut.writeLong(firstBlock.getBlockId());
sendOut.writeLong(0);