You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by jg...@apache.org on 2010/09/07 19:52:25 UTC
svn commit: r993448 - in /hadoop/hdfs/trunk: ./
src/java/org/apache/hadoop/hdfs/ src/java/org/apache/hadoop/hdfs/protocol/
src/java/org/apache/hadoop/hdfs/server/datanode/
src/test/hdfs/org/apache/hadoop/hdfs/
Author: jghoman
Date: Tue Sep 7 17:52:25 2010
New Revision: 993448
URL: http://svn.apache.org/viewvc?rev=993448&view=rev
Log:
HDFS-881. Refactor DataNode Packet header into DataTransferProtocol. Contributed by Todd Lipcon.
Modified:
hadoop/hdfs/trunk/CHANGES.txt
hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/BlockReader.java
hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSOutputStream.java
hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java
hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
Modified: hadoop/hdfs/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/CHANGES.txt?rev=993448&r1=993447&r2=993448&view=diff
==============================================================================
--- hadoop/hdfs/trunk/CHANGES.txt (original)
+++ hadoop/hdfs/trunk/CHANGES.txt Tue Sep 7 17:52:25 2010
@@ -108,6 +108,9 @@ Trunk (unreleased changes)
HDFS-330. Datanode Web UIs should provide robots.txt.
(Allen Wittenauer via jghoman)
+ HDFS-881. Refactor DataNode Packet header into DataTransferProtocol.
+ (Todd Lipcon via jghoman)
+
HDFS-1036. docs for fetchdt
HDFS-1318. Add JMX interface for read access to namenode and datanode
@@ -237,7 +240,7 @@ Trunk (unreleased changes)
HDFS-1355. ant veryclean (clean-cache) doesn't clean enough.
(Luke Lu via jghoman)
- HDFS-1353. Remove most of getBlockLocation optimization. (jghoman)
+ HDFS-1353. Remove most of getBlockLocation optimization. (jghoman)
HDFS-1369. Invalid javadoc reference in FSDatasetMBean.java (Eli Collins)
Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/BlockReader.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/BlockReader.java?rev=993448&r1=993447&r2=993448&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/BlockReader.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/BlockReader.java Tue Sep 7 17:52:25 2010
@@ -34,6 +34,7 @@ import org.apache.hadoop.classification.
import org.apache.hadoop.fs.FSInputChecker;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
+import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.PacketHeader;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
import org.apache.hadoop.hdfs.server.common.HdfsConstants;
@@ -211,35 +212,23 @@ public class BlockReader extends FSInput
// Read next packet if the previous packet has been read completely.
if (dataLeft <= 0) {
//Read packet headers.
- int packetLen = in.readInt();
- long offsetInBlock = in.readLong();
- long seqno = in.readLong();
- boolean lastPacketInBlock = in.readBoolean();
-
+ PacketHeader header = new PacketHeader();
+ header.readFields(in);
+
if (LOG.isDebugEnabled()) {
- LOG.debug("DFSClient readChunk got seqno " + seqno +
- " offsetInBlock " + offsetInBlock +
- " lastPacketInBlock " + lastPacketInBlock +
- " packetLen " + packetLen);
+ LOG.debug("DFSClient readChunk got header " + header);
}
-
- int dataLen = in.readInt();
-
+
// Sanity check the lengths
- if ( ( dataLen <= 0 && !lastPacketInBlock ) ||
- ( dataLen != 0 && lastPacketInBlock) ||
- (seqno != (lastSeqNo + 1)) ) {
- throw new IOException("BlockReader: error in packet header" +
- "(chunkOffset : " + chunkOffset +
- ", dataLen : " + dataLen +
- ", seqno : " + seqno +
- " (last: " + lastSeqNo + "))");
+ if (!header.sanityCheck(lastSeqNo)) {
+ throw new IOException("BlockReader: error in packet header " +
+ header);
}
-
- lastSeqNo = seqno;
- dataLeft = dataLen;
- adjustChecksumBytes(dataLen);
- if (dataLen > 0) {
+
+ lastSeqNo = header.getSeqno();
+ dataLeft = header.getDataLen();
+ adjustChecksumBytes(header.getDataLen());
+ if (header.getDataLen() > 0) {
IOUtils.readFully(in, checksumBytes.array(), 0,
checksumBytes.limit());
}
Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSOutputStream.java?rev=993448&r1=993447&r2=993448&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSOutputStream.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSOutputStream.java Tue Sep 7 17:52:25 2010
@@ -52,6 +52,7 @@ import org.apache.hadoop.hdfs.protocol.L
import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
import org.apache.hadoop.hdfs.protocol.UnresolvedPathException;
import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.BlockConstructionStage;
+import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.PacketHeader;
import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.PipelineAck;
import org.apache.hadoop.hdfs.server.common.HdfsConstants;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
@@ -128,17 +129,30 @@ class DFSOutputStream extends FSOutputSu
private short blockReplication; // replication factor of file
private class Packet {
- ByteBuffer buffer; // only one of buf and buffer is non-null
- byte[] buf;
long seqno; // sequencenumber of buffer in block
long offsetInBlock; // offset in block
boolean lastPacketInBlock; // is this the last packet in block?
int numChunks; // number of chunks currently in packet
int maxChunks; // max chunks in packet
- int dataStart;
- int dataPos;
- int checksumStart;
- int checksumPos;
+
+ /** buffer for accumulating packet checksum and data */
+ ByteBuffer buffer; // wraps buf, only one of these two may be non-null
+ byte[] buf;
+
+ /**
+ * buf is pointed into like follows:
+ * (C is checksum data, D is payload data)
+ *
+ * [HHHHHCCCCC________________DDDDDDDDDDDDDDDD___]
+ * ^ ^ ^ ^
+ * | checksumPos dataStart dataPos
+ * checksumStart
+ */
+ int checksumStart;
+ int dataStart;
+ int dataPos;
+ int checksumPos;
+
private static final long HEART_BEAT_SEQNO = -1L;
/**
@@ -151,7 +165,7 @@ class DFSOutputStream extends FSOutputSu
this.seqno = HEART_BEAT_SEQNO;
buffer = null;
- int packetSize = DataNode.PKT_HEADER_LEN + DFSClient.SIZE_OF_INTEGER;
+ int packetSize = PacketHeader.PKT_HEADER_LEN + DFSClient.SIZE_OF_INTEGER; // TODO(todd) strange
buf = new byte[packetSize];
checksumStart = dataStart = packetSize;
@@ -171,7 +185,7 @@ class DFSOutputStream extends FSOutputSu
buffer = null;
buf = new byte[pktSize];
- checksumStart = DataNode.PKT_HEADER_LEN + DFSClient.SIZE_OF_INTEGER;
+ checksumStart = PacketHeader.PKT_HEADER_LEN;
checksumPos = checksumStart;
dataStart = checksumStart + chunksPerPkt * checksum.getChecksumSize();
dataPos = dataStart;
@@ -222,20 +236,15 @@ class DFSOutputStream extends FSOutputSu
int pktLen = DFSClient.SIZE_OF_INTEGER + dataLen + checksumLen;
//normally dataStart == checksumPos, i.e., offset is zero.
- buffer = ByteBuffer.wrap(buf, dataStart - checksumPos,
- DataNode.PKT_HEADER_LEN + pktLen);
+ buffer = ByteBuffer.wrap(
+ buf, dataStart - checksumPos,
+ PacketHeader.PKT_HEADER_LEN + pktLen - DFSClient.SIZE_OF_INTEGER);
buf = null;
buffer.mark();
-
- /* write the header and data length.
- * The format is described in comment before DataNode.BlockSender
- */
- buffer.putInt(pktLen); // pktSize
- buffer.putLong(offsetInBlock);
- buffer.putLong(seqno);
- buffer.put((byte) ((lastPacketInBlock) ? 1 : 0));
- //end of pkt header
- buffer.putInt(dataLen); // actual data length, excluding checksum.
+
+ PacketHeader header = new PacketHeader(
+ pktLen, offsetInBlock, seqno, lastPacketInBlock, dataLen);
+ header.putInBuffer(buffer);
buffer.reset();
return buffer;
@@ -1111,7 +1120,7 @@ class DFSOutputStream extends FSOutputSu
private void computePacketChunkSize(int psize, int csize) {
int chunkSize = csize + checksum.getChecksumSize();
- int n = DataNode.PKT_HEADER_LEN + DFSClient.SIZE_OF_INTEGER;
+ int n = PacketHeader.PKT_HEADER_LEN;
chunksPerPacket = Math.max((psize - n + chunkSize-1)/chunkSize, 1);
packetSize = n + chunkSize*chunksPerPacket;
if (DFSClient.LOG.isDebugEnabled()) {
@@ -1213,7 +1222,7 @@ class DFSOutputStream extends FSOutputSu
// indicate the end of block and reset bytesCurBlock.
//
if (bytesCurBlock == blockSize) {
- currentPacket = new Packet(DataNode.PKT_HEADER_LEN+4, 0,
+ currentPacket = new Packet(PacketHeader.PKT_HEADER_LEN, 0,
bytesCurBlock);
currentPacket.lastPacketInBlock = true;
waitAndQueuePacket(currentPacket);
@@ -1406,7 +1415,7 @@ class DFSOutputStream extends FSOutputSu
if (bytesCurBlock != 0) {
// send an empty packet to mark the end of the block
- currentPacket = new Packet(DataNode.PKT_HEADER_LEN+4, 0,
+ currentPacket = new Packet(PacketHeader.PKT_HEADER_LEN, 0,
bytesCurBlock);
currentPacket.lastPacketInBlock = true;
}
@@ -1457,7 +1466,7 @@ class DFSOutputStream extends FSOutputSu
synchronized void setChunksPerPacket(int value) {
chunksPerPacket = Math.min(chunksPerPacket, value);
- packetSize = DataNode.PKT_HEADER_LEN + DFSClient.SIZE_OF_INTEGER +
+ packetSize = PacketHeader.PKT_HEADER_LEN +
(checksum.getBytesPerChecksum() +
checksum.getChecksumSize()) * chunksPerPacket;
}
Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java?rev=993448&r1=993447&r2=993448&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java Tue Sep 7 17:52:25 2010
@@ -23,6 +23,7 @@ import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.OutputStream;
+import java.nio.ByteBuffer;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
@@ -561,4 +562,141 @@ public interface DataTransferProtocol {
return ack.toString();
}
}
+
+ /**
+ * Header data for each packet that goes through the read/write pipelines.
+ */
+ public static class PacketHeader implements Writable {
+ /** Header size for a packet */
+ public static final int PKT_HEADER_LEN = ( 4 + /* Packet payload length */
+ 8 + /* offset in block */
+ 8 + /* seqno */
+ 1 + /* isLastPacketInBlock */
+ 4 /* data length */ );
+
+ private int packetLen;
+ private long offsetInBlock;
+ private long seqno;
+ private boolean lastPacketInBlock;
+ private int dataLen;
+
+ public PacketHeader() {
+ }
+
+ public PacketHeader(int packetLen, long offsetInBlock, long seqno,
+ boolean lastPacketInBlock, int dataLen) {
+ this.packetLen = packetLen;
+ this.offsetInBlock = offsetInBlock;
+ this.seqno = seqno;
+ this.lastPacketInBlock = lastPacketInBlock;
+ this.dataLen = dataLen;
+ }
+
+ public int getDataLen() {
+ return dataLen;
+ }
+
+ public boolean isLastPacketInBlock() {
+ return lastPacketInBlock;
+ }
+
+ public long getSeqno() {
+ return seqno;
+ }
+
+ public long getOffsetInBlock() {
+ return offsetInBlock;
+ }
+
+ public int getPacketLen() {
+ return packetLen;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("PacketHeader(")
+ .append("packetLen=").append(packetLen)
+ .append(" offsetInBlock=").append(offsetInBlock)
+ .append(" seqno=").append(seqno)
+ .append(" lastPacketInBlock=").append(lastPacketInBlock)
+ .append(" dataLen=").append(dataLen)
+ .append(")");
+ return sb.toString();
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ // Note that it's important for packetLen to come first and not
+ // change format -
+ // this is used by BlockReceiver to read entire packets with
+ // a single read call.
+ packetLen = in.readInt();
+ offsetInBlock = in.readLong();
+ seqno = in.readLong();
+ lastPacketInBlock = in.readBoolean();
+ dataLen = in.readInt();
+ }
+
+ public void readFields(ByteBuffer buf) throws IOException {
+ packetLen = buf.getInt();
+ offsetInBlock = buf.getLong();
+ seqno = buf.getLong();
+ lastPacketInBlock = (buf.get() != 0);
+ dataLen = buf.getInt();
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeInt(packetLen);
+ out.writeLong(offsetInBlock);
+ out.writeLong(seqno);
+ out.writeBoolean(lastPacketInBlock);
+ out.writeInt(dataLen);
+ }
+
+ /**
+ * Write the header into the buffer.
+ * This requires that PKT_HEADER_LEN bytes are available.
+ */
+ public void putInBuffer(ByteBuffer buf) {
+ buf.putInt(packetLen)
+ .putLong(offsetInBlock)
+ .putLong(seqno)
+ .put((byte)(lastPacketInBlock ? 1 : 0))
+ .putInt(dataLen);
+ }
+
+ /**
+ * Perform a sanity check on the packet, returning true if it is sane.
+ * @param lastSeqNo the previous sequence number received - we expect the current
+ * sequence number to be larger by 1.
+ */
+ public boolean sanityCheck(long lastSeqNo) {
+ // We should only have a non-positive data length for the last packet
+ if (dataLen <= 0 && lastPacketInBlock) return false;
+ // The last packet should not contain data
+ if (lastPacketInBlock && dataLen != 0) return false;
+ // Seqnos should always increase by 1 with each packet received
+ if (seqno != lastSeqNo + 1) return false;
+ return true;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (!(o instanceof PacketHeader)) return false;
+ PacketHeader other = (PacketHeader)o;
+ return (other.packetLen == packetLen &&
+ other.offsetInBlock == offsetInBlock &&
+ other.seqno == seqno &&
+ other.lastPacketInBlock == lastPacketInBlock &&
+ other.dataLen == dataLen);
+ }
+
+ @Override
+ public int hashCode() {
+ return (int)seqno;
+ }
+ }
+
}
Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java?rev=993448&r1=993447&r2=993448&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java Tue Sep 7 17:52:25 2010
@@ -41,6 +41,7 @@ import org.apache.hadoop.hdfs.protocol.F
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.BlockConstructionStage;
import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.PipelineAck;
+import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.PacketHeader;
import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.Daemon;
@@ -334,9 +335,9 @@ class BlockReceiver implements java.io.C
* calculation in DFSClient to make the guess accurate.
*/
int chunkSize = bytesPerChecksum + checksumSize;
- int chunksPerPacket = (datanode.writePacketSize - DataNode.PKT_HEADER_LEN -
- SIZE_OF_INTEGER + chunkSize - 1)/chunkSize;
- buf = ByteBuffer.allocate(DataNode.PKT_HEADER_LEN + SIZE_OF_INTEGER +
+ int chunksPerPacket = (datanode.writePacketSize - PacketHeader.PKT_HEADER_LEN
+ + chunkSize - 1)/chunkSize;
+ buf = ByteBuffer.allocate(PacketHeader.PKT_HEADER_LEN +
Math.max(chunksPerPacket, 1) * chunkSize);
buf.limit(0);
}
@@ -365,7 +366,9 @@ class BlockReceiver implements java.io.C
payloadLen);
}
- int pktSize = payloadLen + DataNode.PKT_HEADER_LEN;
+ // Subtract SIZE_OF_INTEGER since that accounts for the payloadLen that
+ // we read above.
+ int pktSize = payloadLen + PacketHeader.PKT_HEADER_LEN - SIZE_OF_INTEGER;
if (buf.remaining() < pktSize) {
//we need to read more data
@@ -407,30 +410,31 @@ class BlockReceiver implements java.io.C
private int receivePacket() throws IOException {
// read the next packet
readNextPacket();
-
+
buf.mark();
- //read the header
- buf.getInt(); // packet length
- long offsetInBlock = buf.getLong(); // get offset of packet in block
-
- if (offsetInBlock > replicaInfo.getNumBytes()) {
+ PacketHeader header = new PacketHeader();
+ header.readFields(buf);
+ int endOfHeader = buf.position();
+ buf.reset();
+
+ // Sanity check the header
+ if (header.getOffsetInBlock() > replicaInfo.getNumBytes()) {
throw new IOException("Received an out-of-sequence packet for " + block +
- "from " + inAddr + " at offset " + offsetInBlock +
+ "from " + inAddr + " at offset " + header.getOffsetInBlock() +
". Expecting packet starting at " + replicaInfo.getNumBytes());
}
- long seqno = buf.getLong(); // get seqno
- boolean lastPacketInBlock = (buf.get() != 0);
-
- int len = buf.getInt();
- if (len < 0) {
+ if (header.getDataLen() < 0) {
throw new IOException("Got wrong length during writeBlock(" + block +
") from " + inAddr + " at offset " +
- offsetInBlock + ": " + len);
- }
- int endOfHeader = buf.position();
- buf.reset();
-
- return receivePacket(offsetInBlock, seqno, lastPacketInBlock, len, endOfHeader);
+ header.getOffsetInBlock() + ": " +
+ header.getDataLen());
+ }
+
+ return receivePacket(
+ header.getOffsetInBlock(),
+ header.getSeqno(),
+ header.isLastPacketInBlock(),
+ header.getDataLen(), endOfHeader);
}
/**
Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java?rev=993448&r1=993447&r2=993448&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java Tue Sep 7 17:52:25 2010
@@ -33,6 +33,7 @@ import org.apache.commons.logging.Log;
import org.apache.hadoop.fs.ChecksumException;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.FSConstants;
+import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.PacketHeader;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.net.SocketOutputStream;
import org.apache.hadoop.util.DataChecksum;
@@ -304,15 +305,12 @@ class BlockSender implements java.io.Clo
int packetLen = len + numChunks*checksumSize + 4;
boolean lastDataPacket = offset + len == endOffset && len > 0;
pkt.clear();
-
- // write packet header
- pkt.putInt(packetLen);
- pkt.putLong(offset);
- pkt.putLong(seqno);
- pkt.put((byte)((len == 0) ? 1 : 0));
- //why no ByteBuf.putBoolean()?
- pkt.putInt(len);
-
+
+
+ PacketHeader header = new PacketHeader(
+ packetLen, offset, seqno, (len == 0), len);
+ header.putInBuffer(pkt);
+
int checksumOff = pkt.position();
int checksumLen = numChunks * checksumSize;
byte[] buf = pkt.array();
@@ -444,7 +442,7 @@ class BlockSender implements java.io.Clo
}
int maxChunksPerPacket;
- int pktSize = DataNode.PKT_HEADER_LEN + SIZE_OF_INTEGER;
+ int pktSize = PacketHeader.PKT_HEADER_LEN;
if (transferToAllowed && !verifyChecksum &&
baseStream instanceof SocketOutputStream &&
Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=993448&r1=993447&r2=993448&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java Tue Sep 7 17:52:25 2010
@@ -927,6 +927,12 @@ public class DataNode extends Configured
return;
}
LOG.warn(StringUtils.stringifyException(re));
+ try {
+ long sleepTime = Math.min(1000, heartBeatInterval);
+ Thread.sleep(sleepTime);
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ }
} catch (IOException e) {
LOG.warn(StringUtils.stringifyException(e));
}
@@ -1281,14 +1287,6 @@ public class DataNode extends Configured
Not all the fields might be used while reading.
************************************************************************ */
-
- /** Header size for a packet */
- public static final int PKT_HEADER_LEN = ( 4 + /* Packet payload length */
- 8 + /* offset in block */
- 8 + /* seqno */
- 1 /* isLastPacketInBlock */);
-
-
/**
* Used for transferring a block of data. This class
Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDataTransferProtocol.java?rev=993448&r1=993447&r2=993448&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDataTransferProtocol.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDataTransferProtocol.java Tue Sep 7 17:52:25 2010
@@ -19,11 +19,13 @@ package org.apache.hadoop.hdfs;
import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Op.READ_BLOCK;
import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Op.WRITE_BLOCK;
+import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.PacketHeader;
import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.PipelineAck;
import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status;
import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status.ERROR;
import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status.SUCCESS;
+import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
@@ -32,6 +34,7 @@ import java.io.IOException;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
+import java.nio.ByteBuffer;
import java.util.Random;
import junit.framework.TestCase;
@@ -148,14 +151,16 @@ public class TestDataTransferProtocol ex
throws IOException {
sendOut.writeByte((byte)DataChecksum.CHECKSUM_CRC32);
sendOut.writeInt(512); // checksum size
- sendOut.writeInt(8); // size of packet
- sendOut.writeLong(block.getNumBytes()); // OffsetInBlock
- sendOut.writeLong(100); // sequencenumber
- sendOut.writeBoolean(true); // lastPacketInBlock
- sendOut.writeInt(0); // chunk length
+ PacketHeader hdr = new PacketHeader(
+ 8, // size of packet
+ block.getNumBytes(), // OffsetInBlock
+ 100, // sequencenumber
+ true, // lastPacketInBlock
+ 0); // chunk length
+ hdr.write(sendOut);
sendOut.writeInt(0); // zero checksum
-
+
//ok finally write a block with 0 len
SUCCESS.write(recvOut);
Text.writeString(recvOut, "");
@@ -373,13 +378,15 @@ public class TestDataTransferProtocol ex
new DatanodeInfo[1], BlockTokenSecretManager.DUMMY_TOKEN);
sendOut.writeByte((byte)DataChecksum.CHECKSUM_CRC32);
sendOut.writeInt(512);
- sendOut.writeInt(4); // size of packet
- sendOut.writeLong(0); // OffsetInBlock
- sendOut.writeLong(100); // sequencenumber
- sendOut.writeBoolean(false); // lastPacketInBlock
-
- // bad data chunk length
- sendOut.writeInt(-1-random.nextInt(oneMil));
+
+ PacketHeader hdr = new PacketHeader(
+ 4, // size of packet
+ 0, // offset in block,
+ 100, // seqno
+ false, // last packet
+ -1 - random.nextInt(oneMil)); // bad datalen
+ hdr.write(sendOut);
+
SUCCESS.write(recvOut);
Text.writeString(recvOut, "");
new PipelineAck(100, new Status[]{ERROR}).write(recvOut);
@@ -395,12 +402,14 @@ public class TestDataTransferProtocol ex
new DatanodeInfo[1], BlockTokenSecretManager.DUMMY_TOKEN);
sendOut.writeByte((byte)DataChecksum.CHECKSUM_CRC32);
sendOut.writeInt(512); // checksum size
- sendOut.writeInt(8); // size of packet
- sendOut.writeLong(0); // OffsetInBlock
- sendOut.writeLong(100); // sequencenumber
- sendOut.writeBoolean(true); // lastPacketInBlock
- sendOut.writeInt(0); // chunk length
+ hdr = new PacketHeader(
+ 8, // size of packet
+ 0, // OffsetInBlock
+ 100, // sequencenumber
+ true, // lastPacketInBlock
+ 0); // chunk length
+ hdr.write(sendOut);
sendOut.writeInt(0); // zero checksum
sendOut.flush();
//ok finally write a block with 0 len
@@ -497,4 +506,39 @@ public class TestDataTransferProtocol ex
cluster.shutdown();
}
}
+
+ @Test
+ public void testPacketHeader() throws IOException {
+ PacketHeader hdr = new PacketHeader(
+ 4, // size of packet
+ 1024, // OffsetInBlock
+ 100, // sequencenumber
+ false, // lastPacketInBlock
+ 4096); // chunk length
+
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ hdr.write(new DataOutputStream(baos));
+
+ // Read back using DataInput
+ PacketHeader readBack = new PacketHeader();
+ ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
+ readBack.readFields(new DataInputStream(bais));
+ assertEquals(hdr, readBack);
+
+ // Read back using ByteBuffer
+ readBack = new PacketHeader();
+ readBack.readFields(ByteBuffer.wrap(baos.toByteArray()));
+ assertEquals(hdr, readBack);
+
+ // Test sanity check for good header
+ PacketHeader goodHeader = new PacketHeader(
+ 4, // size of packet
+ 0, // OffsetInBlock
+ 100, // sequencenumber
+ true, // lastPacketInBlock
+ 0); // chunk length
+
+ assertTrue(hdr.sanityCheck(99));
+ assertFalse(hdr.sanityCheck(100));
+ }
}