You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by to...@apache.org on 2011/11/01 06:16:54 UTC

svn commit: r1195828 [3/3] - in /hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs: ./ src/main/java/org/apache/hadoop/hdfs/ src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/ src/main/java/org/apache/hadoop/hdfs/protocol/prot...

Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java?rev=1195828&r1=1195827&r2=1195828&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java Tue Nov  1 05:16:53 2011
@@ -403,8 +403,8 @@ class BlockPoolSliceScanner {
       try {
         adjustThrottler();
         
-        blockSender = new BlockSender(block, 0, -1, false, false, true,
-            datanode, null);
+        blockSender = new BlockSender(block, 0, -1, false, true, datanode,
+            null);
 
         DataOutputStream out = 
                 new DataOutputStream(new IOUtils.NullOutputStream());

Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java?rev=1195828&r1=1195827&r2=1195828&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java Tue Nov  1 05:16:53 2011
@@ -108,7 +108,8 @@ class BlockReceiver implements Closeable
       final BlockConstructionStage stage, 
       final long newGs, final long minBytesRcvd, final long maxBytesRcvd, 
       final String clientname, final DatanodeInfo srcDataNode,
-      final DataNode datanode) throws IOException {
+      final DataNode datanode, DataChecksum requestedChecksum)
+      throws IOException {
     try{
       this.block = block;
       this.in = in;
@@ -177,7 +178,7 @@ class BlockReceiver implements Closeable
         }
       }
       // read checksum meta information
-      this.checksum = DataChecksum.newDataChecksum(in);
+      this.checksum = requestedChecksum;
       this.bytesPerChecksum = checksum.getBytesPerChecksum();
       this.checksumSize = checksum.getChecksumSize();
       this.dropCacheBehindWrites = datanode.shouldDropCacheBehindWrites();
@@ -687,11 +688,6 @@ class BlockReceiver implements Closeable
     }
   }
 
-  void writeChecksumHeader(DataOutputStream mirrorOut) throws IOException {
-    checksum.writeHeader(mirrorOut);
-  }
- 
-
   void receiveBlock(
       DataOutputStream mirrOut, // output to next datanode
       DataInputStream mirrIn,   // input from next datanode

Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java?rev=1195828&r1=1195827&r2=1195828&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java Tue Nov  1 05:16:53 2011
@@ -134,8 +134,6 @@ class BlockSender implements java.io.Clo
   private final int checksumSize;
   /** If true, failure to read checksum is ignored */
   private final boolean corruptChecksumOk;
-  /** true if chunk offset is needed to be sent in Checksum header */
-  private final boolean chunkOffsetOK;
   /** Sequence number of packet being sent */
   private long seqno;
   /** Set to true if transferTo is allowed for sending data to the client */
@@ -173,19 +171,17 @@ class BlockSender implements java.io.Clo
    * @param startOffset starting offset to read from
    * @param length length of data to read
    * @param corruptChecksumOk
-   * @param chunkOffsetOK need to send check offset in checksum header
    * @param verifyChecksum verify checksum while reading the data
    * @param datanode datanode from which the block is being read
    * @param clientTraceFmt format string used to print client trace logs
    * @throws IOException
    */
   BlockSender(ExtendedBlock block, long startOffset, long length,
-              boolean corruptChecksumOk, boolean chunkOffsetOK,
-              boolean verifyChecksum, DataNode datanode, String clientTraceFmt)
+              boolean corruptChecksumOk, boolean verifyChecksum,
+              DataNode datanode, String clientTraceFmt)
       throws IOException {
     try {
       this.block = block;
-      this.chunkOffsetOK = chunkOffsetOK;
       this.corruptChecksumOk = corruptChecksumOk;
       this.verifyChecksum = verifyChecksum;
       this.clientTraceFmt = clientTraceFmt;
@@ -600,8 +596,6 @@ class BlockSender implements java.io.Clo
 
     final long startTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0;
     try {
-      writeChecksumHeader(out);
-      
       int maxChunksPerPacket;
       int pktSize = PacketHeader.PKT_HEADER_LEN;
       boolean transferTo = transferToAllowed && !verifyChecksum
@@ -691,22 +685,6 @@ class BlockSender implements java.io.Clo
     return (endOffset - offset) > LONG_READ_THRESHOLD_BYTES;
   }
 
-  
-  /**
-   * Write checksum header to the output stream
-   */
-  private void writeChecksumHeader(DataOutputStream out) throws IOException {
-    try {
-      checksum.writeHeader(out);
-      if (chunkOffsetOK) {
-        out.writeLong(offset);
-      }
-      out.flush();
-    } catch (IOException e) { //socket error
-      throw ioeToSocketException(e);
-    }
-  }
-    
   /**
    * Write packet header into {@code pkt}
    */
@@ -720,4 +698,19 @@ class BlockSender implements java.io.Clo
   boolean didSendEntireByteRange() {
     return sentEntireByteRange;
   }
+
+  /**
+   * @return the checksum type that will be used with this block transfer.
+   */
+  DataChecksum getChecksum() {
+    return checksum;
+  }
+
+  /**
+   * @return the offset into the block file where the sender is currently
+   * reading.
+   */
+  long getOffset() {
+    return offset;
+  }
 }

Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=1195828&r1=1195827&r2=1195828&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java Tue Nov  1 05:16:53 2011
@@ -2053,7 +2053,7 @@ public class DataNode extends Configured
         out = new DataOutputStream(new BufferedOutputStream(baseStream,
             HdfsConstants.SMALL_BUFFER_SIZE));
         blockSender = new BlockSender(b, 0, b.getNumBytes(), 
-            false, false, false, DataNode.this, null);
+            false, false, DataNode.this, null);
         DatanodeInfo srcNode = new DatanodeInfo(bpReg);
 
         //
