You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by sz...@apache.org on 2011/06/24 01:57:19 UTC

svn commit: r1139124 - in /hadoop/common/trunk/hdfs: ./ src/java/org/apache/hadoop/hdfs/ src/java/org/apache/hadoop/hdfs/protocol/datatransfer/ src/java/org/apache/hadoop/hdfs/server/balancer/ src/java/org/apache/hadoop/hdfs/server/datanode/ src/test/a...

Author: szetszwo
Date: Thu Jun 23 23:57:18 2011
New Revision: 1139124

URL: http://svn.apache.org/viewvc?rev=1139124&view=rev
Log:
HDFS-2087. Declare methods in DataTransferProtocol interface, and change Sender and Receiver to implement the interface.

Modified:
    hadoop/common/trunk/hdfs/CHANGES.txt
    hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/BlockReader.java
    hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/DFSClient.java
    hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/DFSOutputStream.java
    hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java
    hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java
    hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java
    hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
    hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
    hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
    hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java
    hadoop/common/trunk/hdfs/src/test/aop/org/apache/hadoop/hdfs/server/datanode/DataTransferProtocolAspects.aj
    hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/DFSTestUtil.java
    hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
    hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java
    hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java

Modified: hadoop/common/trunk/hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/CHANGES.txt?rev=1139124&r1=1139123&r2=1139124&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/CHANGES.txt (original)
+++ hadoop/common/trunk/hdfs/CHANGES.txt Thu Jun 23 23:57:18 2011
@@ -526,6 +526,9 @@ Trunk (unreleased changes)
     HDFS-2092. Remove some object references to Configuration in DFSClient.
     (Bharath Mundlapudi via szetszwo)
 
+    HDFS-2087. Declare methods in DataTransferProtocol interface, and change
+    Sender and Receiver to implement the interface.  (szetszwo)
+
   OPTIMIZATIONS
 
     HDFS-1458. Improve checkpoint performance by avoiding unnecessary image

Modified: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/BlockReader.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/BlockReader.java?rev=1139124&r1=1139123&r2=1139124&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/BlockReader.java (original)
+++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/BlockReader.java Thu Jun 23 23:57:18 2011
@@ -404,10 +404,9 @@ public class BlockReader extends FSInput
                                      String clientName)
                                      throws IOException {
     // in and out will be closed when sock is closed (by the caller)
-    Sender.opReadBlock(
-        new DataOutputStream(new BufferedOutputStream(
-            NetUtils.getOutputStream(sock,HdfsConstants.WRITE_TIMEOUT))),
-        block, startOffset, len, clientName, blockToken);
+    final DataOutputStream out = new DataOutputStream(new BufferedOutputStream(
+          NetUtils.getOutputStream(sock, HdfsConstants.WRITE_TIMEOUT)));
+    new Sender(out).readBlock(block, blockToken, clientName, startOffset, len);
     
     //
     // Get bytes in block, set streams

Modified: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/DFSClient.java?rev=1139124&r1=1139123&r2=1139124&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/DFSClient.java Thu Jun 23 23:57:18 2011
@@ -1164,7 +1164,7 @@ public class DFSClient implements FSCons
                 + Op.BLOCK_CHECKSUM + ", block=" + block);
           }
           // get block MD5
-          Sender.opBlockChecksum(out, block, lb.getBlockToken());
+          new Sender(out).blockChecksum(block, lb.getBlockToken());
 
           final BlockOpResponseProto reply =
             BlockOpResponseProto.parseFrom(HdfsProtoUtil.vintPrefixed(in));

Modified: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/DFSOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/DFSOutputStream.java?rev=1139124&r1=1139123&r2=1139124&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/DFSOutputStream.java (original)
+++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/DFSOutputStream.java Thu Jun 23 23:57:18 2011
@@ -846,8 +846,8 @@ class DFSOutputStream extends FSOutputSu
             DataNode.SMALL_BUFFER_SIZE));
 
         //send the TRANSFER_BLOCK request
-        Sender.opTransferBlock(out, block,
-            dfsClient.clientName, targets, blockToken);
+        new Sender(out).transferBlock(block, blockToken, dfsClient.clientName,
+            targets);
 
         //ack
         in = new DataInputStream(NetUtils.getInputStream(sock));
@@ -1019,10 +1019,9 @@ class DFSOutputStream extends FSOutputSu
         blockReplyStream = new DataInputStream(NetUtils.getInputStream(s));
 
         // send the request
-        Sender.opWriteBlock(out, block,
-            nodes.length, recoveryFlag ? stage.getRecoveryStage() : stage, newGS, 
-            block.getNumBytes(), bytesSent, dfsClient.clientName, null, nodes,
-            accessToken);
+        new Sender(out).writeBlock(block, accessToken, dfsClient.clientName,
+            nodes, null, recoveryFlag? stage.getRecoveryStage() : stage, 
+            nodes.length, block.getNumBytes(), bytesSent, newGS);
         checksum.writeHeader(out);
         out.flush();
 

Modified: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java?rev=1139124&r1=1139123&r2=1139124&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java (original)
+++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java Thu Jun 23 23:57:18 2011
@@ -17,10 +17,16 @@
  */
 package org.apache.hadoop.hdfs.protocol.datatransfer;
 
