You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by sz...@apache.org on 2011/06/02 03:12:40 UTC
svn commit: r1130367 - in /hadoop/hdfs/trunk: CHANGES.txt
src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java
Author: szetszwo
Date: Thu Jun 2 01:12:40 2011
New Revision: 1130367
URL: http://svn.apache.org/viewvc?rev=1130367&view=rev
Log:
HDFS-1966. Encapsulate individual DataTransferProtocol op headers.
Modified:
hadoop/hdfs/trunk/CHANGES.txt
hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java
Modified: hadoop/hdfs/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/CHANGES.txt?rev=1130367&r1=1130366&r2=1130367&view=diff
==============================================================================
--- hadoop/hdfs/trunk/CHANGES.txt (original)
+++ hadoop/hdfs/trunk/CHANGES.txt Thu Jun 2 01:12:40 2011
@@ -468,6 +468,9 @@ Trunk (unreleased changes)
HDFS-1636. If dfs.name.dir points to an empty dir, namenode format
shouldn't require confirmation. (Harsh J Chouraria via todd)
+ HDFS-1966. Encapsulate individual DataTransferProtocol op headers.
+ (szetszwo)
+
OPTIMIZATIONS
HDFS-1458. Improve checkpoint performance by avoiding unnecessary image
Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java?rev=1130367&r1=1130366&r2=1130367&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java Thu Jun 2 01:12:40 2011
@@ -51,10 +51,10 @@ public interface DataTransferProtocol {
* when protocol changes. It is not very obvious.
*/
/*
- * Version 24:
- * Remove deprecated fields.
+ * Version 25:
+ * Encapsulate individual operation headers.
*/
- public static final int DATA_TRANSFER_VERSION = 24;
+ public static final int DATA_TRANSFER_VERSION = 25;
/** Operation */
public enum Op {
@@ -89,7 +89,332 @@ public interface DataTransferProtocol {
public void write(DataOutput out) throws IOException {
out.write(code);
}
- };
+
+ /** Base class for all headers. */
+ private static abstract class BaseHeader implements Writable {
+ private ExtendedBlock block;
+ private Token<BlockTokenIdentifier> blockToken;
+
+ private BaseHeader() {}
+
+ private BaseHeader(
+ final ExtendedBlock block,
+ final Token<BlockTokenIdentifier> blockToken) {
+ this.block = block;
+ this.blockToken = blockToken;
+ }
+
+ /** @return the extended block. */
+ public final ExtendedBlock getBlock() {
+ return block;
+ }
+
+ /** @return the block token. */
+ public final Token<BlockTokenIdentifier> getBlockToken() {
+ return blockToken;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ block.writeId(out);
+ blockToken.write(out);
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ block = new ExtendedBlock();
+ block.readId(in);
+
+ blockToken = new Token<BlockTokenIdentifier>();
+ blockToken.readFields(in);
+ }
+ }
+
+ /** Base header for all client operation. */
+ private static abstract class ClientOperationHeader extends BaseHeader {
+ private String clientName;
+
+ private ClientOperationHeader() {}
+
+ private ClientOperationHeader(
+ final ExtendedBlock block,
+ final Token<BlockTokenIdentifier> blockToken,
+ final String clientName) {
+ super(block, blockToken);
+ this.clientName = clientName;
+ }
+
+ /** @return client name. */
+ public final String getClientName() {
+ return clientName;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ super.write(out);
+ Text.writeString(out, clientName);
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ super.readFields(in);
+ clientName = Text.readString(in);
+ }
+ }
+
+ /** {@link Op#READ_BLOCK} header. */
+ public static class ReadBlockHeader extends ClientOperationHeader {
+ private long offset;
+ private long length;
+
+ /** Default constructor */
+ public ReadBlockHeader() {}
+
+ /** Constructor with all parameters */
+ public ReadBlockHeader(
+ final ExtendedBlock blk,
+ final Token<BlockTokenIdentifier> blockToken,
+ final String clientName,
+ final long offset,
+ final long length) {
+ super(blk, blockToken, clientName);
+ this.offset = offset;
+ this.length = length;
+ }
+
+ /** @return the offset */
+ public long getOffset() {
+ return offset;
+ }
+
+ /** @return the length */
+ public long getLength() {
+ return length;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ super.write(out);
+ out.writeLong(offset);
+ out.writeLong(length);
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ super.readFields(in);
+ offset = in.readLong();
+ length = in.readLong();
+ }
+ }
+
+ /** {@link Op#WRITE_BLOCK} header. */
+ public static class WriteBlockHeader extends ClientOperationHeader {
+ private DatanodeInfo[] targets;
+
+ private DatanodeInfo source;
+ private BlockConstructionStage stage;
+ private int pipelineSize;
+ private long minBytesRcvd;
+ private long maxBytesRcvd;
+ private long latestGenerationStamp;
+
+ /** Default constructor */
+ public WriteBlockHeader() {}
+
+ /** Constructor with all parameters */
+ public WriteBlockHeader(
+ final ExtendedBlock blk,
+ final Token<BlockTokenIdentifier> blockToken,
+ final String clientName,
+ final DatanodeInfo[] targets,
+ final DatanodeInfo source,
+ final BlockConstructionStage stage,
+ final int pipelineSize,
+ final long minBytesRcvd,
+ final long maxBytesRcvd,
+ final long latestGenerationStamp
+ ) throws IOException {
+ super(blk, blockToken, clientName);
+ this.targets = targets;
+ this.source = source;
+ this.stage = stage;
+ this.pipelineSize = pipelineSize;
+ this.minBytesRcvd = minBytesRcvd;
+ this.maxBytesRcvd = maxBytesRcvd;
+ this.latestGenerationStamp = latestGenerationStamp;
+ }
+
+ /** @return targets. */
+ public DatanodeInfo[] getTargets() {
+ return targets;
+ }
+
+ /** @return the source */
+ public DatanodeInfo getSource() {
+ return source;
+ }
+
+ /** @return the stage */
+ public BlockConstructionStage getStage() {
+ return stage;
+ }
+
+ /** @return the pipeline size */
+ public int getPipelineSize() {
+ return pipelineSize;
+ }
+
+ /** @return the minimum bytes received. */
+ public long getMinBytesRcvd() {
+ return minBytesRcvd;
+ }
+
+ /** @return the maximum bytes received. */
+ public long getMaxBytesRcvd() {
+ return maxBytesRcvd;
+ }
+
+ /** @return the latest generation stamp */
+ public long getLatestGenerationStamp() {
+ return latestGenerationStamp;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ super.write(out);
+ Sender.write(out, 1, targets);
+
+ out.writeBoolean(source != null);
+ if (source != null) {
+ source.write(out);
+ }
+
+ stage.write(out);
+ out.writeInt(pipelineSize);
+ WritableUtils.writeVLong(out, minBytesRcvd);
+ WritableUtils.writeVLong(out, maxBytesRcvd);
+ WritableUtils.writeVLong(out, latestGenerationStamp);
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ super.readFields(in);
+ targets = Receiver.readDatanodeInfos(in);
+
+ source = in.readBoolean()? DatanodeInfo.read(in): null;
+ stage = BlockConstructionStage.readFields(in);
+ pipelineSize = in.readInt(); // num of datanodes in entire pipeline
+ minBytesRcvd = WritableUtils.readVLong(in);
+ maxBytesRcvd = WritableUtils.readVLong(in);
+ latestGenerationStamp = WritableUtils.readVLong(in);
+ }
+ }
+
+ /** {@link Op#TRANSFER_BLOCK} header. */
+ public static class TransferBlockHeader extends ClientOperationHeader {
+ private DatanodeInfo[] targets;
+
+ /** Default constructor */
+ public TransferBlockHeader() {}
+
+ /** Constructor with all parameters */
+ public TransferBlockHeader(
+ final ExtendedBlock blk,
+ final Token<BlockTokenIdentifier> blockToken,
+ final String clientName,
+ final DatanodeInfo[] targets) throws IOException {
+ super(blk, blockToken, clientName);
+ this.targets = targets;
+ }
+
+ /** @return targets. */
+ public DatanodeInfo[] getTargets() {
+ return targets;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ super.write(out);
+ Sender.write(out, 0, targets);
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ super.readFields(in);
+ targets = Receiver.readDatanodeInfos(in);
+ }
+ }
+
+ /** {@link Op#REPLACE_BLOCK} header. */
+ public static class ReplaceBlockHeader extends BaseHeader {
+ private String delHint;
+ private DatanodeInfo source;
+
+ /** Default constructor */
+ public ReplaceBlockHeader() {}
+
+ /** Constructor with all parameters */
+ public ReplaceBlockHeader(final ExtendedBlock blk,
+ final Token<BlockTokenIdentifier> blockToken,
+ final String storageId,
+ final DatanodeInfo src) throws IOException {
+ super(blk, blockToken);
+ this.delHint = storageId;
+ this.source = src;
+ }
+
+ /** @return delete-hint. */
+ public String getDelHint() {
+ return delHint;
+ }
+
+ /** @return source datanode. */
+ public DatanodeInfo getSource() {
+ return source;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ super.write(out);
+ Text.writeString(out, delHint);
+ source.write(out);
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ super.readFields(in);
+ delHint = Text.readString(in);
+ source = DatanodeInfo.read(in);
+ }
+ }
+
+ /** {@link Op#COPY_BLOCK} header. */
+ public static class CopyBlockHeader extends BaseHeader {
+ /** Default constructor */
+ public CopyBlockHeader() {}
+
+ /** Constructor with all parameters */
+ public CopyBlockHeader(
+ final ExtendedBlock block,
+ final Token<BlockTokenIdentifier> blockToken) {
+ super(block, blockToken);
+ }
+ }
+
+ /** {@link Op#BLOCK_CHECKSUM} header. */
+ public static class BlockChecksumHeader extends BaseHeader {
+ /** Default constructor */
+ public BlockChecksumHeader() {}
+
+ /** Constructor with all parameters */
+ public BlockChecksumHeader(
+ final ExtendedBlock block,
+ final Token<BlockTokenIdentifier> blockToken) {
+ super(block, blockToken);
+ }
+ }
+ }
+
/** Status */
public enum Status {
@@ -189,24 +514,27 @@ public interface DataTransferProtocol {
@InterfaceStability.Evolving
public static class Sender {
/** Initialize a operation. */
- public static void op(DataOutputStream out, Op op) throws IOException {
+ private static void op(final DataOutput out, final Op op
+ ) throws IOException {
out.writeShort(DataTransferProtocol.DATA_TRANSFER_VERSION);
op.write(out);
}
+ /** Send an operation request. */
+ private static void send(final DataOutputStream out, final Op opcode,
+ final Op.BaseHeader parameters) throws IOException {
+ op(out, opcode);
+ parameters.write(out);
+ out.flush();
+ }
+
/** Send OP_READ_BLOCK */
public static void opReadBlock(DataOutputStream out, ExtendedBlock blk,
long blockOffset, long blockLen, String clientName,
Token<BlockTokenIdentifier> blockToken)
throws IOException {
- op(out, Op.READ_BLOCK);
-
- blk.writeId(out);
- out.writeLong(blockOffset);
- out.writeLong(blockLen);
- Text.writeString(out, clientName);
- blockToken.write(out);
- out.flush();
+ send(out, Op.READ_BLOCK, new Op.ReadBlockHeader(blk, blockToken,
+ clientName, blockOffset, blockLen));
}
/** Send OP_WRITE_BLOCK */
@@ -215,74 +543,43 @@ public interface DataTransferProtocol {
long minBytesRcvd, long maxBytesRcvd, String client, DatanodeInfo src,
DatanodeInfo[] targets, Token<BlockTokenIdentifier> blockToken)
throws IOException {
- op(out, Op.WRITE_BLOCK);
-
- blk.writeId(out);
- out.writeInt(pipelineSize);
- stage.write(out);
- WritableUtils.writeVLong(out, newGs);
- WritableUtils.writeVLong(out, minBytesRcvd);
- WritableUtils.writeVLong(out, maxBytesRcvd);
- Text.writeString(out, client);
-
- out.writeBoolean(src != null);
- if (src != null) {
- src.write(out);
- }
- write(out, 1, targets);
- blockToken.write(out);
+ send(out, Op.WRITE_BLOCK, new Op.WriteBlockHeader(blk, blockToken,
+ client, targets, src, stage, pipelineSize, minBytesRcvd, maxBytesRcvd,
+ newGs));
}
/** Send {@link Op#TRANSFER_BLOCK} */
public static void opTransferBlock(DataOutputStream out, ExtendedBlock blk,
String client, DatanodeInfo[] targets,
Token<BlockTokenIdentifier> blockToken) throws IOException {
- op(out, Op.TRANSFER_BLOCK);
-
- blk.writeId(out);
- Text.writeString(out, client);
- write(out, 0, targets);
- blockToken.write(out);
- out.flush();
+ send(out, Op.TRANSFER_BLOCK, new Op.TransferBlockHeader(blk, blockToken,
+ client, targets));
}
/** Send OP_REPLACE_BLOCK */
public static void opReplaceBlock(DataOutputStream out,
- ExtendedBlock blk, String storageId, DatanodeInfo src,
+ ExtendedBlock blk, String delHint, DatanodeInfo src,
Token<BlockTokenIdentifier> blockToken) throws IOException {
- op(out, Op.REPLACE_BLOCK);
-
- blk.writeId(out);
- Text.writeString(out, storageId);
- src.write(out);
- blockToken.write(out);
- out.flush();
+ send(out, Op.REPLACE_BLOCK, new Op.ReplaceBlockHeader(blk, blockToken,
+ delHint, src));
}
/** Send OP_COPY_BLOCK */
public static void opCopyBlock(DataOutputStream out, ExtendedBlock blk,
Token<BlockTokenIdentifier> blockToken)
throws IOException {
- op(out, Op.COPY_BLOCK);
-
- blk.writeId(out);
- blockToken.write(out);
- out.flush();
+ send(out, Op.COPY_BLOCK, new Op.CopyBlockHeader(blk, blockToken));
}
/** Send OP_BLOCK_CHECKSUM */
public static void opBlockChecksum(DataOutputStream out, ExtendedBlock blk,
Token<BlockTokenIdentifier> blockToken)
throws IOException {
- op(out, Op.BLOCK_CHECKSUM);
-
- blk.writeId(out);
- blockToken.write(out);
- out.flush();
+ send(out, Op.BLOCK_CHECKSUM, new Op.BlockChecksumHeader(blk, blockToken));
}
/** Write an array of {@link DatanodeInfo} */
- private static void write(final DataOutputStream out,
+ private static void write(final DataOutput out,
final int start,
final DatanodeInfo[] datanodeinfos) throws IOException {
out.writeInt(datanodeinfos.length - start);
@@ -334,14 +631,10 @@ public interface DataTransferProtocol {
/** Receive OP_READ_BLOCK */
private void opReadBlock(DataInputStream in) throws IOException {
- final ExtendedBlock blk = new ExtendedBlock();
- blk.readId(in);
- final long offset = in.readLong();
- final long length = in.readLong();
- final String client = Text.readString(in);
- final Token<BlockTokenIdentifier> blockToken = readBlockToken(in);
-
- opReadBlock(in, blk, offset, length, client, blockToken);
+ final Op.ReadBlockHeader h = new Op.ReadBlockHeader();
+ h.readFields(in);
+ opReadBlock(in, h.getBlock(), h.getOffset(), h.getLength(),
+ h.getClientName(), h.getBlockToken());
}
/**
@@ -353,22 +646,12 @@ public interface DataTransferProtocol {
/** Receive OP_WRITE_BLOCK */
private void opWriteBlock(DataInputStream in) throws IOException {
- final ExtendedBlock blk = new ExtendedBlock();
- blk.readId(in);
- final int pipelineSize = in.readInt(); // num of datanodes in entire pipeline
- final BlockConstructionStage stage =
- BlockConstructionStage.readFields(in);
- final long newGs = WritableUtils.readVLong(in);
- final long minBytesRcvd = WritableUtils.readVLong(in);
- final long maxBytesRcvd = WritableUtils.readVLong(in);
- final String client = Text.readString(in); // working on behalf of this client
- final DatanodeInfo src = in.readBoolean()? DatanodeInfo.read(in): null;
-
- final DatanodeInfo targets[] = readDatanodeInfos(in);
- final Token<BlockTokenIdentifier> blockToken = readBlockToken(in);
-
- opWriteBlock(in, blk, pipelineSize, stage,
- newGs, minBytesRcvd, maxBytesRcvd, client, src, targets, blockToken);
+ final Op.WriteBlockHeader h = new Op.WriteBlockHeader();
+ h.readFields(in);
+ opWriteBlock(in, h.getBlock(), h.getPipelineSize(), h.getStage(),
+ h.getLatestGenerationStamp(),
+ h.getMinBytesRcvd(), h.getMaxBytesRcvd(),
+ h.getClientName(), h.getSource(), h.getTargets(), h.getBlockToken());
}
/**
@@ -383,13 +666,10 @@ public interface DataTransferProtocol {
/** Receive {@link Op#TRANSFER_BLOCK} */
private void opTransferBlock(DataInputStream in) throws IOException {
- final ExtendedBlock blk = new ExtendedBlock();
- blk.readId(in);
- final String client = Text.readString(in);
- final DatanodeInfo targets[] = readDatanodeInfos(in);
- final Token<BlockTokenIdentifier> blockToken = readBlockToken(in);
-
- opTransferBlock(in, blk, client, targets, blockToken);
+ final Op.TransferBlockHeader h = new Op.TransferBlockHeader();
+ h.readFields(in);
+ opTransferBlock(in, h.getBlock(), h.getClientName(), h.getTargets(),
+ h.getBlockToken());
}
/**
@@ -404,13 +684,10 @@ public interface DataTransferProtocol {
/** Receive OP_REPLACE_BLOCK */
private void opReplaceBlock(DataInputStream in) throws IOException {
- final ExtendedBlock blk = new ExtendedBlock();
- blk.readId(in);
- final String sourceId = Text.readString(in); // read del hint
- final DatanodeInfo src = DatanodeInfo.read(in); // read proxy source
- final Token<BlockTokenIdentifier> blockToken = readBlockToken(in);
-
- opReplaceBlock(in, blk, sourceId, src, blockToken);
+ final Op.ReplaceBlockHeader h = new Op.ReplaceBlockHeader();
+ h.readFields(in);
+ opReplaceBlock(in, h.getBlock(), h.getDelHint(), h.getSource(),
+ h.getBlockToken());
}
/**
@@ -418,16 +695,14 @@ public interface DataTransferProtocol {
* It is used for balancing purpose; send to a destination
*/
protected abstract void opReplaceBlock(DataInputStream in,
- ExtendedBlock blk, String sourceId, DatanodeInfo src,
+ ExtendedBlock blk, String delHint, DatanodeInfo src,
Token<BlockTokenIdentifier> blockToken) throws IOException;
/** Receive OP_COPY_BLOCK */
private void opCopyBlock(DataInputStream in) throws IOException {
- final ExtendedBlock blk = new ExtendedBlock();
- blk.readId(in);
- final Token<BlockTokenIdentifier> blockToken = readBlockToken(in);
-
- opCopyBlock(in, blk, blockToken);
+ final Op.CopyBlockHeader h = new Op.CopyBlockHeader();
+ h.readFields(in);
+ opCopyBlock(in, h.getBlock(), h.getBlockToken());
}
/**
@@ -440,11 +715,9 @@ public interface DataTransferProtocol {
/** Receive OP_BLOCK_CHECKSUM */
private void opBlockChecksum(DataInputStream in) throws IOException {
- final ExtendedBlock blk = new ExtendedBlock();
- blk.readId(in);
- final Token<BlockTokenIdentifier> blockToken = readBlockToken(in);
-
- opBlockChecksum(in, blk, blockToken);
+ final Op.BlockChecksumHeader h = new Op.BlockChecksumHeader();
+ h.readFields(in);
+ opBlockChecksum(in, h.getBlock(), h.getBlockToken());
}
/**
@@ -456,7 +729,7 @@ public interface DataTransferProtocol {
throws IOException;
/** Read an array of {@link DatanodeInfo} */
- private static DatanodeInfo[] readDatanodeInfos(final DataInputStream in
+ private static DatanodeInfo[] readDatanodeInfos(final DataInput in
) throws IOException {
final int n = in.readInt();
if (n < 0) {
@@ -469,14 +742,6 @@ public interface DataTransferProtocol {
}
return datanodeinfos;
}
-
- /** Read an AccessToken */
- static private Token<BlockTokenIdentifier> readBlockToken(DataInputStream in
- ) throws IOException {
- final Token<BlockTokenIdentifier> t = new Token<BlockTokenIdentifier>();
- t.readFields(in);
- return t;
- }
}
/** reply **/