@@ -2066,7 +2066,7 @@ public class DataNode extends Configured
         }
 
         new Sender(out).writeBlock(b, accessToken, clientname, targets, srcNode,
-            stage, 0, 0, 0, 0);
+            stage, 0, 0, 0, 0, blockSender.getChecksum());
 
         // send data & checksum
         blockSender.sendBlock(out, baseStream, null);

Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java?rev=1195828&r1=1195827&r2=1195828&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java Tue Nov  1 05:16:53 2011
@@ -44,6 +44,7 @@ import org.apache.hadoop.hdfs.protocol.E
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.HdfsProtoUtil;
 import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
+import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
 import org.apache.hadoop.hdfs.protocol.datatransfer.Op;
 import org.apache.hadoop.hdfs.protocol.datatransfer.Receiver;
 import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
@@ -52,6 +53,7 @@ import org.apache.hadoop.hdfs.protocol.p
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProtoOrBuilder;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientReadStatusProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReadOpChecksumInfoProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
@@ -225,7 +227,7 @@ class DataXceiver extends Receiver imple
     try {
       try {
         blockSender = new BlockSender(block, blockOffset, length,
-            true, true, false, datanode, clientTraceFmt);
+            true, false, datanode, clientTraceFmt);
       } catch(IOException e) {
         String msg = "opReadBlock " + block + " received exception " + e; 
         LOG.info(msg);
@@ -234,7 +236,8 @@ class DataXceiver extends Receiver imple
       }
       
       // send op status
-      sendResponse(s, SUCCESS, null, datanode.socketWriteTimeout);
+      writeSuccessWithChecksumInfo(blockSender,
+          getStreamWithTimeout(s, datanode.socketWriteTimeout));
 
       long read = blockSender.sendBlock(out, baseStream, null); // send data
 
@@ -292,7 +295,8 @@ class DataXceiver extends Receiver imple
       final int pipelineSize,
       final long minBytesRcvd,
       final long maxBytesRcvd,
-      final long latestGenerationStamp) throws IOException {
+      final long latestGenerationStamp,
+      DataChecksum requestedChecksum) throws IOException {
     updateCurrentThreadName("Receiving block " + block + " client=" + clientname);
     final boolean isDatanode = clientname.length() == 0;
     final boolean isClient = !isDatanode;
@@ -351,7 +355,7 @@ class DataXceiver extends Receiver imple
             s.getRemoteSocketAddress().toString(),
             s.getLocalSocketAddress().toString(),
             stage, latestGenerationStamp, minBytesRcvd, maxBytesRcvd,
-            clientname, srcDataNode, datanode);
+            clientname, srcDataNode, datanode, requestedChecksum);
       } else {
         datanode.data.recoverClose(block, latestGenerationStamp, minBytesRcvd);
       }
@@ -381,11 +385,8 @@ class DataXceiver extends Receiver imple
 
           new Sender(mirrorOut).writeBlock(originalBlock, blockToken,
               clientname, targets, srcDataNode, stage, pipelineSize,
-              minBytesRcvd, maxBytesRcvd, latestGenerationStamp);
+              minBytesRcvd, maxBytesRcvd, latestGenerationStamp, requestedChecksum);
 
