You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by to...@apache.org on 2011/11/01 06:17:09 UTC
svn commit: r1195829 [1/3] - in
/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs: ./
src/main/java/org/apache/hadoop/hdfs/
src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/
src/main/java/org/apache/hadoop/hdfs/protocol/proto/ src/main/jav...
Author: todd
Date: Tue Nov 1 05:17:08 2011
New Revision: 1195829
URL: http://svn.apache.org/viewvc?rev=1195829&view=rev
Log:
HDFS-2521. Remove custom checksum headers from data transfer protocol. Contributed by Todd Lipcon.
Modified:
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtoUtil.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/proto/DataTransferProtos.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/proto/datatransfer.proto
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1195829&r1=1195828&r2=1195829&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Tue Nov 1 05:17:08 2011
@@ -839,6 +839,9 @@ Release 0.23.0 - Unreleased
HDFS-2512. Add textual error message to data transfer protocol responses
(todd)
+ HDFS-2521. Remove custom checksum headers from data transfer protocol
+ (todd)
+
OPTIMIZATIONS
HDFS-1458. Improve checkpoint performance by avoiding unnecessary image
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java?rev=1195829&r1=1195828&r2=1195829&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java Tue Nov 1 05:17:08 2011
@@ -1033,9 +1033,7 @@ class DFSOutputStream extends FSOutputSu
// send the request
new Sender(out).writeBlock(block, accessToken, dfsClient.clientName,
nodes, null, recoveryFlag? stage.getRecoveryStage() : stage,
- nodes.length, block.getNumBytes(), bytesSent, newGS);
- checksum.writeHeader(out);
- out.flush();
+ nodes.length, block.getNumBytes(), bytesSent, newGS, checksum);
// receive ack for connect
BlockOpResponseProto resp = BlockOpResponseProto.parseFrom(
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java?rev=1195829&r1=1195828&r2=1195829&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java Tue Nov 1 05:17:08 2011
@@ -33,10 +33,13 @@ import org.apache.hadoop.classification.
import org.apache.hadoop.fs.FSInputChecker;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientReadStatusProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReadOpChecksumInfoProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
@@ -408,11 +411,14 @@ public class RemoteBlockReader extends F
BlockOpResponseProto status = BlockOpResponseProto.parseFrom(
vintPrefixed(in));
checkSuccess(status, sock, block, file);
- DataChecksum checksum = DataChecksum.newDataChecksum( in );
+ ReadOpChecksumInfoProto checksumInfo =
+ status.getReadOpChecksumInfo();
+ DataChecksum checksum = DataTransferProtoUtil.fromProto(
+ checksumInfo.getChecksum());
//Warning when we get CHECKSUM_NULL?
// Read the first chunk offset.
- long firstChunkOffset = in.readLong();
+ long firstChunkOffset = checksumInfo.getChunkOffset();
if ( firstChunkOffset < 0 || firstChunkOffset > startOffset ||
firstChunkOffset >= (startOffset + checksum.getBytesPerChecksum())) {
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtoUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtoUtil.java?rev=1195829&r1=1195828&r2=1195829&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtoUtil.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtoUtil.java Tue Nov 1 05:17:08 2011
@@ -23,10 +23,16 @@ import org.apache.hadoop.classification.
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsProtoUtil;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BaseHeaderProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ChecksumProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ChecksumProto.ChecksumType;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientOperationHeaderProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProto;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.DataChecksum;
+
+import com.google.common.collect.BiMap;
+import com.google.common.collect.ImmutableBiMap;
/**
@@ -35,8 +41,20 @@ import org.apache.hadoop.security.token.
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
-abstract class DataTransferProtoUtil {
+public abstract class DataTransferProtoUtil {
+
+ /**
+ * Map between the internal DataChecksum identifiers and the protobuf-
+ * generated identifiers on the wire.
+ */
+ static BiMap<Integer, ChecksumProto.ChecksumType> checksumTypeMap =
+ ImmutableBiMap.<Integer, ChecksumProto.ChecksumType>builder()
+ .put(DataChecksum.CHECKSUM_CRC32, ChecksumProto.ChecksumType.CRC32)
+ .put(DataChecksum.CHECKSUM_CRC32C, ChecksumProto.ChecksumType.CRC32C)
+ .put(DataChecksum.CHECKSUM_NULL, ChecksumProto.ChecksumType.NULL)
+ .build();
+
static BlockConstructionStage fromProto(
OpWriteBlockProto.BlockConstructionStage stage) {
return BlockConstructionStage.valueOf(BlockConstructionStage.class,
@@ -49,6 +67,28 @@ abstract class DataTransferProtoUtil {
stage.name());
}
+ public static ChecksumProto toProto(DataChecksum checksum) {
+ ChecksumType type = checksumTypeMap.get(checksum.getChecksumType());
+ if (type == null) {
+ throw new IllegalArgumentException(
+ "Can't convert checksum to protobuf: " + checksum);
+ }
+
+ return ChecksumProto.newBuilder()
+ .setBytesPerChecksum(checksum.getBytesPerChecksum())
+ .setType(type)
+ .build();
+ }
+
+ public static DataChecksum fromProto(ChecksumProto proto) {
+ if (proto == null) return null;
+
+ int bytesPerChecksum = proto.getBytesPerChecksum();
+ int type = checksumTypeMap.inverse().get(proto.getType());
+
+ return DataChecksum.newDataChecksum(type, bytesPerChecksum);
+ }
+
static ClientOperationHeaderProto buildClientHeader(ExtendedBlock blk,
String client, Token<BlockTokenIdentifier> blockToken) {
ClientOperationHeaderProto header =
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java?rev=1195829&r1=1195828&r2=1195829&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java Tue Nov 1 05:17:08 2011
@@ -27,6 +27,7 @@ import org.apache.hadoop.hdfs.protocol.D
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.DataChecksum;
/**
* Transfer data to/from datanode using a streaming protocol.
@@ -84,7 +85,8 @@ public interface DataTransferProtocol {
final int pipelineSize,
final long minBytesRcvd,
final long maxBytesRcvd,
- final long latestGenerationStamp) throws IOException;
+ final long latestGenerationStamp,
+ final DataChecksum requestedChecksum) throws IOException;
/**
* Transfer a block to another datanode.
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java?rev=1195829&r1=1195828&r2=1195829&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java Tue Nov 1 05:17:08 2011
@@ -103,7 +103,8 @@ public abstract class Receiver implement
fromProto(proto.getStage()),
proto.getPipelineSize(),
proto.getMinBytesRcvd(), proto.getMaxBytesRcvd(),
- proto.getLatestGenerationStamp());
+ proto.getLatestGenerationStamp(),
+ fromProto(proto.getRequestedChecksum()));
}
/** Receive {@link Op#TRANSFER_BLOCK} */
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java?rev=1195829&r1=1195828&r2=1195829&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java Tue Nov 1 05:17:08 2011
@@ -29,6 +29,7 @@ import org.apache.hadoop.classification.
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ChecksumProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientOperationHeaderProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpCopyBlockProto;
@@ -38,6 +39,7 @@ import org.apache.hadoop.hdfs.protocol.p
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProto;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.DataChecksum;
import com.google.protobuf.Message;
@@ -93,10 +95,14 @@ public class Sender implements DataTrans
final int pipelineSize,
final long minBytesRcvd,
final long maxBytesRcvd,
- final long latestGenerationStamp) throws IOException {
+ final long latestGenerationStamp,
+ DataChecksum requestedChecksum) throws IOException {
ClientOperationHeaderProto header = DataTransferProtoUtil.buildClientHeader(
blk, clientName, blockToken);
+ ChecksumProto checksumProto =
+ DataTransferProtoUtil.toProto(requestedChecksum);
+
OpWriteBlockProto.Builder proto = OpWriteBlockProto.newBuilder()
.setHeader(header)
.addAllTargets(toProtos(targets, 1))
@@ -104,7 +110,8 @@ public class Sender implements DataTrans
.setPipelineSize(pipelineSize)
.setMinBytesRcvd(minBytesRcvd)
.setMaxBytesRcvd(maxBytesRcvd)
- .setLatestGenerationStamp(latestGenerationStamp);
+ .setLatestGenerationStamp(latestGenerationStamp)
+ .setRequestedChecksum(checksumProto);
if (source != null) {
proto.setSource(toProto(source));