You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by sz...@apache.org on 2011/06/24 01:57:19 UTC
svn commit: r1139124 - in /hadoop/common/trunk/hdfs: ./
src/java/org/apache/hadoop/hdfs/
src/java/org/apache/hadoop/hdfs/protocol/datatransfer/
src/java/org/apache/hadoop/hdfs/server/balancer/
src/java/org/apache/hadoop/hdfs/server/datanode/ src/test/a...
Author: szetszwo
Date: Thu Jun 23 23:57:18 2011
New Revision: 1139124
URL: http://svn.apache.org/viewvc?rev=1139124&view=rev
Log:
HDFS-2087. Declare methods in DataTransferProtocol interface, and change Sender and Receiver to implement the interface.
Modified:
hadoop/common/trunk/hdfs/CHANGES.txt
hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/BlockReader.java
hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/DFSClient.java
hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/DFSOutputStream.java
hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java
hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java
hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java
hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java
hadoop/common/trunk/hdfs/src/test/aop/org/apache/hadoop/hdfs/server/datanode/DataTransferProtocolAspects.aj
hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/DFSTestUtil.java
hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java
hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java
Modified: hadoop/common/trunk/hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/CHANGES.txt?rev=1139124&r1=1139123&r2=1139124&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/CHANGES.txt (original)
+++ hadoop/common/trunk/hdfs/CHANGES.txt Thu Jun 23 23:57:18 2011
@@ -526,6 +526,9 @@ Trunk (unreleased changes)
HDFS-2092. Remove some object references to Configuration in DFSClient.
(Bharath Mundlapudi via szetszwo)
+ HDFS-2087. Declare methods in DataTransferProtocol interface, and change
+ Sender and Receiver to implement the interface. (szetszwo)
+
OPTIMIZATIONS
HDFS-1458. Improve checkpoint performance by avoiding unnecessary image
Modified: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/BlockReader.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/BlockReader.java?rev=1139124&r1=1139123&r2=1139124&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/BlockReader.java (original)
+++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/BlockReader.java Thu Jun 23 23:57:18 2011
@@ -404,10 +404,9 @@ public class BlockReader extends FSInput
String clientName)
throws IOException {
// in and out will be closed when sock is closed (by the caller)
- Sender.opReadBlock(
- new DataOutputStream(new BufferedOutputStream(
- NetUtils.getOutputStream(sock,HdfsConstants.WRITE_TIMEOUT))),
- block, startOffset, len, clientName, blockToken);
+ final DataOutputStream out = new DataOutputStream(new BufferedOutputStream(
+ NetUtils.getOutputStream(sock, HdfsConstants.WRITE_TIMEOUT)));
+ new Sender(out).readBlock(block, blockToken, clientName, startOffset, len);
//
// Get bytes in block, set streams
Modified: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/DFSClient.java?rev=1139124&r1=1139123&r2=1139124&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/DFSClient.java Thu Jun 23 23:57:18 2011
@@ -1164,7 +1164,7 @@ public class DFSClient implements FSCons
+ Op.BLOCK_CHECKSUM + ", block=" + block);
}
// get block MD5
- Sender.opBlockChecksum(out, block, lb.getBlockToken());
+ new Sender(out).blockChecksum(block, lb.getBlockToken());
final BlockOpResponseProto reply =
BlockOpResponseProto.parseFrom(HdfsProtoUtil.vintPrefixed(in));
Modified: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/DFSOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/DFSOutputStream.java?rev=1139124&r1=1139123&r2=1139124&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/DFSOutputStream.java (original)
+++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/DFSOutputStream.java Thu Jun 23 23:57:18 2011
@@ -846,8 +846,8 @@ class DFSOutputStream extends FSOutputSu
DataNode.SMALL_BUFFER_SIZE));
//send the TRANSFER_BLOCK request
- Sender.opTransferBlock(out, block,
- dfsClient.clientName, targets, blockToken);
+ new Sender(out).transferBlock(block, blockToken, dfsClient.clientName,
+ targets);
//ack
in = new DataInputStream(NetUtils.getInputStream(sock));
@@ -1019,10 +1019,9 @@ class DFSOutputStream extends FSOutputSu
blockReplyStream = new DataInputStream(NetUtils.getInputStream(s));
// send the request
- Sender.opWriteBlock(out, block,
- nodes.length, recoveryFlag ? stage.getRecoveryStage() : stage, newGS,
- block.getNumBytes(), bytesSent, dfsClient.clientName, null, nodes,
- accessToken);
+ new Sender(out).writeBlock(block, accessToken, dfsClient.clientName,
+ nodes, null, recoveryFlag? stage.getRecoveryStage() : stage,
+ nodes.length, block.getNumBytes(), bytesSent, newGS);
checksum.writeHeader(out);
out.flush();
Modified: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java?rev=1139124&r1=1139123&r2=1139124&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java (original)
+++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java Thu Jun 23 23:57:18 2011
@@ -17,10 +17,16 @@
*/
package org.apache.hadoop.hdfs.protocol.datatransfer;
+import java.io.IOException;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.security.token.Token;
/**
* Transfer data to/from datanode using a streaming protocol.
@@ -35,8 +41,101 @@ public interface DataTransferProtocol {
* when protocol changes. It is not very obvious.
*/
/*
- * Version 27:
- * Move DataTransferProtocol and the inner classes to a package.
+ * Version 28:
+ * Declare methods in DataTransferProtocol interface.
+ */
+ public static final int DATA_TRANSFER_VERSION = 28;
+
+ /**
+ * Read a block.
+ *
+ * @param blk the block being read.
+ * @param blockToken security token for accessing the block.
+ * @param clientName client's name.
+ * @param blockOffset offset of the block.
+ * @param length maximum number of bytes for this read.
+ */
+ public void readBlock(final ExtendedBlock blk,
+ final Token<BlockTokenIdentifier> blockToken,
+ final String clientName,
+ final long blockOffset,
+ final long length) throws IOException;
+
+ /**
+ * Write a block to a datanode pipeline.
+ *
+ * @param blk the block being written.
+ * @param blockToken security token for accessing the block.
+ * @param clientName client's name.
+ * @param targets target datanodes in the pipeline.
+ * @param source source datanode.
+ * @param stage pipeline stage.
+ * @param pipelineSize the size of the pipeline.
+ * @param minBytesRcvd minimum number of bytes received.
+ * @param maxBytesRcvd maximum number of bytes received.
+ * @param latestGenerationStamp the latest generation stamp of the block.
+ */
+ public void writeBlock(final ExtendedBlock blk,
+ final Token<BlockTokenIdentifier> blockToken,
+ final String clientName,
+ final DatanodeInfo[] targets,
+ final DatanodeInfo source,
+ final BlockConstructionStage stage,
+ final int pipelineSize,
+ final long minBytesRcvd,
+ final long maxBytesRcvd,
+ final long latestGenerationStamp) throws IOException;
+
+ /**
+ * Transfer a block to another datanode.
+ * The block stage must be
+ * either {@link BlockConstructionStage#TRANSFER_RBW}
+ * or {@link BlockConstructionStage#TRANSFER_FINALIZED}.
+ *
+ * @param blk the block being transferred.
+ * @param blockToken security token for accessing the block.
+ * @param clientName client's name.
+ * @param targets target datanodes.
+ */
+ public void transferBlock(final ExtendedBlock blk,
+ final Token<BlockTokenIdentifier> blockToken,
+ final String clientName,
+ final DatanodeInfo[] targets) throws IOException;
+
+ /**
+ * Receive a block from a source datanode
+ * and then notifies the namenode
+ * to remove the copy from the original datanode.
+ * Note that the source datanode and the original datanode can be different.
+ * It is used for balancing purpose.
+ *
+ * @param blk the block being replaced.
+ * @param blockToken security token for accessing the block.
+ * @param delHint the hint for deleting the block in the original datanode.
+ * @param source the source datanode for receiving the block.
+ */
+ public void replaceBlock(final ExtendedBlock blk,
+ final Token<BlockTokenIdentifier> blockToken,
+ final String delHint,
+ final DatanodeInfo source) throws IOException;
+
+ /**
+ * Copy a block.
+ * It is used for balancing purpose.
+ *
+ * @param blk the block being copied.
+ * @param blockToken security token for accessing the block.
+ */
+ public void copyBlock(final ExtendedBlock blk,
+ final Token<BlockTokenIdentifier> blockToken) throws IOException;
+
+ /**
+ * Get block checksum (MD5 of CRC32).
+ *
+ * @param blk a block.
+ * @param blockToken security token for accessing the block.
+ * @throws IOException
*/
- public static final int DATA_TRANSFER_VERSION = 27;
+ public void blockChecksum(final ExtendedBlock blk,
+ final Token<BlockTokenIdentifier> blockToken) throws IOException;
}
Modified: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java?rev=1139124&r1=1139123&r2=1139124&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java (original)
+++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java Thu Jun 23 23:57:18 2011
@@ -27,23 +27,26 @@ import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
-import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpCopyBlockProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReadBlockProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReplaceBlockProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpTransferBlockProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProto;
-import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
-import org.apache.hadoop.security.token.Token;
/** Receiver */
@InterfaceAudience.Private
@InterfaceStability.Evolving
-public abstract class Receiver {
+public abstract class Receiver implements DataTransferProtocol {
+ protected final DataInputStream in;
+
+ /** Create a receiver for DataTransferProtocol with a socket. */
+ protected Receiver(final DataInputStream in) {
+ this.in = in;
+ }
+
/** Read an Op. It also checks protocol version. */
- protected final Op readOp(DataInputStream in) throws IOException {
+ protected final Op readOp() throws IOException {
final short version = in.readShort();
if (version != DataTransferProtocol.DATA_TRANSFER_VERSION) {
throw new IOException( "Version Mismatch (Expected: " +
@@ -54,11 +57,10 @@ public abstract class Receiver {
}
/** Process op by the corresponding method. */
- protected final void processOp(Op op, DataInputStream in
- ) throws IOException {
+ protected final void processOp(Op op) throws IOException {
switch(op) {
case READ_BLOCK:
- opReadBlock(in);
+ opReadBlock();
break;
case WRITE_BLOCK:
opWriteBlock(in);
@@ -81,121 +83,60 @@ public abstract class Receiver {
}
/** Receive OP_READ_BLOCK */
- private void opReadBlock(DataInputStream in) throws IOException {
+ private void opReadBlock() throws IOException {
OpReadBlockProto proto = OpReadBlockProto.parseFrom(vintPrefixed(in));
-
- ExtendedBlock b = fromProto(
- proto.getHeader().getBaseHeader().getBlock());
- Token<BlockTokenIdentifier> token = fromProto(
- proto.getHeader().getBaseHeader().getToken());
-
- opReadBlock(in, b, proto.getOffset(), proto.getLen(),
- proto.getHeader().getClientName(), token);
- }
- /**
- * Abstract OP_READ_BLOCK method. Read a block.
- */
- protected abstract void opReadBlock(DataInputStream in, ExtendedBlock blk,
- long offset, long length, String client,
- Token<BlockTokenIdentifier> blockToken) throws IOException;
+ readBlock(fromProto(proto.getHeader().getBaseHeader().getBlock()),
+ fromProto(proto.getHeader().getBaseHeader().getToken()),
+ proto.getHeader().getClientName(),
+ proto.getOffset(),
+ proto.getLen());
+ }
/** Receive OP_WRITE_BLOCK */
private void opWriteBlock(DataInputStream in) throws IOException {
final OpWriteBlockProto proto = OpWriteBlockProto.parseFrom(vintPrefixed(in));
- opWriteBlock(in,
- fromProto(proto.getHeader().getBaseHeader().getBlock()),
- proto.getPipelineSize(),
- fromProto(proto.getStage()),
- proto.getLatestGenerationStamp(),
- proto.getMinBytesRcvd(), proto.getMaxBytesRcvd(),
+ writeBlock(fromProto(proto.getHeader().getBaseHeader().getBlock()),
+ fromProto(proto.getHeader().getBaseHeader().getToken()),
proto.getHeader().getClientName(),
- fromProto(proto.getSource()),
fromProtos(proto.getTargetsList()),
- fromProto(proto.getHeader().getBaseHeader().getToken()));
+ fromProto(proto.getSource()),
+ fromProto(proto.getStage()),
+ proto.getPipelineSize(),
+ proto.getMinBytesRcvd(), proto.getMaxBytesRcvd(),
+ proto.getLatestGenerationStamp());
}
- /**
- * Abstract OP_WRITE_BLOCK method.
- * Write a block.
- */
- protected abstract void opWriteBlock(DataInputStream in, ExtendedBlock blk,
- int pipelineSize, BlockConstructionStage stage, long newGs,
- long minBytesRcvd, long maxBytesRcvd, String client, DatanodeInfo src,
- DatanodeInfo[] targets, Token<BlockTokenIdentifier> blockToken)
- throws IOException;
-
/** Receive {@link Op#TRANSFER_BLOCK} */
private void opTransferBlock(DataInputStream in) throws IOException {
final OpTransferBlockProto proto =
OpTransferBlockProto.parseFrom(vintPrefixed(in));
-
- opTransferBlock(in,
- fromProto(proto.getHeader().getBaseHeader().getBlock()),
+ transferBlock(fromProto(proto.getHeader().getBaseHeader().getBlock()),
+ fromProto(proto.getHeader().getBaseHeader().getToken()),
proto.getHeader().getClientName(),
- fromProtos(proto.getTargetsList()),
- fromProto(proto.getHeader().getBaseHeader().getToken()));
+ fromProtos(proto.getTargetsList()));
}
- /**
- * Abstract {@link Op#TRANSFER_BLOCK} method.
- * For {@link BlockConstructionStage#TRANSFER_RBW}
- * or {@link BlockConstructionStage#TRANSFER_FINALIZED}.
- */
- protected abstract void opTransferBlock(DataInputStream in, ExtendedBlock blk,
- String client, DatanodeInfo[] targets,
- Token<BlockTokenIdentifier> blockToken)
- throws IOException;
-
/** Receive OP_REPLACE_BLOCK */
private void opReplaceBlock(DataInputStream in) throws IOException {
OpReplaceBlockProto proto = OpReplaceBlockProto.parseFrom(vintPrefixed(in));
-
- opReplaceBlock(in,
- fromProto(proto.getHeader().getBlock()),
+ replaceBlock(fromProto(proto.getHeader().getBlock()),
+ fromProto(proto.getHeader().getToken()),
proto.getDelHint(),
- fromProto(proto.getSource()),
- fromProto(proto.getHeader().getToken()));
+ fromProto(proto.getSource()));
}
- /**
- * Abstract OP_REPLACE_BLOCK method.
- * It is used for balancing purpose; send to a destination
- */
- protected abstract void opReplaceBlock(DataInputStream in,
- ExtendedBlock blk, String delHint, DatanodeInfo src,
- Token<BlockTokenIdentifier> blockToken) throws IOException;
-
/** Receive OP_COPY_BLOCK */
private void opCopyBlock(DataInputStream in) throws IOException {
OpCopyBlockProto proto = OpCopyBlockProto.parseFrom(vintPrefixed(in));
-
- opCopyBlock(in,
- fromProto(proto.getHeader().getBlock()),
+ copyBlock(fromProto(proto.getHeader().getBlock()),
fromProto(proto.getHeader().getToken()));
}
- /**
- * Abstract OP_COPY_BLOCK method. It is used for balancing purpose; send to
- * a proxy source.
- */
- protected abstract void opCopyBlock(DataInputStream in, ExtendedBlock blk,
- Token<BlockTokenIdentifier> blockToken)
- throws IOException;
-
/** Receive OP_BLOCK_CHECKSUM */
private void opBlockChecksum(DataInputStream in) throws IOException {
OpBlockChecksumProto proto = OpBlockChecksumProto.parseFrom(vintPrefixed(in));
- opBlockChecksum(in,
- fromProto(proto.getHeader().getBlock()),
+ blockChecksum(fromProto(proto.getHeader().getBlock()),
fromProto(proto.getHeader().getToken()));
}
-
- /**
- * Abstract OP_BLOCK_CHECKSUM method.
- * Get the checksum of a block
- */
- protected abstract void opBlockChecksum(DataInputStream in,
- ExtendedBlock blk, Token<BlockTokenIdentifier> blockToken)
- throws IOException;
}
Modified: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java?rev=1139124&r1=1139123&r2=1139124&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java (original)
+++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java Thu Jun 23 23:57:18 2011
@@ -44,7 +44,14 @@ import com.google.protobuf.Message;
/** Sender */
@InterfaceAudience.Private
@InterfaceStability.Evolving
-public class Sender {
+public class Sender implements DataTransferProtocol {
+ private final DataOutputStream out;
+
+ /** Create a sender for DataTransferProtocol with a output stream. */
+ public Sender(final DataOutputStream out) {
+ this.out = out;
+ }
+
/** Initialize a operation. */
private static void op(final DataOutput out, final Op op
) throws IOException {
@@ -59,79 +66,85 @@ public class Sender {
out.flush();
}
- /** Send OP_READ_BLOCK */
- public static void opReadBlock(DataOutputStream out, ExtendedBlock blk,
- long blockOffset, long blockLen, String clientName,
- Token<BlockTokenIdentifier> blockToken)
- throws IOException {
+ @Override
+ public void readBlock(final ExtendedBlock blk,
+ final Token<BlockTokenIdentifier> blockToken,
+ final String clientName,
+ final long blockOffset,
+ final long length) throws IOException {
OpReadBlockProto proto = OpReadBlockProto.newBuilder()
.setHeader(DataTransferProtoUtil.buildClientHeader(blk, clientName, blockToken))
.setOffset(blockOffset)
- .setLen(blockLen)
+ .setLen(length)
.build();
send(out, Op.READ_BLOCK, proto);
}
- /** Send OP_WRITE_BLOCK */
- public static void opWriteBlock(DataOutputStream out, ExtendedBlock blk,
- int pipelineSize, BlockConstructionStage stage, long newGs,
- long minBytesRcvd, long maxBytesRcvd, String client, DatanodeInfo src,
- DatanodeInfo[] targets, Token<BlockTokenIdentifier> blockToken)
- throws IOException {
- ClientOperationHeaderProto header = DataTransferProtoUtil.buildClientHeader(blk, client,
- blockToken);
+ @Override
+ public void writeBlock(final ExtendedBlock blk,
+ final Token<BlockTokenIdentifier> blockToken,
+ final String clientName,
+ final DatanodeInfo[] targets,
+ final DatanodeInfo source,
+ final BlockConstructionStage stage,
+ final int pipelineSize,
+ final long minBytesRcvd,
+ final long maxBytesRcvd,
+ final long latestGenerationStamp) throws IOException {
+ ClientOperationHeaderProto header = DataTransferProtoUtil.buildClientHeader(
+ blk, clientName, blockToken);
OpWriteBlockProto.Builder proto = OpWriteBlockProto.newBuilder()
.setHeader(header)
- .addAllTargets(
- toProtos(targets, 1))
+ .addAllTargets(toProtos(targets, 1))
.setStage(toProto(stage))
.setPipelineSize(pipelineSize)
.setMinBytesRcvd(minBytesRcvd)
.setMaxBytesRcvd(maxBytesRcvd)
- .setLatestGenerationStamp(newGs);
+ .setLatestGenerationStamp(latestGenerationStamp);
- if (src != null) {
- proto.setSource(toProto(src));
+ if (source != null) {
+ proto.setSource(toProto(source));
}
send(out, Op.WRITE_BLOCK, proto.build());
}
- /** Send {@link Op#TRANSFER_BLOCK} */
- public static void opTransferBlock(DataOutputStream out, ExtendedBlock blk,
- String client, DatanodeInfo[] targets,
- Token<BlockTokenIdentifier> blockToken) throws IOException {
+ @Override
+ public void transferBlock(final ExtendedBlock blk,
+ final Token<BlockTokenIdentifier> blockToken,
+ final String clientName,
+ final DatanodeInfo[] targets) throws IOException {
OpTransferBlockProto proto = OpTransferBlockProto.newBuilder()
.setHeader(DataTransferProtoUtil.buildClientHeader(
- blk, client, blockToken))
+ blk, clientName, blockToken))
.addAllTargets(toProtos(targets, 0))
.build();
send(out, Op.TRANSFER_BLOCK, proto);
}
- /** Send OP_REPLACE_BLOCK */
- public static void opReplaceBlock(DataOutputStream out,
- ExtendedBlock blk, String delHint, DatanodeInfo src,
- Token<BlockTokenIdentifier> blockToken) throws IOException {
+ @Override
+ public void replaceBlock(final ExtendedBlock blk,
+ final Token<BlockTokenIdentifier> blockToken,
+ final String delHint,
+ final DatanodeInfo source) throws IOException {
OpReplaceBlockProto proto = OpReplaceBlockProto.newBuilder()
.setHeader(DataTransferProtoUtil.buildBaseHeader(blk, blockToken))
.setDelHint(delHint)
- .setSource(toProto(src))
+ .setSource(toProto(source))
.build();
send(out, Op.REPLACE_BLOCK, proto);
}
- /** Send OP_COPY_BLOCK */
- public static void opCopyBlock(DataOutputStream out, ExtendedBlock blk,
- Token<BlockTokenIdentifier> blockToken)
- throws IOException {
+ @Override
+ public void copyBlock(final ExtendedBlock blk,
+ final Token<BlockTokenIdentifier> blockToken) throws IOException {
OpCopyBlockProto proto = OpCopyBlockProto.newBuilder()
.setHeader(DataTransferProtoUtil.buildBaseHeader(blk, blockToken))
.build();
@@ -139,10 +152,9 @@ public class Sender {
send(out, Op.COPY_BLOCK, proto);
}
- /** Send OP_BLOCK_CHECKSUM */
- public static void opBlockChecksum(DataOutputStream out, ExtendedBlock blk,
- Token<BlockTokenIdentifier> blockToken)
- throws IOException {
+ @Override
+ public void blockChecksum(final ExtendedBlock blk,
+ final Token<BlockTokenIdentifier> blockToken) throws IOException {
OpBlockChecksumProto proto = OpBlockChecksumProto.newBuilder()
.setHeader(DataTransferProtoUtil.buildBaseHeader(blk, blockToken))
.build();
Modified: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java?rev=1139124&r1=1139123&r2=1139124&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java (original)
+++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java Thu Jun 23 23:57:18 2011
@@ -348,8 +348,8 @@ public class Balancer {
private void sendRequest(DataOutputStream out) throws IOException {
final ExtendedBlock eb = new ExtendedBlock(nnc.blockpoolID, block.getBlock());
final Token<BlockTokenIdentifier> accessToken = nnc.getAccessToken(eb);
- Sender.opReplaceBlock(out, eb, source.getStorageID(),
- proxySource.getDatanode(), accessToken);
+ new Sender(out).replaceBlock(eb, accessToken,
+ source.getStorageID(), proxySource.getDatanode());
}
/* Receive a block copy response from the input stream */
Modified: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=1139124&r1=1139123&r2=1139124&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original)
+++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java Thu Jun 23 23:57:18 2011
@@ -1977,8 +1977,8 @@ public class DataNode extends Configured
EnumSet.of(BlockTokenSecretManager.AccessMode.WRITE));
}
- Sender.opWriteBlock(out,
- b, 0, stage, 0, 0, 0, clientname, srcNode, targets, accessToken);
+ new Sender(out).writeBlock(b, accessToken, clientname, targets, srcNode,
+ stage, 0, 0, 0, 0);
// send data & checksum
blockSender.sendBlock(out, baseStream, null);
Modified: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java?rev=1139124&r1=1139123&r2=1139124&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java (original)
+++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java Thu Jun 23 23:57:18 2011
@@ -85,7 +85,10 @@ class DataXceiver extends Receiver imple
private long opStartTime; //the start time of receiving an Op
public DataXceiver(Socket s, DataNode datanode,
- DataXceiverServer dataXceiverServer) {
+ DataXceiverServer dataXceiverServer) throws IOException {
+ super(new DataInputStream(new BufferedInputStream(
+ NetUtils.getInputStream(s), FSConstants.SMALL_BUFFER_SIZE)));
+
this.s = s;
this.isLocal = s.getInetAddress().equals(s.getLocalAddress());
this.datanode = datanode;
@@ -127,13 +130,9 @@ class DataXceiver extends Receiver imple
public void run() {
updateCurrentThreadName("Waiting for operation");
- DataInputStream in=null;
int opsProcessed = 0;
Op op = null;
try {
- in = new DataInputStream(
- new BufferedInputStream(NetUtils.getInputStream(s),
- SMALL_BUFFER_SIZE));
int stdTimeout = s.getSoTimeout();
// We process requests in a loop, and stay around for a short timeout.
@@ -145,7 +144,7 @@ class DataXceiver extends Receiver imple
assert socketKeepaliveTimeout > 0;
s.setSoTimeout(socketKeepaliveTimeout);
}
- op = readOp(in);
+ op = readOp();
} catch (InterruptedIOException ignored) {
// Time out while we wait for client rpc
break;
@@ -176,7 +175,7 @@ class DataXceiver extends Receiver imple
}
opStartTime = now();
- processOp(op, in);
+ processOp(op);
++opsProcessed;
} while (!s.isClosed() && socketKeepaliveTimeout > 0);
} catch (Throwable t) {
@@ -196,13 +195,12 @@ class DataXceiver extends Receiver imple
}
}
- /**
- * Read a block from the disk.
- */
@Override
- protected void opReadBlock(DataInputStream in, ExtendedBlock block,
- long startOffset, long length, String clientName,
- Token<BlockTokenIdentifier> blockToken) throws IOException {
+ public void readBlock(final ExtendedBlock block,
+ final Token<BlockTokenIdentifier> blockToken,
+ final String clientName,
+ final long blockOffset,
+ final long length) throws IOException {
OutputStream baseStream = NetUtils.getOutputStream(s,
datanode.socketWriteTimeout);
DataOutputStream out = new DataOutputStream(
@@ -225,7 +223,7 @@ class DataXceiver extends Receiver imple
updateCurrentThreadName("Sending block " + block);
try {
try {
- blockSender = new BlockSender(block, startOffset, length,
+ blockSender = new BlockSender(block, blockOffset, length,
true, true, false, datanode, clientTraceFmt);
} catch(IOException e) {
LOG.info("opReadBlock " + block + " received exception " + e);
@@ -284,16 +282,17 @@ class DataXceiver extends Receiver imple
datanode.metrics.incrReadsFromClient(isLocal);
}
- /**
- * Write a block to disk.
- */
@Override
- protected void opWriteBlock(final DataInputStream in, final ExtendedBlock block,
- final int pipelineSize, final BlockConstructionStage stage,
- final long newGs, final long minBytesRcvd, final long maxBytesRcvd,
- final String clientname, final DatanodeInfo srcDataNode,
- final DatanodeInfo[] targets, final Token<BlockTokenIdentifier> blockToken
- ) throws IOException {
+ public void writeBlock(final ExtendedBlock block,
+ final Token<BlockTokenIdentifier> blockToken,
+ final String clientname,
+ final DatanodeInfo[] targets,
+ final DatanodeInfo srcDataNode,
+ final BlockConstructionStage stage,
+ final int pipelineSize,
+ final long minBytesRcvd,
+ final long maxBytesRcvd,
+ final long latestGenerationStamp) throws IOException {
updateCurrentThreadName("Receiving block " + block + " client=" + clientname);
final boolean isDatanode = clientname.length() == 0;
final boolean isClient = !isDatanode;
@@ -308,7 +307,7 @@ class DataXceiver extends Receiver imple
if (LOG.isDebugEnabled()) {
LOG.debug("opWriteBlock: stage=" + stage + ", clientname=" + clientname
- + "\n block =" + block + ", newGs=" + newGs
+ + "\n block =" + block + ", newGs=" + latestGenerationStamp
+ ", bytesRcvd=[" + minBytesRcvd + ", " + maxBytesRcvd + "]"
+ "\n targets=" + Arrays.asList(targets)
+ "; pipelineSize=" + pipelineSize + ", srcDataNode=" + srcDataNode
@@ -351,10 +350,10 @@ class DataXceiver extends Receiver imple
blockReceiver = new BlockReceiver(block, in,
s.getRemoteSocketAddress().toString(),
s.getLocalSocketAddress().toString(),
- stage, newGs, minBytesRcvd, maxBytesRcvd,
+ stage, latestGenerationStamp, minBytesRcvd, maxBytesRcvd,
clientname, srcDataNode, datanode);
} else {
- datanode.data.recoverClose(block, newGs, minBytesRcvd);
+ datanode.data.recoverClose(block, latestGenerationStamp, minBytesRcvd);
}
//
@@ -380,9 +379,9 @@ class DataXceiver extends Receiver imple
SMALL_BUFFER_SIZE));
mirrorIn = new DataInputStream(NetUtils.getInputStream(mirrorSock));
- Sender.opWriteBlock(mirrorOut, originalBlock,
- pipelineSize, stage, newGs, minBytesRcvd, maxBytesRcvd, clientname,
- srcDataNode, targets, blockToken);
+ new Sender(mirrorOut).writeBlock(originalBlock, blockToken,
+ clientname, targets, srcDataNode, stage, pipelineSize,
+ minBytesRcvd, maxBytesRcvd, latestGenerationStamp);
if (blockReceiver != null) { // send checksum header
blockReceiver.writeChecksumHeader(mirrorOut);
@@ -464,7 +463,7 @@ class DataXceiver extends Receiver imple
// update its generation stamp
if (isClient &&
stage == BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) {
- block.setGenerationStamp(newGs);
+ block.setGenerationStamp(latestGenerationStamp);
block.setNumBytes(minBytesRcvd);
}
@@ -499,10 +498,10 @@ class DataXceiver extends Receiver imple
}
@Override
- protected void opTransferBlock(final DataInputStream in,
- final ExtendedBlock blk, final String client,
- final DatanodeInfo[] targets,
- final Token<BlockTokenIdentifier> blockToken) throws IOException {
+ public void transferBlock(final ExtendedBlock blk,
+ final Token<BlockTokenIdentifier> blockToken,
+ final String clientName,
+ final DatanodeInfo[] targets) throws IOException {
checkAccess(null, true, blk, blockToken,
Op.TRANSFER_BLOCK, BlockTokenSecretManager.AccessMode.COPY);
@@ -511,19 +510,16 @@ class DataXceiver extends Receiver imple
final DataOutputStream out = new DataOutputStream(
NetUtils.getOutputStream(s, datanode.socketWriteTimeout));
try {
- datanode.transferReplicaForPipelineRecovery(blk, targets, client);
+ datanode.transferReplicaForPipelineRecovery(blk, targets, clientName);
writeResponse(Status.SUCCESS, out);
} finally {
IOUtils.closeStream(out);
}
}
- /**
- * Get block checksum (MD5 of CRC32).
- */
@Override
- protected void opBlockChecksum(DataInputStream in, ExtendedBlock block,
- Token<BlockTokenIdentifier> blockToken) throws IOException {
+ public void blockChecksum(final ExtendedBlock block,
+ final Token<BlockTokenIdentifier> blockToken) throws IOException {
final DataOutputStream out = new DataOutputStream(
NetUtils.getOutputStream(s, datanode.socketWriteTimeout));
checkAccess(out, true, block, blockToken,
@@ -572,12 +568,9 @@ class DataXceiver extends Receiver imple
datanode.metrics.addBlockChecksumOp(elapsed());
}
- /**
- * Read a block from the disk and then sends it to a destination.
- */
@Override
- protected void opCopyBlock(DataInputStream in, ExtendedBlock block,
- Token<BlockTokenIdentifier> blockToken) throws IOException {
+ public void copyBlock(final ExtendedBlock block,
+ final Token<BlockTokenIdentifier> blockToken) throws IOException {
updateCurrentThreadName("Copying block " + block);
// Read in the header
if (datanode.isBlockTokenEnabled) {
@@ -647,15 +640,12 @@ class DataXceiver extends Receiver imple
datanode.metrics.addCopyBlockOp(elapsed());
}
- /**
- * Receive a block and write it to disk, it then notifies the namenode to
- * remove the copy from the source.
- */
@Override
- protected void opReplaceBlock(DataInputStream in,
- ExtendedBlock block, String sourceID, DatanodeInfo proxySource,
- Token<BlockTokenIdentifier> blockToken) throws IOException {
- updateCurrentThreadName("Replacing block " + block + " from " + sourceID);
+ public void replaceBlock(final ExtendedBlock block,
+ final Token<BlockTokenIdentifier> blockToken,
+ final String delHint,
+ final DatanodeInfo proxySource) throws IOException {
+ updateCurrentThreadName("Replacing block " + block + " from " + delHint);
/* read header */
block.setNumBytes(dataXceiverServer.estimateBlockSize);
@@ -699,7 +689,7 @@ class DataXceiver extends Receiver imple
new BufferedOutputStream(baseStream, SMALL_BUFFER_SIZE));
/* send request to the proxy */
- Sender.opCopyBlock(proxyOut, block, blockToken);
+ new Sender(proxyOut).copyBlock(block, blockToken);
// receive the response from the proxy
proxyReply = new DataInputStream(new BufferedInputStream(
@@ -727,7 +717,7 @@ class DataXceiver extends Receiver imple
dataXceiverServer.balanceThrottler, null);
// notify name node
- datanode.notifyNamenodeReceivedBlock(block, sourceID);
+ datanode.notifyNamenodeReceivedBlock(block, delHint);
LOG.info("Moved block " + block +
" from " + s.getRemoteSocketAddress());
Modified: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java?rev=1139124&r1=1139123&r2=1139124&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java (original)
+++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java Thu Jun 23 23:57:18 2011
@@ -28,12 +28,13 @@ import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.FSConstants;
import org.apache.hadoop.hdfs.server.balancer.Balancer;
import org.apache.hadoop.hdfs.util.DataTransferThrottler;
+import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
/**
@@ -128,15 +129,20 @@ class DataXceiverServer implements Runna
DFSConfigKeys.DFS_DATANODE_BALANCE_BANDWIDTHPERSEC_DEFAULT));
}
- /**
- */
+ @Override
public void run() {
while (datanode.shouldRun) {
try {
Socket s = ss.accept();
s.setTcpNoDelay(true);
- new Daemon(datanode.threadGroup,
- new DataXceiver(s, datanode, this)).start();
+ final DataXceiver exciver;
+ try {
+ exciver = new DataXceiver(s, datanode, this);
+ } catch(IOException e) {
+ IOUtils.closeSocket(s);
+ throw e;
+ }
+ new Daemon(datanode.threadGroup, exciver).start();
} catch (SocketTimeoutException ignored) {
// wake up to see if should continue to run
} catch (IOException ie) {
Modified: hadoop/common/trunk/hdfs/src/test/aop/org/apache/hadoop/hdfs/server/datanode/DataTransferProtocolAspects.aj
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/test/aop/org/apache/hadoop/hdfs/server/datanode/DataTransferProtocolAspects.aj?rev=1139124&r1=1139123&r2=1139124&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/test/aop/org/apache/hadoop/hdfs/server/datanode/DataTransferProtocolAspects.aj (original)
+++ hadoop/common/trunk/hdfs/src/test/aop/org/apache/hadoop/hdfs/server/datanode/DataTransferProtocolAspects.aj Thu Jun 23 23:57:18 2011
@@ -46,7 +46,7 @@ public aspect DataTransferProtocolAspect
*/
pointcut receiverOp(DataXceiver dataxceiver):
- call(Op Receiver.readOp(DataInputStream)) && target(dataxceiver);
+ call(Op Receiver.readOp()) && target(dataxceiver);
after(DataXceiver dataxceiver) returning(Op op): receiverOp(dataxceiver) {
LOG.info("FI: receiverOp " + op + ", datanode="
Modified: hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/DFSTestUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/DFSTestUtil.java?rev=1139124&r1=1139123&r2=1139124&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/DFSTestUtil.java (original)
+++ hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/DFSTestUtil.java Thu Jun 23 23:57:18 2011
@@ -683,8 +683,8 @@ public class DFSTestUtil {
final DataInputStream in = new DataInputStream(NetUtils.getInputStream(s));
// send the request
- Sender.opTransferBlock(out, b, dfsClient.clientName,
- new DatanodeInfo[]{datanodes[1]}, new Token<BlockTokenIdentifier>());
+ new Sender(out).transferBlock(b, new Token<BlockTokenIdentifier>(),
+ dfsClient.clientName, new DatanodeInfo[]{datanodes[1]});
out.flush();
return BlockOpResponseProto.parseDelimitedFrom(in);
Modified: hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestDataTransferProtocol.java?rev=1139124&r1=1139123&r2=1139124&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestDataTransferProtocol.java (original)
+++ hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestDataTransferProtocol.java Thu Jun 23 23:57:18 2011
@@ -72,7 +72,8 @@ public class TestDataTransferProtocol ex
DatanodeID datanode;
InetSocketAddress dnAddr;
ByteArrayOutputStream sendBuf = new ByteArrayOutputStream(128);
- DataOutputStream sendOut = new DataOutputStream(sendBuf);
+ final DataOutputStream sendOut = new DataOutputStream(sendBuf);
+ final Sender sender = new Sender(sendOut);
ByteArrayOutputStream recvBuf = new ByteArrayOutputStream(128);
DataOutputStream recvOut = new DataOutputStream(recvBuf);
@@ -185,9 +186,9 @@ public class TestDataTransferProtocol ex
String description, Boolean eofExcepted) throws IOException {
sendBuf.reset();
recvBuf.reset();
- Sender.opWriteBlock(sendOut, block, 0,
- stage, newGS, block.getNumBytes(), block.getNumBytes(), "cl", null,
- new DatanodeInfo[1], BlockTokenSecretManager.DUMMY_TOKEN);
+ sender.writeBlock(block, BlockTokenSecretManager.DUMMY_TOKEN, "cl",
+ new DatanodeInfo[1], null, stage,
+ 0, block.getNumBytes(), block.getNumBytes(), newGS);
if (eofExcepted) {
sendResponse(Status.ERROR, null, recvOut);
sendRecvData(description, true);
@@ -372,10 +373,11 @@ public class TestDataTransferProtocol ex
/* Test OP_WRITE_BLOCK */
sendBuf.reset();
- Sender.opWriteBlock(sendOut,
- new ExtendedBlock(poolId, newBlockId), 0,
- BlockConstructionStage.PIPELINE_SETUP_CREATE, 0L, 0L, 0L, "cl", null,
- new DatanodeInfo[1], BlockTokenSecretManager.DUMMY_TOKEN);
+ sender.writeBlock(new ExtendedBlock(poolId, newBlockId),
+ BlockTokenSecretManager.DUMMY_TOKEN, "cl",
+ new DatanodeInfo[1], null,
+ BlockConstructionStage.PIPELINE_SETUP_CREATE,
+ 0, 0L, 0L, 0L);
sendOut.writeByte((byte)DataChecksum.CHECKSUM_CRC32);
// bad bytes per checksum
@@ -386,10 +388,10 @@ public class TestDataTransferProtocol ex
sendBuf.reset();
recvBuf.reset();
- Sender.opWriteBlock(sendOut,
- new ExtendedBlock(poolId, ++newBlockId), 0,
- BlockConstructionStage.PIPELINE_SETUP_CREATE, 0L, 0L, 0L, "cl", null,
- new DatanodeInfo[1], BlockTokenSecretManager.DUMMY_TOKEN);
+ sender.writeBlock(new ExtendedBlock(poolId, ++newBlockId),
+ BlockTokenSecretManager.DUMMY_TOKEN, "cl",
+ new DatanodeInfo[1], null,
+ BlockConstructionStage.PIPELINE_SETUP_CREATE, 0, 0L, 0L, 0L);
sendOut.writeByte((byte)DataChecksum.CHECKSUM_CRC32);
sendOut.writeInt(512);
@@ -409,10 +411,10 @@ public class TestDataTransferProtocol ex
// test for writing a valid zero size block
sendBuf.reset();
recvBuf.reset();
- Sender.opWriteBlock(sendOut,
- new ExtendedBlock(poolId, ++newBlockId), 0,
- BlockConstructionStage.PIPELINE_SETUP_CREATE, 0L, 0L, 0L, "cl", null,
- new DatanodeInfo[1], BlockTokenSecretManager.DUMMY_TOKEN);
+ sender.writeBlock(new ExtendedBlock(poolId, ++newBlockId),
+ BlockTokenSecretManager.DUMMY_TOKEN, "cl",
+ new DatanodeInfo[1], null,
+ BlockConstructionStage.PIPELINE_SETUP_CREATE, 0, 0L, 0L, 0L);
sendOut.writeByte((byte)DataChecksum.CHECKSUM_CRC32);
sendOut.writeInt(512); // checksum size
@@ -439,22 +441,22 @@ public class TestDataTransferProtocol ex
sendBuf.reset();
recvBuf.reset();
blk.setBlockId(blkid-1);
- Sender.opReadBlock(sendOut, blk, 0L, fileLen, "cl",
- BlockTokenSecretManager.DUMMY_TOKEN);
+ sender.readBlock(blk, BlockTokenSecretManager.DUMMY_TOKEN, "cl",
+ 0L, fileLen);
sendRecvData("Wrong block ID " + newBlockId + " for read", false);
// negative block start offset -1L
sendBuf.reset();
blk.setBlockId(blkid);
- Sender.opReadBlock(sendOut, blk, -1L, fileLen, "cl",
- BlockTokenSecretManager.DUMMY_TOKEN);
+ sender.readBlock(blk, BlockTokenSecretManager.DUMMY_TOKEN, "cl",
+ -1L, fileLen);
sendRecvData("Negative start-offset for read for block " +
firstBlock.getBlockId(), false);
// bad block start offset
sendBuf.reset();
- Sender.opReadBlock(sendOut, blk, fileLen, fileLen, "cl",
- BlockTokenSecretManager.DUMMY_TOKEN);
+ sender.readBlock(blk, BlockTokenSecretManager.DUMMY_TOKEN, "cl",
+ fileLen, fileLen);
sendRecvData("Wrong start-offset for reading block " +
firstBlock.getBlockId(), false);
@@ -462,8 +464,8 @@ public class TestDataTransferProtocol ex
recvBuf.reset();
sendResponse(Status.SUCCESS, null, recvOut);
sendBuf.reset();
- Sender.opReadBlock(sendOut, blk, 0L,
- -1 - random.nextInt(oneMil), "cl", BlockTokenSecretManager.DUMMY_TOKEN);
+ sender.readBlock(blk, BlockTokenSecretManager.DUMMY_TOKEN, "cl",
+ 0L, -1L-random.nextInt(oneMil));
sendRecvData("Negative length for reading block " +
firstBlock.getBlockId(), false);
@@ -471,15 +473,15 @@ public class TestDataTransferProtocol ex
recvBuf.reset();
sendResponse(Status.ERROR, null, recvOut);
sendBuf.reset();
- Sender.opReadBlock(sendOut, blk, 0L,
- fileLen + 1, "cl", BlockTokenSecretManager.DUMMY_TOKEN);
+ sender.readBlock(blk, BlockTokenSecretManager.DUMMY_TOKEN, "cl",
+ 0L, fileLen+1);
sendRecvData("Wrong length for reading block " +
firstBlock.getBlockId(), false);
//At the end of all this, read the file to make sure that succeeds finally.
sendBuf.reset();
- Sender.opReadBlock(sendOut, blk, 0L,
- fileLen, "cl", BlockTokenSecretManager.DUMMY_TOKEN);
+ sender.readBlock(blk, BlockTokenSecretManager.DUMMY_TOKEN, "cl",
+ 0L, fileLen);
readFile(fileSys, file, fileLen);
} finally {
cluster.shutdown();
Modified: hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java?rev=1139124&r1=1139123&r2=1139124&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java (original)
+++ hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java Thu Jun 23 23:57:18 2011
@@ -258,8 +258,8 @@ public class TestBlockReplacement extend
sock.setKeepAlive(true);
// sendRequest
DataOutputStream out = new DataOutputStream(sock.getOutputStream());
- Sender.opReplaceBlock(out, block, source
- .getStorageID(), sourceProxy, BlockTokenSecretManager.DUMMY_TOKEN);
+ new Sender(out).replaceBlock(block, BlockTokenSecretManager.DUMMY_TOKEN,
+ source.getStorageID(), sourceProxy);
out.flush();
// receiveResponse
DataInputStream reply = new DataInputStream(sock.getInputStream());
Modified: hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java?rev=1139124&r1=1139123&r2=1139124&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java (original)
+++ hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java Thu Jun 23 23:57:18 2011
@@ -140,10 +140,10 @@ public class TestDiskError {
// write the header.
DataOutputStream out = new DataOutputStream(s.getOutputStream());
- Sender.opWriteBlock(out, block.getBlock(), 1,
- BlockConstructionStage.PIPELINE_SETUP_CREATE,
- 0L, 0L, 0L, "", null, new DatanodeInfo[0],
- BlockTokenSecretManager.DUMMY_TOKEN);
+ new Sender(out).writeBlock(block.getBlock(),
+ BlockTokenSecretManager.DUMMY_TOKEN, "",
+ new DatanodeInfo[0], null,
+ BlockConstructionStage.PIPELINE_SETUP_CREATE, 1, 0L, 0L, 0L);
// write check header
out.writeByte( 1 );