-          if (blockReceiver != null) { // send checksum header
-            blockReceiver.writeChecksumHeader(mirrorOut);
-          }
           mirrorOut.flush();
 
           // read connect ack (only for clients, not for replication req)
@@ -600,8 +601,8 @@ class DataXceiver extends Receiver imple
 
     try {
       // check if the block exists or not
-      blockSender = new BlockSender(block, 0, -1, false, false, false, 
-          datanode, null);
+      blockSender = new BlockSender(block, 0, -1, false, false, datanode, 
+          null);
 
       // set up response stream
       OutputStream baseStream = NetUtils.getOutputStream(
@@ -610,7 +611,7 @@ class DataXceiver extends Receiver imple
           baseStream, HdfsConstants.SMALL_BUFFER_SIZE));
 
       // send status first
-      writeResponse(SUCCESS, null, reply);
+      writeSuccessWithChecksumInfo(blockSender, reply);
       // send block content to the target
       long read = blockSender.sendBlock(reply, baseStream, 
                                         dataXceiverServer.balanceThrottler);
@@ -709,11 +710,16 @@ class DataXceiver extends Receiver imple
         throw new IOException("Copy block " + block + " from "
             + proxySock.getRemoteSocketAddress() + " failed");
       }
+      
+      // get checksum info about the block we're copying
+      ReadOpChecksumInfoProto checksumInfo = copyResponse.getReadOpChecksumInfo();
+      DataChecksum remoteChecksum = DataTransferProtoUtil.fromProto(
+          checksumInfo.getChecksum());
       // open a block receiver and check if the block does not exist
       blockReceiver = new BlockReceiver(
           block, proxyReply, proxySock.getRemoteSocketAddress().toString(),
           proxySock.getLocalSocketAddress().toString(),
-          null, 0, 0, 0, "", null, datanode);
+          null, 0, 0, 0, "", null, datanode, remoteChecksum);
 
       // receive a block
       blockReceiver.receiveBlock(null, null, null, null, 
@@ -767,15 +773,19 @@ class DataXceiver extends Receiver imple
    * @param opStatus status message to write
    * @param timeout send timeout
    **/
-  private void sendResponse(Socket s, Status status, String message,
+  private static void sendResponse(Socket s, Status status, String message,
       long timeout) throws IOException {
-    DataOutputStream reply = 
-      new DataOutputStream(NetUtils.getOutputStream(s, timeout));
+    DataOutputStream reply = getStreamWithTimeout(s, timeout);
     
     writeResponse(status, message, reply);
   }
   
-  private void writeResponse(Status status, String message, OutputStream out)
+  private static DataOutputStream getStreamWithTimeout(Socket s, long timeout)
+      throws IOException {
+    return new DataOutputStream(NetUtils.getOutputStream(s, timeout));
+  }
+
+  private static void writeResponse(Status status, String message, OutputStream out)
   throws IOException {
     BlockOpResponseProto.Builder response = BlockOpResponseProto.newBuilder()
       .setStatus(status);
@@ -786,6 +796,22 @@ class DataXceiver extends Receiver imple
     out.flush();
   }
   
+  private void writeSuccessWithChecksumInfo(BlockSender blockSender,
+      DataOutputStream out) throws IOException {
+
+    ReadOpChecksumInfoProto ckInfo = ReadOpChecksumInfoProto.newBuilder()
+      .setChecksum(DataTransferProtoUtil.toProto(blockSender.getChecksum()))
+      .setChunkOffset(blockSender.getOffset())
+      .build();
+      
+    BlockOpResponseProto response = BlockOpResponseProto.newBuilder()
+      .setStatus(SUCCESS)
+      .setReadOpChecksumInfo(ckInfo)
+      .build();
+    response.writeDelimitedTo(out);
+    out.flush();
+  }
+  
 
   private void checkAccess(DataOutputStream out, final boolean reply, 
       final ExtendedBlock blk,

Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/proto/datatransfer.proto
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/proto/datatransfer.proto?rev=1195828&r1=1195827&r2=1195828&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/proto/datatransfer.proto (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/proto/datatransfer.proto Tue Nov  1 05:16:53 2011
@@ -40,6 +40,17 @@ message OpReadBlockProto {
   required uint64 offset = 2;
   required uint64 len = 3;
 }
+
+
+message ChecksumProto {
+  enum ChecksumType {
+    NULL = 0;
+    CRC32 = 1;
+    CRC32C = 2;
+  }
+  required ChecksumType type = 1;
+  required uint32 bytesPerChecksum = 2;
+}
   
 message OpWriteBlockProto {
   required ClientOperationHeaderProto header = 1;
@@ -69,6 +80,11 @@ message OpWriteBlockProto {
   required uint64 minBytesRcvd = 6;
   required uint64 maxBytesRcvd = 7;
   required uint64 latestGenerationStamp = 8;
+
+  /**
+   * The requested checksum mechanism for this block write.
+   */
+  required ChecksumProto requestedChecksum = 9;
 }
   
 message OpTransferBlockProto {
@@ -114,14 +130,30 @@ message PipelineAckProto {
   repeated Status status = 2;
 }
 
+/**
+ * Sent as part of the BlockOpResponseProto
+ * for READ_BLOCK and COPY_BLOCK operations.
+ */
+message ReadOpChecksumInfoProto {
+  required ChecksumProto checksum = 1;
+
+  /**
+   * The offset into the block at which the first packet
+   * will start. This is necessary since reads will align
+   * backwards to a checksum chunk boundary.
+   */
+  required uint64 chunkOffset = 2;
+}
+
 message BlockOpResponseProto {
   required Status status = 1;
 
   optional string firstBadLink = 2;
   optional OpBlockChecksumResponseProto checksumResponse = 3;
+  optional ReadOpChecksumInfoProto readOpChecksumInfo = 4;
 
   /** explanatory text which may be useful to log on the client side */
-  optional string message = 4;
+  optional string message = 5;
 }
 
 /**

Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java?rev=1195828&r1=1195827&r2=1195828&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java Tue Nov  1 05:16:53 2011
@@ -31,6 +31,7 @@ import java.util.Random;
 
 import junit.framework.TestCase;
 
+import org.apache.commons.digester.SetRootRule;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -43,6 +44,7 @@ import org.apache.hadoop.hdfs.protocol.D
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
 import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
+import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
 import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol;
 import org.apache.hadoop.hdfs.protocol.datatransfer.Op;
 import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
@@ -50,6 +52,7 @@ import org.apache.hadoop.hdfs.protocol.d
 import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto.Builder;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReadOpChecksumInfoProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
@@ -59,6 +62,7 @@ import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.util.DataChecksum;
 import org.apache.hadoop.util.StringUtils;
 import org.junit.Test;
+import org.mockito.Mockito;
 
 /**
  * This tests data transfer protocol handling in the Datanode. It sends
@@ -68,6 +72,9 @@ public class TestDataTransferProtocol ex
   
   private static final Log LOG = LogFactory.getLog(
                     "org.apache.hadoop.hdfs.TestDataTransferProtocol");
+
+  private static final DataChecksum DEFAULT_CHECKSUM =
+    DataChecksum.newDataChecksum(DataChecksum.CHECKSUM_CRC32, 512);
   
   DatanodeID datanode;
   InetSocketAddress dnAddr;
@@ -149,9 +156,6 @@ public class TestDataTransferProtocol ex
   
   private void writeZeroLengthPacket(ExtendedBlock block, String description)
   throws IOException {
-    sendOut.writeByte((byte)DataChecksum.CHECKSUM_CRC32);
-    sendOut.writeInt(512);         // checksum size
-
     PacketHeader hdr = new PacketHeader(
       8,                   // size of packet
       block.getNumBytes(), // OffsetInBlock
@@ -188,7 +192,8 @@ public class TestDataTransferProtocol ex
     recvBuf.reset();
     sender.writeBlock(block, BlockTokenSecretManager.DUMMY_TOKEN, "cl",
         new DatanodeInfo[1], null, stage,
-        0, block.getNumBytes(), block.getNumBytes(), newGS);
+        0, block.getNumBytes(), block.getNumBytes(), newGS,
+        DEFAULT_CHECKSUM);
     if (eofExcepted) {
       sendResponse(Status.ERROR, null, null, recvOut);
       sendRecvData(description, true);
@@ -373,15 +378,16 @@ public class TestDataTransferProtocol ex
     
     /* Test OP_WRITE_BLOCK */
     sendBuf.reset();
+    
+    DataChecksum badChecksum = Mockito.spy(DEFAULT_CHECKSUM);
+    Mockito.doReturn(-1).when(badChecksum).getBytesPerChecksum();
+
     sender.writeBlock(new ExtendedBlock(poolId, newBlockId),
         BlockTokenSecretManager.DUMMY_TOKEN, "cl",
         new DatanodeInfo[1], null,
         BlockConstructionStage.PIPELINE_SETUP_CREATE,
-        0, 0L, 0L, 0L);
-    sendOut.writeByte((byte)DataChecksum.CHECKSUM_CRC32);
-    
-    // bad bytes per checksum
-    sendOut.writeInt(-1-random.nextInt(oneMil));
+        0, 0L, 0L, 0L,
+        badChecksum);
     recvBuf.reset();
     sendResponse(Status.ERROR, null, null, recvOut);
     sendRecvData("wrong bytesPerChecksum while writing", true);
@@ -391,9 +397,8 @@ public class TestDataTransferProtocol ex
     sender.writeBlock(new ExtendedBlock(poolId, ++newBlockId),
         BlockTokenSecretManager.DUMMY_TOKEN, "cl",
         new DatanodeInfo[1], null,
-        BlockConstructionStage.PIPELINE_SETUP_CREATE, 0, 0L, 0L, 0L);
-    sendOut.writeByte((byte)DataChecksum.CHECKSUM_CRC32);
-    sendOut.writeInt(512);
+        BlockConstructionStage.PIPELINE_SETUP_CREATE, 0, 0L, 0L, 0L,
+        DEFAULT_CHECKSUM);
 
     PacketHeader hdr = new PacketHeader(
       4,     // size of packet
@@ -414,9 +419,8 @@ public class TestDataTransferProtocol ex
     sender.writeBlock(new ExtendedBlock(poolId, ++newBlockId),
         BlockTokenSecretManager.DUMMY_TOKEN, "cl",
         new DatanodeInfo[1], null,
-        BlockConstructionStage.PIPELINE_SETUP_CREATE, 0, 0L, 0L, 0L);
-    sendOut.writeByte((byte)DataChecksum.CHECKSUM_CRC32);
-    sendOut.writeInt(512);         // checksum size
+        BlockConstructionStage.PIPELINE_SETUP_CREATE, 0, 0L, 0L, 0L,
+        DEFAULT_CHECKSUM);
 
     hdr = new PacketHeader(
       8,     // size of packet
@@ -462,7 +466,15 @@ public class TestDataTransferProtocol ex
     
     // negative length is ok. Datanode assumes we want to read the whole block.
     recvBuf.reset();
-    sendResponse(Status.SUCCESS, null, null, recvOut);
+    
+    BlockOpResponseProto.newBuilder()
+      .setStatus(Status.SUCCESS)
+      .setReadOpChecksumInfo(ReadOpChecksumInfoProto.newBuilder()
+          .setChecksum(DataTransferProtoUtil.toProto(DEFAULT_CHECKSUM))
+          .setChunkOffset(0L))
+      .build()
+      .writeDelimitedTo(recvOut);
+    
     sendBuf.reset();
     sender.readBlock(blk, BlockTokenSecretManager.DUMMY_TOKEN, "cl",
         0L, -1L-random.nextInt(oneMil));

Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java?rev=1195828&r1=1195827&r2=1195828&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java Tue Nov  1 05:16:53 2011
@@ -40,6 +40,7 @@ import org.apache.hadoop.hdfs.protocol.d
 import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
 import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
+import org.apache.hadoop.util.DataChecksum;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -140,14 +141,13 @@ public class TestDiskError {
     // write the header.
     DataOutputStream out = new DataOutputStream(s.getOutputStream());
 
+    DataChecksum checksum = DataChecksum.newDataChecksum(
+        DataChecksum.CHECKSUM_CRC32, 512);
     new Sender(out).writeBlock(block.getBlock(),
         BlockTokenSecretManager.DUMMY_TOKEN, "",
         new DatanodeInfo[0], null,
-        BlockConstructionStage.PIPELINE_SETUP_CREATE, 1, 0L, 0L, 0L);
-
-    // write check header
-    out.writeByte( 1 );
-    out.writeInt( 512 );
+        BlockConstructionStage.PIPELINE_SETUP_CREATE, 1, 0L, 0L, 0L,
+        checksum);
     out.flush();
 
     // close the connection before sending the content of the block