+import java.io.IOException;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.security.token.Token;
 
 /**
  * Transfer data to/from datanode using a streaming protocol.
@@ -35,8 +41,101 @@ public interface DataTransferProtocol {
    * when protocol changes. It is not very obvious. 
    */
   /*
-   * Version 27:
-   *    Move DataTransferProtocol and the inner classes to a package.
+   * Version 28:
+   *    Declare methods in DataTransferProtocol interface.
+   */
+  public static final int DATA_TRANSFER_VERSION = 28;
+
+  /** 
+   * Read a block.
+   * 
+   * @param blk the block being read.
+   * @param blockToken security token for accessing the block.
+   * @param clientName client's name.
+   * @param blockOffset offset of the block.
+   * @param length maximum number of bytes for this read.
+   */
+  public void readBlock(final ExtendedBlock blk,
+      final Token<BlockTokenIdentifier> blockToken,
+      final String clientName,
+      final long blockOffset,
+      final long length) throws IOException;
+
+  /**
+   * Write a block to a datanode pipeline.
+   * 
+   * @param blk the block being written.
+   * @param blockToken security token for accessing the block.
+   * @param clientName client's name.
+   * @param targets target datanodes in the pipeline.
+   * @param source source datanode.
+   * @param stage pipeline stage.
+   * @param pipelineSize the size of the pipeline.
+   * @param minBytesRcvd minimum number of bytes received.
+   * @param maxBytesRcvd maximum number of bytes received.
+   * @param latestGenerationStamp the latest generation stamp of the block.
+   */
+  public void writeBlock(final ExtendedBlock blk,
+      final Token<BlockTokenIdentifier> blockToken,
+      final String clientName,
+      final DatanodeInfo[] targets,
+      final DatanodeInfo source,
+      final BlockConstructionStage stage,
+      final int pipelineSize,
+      final long minBytesRcvd,
+      final long maxBytesRcvd,
+      final long latestGenerationStamp) throws IOException;
+
+  /**
+   * Transfer a block to another datanode.
+   * The block stage must be
+   * either {@link BlockConstructionStage#TRANSFER_RBW}
+   * or {@link BlockConstructionStage#TRANSFER_FINALIZED}.
+   * 
+   * @param blk the block being transferred.
+   * @param blockToken security token for accessing the block.
+   * @param clientName client's name.
+   * @param targets target datanodes.
+   */
+  public void transferBlock(final ExtendedBlock blk,
+      final Token<BlockTokenIdentifier> blockToken,
+      final String clientName,
+      final DatanodeInfo[] targets) throws IOException;
+
+  /**
+   * Receive a block from a source datanode
+   * and then notifies the namenode
+   * to remove the copy from the original datanode.
+   * Note that the source datanode and the original datanode can be different.
+   * It is used for balancing purpose.
+   * 
+   * @param blk the block being replaced.
+   * @param blockToken security token for accessing the block.
+   * @param delHint the hint for deleting the block in the original datanode.
+   * @param source the source datanode for receiving the block.
+   */
+  public void replaceBlock(final ExtendedBlock blk,
+      final Token<BlockTokenIdentifier> blockToken,
+      final String delHint,
+      final DatanodeInfo source) throws IOException;
+
+  /**
+   * Copy a block. 
+   * It is used for balancing purpose.
+   * 
+   * @param blk the block being copied.
+   * @param blockToken security token for accessing the block.
+   */
+  public void copyBlock(final ExtendedBlock blk,
+      final Token<BlockTokenIdentifier> blockToken) throws IOException;
+
+  /**
+   * Get block checksum (MD5 of CRC32).
+   * 
+   * @param blk a block.
+   * @param blockToken security token for accessing the block.
+   * @throws IOException
    */
-  public static final int DATA_TRANSFER_VERSION = 27;
+  public void blockChecksum(final ExtendedBlock blk,
+      final Token<BlockTokenIdentifier> blockToken) throws IOException;
 }

Modified: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java?rev=1139124&r1=1139123&r2=1139124&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java (original)
+++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java Thu Jun 23 23:57:18 2011
@@ -27,23 +27,26 @@ import java.io.IOException;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
-import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpCopyBlockProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReadBlockProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReplaceBlockProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpTransferBlockProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProto;
-import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
-import org.apache.hadoop.security.token.Token;
 
 /** Receiver */
 @InterfaceAudience.Private
 @InterfaceStability.Evolving
