You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by to...@apache.org on 2011/11/01 06:17:09 UTC
svn commit: r1195829 [3/3] - in
/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs: ./
src/main/java/org/apache/hadoop/hdfs/
src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/
src/main/java/org/apache/hadoop/hdfs/protocol/proto/ src/main/jav...
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java?rev=1195829&r1=1195828&r2=1195829&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java Tue Nov 1 05:17:08 2011
@@ -403,8 +403,8 @@ class BlockPoolSliceScanner {
try {
adjustThrottler();
- blockSender = new BlockSender(block, 0, -1, false, false, true,
- datanode, null);
+ blockSender = new BlockSender(block, 0, -1, false, true, datanode,
+ null);
DataOutputStream out =
new DataOutputStream(new IOUtils.NullOutputStream());
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java?rev=1195829&r1=1195828&r2=1195829&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java Tue Nov 1 05:17:08 2011
@@ -108,7 +108,8 @@ class BlockReceiver implements Closeable
final BlockConstructionStage stage,
final long newGs, final long minBytesRcvd, final long maxBytesRcvd,
final String clientname, final DatanodeInfo srcDataNode,
- final DataNode datanode) throws IOException {
+ final DataNode datanode, DataChecksum requestedChecksum)
+ throws IOException {
try{
this.block = block;
this.in = in;
@@ -177,7 +178,7 @@ class BlockReceiver implements Closeable
}
}
// read checksum meta information
- this.checksum = DataChecksum.newDataChecksum(in);
+ this.checksum = requestedChecksum;
this.bytesPerChecksum = checksum.getBytesPerChecksum();
this.checksumSize = checksum.getChecksumSize();
this.dropCacheBehindWrites = datanode.shouldDropCacheBehindWrites();
@@ -687,11 +688,6 @@ class BlockReceiver implements Closeable
}
}
- void writeChecksumHeader(DataOutputStream mirrorOut) throws IOException {
- checksum.writeHeader(mirrorOut);
- }
-
-
void receiveBlock(
DataOutputStream mirrOut, // output to next datanode
DataInputStream mirrIn, // input from next datanode
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java?rev=1195829&r1=1195828&r2=1195829&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java Tue Nov 1 05:17:08 2011
@@ -134,8 +134,6 @@ class BlockSender implements java.io.Clo
private final int checksumSize;
/** If true, failure to read checksum is ignored */
private final boolean corruptChecksumOk;
- /** true if chunk offset is needed to be sent in Checksum header */
- private final boolean chunkOffsetOK;
/** Sequence number of packet being sent */
private long seqno;
/** Set to true if transferTo is allowed for sending data to the client */
@@ -173,19 +171,17 @@ class BlockSender implements java.io.Clo
* @param startOffset starting offset to read from
* @param length length of data to read
* @param corruptChecksumOk
- * @param chunkOffsetOK need to send check offset in checksum header
* @param verifyChecksum verify checksum while reading the data
* @param datanode datanode from which the block is being read
* @param clientTraceFmt format string used to print client trace logs
* @throws IOException
*/
BlockSender(ExtendedBlock block, long startOffset, long length,
- boolean corruptChecksumOk, boolean chunkOffsetOK,
- boolean verifyChecksum, DataNode datanode, String clientTraceFmt)
+ boolean corruptChecksumOk, boolean verifyChecksum,
+ DataNode datanode, String clientTraceFmt)
throws IOException {
try {
this.block = block;
- this.chunkOffsetOK = chunkOffsetOK;
this.corruptChecksumOk = corruptChecksumOk;
this.verifyChecksum = verifyChecksum;
this.clientTraceFmt = clientTraceFmt;
@@ -600,8 +596,6 @@ class BlockSender implements java.io.Clo
final long startTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0;
try {
- writeChecksumHeader(out);
-
int maxChunksPerPacket;
int pktSize = PacketHeader.PKT_HEADER_LEN;
boolean transferTo = transferToAllowed && !verifyChecksum
@@ -691,22 +685,6 @@ class BlockSender implements java.io.Clo
return (endOffset - offset) > LONG_READ_THRESHOLD_BYTES;
}
-
- /**
- * Write checksum header to the output stream
- */
- private void writeChecksumHeader(DataOutputStream out) throws IOException {
- try {
- checksum.writeHeader(out);
- if (chunkOffsetOK) {
- out.writeLong(offset);
- }
- out.flush();
- } catch (IOException e) { //socket error
- throw ioeToSocketException(e);
- }
- }
-
/**
* Write packet header into {@code pkt}
*/
@@ -720,4 +698,19 @@ class BlockSender implements java.io.Clo
boolean didSendEntireByteRange() {
return sentEntireByteRange;
}
+
+ /**
+ * @return the checksum type that will be used with this block transfer.
+ */
+ DataChecksum getChecksum() {
+ return checksum;
+ }
+
+ /**
+ * @return the offset into the block file where the sender is currently
+ * reading.
+ */
+ long getOffset() {
+ return offset;
+ }
}
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=1195829&r1=1195828&r2=1195829&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java Tue Nov 1 05:17:08 2011
@@ -2097,7 +2097,7 @@ public class DataNode extends Configured
out = new DataOutputStream(new BufferedOutputStream(baseStream,
HdfsConstants.SMALL_BUFFER_SIZE));
blockSender = new BlockSender(b, 0, b.getNumBytes(),
- false, false, false, DataNode.this, null);
+ false, false, DataNode.this, null);
DatanodeInfo srcNode = new DatanodeInfo(bpReg);
//
@@ -2110,7 +2110,7 @@ public class DataNode extends Configured
}
new Sender(out).writeBlock(b, accessToken, clientname, targets, srcNode,
- stage, 0, 0, 0, 0);
+ stage, 0, 0, 0, 0, blockSender.getChecksum());
// send data & checksum
blockSender.sendBlock(out, baseStream, null);
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java?rev=1195829&r1=1195828&r2=1195829&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java Tue Nov 1 05:17:08 2011
@@ -44,6 +44,7 @@ import org.apache.hadoop.hdfs.protocol.E
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.HdfsProtoUtil;
import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
+import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
import org.apache.hadoop.hdfs.protocol.datatransfer.Op;
import org.apache.hadoop.hdfs.protocol.datatransfer.Receiver;
import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
@@ -52,6 +53,7 @@ import org.apache.hadoop.hdfs.protocol.p
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProtoOrBuilder;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientReadStatusProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReadOpChecksumInfoProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
@@ -225,7 +227,7 @@ class DataXceiver extends Receiver imple
try {
try {
blockSender = new BlockSender(block, blockOffset, length,
- true, true, false, datanode, clientTraceFmt);
+ true, false, datanode, clientTraceFmt);
} catch(IOException e) {
String msg = "opReadBlock " + block + " received exception " + e;
LOG.info(msg);
@@ -234,7 +236,8 @@ class DataXceiver extends Receiver imple
}
// send op status
- sendResponse(s, SUCCESS, null, datanode.socketWriteTimeout);
+ writeSuccessWithChecksumInfo(blockSender,
+ getStreamWithTimeout(s, datanode.socketWriteTimeout));
long read = blockSender.sendBlock(out, baseStream, null); // send data
@@ -292,7 +295,8 @@ class DataXceiver extends Receiver imple
final int pipelineSize,
final long minBytesRcvd,
final long maxBytesRcvd,
- final long latestGenerationStamp) throws IOException {
+ final long latestGenerationStamp,
+ DataChecksum requestedChecksum) throws IOException {
updateCurrentThreadName("Receiving block " + block + " client=" + clientname);
final boolean isDatanode = clientname.length() == 0;
final boolean isClient = !isDatanode;
@@ -351,7 +355,7 @@ class DataXceiver extends Receiver imple
s.getRemoteSocketAddress().toString(),
s.getLocalSocketAddress().toString(),
stage, latestGenerationStamp, minBytesRcvd, maxBytesRcvd,
- clientname, srcDataNode, datanode);
+ clientname, srcDataNode, datanode, requestedChecksum);
} else {
datanode.data.recoverClose(block, latestGenerationStamp, minBytesRcvd);
}
@@ -381,11 +385,8 @@ class DataXceiver extends Receiver imple
new Sender(mirrorOut).writeBlock(originalBlock, blockToken,
clientname, targets, srcDataNode, stage, pipelineSize,
- minBytesRcvd, maxBytesRcvd, latestGenerationStamp);
+ minBytesRcvd, maxBytesRcvd, latestGenerationStamp, requestedChecksum);
- if (blockReceiver != null) { // send checksum header
- blockReceiver.writeChecksumHeader(mirrorOut);
- }
mirrorOut.flush();
// read connect ack (only for clients, not for replication req)
@@ -600,8 +601,8 @@ class DataXceiver extends Receiver imple
try {
// check if the block exists or not
- blockSender = new BlockSender(block, 0, -1, false, false, false,
- datanode, null);
+ blockSender = new BlockSender(block, 0, -1, false, false, datanode,
+ null);
// set up response stream
OutputStream baseStream = NetUtils.getOutputStream(
@@ -610,7 +611,7 @@ class DataXceiver extends Receiver imple
baseStream, HdfsConstants.SMALL_BUFFER_SIZE));
// send status first
- writeResponse(SUCCESS, null, reply);
+ writeSuccessWithChecksumInfo(blockSender, reply);
// send block content to the target
long read = blockSender.sendBlock(reply, baseStream,
dataXceiverServer.balanceThrottler);
@@ -709,11 +710,16 @@ class DataXceiver extends Receiver imple
throw new IOException("Copy block " + block + " from "
+ proxySock.getRemoteSocketAddress() + " failed");
}
+
+ // get checksum info about the block we're copying
+ ReadOpChecksumInfoProto checksumInfo = copyResponse.getReadOpChecksumInfo();
+ DataChecksum remoteChecksum = DataTransferProtoUtil.fromProto(
+ checksumInfo.getChecksum());
// open a block receiver and check if the block does not exist
blockReceiver = new BlockReceiver(
block, proxyReply, proxySock.getRemoteSocketAddress().toString(),
proxySock.getLocalSocketAddress().toString(),
- null, 0, 0, 0, "", null, datanode);
+ null, 0, 0, 0, "", null, datanode, remoteChecksum);
// receive a block
blockReceiver.receiveBlock(null, null, null, null,
@@ -767,15 +773,19 @@ class DataXceiver extends Receiver imple
* @param opStatus status message to write
* @param timeout send timeout
**/
- private void sendResponse(Socket s, Status status, String message,
+ private static void sendResponse(Socket s, Status status, String message,
long timeout) throws IOException {
- DataOutputStream reply =
- new DataOutputStream(NetUtils.getOutputStream(s, timeout));
+ DataOutputStream reply = getStreamWithTimeout(s, timeout);
writeResponse(status, message, reply);
}
- private void writeResponse(Status status, String message, OutputStream out)
+ private static DataOutputStream getStreamWithTimeout(Socket s, long timeout)
+ throws IOException {
+ return new DataOutputStream(NetUtils.getOutputStream(s, timeout));
+ }
+
+ private static void writeResponse(Status status, String message, OutputStream out)
throws IOException {
BlockOpResponseProto.Builder response = BlockOpResponseProto.newBuilder()
.setStatus(status);
@@ -786,6 +796,22 @@ class DataXceiver extends Receiver imple
out.flush();
}
+ private void writeSuccessWithChecksumInfo(BlockSender blockSender,
+ DataOutputStream out) throws IOException {
+
+ ReadOpChecksumInfoProto ckInfo = ReadOpChecksumInfoProto.newBuilder()
+ .setChecksum(DataTransferProtoUtil.toProto(blockSender.getChecksum()))
+ .setChunkOffset(blockSender.getOffset())
+ .build();
+
+ BlockOpResponseProto response = BlockOpResponseProto.newBuilder()
+ .setStatus(SUCCESS)
+ .setReadOpChecksumInfo(ckInfo)
+ .build();
+ response.writeDelimitedTo(out);
+ out.flush();
+ }
+
private void checkAccess(DataOutputStream out, final boolean reply,
final ExtendedBlock blk,
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/proto/datatransfer.proto
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/proto/datatransfer.proto?rev=1195829&r1=1195828&r2=1195829&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/proto/datatransfer.proto (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/proto/datatransfer.proto Tue Nov 1 05:17:08 2011
@@ -40,6 +40,17 @@ message OpReadBlockProto {
required uint64 offset = 2;
required uint64 len = 3;
}
+
+
+message ChecksumProto {
+ enum ChecksumType {
+ NULL = 0;
+ CRC32 = 1;
+ CRC32C = 2;
+ }
+ required ChecksumType type = 1;
+ required uint32 bytesPerChecksum = 2;
+}
message OpWriteBlockProto {
required ClientOperationHeaderProto header = 1;
@@ -69,6 +80,11 @@ message OpWriteBlockProto {
required uint64 minBytesRcvd = 6;
required uint64 maxBytesRcvd = 7;
required uint64 latestGenerationStamp = 8;
+
+ /**
+ * The requested checksum mechanism for this block write.
+ */
+ required ChecksumProto requestedChecksum = 9;
}
message OpTransferBlockProto {
@@ -114,14 +130,30 @@ message PipelineAckProto {
repeated Status status = 2;
}
+/**
+ * Sent as part of the BlockOpResponseProto
+ * for READ_BLOCK and COPY_BLOCK operations.
+ */
+message ReadOpChecksumInfoProto {
+ required ChecksumProto checksum = 1;
+
+ /**
+ * The offset into the block at which the first packet
+ * will start. This is necessary since reads will align
+ * backwards to a checksum chunk boundary.
+ */
+ required uint64 chunkOffset = 2;
+}
+
message BlockOpResponseProto {
required Status status = 1;
optional string firstBadLink = 2;
optional OpBlockChecksumResponseProto checksumResponse = 3;
+ optional ReadOpChecksumInfoProto readOpChecksumInfo = 4;
/** explanatory text which may be useful to log on the client side */
- optional string message = 4;
+ optional string message = 5;
}
/**
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java?rev=1195829&r1=1195828&r2=1195829&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java Tue Nov 1 05:17:08 2011
@@ -31,6 +31,7 @@ import java.util.Random;
import junit.framework.TestCase;
+import org.apache.commons.digester.SetRootRule;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -43,6 +44,7 @@ import org.apache.hadoop.hdfs.protocol.D
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
+import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol;
import org.apache.hadoop.hdfs.protocol.datatransfer.Op;
import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
@@ -50,6 +52,7 @@ import org.apache.hadoop.hdfs.protocol.d
import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto.Builder;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReadOpChecksumInfoProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
@@ -59,6 +62,7 @@ import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.StringUtils;
import org.junit.Test;
+import org.mockito.Mockito;
/**
* This tests data transfer protocol handling in the Datanode. It sends
@@ -68,6 +72,9 @@ public class TestDataTransferProtocol ex
private static final Log LOG = LogFactory.getLog(
"org.apache.hadoop.hdfs.TestDataTransferProtocol");
+
+ private static final DataChecksum DEFAULT_CHECKSUM =
+ DataChecksum.newDataChecksum(DataChecksum.CHECKSUM_CRC32, 512);
DatanodeID datanode;
InetSocketAddress dnAddr;
@@ -149,9 +156,6 @@ public class TestDataTransferProtocol ex
private void writeZeroLengthPacket(ExtendedBlock block, String description)
throws IOException {
- sendOut.writeByte((byte)DataChecksum.CHECKSUM_CRC32);
- sendOut.writeInt(512); // checksum size
-
PacketHeader hdr = new PacketHeader(
8, // size of packet
block.getNumBytes(), // OffsetInBlock
@@ -188,7 +192,8 @@ public class TestDataTransferProtocol ex
recvBuf.reset();
sender.writeBlock(block, BlockTokenSecretManager.DUMMY_TOKEN, "cl",
new DatanodeInfo[1], null, stage,
- 0, block.getNumBytes(), block.getNumBytes(), newGS);
+ 0, block.getNumBytes(), block.getNumBytes(), newGS,
+ DEFAULT_CHECKSUM);
if (eofExcepted) {
sendResponse(Status.ERROR, null, null, recvOut);
sendRecvData(description, true);
@@ -373,15 +378,16 @@ public class TestDataTransferProtocol ex
/* Test OP_WRITE_BLOCK */
sendBuf.reset();
+
+ DataChecksum badChecksum = Mockito.spy(DEFAULT_CHECKSUM);
+ Mockito.doReturn(-1).when(badChecksum).getBytesPerChecksum();
+
sender.writeBlock(new ExtendedBlock(poolId, newBlockId),
BlockTokenSecretManager.DUMMY_TOKEN, "cl",
new DatanodeInfo[1], null,
BlockConstructionStage.PIPELINE_SETUP_CREATE,
- 0, 0L, 0L, 0L);
- sendOut.writeByte((byte)DataChecksum.CHECKSUM_CRC32);
-
- // bad bytes per checksum
- sendOut.writeInt(-1-random.nextInt(oneMil));
+ 0, 0L, 0L, 0L,
+ badChecksum);
recvBuf.reset();
sendResponse(Status.ERROR, null, null, recvOut);
sendRecvData("wrong bytesPerChecksum while writing", true);
@@ -391,9 +397,8 @@ public class TestDataTransferProtocol ex
sender.writeBlock(new ExtendedBlock(poolId, ++newBlockId),
BlockTokenSecretManager.DUMMY_TOKEN, "cl",
new DatanodeInfo[1], null,
- BlockConstructionStage.PIPELINE_SETUP_CREATE, 0, 0L, 0L, 0L);
- sendOut.writeByte((byte)DataChecksum.CHECKSUM_CRC32);
- sendOut.writeInt(512);
+ BlockConstructionStage.PIPELINE_SETUP_CREATE, 0, 0L, 0L, 0L,
+ DEFAULT_CHECKSUM);
PacketHeader hdr = new PacketHeader(
4, // size of packet
@@ -414,9 +419,8 @@ public class TestDataTransferProtocol ex
sender.writeBlock(new ExtendedBlock(poolId, ++newBlockId),
BlockTokenSecretManager.DUMMY_TOKEN, "cl",
new DatanodeInfo[1], null,
- BlockConstructionStage.PIPELINE_SETUP_CREATE, 0, 0L, 0L, 0L);
- sendOut.writeByte((byte)DataChecksum.CHECKSUM_CRC32);
- sendOut.writeInt(512); // checksum size
+ BlockConstructionStage.PIPELINE_SETUP_CREATE, 0, 0L, 0L, 0L,
+ DEFAULT_CHECKSUM);
hdr = new PacketHeader(
8, // size of packet
@@ -462,7 +466,15 @@ public class TestDataTransferProtocol ex
// negative length is ok. Datanode assumes we want to read the whole block.
recvBuf.reset();
- sendResponse(Status.SUCCESS, null, null, recvOut);
+
+ BlockOpResponseProto.newBuilder()
+ .setStatus(Status.SUCCESS)
+ .setReadOpChecksumInfo(ReadOpChecksumInfoProto.newBuilder()
+ .setChecksum(DataTransferProtoUtil.toProto(DEFAULT_CHECKSUM))
+ .setChunkOffset(0L))
+ .build()
+ .writeDelimitedTo(recvOut);
+
sendBuf.reset();
sender.readBlock(blk, BlockTokenSecretManager.DUMMY_TOKEN, "cl",
0L, -1L-random.nextInt(oneMil));
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java?rev=1195829&r1=1195828&r2=1195829&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java Tue Nov 1 05:17:08 2011
@@ -40,6 +40,7 @@ import org.apache.hadoop.hdfs.protocol.d
import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
+import org.apache.hadoop.util.DataChecksum;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -140,14 +141,13 @@ public class TestDiskError {
// write the header.
DataOutputStream out = new DataOutputStream(s.getOutputStream());
+ DataChecksum checksum = DataChecksum.newDataChecksum(
+ DataChecksum.CHECKSUM_CRC32, 512);
new Sender(out).writeBlock(block.getBlock(),
BlockTokenSecretManager.DUMMY_TOKEN, "",
new DatanodeInfo[0], null,
- BlockConstructionStage.PIPELINE_SETUP_CREATE, 1, 0L, 0L, 0L);
-
- // write check header
- out.writeByte( 1 );
- out.writeInt( 512 );
+ BlockConstructionStage.PIPELINE_SETUP_CREATE, 1, 0L, 0L, 0L,
+ checksum);
out.flush();
// close the connection before sending the content of the block