-public abstract class Receiver {
+public abstract class Receiver implements DataTransferProtocol {
+  protected final DataInputStream in;
+
+  /** Create a receiver for DataTransferProtocol with a socket. */
+  protected Receiver(final DataInputStream in) {
+    this.in = in;
+  }
+
   /** Read an Op.  It also checks protocol version. */
-  protected final Op readOp(DataInputStream in) throws IOException {
+  protected final Op readOp() throws IOException {
     final short version = in.readShort();
     if (version != DataTransferProtocol.DATA_TRANSFER_VERSION) {
       throw new IOException( "Version Mismatch (Expected: " +
@@ -54,11 +57,10 @@ public abstract class Receiver {
   }
 
   /** Process op by the corresponding method. */
-  protected final void processOp(Op op, DataInputStream in
-      ) throws IOException {
+  protected final void processOp(Op op) throws IOException {
     switch(op) {
     case READ_BLOCK:
-      opReadBlock(in);
+      opReadBlock();
       break;
     case WRITE_BLOCK:
       opWriteBlock(in);
@@ -81,121 +83,60 @@ public abstract class Receiver {
   }
 
   /** Receive OP_READ_BLOCK */
-  private void opReadBlock(DataInputStream in) throws IOException {
+  private void opReadBlock() throws IOException {
     OpReadBlockProto proto = OpReadBlockProto.parseFrom(vintPrefixed(in));
-    
-    ExtendedBlock b = fromProto(
-        proto.getHeader().getBaseHeader().getBlock());
-    Token<BlockTokenIdentifier> token = fromProto(
-        proto.getHeader().getBaseHeader().getToken());
-
-    opReadBlock(in, b, proto.getOffset(), proto.getLen(),
-        proto.getHeader().getClientName(), token);
-  }
-  /**
-   * Abstract OP_READ_BLOCK method. Read a block.
-   */
-  protected abstract void opReadBlock(DataInputStream in, ExtendedBlock blk,
-      long offset, long length, String client,
-      Token<BlockTokenIdentifier> blockToken) throws IOException;
+    readBlock(fromProto(proto.getHeader().getBaseHeader().getBlock()),
+        fromProto(proto.getHeader().getBaseHeader().getToken()),
+        proto.getHeader().getClientName(),
+        proto.getOffset(),
+        proto.getLen());
+  }
   
   /** Receive OP_WRITE_BLOCK */
   private void opWriteBlock(DataInputStream in) throws IOException {
     final OpWriteBlockProto proto = OpWriteBlockProto.parseFrom(vintPrefixed(in));
-    opWriteBlock(in,
-        fromProto(proto.getHeader().getBaseHeader().getBlock()),
-        proto.getPipelineSize(),
-        fromProto(proto.getStage()),
-        proto.getLatestGenerationStamp(),
-        proto.getMinBytesRcvd(), proto.getMaxBytesRcvd(),
+    writeBlock(fromProto(proto.getHeader().getBaseHeader().getBlock()),
+        fromProto(proto.getHeader().getBaseHeader().getToken()),
         proto.getHeader().getClientName(),
-        fromProto(proto.getSource()),
         fromProtos(proto.getTargetsList()),
-        fromProto(proto.getHeader().getBaseHeader().getToken()));
+        fromProto(proto.getSource()),
+        fromProto(proto.getStage()),
+        proto.getPipelineSize(),
+        proto.getMinBytesRcvd(), proto.getMaxBytesRcvd(),
+        proto.getLatestGenerationStamp());
   }
 
-  /**
-   * Abstract OP_WRITE_BLOCK method. 
-   * Write a block.
-   */
-  protected abstract void opWriteBlock(DataInputStream in, ExtendedBlock blk,
-      int pipelineSize, BlockConstructionStage stage, long newGs,
-      long minBytesRcvd, long maxBytesRcvd, String client, DatanodeInfo src,
-      DatanodeInfo[] targets, Token<BlockTokenIdentifier> blockToken)
-      throws IOException;
-
   /** Receive {@link Op#TRANSFER_BLOCK} */
   private void opTransferBlock(DataInputStream in) throws IOException {
     final OpTransferBlockProto proto =
       OpTransferBlockProto.parseFrom(vintPrefixed(in));
-
-    opTransferBlock(in,
-        fromProto(proto.getHeader().getBaseHeader().getBlock()),
+    transferBlock(fromProto(proto.getHeader().getBaseHeader().getBlock()),
+        fromProto(proto.getHeader().getBaseHeader().getToken()),
         proto.getHeader().getClientName(),
-        fromProtos(proto.getTargetsList()),
-        fromProto(proto.getHeader().getBaseHeader().getToken()));
+        fromProtos(proto.getTargetsList()));
   }
 
-  /**
-   * Abstract {@link Op#TRANSFER_BLOCK} method.
-   * For {@link BlockConstructionStage#TRANSFER_RBW}
-   * or {@link BlockConstructionStage#TRANSFER_FINALIZED}.
-   */
-  protected abstract void opTransferBlock(DataInputStream in, ExtendedBlock blk,
-      String client, DatanodeInfo[] targets,
-      Token<BlockTokenIdentifier> blockToken)
-      throws IOException;
-
   /** Receive OP_REPLACE_BLOCK */
   private void opReplaceBlock(DataInputStream in) throws IOException {
     OpReplaceBlockProto proto = OpReplaceBlockProto.parseFrom(vintPrefixed(in));
-
-    opReplaceBlock(in,
-        fromProto(proto.getHeader().getBlock()),
+    replaceBlock(fromProto(proto.getHeader().getBlock()),
+        fromProto(proto.getHeader().getToken()),
         proto.getDelHint(),
-        fromProto(proto.getSource()),
-        fromProto(proto.getHeader().getToken()));
+        fromProto(proto.getSource()));
   }
 
-  /**
-   * Abstract OP_REPLACE_BLOCK method.
-   * It is used for balancing purpose; send to a destination
-   */
-  protected abstract void opReplaceBlock(DataInputStream in,
-      ExtendedBlock blk, String delHint, DatanodeInfo src,
-      Token<BlockTokenIdentifier> blockToken) throws IOException;
-
   /** Receive OP_COPY_BLOCK */
   private void opCopyBlock(DataInputStream in) throws IOException {
     OpCopyBlockProto proto = OpCopyBlockProto.parseFrom(vintPrefixed(in));
-    
-    opCopyBlock(in,
-        fromProto(proto.getHeader().getBlock()),
+    copyBlock(fromProto(proto.getHeader().getBlock()),
         fromProto(proto.getHeader().getToken()));
   }
 
-  /**
-   * Abstract OP_COPY_BLOCK method. It is used for balancing purpose; send to
-   * a proxy source.
-   */
-  protected abstract void opCopyBlock(DataInputStream in, ExtendedBlock blk,
-      Token<BlockTokenIdentifier> blockToken)
-      throws IOException;
-
   /** Receive OP_BLOCK_CHECKSUM */
   private void opBlockChecksum(DataInputStream in) throws IOException {
     OpBlockChecksumProto proto = OpBlockChecksumProto.parseFrom(vintPrefixed(in));
     
-    opBlockChecksum(in,
-        fromProto(proto.getHeader().getBlock()),
+    blockChecksum(fromProto(proto.getHeader().getBlock()),
         fromProto(proto.getHeader().getToken()));
   }
-
-  /**
-   * Abstract OP_BLOCK_CHECKSUM method.
-   * Get the checksum of a block 
-   */
-  protected abstract void opBlockChecksum(DataInputStream in,
-      ExtendedBlock blk, Token<BlockTokenIdentifier> blockToken)
-      throws IOException;
 }

Modified: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java?rev=1139124&r1=1139123&r2=1139124&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java (original)
+++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java Thu Jun 23 23:57:18 2011
@@ -44,7 +44,14 @@ import com.google.protobuf.Message;
 /** Sender */
 @InterfaceAudience.Private
 @InterfaceStability.Evolving
-public class Sender {
+public class Sender implements DataTransferProtocol {
+  private final DataOutputStream out;
+
+  /** Create a sender for DataTransferProtocol with a output stream. */
+  public Sender(final DataOutputStream out) {
+    this.out = out;    
+  }
+
   /** Initialize a operation. */
   private static void op(final DataOutput out, final Op op
       ) throws IOException {
@@ -59,79 +66,85 @@ public class Sender {
     out.flush();
   }
 
-  /** Send OP_READ_BLOCK */
-  public static void opReadBlock(DataOutputStream out, ExtendedBlock blk,
-      long blockOffset, long blockLen, String clientName,
-      Token<BlockTokenIdentifier> blockToken)
-      throws IOException {
+  @Override
+  public void readBlock(final ExtendedBlock blk,
+      final Token<BlockTokenIdentifier> blockToken,
+      final String clientName,
+      final long blockOffset,
+      final long length) throws IOException {
 
     OpReadBlockProto proto = OpReadBlockProto.newBuilder()
       .setHeader(DataTransferProtoUtil.buildClientHeader(blk, clientName, blockToken))
       .setOffset(blockOffset)
-      .setLen(blockLen)
+      .setLen(length)
       .build();
 
     send(out, Op.READ_BLOCK, proto);
   }
   
 
-  /** Send OP_WRITE_BLOCK */
-  public static void opWriteBlock(DataOutputStream out, ExtendedBlock blk,
-      int pipelineSize, BlockConstructionStage stage, long newGs,
-      long minBytesRcvd, long maxBytesRcvd, String client, DatanodeInfo src,
-      DatanodeInfo[] targets, Token<BlockTokenIdentifier> blockToken)
-      throws IOException {
-    ClientOperationHeaderProto header = DataTransferProtoUtil.buildClientHeader(blk, client,
-        blockToken);
+  @Override
+  public void writeBlock(final ExtendedBlock blk,
+      final Token<BlockTokenIdentifier> blockToken,
+      final String clientName,
+      final DatanodeInfo[] targets,
+      final DatanodeInfo source,
+      final BlockConstructionStage stage,
+      final int pipelineSize,
+      final long minBytesRcvd,
+      final long maxBytesRcvd,
+      final long latestGenerationStamp) throws IOException {
+    ClientOperationHeaderProto header = DataTransferProtoUtil.buildClientHeader(
+        blk, clientName, blockToken);
     
     OpWriteBlockProto.Builder proto = OpWriteBlockProto.newBuilder()
       .setHeader(header)
-      .addAllTargets(
-          toProtos(targets, 1))
+      .addAllTargets(toProtos(targets, 1))
       .setStage(toProto(stage))
       .setPipelineSize(pipelineSize)
       .setMinBytesRcvd(minBytesRcvd)
       .setMaxBytesRcvd(maxBytesRcvd)
-      .setLatestGenerationStamp(newGs);
+      .setLatestGenerationStamp(latestGenerationStamp);
     
-    if (src != null) {
-      proto.setSource(toProto(src));
+    if (source != null) {
+      proto.setSource(toProto(source));
     }
 
     send(out, Op.WRITE_BLOCK, proto.build());
   }
 
-  /** Send {@link Op#TRANSFER_BLOCK} */
-  public static void opTransferBlock(DataOutputStream out, ExtendedBlock blk,
-      String client, DatanodeInfo[] targets,
-      Token<BlockTokenIdentifier> blockToken) throws IOException {
+  @Override
+  public void transferBlock(final ExtendedBlock blk,
+      final Token<BlockTokenIdentifier> blockToken,
+      final String clientName,
+      final DatanodeInfo[] targets) throws IOException {
     
     OpTransferBlockProto proto = OpTransferBlockProto.newBuilder()
       .setHeader(DataTransferProtoUtil.buildClientHeader(
-          blk, client, blockToken))
+          blk, clientName, blockToken))
       .addAllTargets(toProtos(targets, 0))
       .build();
 
     send(out, Op.TRANSFER_BLOCK, proto);
   }
 
-  /** Send OP_REPLACE_BLOCK */
-  public static void opReplaceBlock(DataOutputStream out,
-      ExtendedBlock blk, String delHint, DatanodeInfo src,
-      Token<BlockTokenIdentifier> blockToken) throws IOException {
+  @Override
+  public void replaceBlock(final ExtendedBlock blk,
+      final Token<BlockTokenIdentifier> blockToken,
+      final String delHint,
+      final DatanodeInfo source) throws IOException {
     OpReplaceBlockProto proto = OpReplaceBlockProto.newBuilder()
       .setHeader(DataTransferProtoUtil.buildBaseHeader(blk, blockToken))
       .setDelHint(delHint)
-      .setSource(toProto(src))
+      .setSource(toProto(source))
       .build();
     
     send(out, Op.REPLACE_BLOCK, proto);
   }
 
-  /** Send OP_COPY_BLOCK */
-  public static void opCopyBlock(DataOutputStream out, ExtendedBlock blk,
-      Token<BlockTokenIdentifier> blockToken)
-      throws IOException {
+  @Override
+  public void copyBlock(final ExtendedBlock blk,
+      final Token<BlockTokenIdentifier> blockToken) throws IOException {
     OpCopyBlockProto proto = OpCopyBlockProto.newBuilder()
       .setHeader(DataTransferProtoUtil.buildBaseHeader(blk, blockToken))
       .build();
@@ -139,10 +152,9 @@ public class Sender {
     send(out, Op.COPY_BLOCK, proto);
   }
 
-  /** Send OP_BLOCK_CHECKSUM */
-  public static void opBlockChecksum(DataOutputStream out, ExtendedBlock blk,
-      Token<BlockTokenIdentifier> blockToken)
-      throws IOException {
+  @Override
+  public void blockChecksum(final ExtendedBlock blk,
+      final Token<BlockTokenIdentifier> blockToken) throws IOException {
     OpBlockChecksumProto proto = OpBlockChecksumProto.newBuilder()
       .setHeader(DataTransferProtoUtil.buildBaseHeader(blk, blockToken))
       .build();

Modified: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java?rev=1139124&r1=1139123&r2=1139124&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java (original)
+++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java Thu Jun 23 23:57:18 2011
@@ -348,8 +348,8 @@ public class Balancer {
     private void sendRequest(DataOutputStream out) throws IOException {
       final ExtendedBlock eb = new ExtendedBlock(nnc.blockpoolID, block.getBlock());
       final Token<BlockTokenIdentifier> accessToken = nnc.getAccessToken(eb);
-      Sender.opReplaceBlock(out, eb, source.getStorageID(), 
-          proxySource.getDatanode(), accessToken);
+      new Sender(out).replaceBlock(eb, accessToken,
+          source.getStorageID(), proxySource.getDatanode());
     }
     
     /* Receive a block copy response from the input stream */ 

Modified: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=1139124&r1=1139123&r2=1139124&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original)
+++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java Thu Jun 23 23:57:18 2011
@@ -1977,8 +1977,8 @@ public class DataNode extends Configured
               EnumSet.of(BlockTokenSecretManager.AccessMode.WRITE));
         }
 
-        Sender.opWriteBlock(out,
-            b, 0, stage, 0, 0, 0, clientname, srcNode, targets, accessToken);
+        new Sender(out).writeBlock(b, accessToken, clientname, targets, srcNode,
+            stage, 0, 0, 0, 0);
 
         // send data & checksum
         blockSender.sendBlock(out, baseStream, null);

Modified: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java?rev=1139124&r1=1139123&r2=1139124&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java (original)
+++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java Thu Jun 23 23:57:18 2011
@@ -85,7 +85,10 @@ class DataXceiver extends Receiver imple
   private long opStartTime; //the start time of receiving an Op
   
   public DataXceiver(Socket s, DataNode datanode, 
-      DataXceiverServer dataXceiverServer) {
+      DataXceiverServer dataXceiverServer) throws IOException {
+    super(new DataInputStream(new BufferedInputStream(
+        NetUtils.getInputStream(s), FSConstants.SMALL_BUFFER_SIZE)));
+
     this.s = s;
     this.isLocal = s.getInetAddress().equals(s.getLocalAddress());
     this.datanode = datanode;
@@ -127,13 +130,9 @@ class DataXceiver extends Receiver imple
   public void run() {
     updateCurrentThreadName("Waiting for operation");
 
-    DataInputStream in=null; 
     int opsProcessed = 0;
     Op op = null;
     try {
-      in = new DataInputStream(
-          new BufferedInputStream(NetUtils.getInputStream(s), 
-                                  SMALL_BUFFER_SIZE));
       int stdTimeout = s.getSoTimeout();
 
       // We process requests in a loop, and stay around for a short timeout.
@@ -145,7 +144,7 @@ class DataXceiver extends Receiver imple
             assert socketKeepaliveTimeout > 0;
             s.setSoTimeout(socketKeepaliveTimeout);
           }
-          op = readOp(in);
+          op = readOp();
         } catch (InterruptedIOException ignored) {
           // Time out while we wait for client rpc
           break;
@@ -176,7 +175,7 @@ class DataXceiver extends Receiver imple
         }
 
         opStartTime = now();
-        processOp(op, in);
+        processOp(op);
         ++opsProcessed;
       } while (!s.isClosed() && socketKeepaliveTimeout > 0);
     } catch (Throwable t) {
@@ -196,13 +195,12 @@ class DataXceiver extends Receiver imple
     }
   }
 
-  /**
-   * Read a block from the disk.
-   */
   @Override
-  protected void opReadBlock(DataInputStream in, ExtendedBlock block,
-      long startOffset, long length, String clientName,
-      Token<BlockTokenIdentifier> blockToken) throws IOException {
+  public void readBlock(final ExtendedBlock block,
+      final Token<BlockTokenIdentifier> blockToken,
+      final String clientName,
+      final long blockOffset,
+      final long length) throws IOException {
     OutputStream baseStream = NetUtils.getOutputStream(s, 
         datanode.socketWriteTimeout);
     DataOutputStream out = new DataOutputStream(
@@ -225,7 +223,7 @@ class DataXceiver extends Receiver imple
     updateCurrentThreadName("Sending block " + block);
     try {
       try {
-        blockSender = new BlockSender(block, startOffset, length,
+        blockSender = new BlockSender(block, blockOffset, length,
             true, true, false, datanode, clientTraceFmt);
       } catch(IOException e) {
         LOG.info("opReadBlock " + block + " received exception " + e);
@@ -284,16 +282,17 @@ class DataXceiver extends Receiver imple
     datanode.metrics.incrReadsFromClient(isLocal);
   }
 
-  /**
-   * Write a block to disk.
-   */
   @Override
-  protected void opWriteBlock(final DataInputStream in, final ExtendedBlock block, 
-      final int pipelineSize, final BlockConstructionStage stage,
-      final long newGs, final long minBytesRcvd, final long maxBytesRcvd,
-      final String clientname, final DatanodeInfo srcDataNode,
-      final DatanodeInfo[] targets, final Token<BlockTokenIdentifier> blockToken
-      ) throws IOException {
+  public void writeBlock(final ExtendedBlock block,
+      final Token<BlockTokenIdentifier> blockToken,
+      final String clientname,
+      final DatanodeInfo[] targets,
+      final DatanodeInfo srcDataNode,
+      final BlockConstructionStage stage,
+      final int pipelineSize,
+      final long minBytesRcvd,
+      final long maxBytesRcvd,
+      final long latestGenerationStamp) throws IOException {
     updateCurrentThreadName("Receiving block " + block + " client=" + clientname);
     final boolean isDatanode = clientname.length() == 0;
     final boolean isClient = !isDatanode;
@@ -308,7 +307,7 @@ class DataXceiver extends Receiver imple
 
     if (LOG.isDebugEnabled()) {
       LOG.debug("opWriteBlock: stage=" + stage + ", clientname=" + clientname 
-      		+ "\n  block  =" + block + ", newGs=" + newGs
+      		+ "\n  block  =" + block + ", newGs=" + latestGenerationStamp
       		+ ", bytesRcvd=[" + minBytesRcvd + ", " + maxBytesRcvd + "]"
           + "\n  targets=" + Arrays.asList(targets)
           + "; pipelineSize=" + pipelineSize + ", srcDataNode=" + srcDataNode
@@ -351,10 +350,10 @@ class DataXceiver extends Receiver imple
         blockReceiver = new BlockReceiver(block, in, 
             s.getRemoteSocketAddress().toString(),
             s.getLocalSocketAddress().toString(),
-            stage, newGs, minBytesRcvd, maxBytesRcvd,
+            stage, latestGenerationStamp, minBytesRcvd, maxBytesRcvd,
             clientname, srcDataNode, datanode);
       } else {
-        datanode.data.recoverClose(block, newGs, minBytesRcvd);
+        datanode.data.recoverClose(block, latestGenerationStamp, minBytesRcvd);
       }
 
       //
@@ -380,9 +379,9 @@ class DataXceiver extends Receiver imple
                          SMALL_BUFFER_SIZE));
           mirrorIn = new DataInputStream(NetUtils.getInputStream(mirrorSock));
 
-          Sender.opWriteBlock(mirrorOut, originalBlock,
-              pipelineSize, stage, newGs, minBytesRcvd, maxBytesRcvd, clientname,
-              srcDataNode, targets, blockToken);
+          new Sender(mirrorOut).writeBlock(originalBlock, blockToken,
+              clientname, targets, srcDataNode, stage, pipelineSize,
+              minBytesRcvd, maxBytesRcvd, latestGenerationStamp);
 
           if (blockReceiver != null) { // send checksum header
             blockReceiver.writeChecksumHeader(mirrorOut);
@@ -464,7 +463,7 @@ class DataXceiver extends Receiver imple
       // update its generation stamp
       if (isClient && 
           stage == BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) {
-        block.setGenerationStamp(newGs);
+        block.setGenerationStamp(latestGenerationStamp);
         block.setNumBytes(minBytesRcvd);
       }
       
@@ -499,10 +498,10 @@ class DataXceiver extends Receiver imple
   }
 
   @Override
-  protected void opTransferBlock(final DataInputStream in,
-      final ExtendedBlock blk, final String client,
-      final DatanodeInfo[] targets,
-      final Token<BlockTokenIdentifier> blockToken) throws IOException {
+  public void transferBlock(final ExtendedBlock blk,
+      final Token<BlockTokenIdentifier> blockToken,
+      final String clientName,
+      final DatanodeInfo[] targets) throws IOException {
     checkAccess(null, true, blk, blockToken,
         Op.TRANSFER_BLOCK, BlockTokenSecretManager.AccessMode.COPY);
 
@@ -511,19 +510,16 @@ class DataXceiver extends Receiver imple
     final DataOutputStream out = new DataOutputStream(
         NetUtils.getOutputStream(s, datanode.socketWriteTimeout));
     try {
-      datanode.transferReplicaForPipelineRecovery(blk, targets, client);
+      datanode.transferReplicaForPipelineRecovery(blk, targets, clientName);
       writeResponse(Status.SUCCESS, out);
     } finally {
       IOUtils.closeStream(out);
     }
   }
   
-  /**
-   * Get block checksum (MD5 of CRC32).
-   */
   @Override
-  protected void opBlockChecksum(DataInputStream in, ExtendedBlock block,
-      Token<BlockTokenIdentifier> blockToken) throws IOException {
+  public void blockChecksum(final ExtendedBlock block,
+      final Token<BlockTokenIdentifier> blockToken) throws IOException {
     final DataOutputStream out = new DataOutputStream(
         NetUtils.getOutputStream(s, datanode.socketWriteTimeout));
     checkAccess(out, true, block, blockToken,
@@ -572,12 +568,9 @@ class DataXceiver extends Receiver imple
     datanode.metrics.addBlockChecksumOp(elapsed());
   }
 
-  /**
-   * Read a block from the disk and then sends it to a destination.
-   */
   @Override
-  protected void opCopyBlock(DataInputStream in, ExtendedBlock block,
-      Token<BlockTokenIdentifier> blockToken) throws IOException {
+  public void copyBlock(final ExtendedBlock block,
+      final Token<BlockTokenIdentifier> blockToken) throws IOException {
     updateCurrentThreadName("Copying block " + block);
     // Read in the header
     if (datanode.isBlockTokenEnabled) {
@@ -647,15 +640,12 @@ class DataXceiver extends Receiver imple
     datanode.metrics.addCopyBlockOp(elapsed());
   }
 
-  /**
-   * Receive a block and write it to disk, it then notifies the namenode to
-   * remove the copy from the source.
-   */
   @Override
-  protected void opReplaceBlock(DataInputStream in,
-      ExtendedBlock block, String sourceID, DatanodeInfo proxySource,
-      Token<BlockTokenIdentifier> blockToken) throws IOException {
-    updateCurrentThreadName("Replacing block " + block + " from " + sourceID);
+  public void replaceBlock(final ExtendedBlock block,
+      final Token<BlockTokenIdentifier> blockToken,
+      final String delHint,
+      final DatanodeInfo proxySource) throws IOException {
+    updateCurrentThreadName("Replacing block " + block + " from " + delHint);
 
     /* read header */
     block.setNumBytes(dataXceiverServer.estimateBlockSize);
@@ -699,7 +689,7 @@ class DataXceiver extends Receiver imple
                      new BufferedOutputStream(baseStream, SMALL_BUFFER_SIZE));
 
       /* send request to the proxy */
-      Sender.opCopyBlock(proxyOut, block, blockToken);
+      new Sender(proxyOut).copyBlock(block, blockToken);
 
       // receive the response from the proxy
       proxyReply = new DataInputStream(new BufferedInputStream(
@@ -727,7 +717,7 @@ class DataXceiver extends Receiver imple
           dataXceiverServer.balanceThrottler, null);
                     
       // notify name node
-      datanode.notifyNamenodeReceivedBlock(block, sourceID);
+      datanode.notifyNamenodeReceivedBlock(block, delHint);
 
       LOG.info("Moved block " + block + 
           " from " + s.getRemoteSocketAddress());

Modified: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java?rev=1139124&r1=1139123&r2=1139124&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java (original)
+++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java Thu Jun 23 23:57:18 2011
@@ -28,12 +28,13 @@ import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.hdfs.server.balancer.Balancer;
 import org.apache.hadoop.hdfs.util.DataTransferThrottler;
+import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.util.Daemon;
 import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
 
 
 /**
@@ -128,15 +129,20 @@ class DataXceiverServer implements Runna
                    DFSConfigKeys.DFS_DATANODE_BALANCE_BANDWIDTHPERSEC_DEFAULT));
   }
 
-  /**
-   */
+  @Override
   public void run() {
     while (datanode.shouldRun) {
       try {
         Socket s = ss.accept();
         s.setTcpNoDelay(true);
-        new Daemon(datanode.threadGroup, 
-            new DataXceiver(s, datanode, this)).start();
+        final DataXceiver exciver;
+        try {
+          exciver = new DataXceiver(s, datanode, this);
+        } catch(IOException e) {
+          IOUtils.closeSocket(s);
+          throw e;
+        }
+        new Daemon(datanode.threadGroup, exciver).start();
       } catch (SocketTimeoutException ignored) {
         // wake up to see if should continue to run
       } catch (IOException ie) {

Modified: hadoop/common/trunk/hdfs/src/test/aop/org/apache/hadoop/hdfs/server/datanode/DataTransferProtocolAspects.aj
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/test/aop/org/apache/hadoop/hdfs/server/datanode/DataTransferProtocolAspects.aj?rev=1139124&r1=1139123&r2=1139124&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/test/aop/org/apache/hadoop/hdfs/server/datanode/DataTransferProtocolAspects.aj (original)
+++ hadoop/common/trunk/hdfs/src/test/aop/org/apache/hadoop/hdfs/server/datanode/DataTransferProtocolAspects.aj Thu Jun 23 23:57:18 2011
@@ -46,7 +46,7 @@ public aspect DataTransferProtocolAspect
   */
 
   pointcut receiverOp(DataXceiver dataxceiver):
-    call(Op Receiver.readOp(DataInputStream)) && target(dataxceiver);
+    call(Op Receiver.readOp()) && target(dataxceiver);
 
   after(DataXceiver dataxceiver) returning(Op op): receiverOp(dataxceiver) {
     LOG.info("FI: receiverOp " + op + ", datanode="

Modified: hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/DFSTestUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/DFSTestUtil.java?rev=1139124&r1=1139123&r2=1139124&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/DFSTestUtil.java (original)
+++ hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/DFSTestUtil.java Thu Jun 23 23:57:18 2011
@@ -683,8 +683,8 @@ public class DFSTestUtil {
     final DataInputStream in = new DataInputStream(NetUtils.getInputStream(s));
 
     // send the request
-    Sender.opTransferBlock(out, b, dfsClient.clientName,
-        new DatanodeInfo[]{datanodes[1]}, new Token<BlockTokenIdentifier>());
+    new Sender(out).transferBlock(b, new Token<BlockTokenIdentifier>(),
+        dfsClient.clientName, new DatanodeInfo[]{datanodes[1]});
     out.flush();
 
     return BlockOpResponseProto.parseDelimitedFrom(in);

Modified: hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestDataTransferProtocol.java?rev=1139124&r1=1139123&r2=1139124&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestDataTransferProtocol.java (original)
+++ hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestDataTransferProtocol.java Thu Jun 23 23:57:18 2011
@@ -72,7 +72,8 @@ public class TestDataTransferProtocol ex
   DatanodeID datanode;
   InetSocketAddress dnAddr;
   ByteArrayOutputStream sendBuf = new ByteArrayOutputStream(128);
-  DataOutputStream sendOut = new DataOutputStream(sendBuf);
+  final DataOutputStream sendOut = new DataOutputStream(sendBuf);
+  final Sender sender = new Sender(sendOut);
   ByteArrayOutputStream recvBuf = new ByteArrayOutputStream(128);
   DataOutputStream recvOut = new DataOutputStream(recvBuf);
 
@@ -185,9 +186,9 @@ public class TestDataTransferProtocol ex
       String description, Boolean eofExcepted) throws IOException {
     sendBuf.reset();
     recvBuf.reset();
-    Sender.opWriteBlock(sendOut, block, 0,
-        stage, newGS, block.getNumBytes(), block.getNumBytes(), "cl", null,
-        new DatanodeInfo[1], BlockTokenSecretManager.DUMMY_TOKEN);
+    sender.writeBlock(block, BlockTokenSecretManager.DUMMY_TOKEN, "cl",
+        new DatanodeInfo[1], null, stage,
+        0, block.getNumBytes(), block.getNumBytes(), newGS);
     if (eofExcepted) {
       sendResponse(Status.ERROR, null, recvOut);
       sendRecvData(description, true);
@@ -372,10 +373,11 @@ public class TestDataTransferProtocol ex
     
     /* Test OP_WRITE_BLOCK */
     sendBuf.reset();
-    Sender.opWriteBlock(sendOut, 
-        new ExtendedBlock(poolId, newBlockId), 0,
-        BlockConstructionStage.PIPELINE_SETUP_CREATE, 0L, 0L, 0L, "cl", null,
-        new DatanodeInfo[1], BlockTokenSecretManager.DUMMY_TOKEN);
+    sender.writeBlock(new ExtendedBlock(poolId, newBlockId),
+        BlockTokenSecretManager.DUMMY_TOKEN, "cl",
+        new DatanodeInfo[1], null,
+        BlockConstructionStage.PIPELINE_SETUP_CREATE,
+        0, 0L, 0L, 0L);
     sendOut.writeByte((byte)DataChecksum.CHECKSUM_CRC32);
     
     // bad bytes per checksum
@@ -386,10 +388,10 @@ public class TestDataTransferProtocol ex
 
     sendBuf.reset();
     recvBuf.reset();
-    Sender.opWriteBlock(sendOut,
-        new ExtendedBlock(poolId, ++newBlockId), 0,
-        BlockConstructionStage.PIPELINE_SETUP_CREATE, 0L, 0L, 0L, "cl", null,
-        new DatanodeInfo[1], BlockTokenSecretManager.DUMMY_TOKEN);
+    sender.writeBlock(new ExtendedBlock(poolId, ++newBlockId),
+        BlockTokenSecretManager.DUMMY_TOKEN, "cl",
+        new DatanodeInfo[1], null,
+        BlockConstructionStage.PIPELINE_SETUP_CREATE, 0, 0L, 0L, 0L);
     sendOut.writeByte((byte)DataChecksum.CHECKSUM_CRC32);
     sendOut.writeInt(512);
 
@@ -409,10 +411,10 @@ public class TestDataTransferProtocol ex
     // test for writing a valid zero size block
     sendBuf.reset();
     recvBuf.reset();
-    Sender.opWriteBlock(sendOut, 
-        new ExtendedBlock(poolId, ++newBlockId), 0,
-        BlockConstructionStage.PIPELINE_SETUP_CREATE, 0L, 0L, 0L, "cl", null,
-        new DatanodeInfo[1], BlockTokenSecretManager.DUMMY_TOKEN);
+    sender.writeBlock(new ExtendedBlock(poolId, ++newBlockId),
+        BlockTokenSecretManager.DUMMY_TOKEN, "cl",
+        new DatanodeInfo[1], null,
+        BlockConstructionStage.PIPELINE_SETUP_CREATE, 0, 0L, 0L, 0L);
     sendOut.writeByte((byte)DataChecksum.CHECKSUM_CRC32);
     sendOut.writeInt(512);         // checksum size
 
@@ -439,22 +441,22 @@ public class TestDataTransferProtocol ex
     sendBuf.reset();
     recvBuf.reset();
     blk.setBlockId(blkid-1);
-    Sender.opReadBlock(sendOut, blk, 0L, fileLen, "cl",
-          BlockTokenSecretManager.DUMMY_TOKEN);
+    sender.readBlock(blk, BlockTokenSecretManager.DUMMY_TOKEN, "cl",
+        0L, fileLen);
     sendRecvData("Wrong block ID " + newBlockId + " for read", false); 
 
     // negative block start offset -1L
     sendBuf.reset();
     blk.setBlockId(blkid);
-    Sender.opReadBlock(sendOut, blk, -1L, fileLen, "cl",
-          BlockTokenSecretManager.DUMMY_TOKEN);
+    sender.readBlock(blk, BlockTokenSecretManager.DUMMY_TOKEN, "cl",
+        -1L, fileLen);
     sendRecvData("Negative start-offset for read for block " + 
                  firstBlock.getBlockId(), false);
 
     // bad block start offset
     sendBuf.reset();
-    Sender.opReadBlock(sendOut, blk, fileLen, fileLen, "cl",
-          BlockTokenSecretManager.DUMMY_TOKEN);
+    sender.readBlock(blk, BlockTokenSecretManager.DUMMY_TOKEN, "cl",
+        fileLen, fileLen);
     sendRecvData("Wrong start-offset for reading block " +
                  firstBlock.getBlockId(), false);
     
@@ -462,8 +464,8 @@ public class TestDataTransferProtocol ex
     recvBuf.reset();
     sendResponse(Status.SUCCESS, null, recvOut);
     sendBuf.reset();
-    Sender.opReadBlock(sendOut, blk, 0L, 
-        -1 - random.nextInt(oneMil), "cl", BlockTokenSecretManager.DUMMY_TOKEN);
+    sender.readBlock(blk, BlockTokenSecretManager.DUMMY_TOKEN, "cl",
+        0L, -1L-random.nextInt(oneMil));
     sendRecvData("Negative length for reading block " +
                  firstBlock.getBlockId(), false);
     
@@ -471,15 +473,15 @@ public class TestDataTransferProtocol ex
     recvBuf.reset();
     sendResponse(Status.ERROR, null, recvOut);
     sendBuf.reset();
-    Sender.opReadBlock(sendOut, blk, 0L, 
-        fileLen + 1, "cl", BlockTokenSecretManager.DUMMY_TOKEN);
+    sender.readBlock(blk, BlockTokenSecretManager.DUMMY_TOKEN, "cl",
+        0L, fileLen+1);
     sendRecvData("Wrong length for reading block " +
                  firstBlock.getBlockId(), false);
     
     //At the end of all this, read the file to make sure that succeeds finally.
     sendBuf.reset();
-    Sender.opReadBlock(sendOut, blk, 0L, 
-        fileLen, "cl", BlockTokenSecretManager.DUMMY_TOKEN);
+    sender.readBlock(blk, BlockTokenSecretManager.DUMMY_TOKEN, "cl",
+        0L, fileLen);
     readFile(fileSys, file, fileLen);
     } finally {
       cluster.shutdown();

Modified: hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java?rev=1139124&r1=1139123&r2=1139124&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java (original)
+++ hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java Thu Jun 23 23:57:18 2011
@@ -258,8 +258,8 @@ public class TestBlockReplacement extend
     sock.setKeepAlive(true);
     // sendRequest
     DataOutputStream out = new DataOutputStream(sock.getOutputStream());
-    Sender.opReplaceBlock(out, block, source
-        .getStorageID(), sourceProxy, BlockTokenSecretManager.DUMMY_TOKEN);
+    new Sender(out).replaceBlock(block, BlockTokenSecretManager.DUMMY_TOKEN,
+        source.getStorageID(), sourceProxy);
     out.flush();
     // receiveResponse
     DataInputStream reply = new DataInputStream(sock.getInputStream());

Modified: hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java?rev=1139124&r1=1139123&r2=1139124&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java (original)
+++ hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java Thu Jun 23 23:57:18 2011
@@ -140,10 +140,10 @@ public class TestDiskError {
     // write the header.
     DataOutputStream out = new DataOutputStream(s.getOutputStream());
 
-    Sender.opWriteBlock(out, block.getBlock(), 1,
-        BlockConstructionStage.PIPELINE_SETUP_CREATE,
-        0L, 0L, 0L, "", null, new DatanodeInfo[0],
-        BlockTokenSecretManager.DUMMY_TOKEN);
+    new Sender(out).writeBlock(block.getBlock(),
+        BlockTokenSecretManager.DUMMY_TOKEN, "",
+        new DatanodeInfo[0], null,
+        BlockConstructionStage.PIPELINE_SETUP_CREATE, 1, 0L, 0L, 0L);
 
     // write check header
     out.writeByte( 1 );