You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by um...@apache.org on 2016/03/01 06:53:42 UTC
hadoop git commit: HDFS-9733. Refactor DFSClient#getFileChecksum and
DataXceiver#blockChecksum. Contributed by Kai Zheng
Repository: hadoop
Updated Branches:
refs/heads/trunk 680f3fc02 -> 307ec80ac
HDFS-9733. Refactor DFSClient#getFileChecksum and DataXceiver#blockChecksum. Contributed by Kai Zheng
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/307ec80a
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/307ec80a
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/307ec80a
Branch: refs/heads/trunk
Commit: 307ec80acae3b4a41d21b2d4b3a55032e55fcdc6
Parents: 680f3fc
Author: Uma Maheswara Rao G <um...@intel.com>
Authored: Mon Feb 29 21:52:20 2016 -0800
Committer: Uma Maheswara Rao G <um...@intel.com>
Committed: Mon Feb 29 21:52:20 2016 -0800
----------------------------------------------------------------------
.../main/java/org/apache/hadoop/io/IOUtils.java | 4 +-
.../main/java/org/apache/hadoop/io/MD5Hash.java | 11 +
.../org/apache/hadoop/util/DataChecksum.java | 12 +-
.../java/org/apache/hadoop/hdfs/DFSClient.java | 237 ++---------
.../org/apache/hadoop/hdfs/DFSUtilClient.java | 49 +++
.../apache/hadoop/hdfs/FileChecksumHelper.java | 416 +++++++++++++++++++
.../protocol/datatransfer/IOStreamPair.java | 11 +-
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 +
.../server/datanode/BlockChecksumHelper.java | 254 +++++++++++
.../hdfs/server/datanode/BlockSender.java | 1 -
.../hadoop/hdfs/server/datanode/DataNode.java | 10 +
.../hdfs/server/datanode/DataXceiver.java | 162 +++-----
12 files changed, 846 insertions(+), 324 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/307ec80a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/IOUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/IOUtils.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/IOUtils.java
index 451163c..2588bf1 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/IOUtils.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/IOUtils.java
@@ -261,7 +261,9 @@ public class IOUtils {
* @param stream the Stream to close
*/
public static void closeStream(java.io.Closeable stream) {
- cleanup(null, stream);
+ if (stream != null) {
+ cleanup(null, stream);
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/307ec80a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/MD5Hash.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/MD5Hash.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/MD5Hash.java
index 822e089..aaf3ea1 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/MD5Hash.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/MD5Hash.java
@@ -128,6 +128,17 @@ public class MD5Hash implements WritableComparable<MD5Hash> {
return new MD5Hash(digest);
}
+ /** Construct a hash value for an array of byte array. */
+ public static MD5Hash digest(byte[][] dataArr, int start, int len) {
+ byte[] digest;
+ MessageDigest digester = getDigester();
+ for (byte[] data : dataArr) {
+ digester.update(data, start, len);
+ }
+ digest = digester.digest();
+ return new MD5Hash(digest);
+ }
+
/** Construct a hash value for a String. */
public static MD5Hash digest(String string) {
return digest(UTF8.getBytes(string));
http://git-wip-us.apache.org/repos/asf/hadoop/blob/307ec80a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/DataChecksum.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/DataChecksum.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/DataChecksum.java
index faac587..e44b64d 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/DataChecksum.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/DataChecksum.java
@@ -45,7 +45,7 @@ public class DataChecksum implements Checksum {
public static final int CHECKSUM_MIXED = 4;
/** The checksum types */
- public static enum Type {
+ public enum Type {
NULL (CHECKSUM_NULL, 0),
CRC32 (CHECKSUM_CRC32, 4),
CRC32C(CHECKSUM_CRC32C, 4),
@@ -55,7 +55,7 @@ public class DataChecksum implements Checksum {
public final int id;
public final int size;
- private Type(int id, int size) {
+ Type(int id, int size) {
this.id = id;
this.size = size;
}
@@ -230,17 +230,21 @@ public class DataChecksum implements Checksum {
public Type getChecksumType() {
return type;
}
+
/** @return the size for a checksum. */
public int getChecksumSize() {
return type.size;
}
+
/** @return the required checksum size given the data length. */
public int getChecksumSize(int dataSize) {
return ((dataSize - 1)/getBytesPerChecksum() + 1) * getChecksumSize();
}
+
public int getBytesPerChecksum() {
return bytesPerChecksum;
}
+
public int getNumBytesInSum() {
return inSum;
}
@@ -249,16 +253,19 @@ public class DataChecksum implements Checksum {
static public int getChecksumHeaderSize() {
return 1 + SIZE_OF_INTEGER; // type byte, bytesPerChecksum int
}
+
//Checksum Interface. Just a wrapper around member summer.
@Override
public long getValue() {
return summer.getValue();
}
+
@Override
public void reset() {
summer.reset();
inSum = 0;
}
+
@Override
public void update( byte[] b, int off, int len ) {
if ( len > 0 ) {
@@ -266,6 +273,7 @@ public class DataChecksum implements Checksum {
inSum += len;
}
}
+
@Override
public void update( int b ) {
summer.update( b );
http://git-wip-us.apache.org/repos/asf/hadoop/blob/307ec80a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
index 15a49f1..da3f745 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
@@ -27,12 +27,9 @@ import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_LOCA
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_DEFAULT;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_KEY;
-import java.io.BufferedOutputStream;
-import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
-import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetAddress;
import java.net.InetSocketAddress;
@@ -80,9 +77,7 @@ import org.apache.hadoop.fs.FsStatus;
import org.apache.hadoop.fs.FsTracer;
import org.apache.hadoop.fs.HdfsBlockLocation;
import org.apache.hadoop.fs.InvalidPathException;
-import org.apache.hadoop.fs.MD5MD5CRC32CastagnoliFileChecksum;
import org.apache.hadoop.fs.MD5MD5CRC32FileChecksum;
-import org.apache.hadoop.fs.MD5MD5CRC32GzipFileChecksum;
import org.apache.hadoop.fs.Options;
import org.apache.hadoop.fs.Options.ChecksumOpt;
import org.apache.hadoop.fs.ParentNotDirectoryException;
@@ -138,7 +133,6 @@ import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
import org.apache.hadoop.hdfs.protocol.UnresolvedPathException;
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
-import org.apache.hadoop.hdfs.protocol.datatransfer.Op;
import org.apache.hadoop.hdfs.protocol.datatransfer.ReplaceDatanodeOnFailure;
import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver;
@@ -146,20 +140,16 @@ import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactor
import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataTransferSaslUtil;
import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
-import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumResponseProto;
import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
-import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
import org.apache.hadoop.hdfs.server.namenode.SafeModeException;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
import org.apache.hadoop.hdfs.util.IOUtilsClient;
-import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.EnumSetWritable;
import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.io.MD5Hash;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.retry.LossyRetryInvocationHandler;
import org.apache.hadoop.ipc.RPC;
@@ -1293,7 +1283,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
/**
* Invoke namenode append RPC.
- * It retries in case of {@link BlockNotYetCompleteException}.
+ * It retries in case of some {@link RetriableException}.
*/
private LastBlockWithStatus callAppend(String src,
EnumSetWritable<CreateFlag> flag) throws IOException {
@@ -1695,7 +1685,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
}
/**
- * Get the checksum of the whole file of a range of the file. Note that the
+ * Get the checksum of the whole file or a range of the file. Note that the
* range always starts from the beginning of the file.
* @param src The file path
* @param length the length of the range, i.e., the range is [0, length]
@@ -1706,9 +1696,23 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
throws IOException {
checkOpen();
Preconditions.checkArgument(length >= 0);
+
+ LocatedBlocks blockLocations = getBlockLocations(src, length);
+
+ FileChecksumHelper.FileChecksumComputer maker =
+ new FileChecksumHelper.ReplicatedFileChecksumComputer(src, length,
+ blockLocations, namenode, this);
+
+ maker.compute();
+
+ return maker.getFileChecksum();
+ }
+
+ protected LocatedBlocks getBlockLocations(String src,
+ long length) throws IOException {
//get block locations for the file range
- LocatedBlocks blockLocations = callGetBlockLocations(namenode, src, 0,
- length);
+ LocatedBlocks blockLocations = callGetBlockLocations(namenode,
+ src, 0, length);
if (null == blockLocations) {
throw new FileNotFoundException("File does not exist: " + src);
}
@@ -1716,194 +1720,15 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
throw new IOException("Fail to get checksum, since file " + src
+ " is under construction.");
}
- List<LocatedBlock> locatedblocks = blockLocations.getLocatedBlocks();
- final DataOutputBuffer md5out = new DataOutputBuffer();
- int bytesPerCRC = -1;
- DataChecksum.Type crcType = DataChecksum.Type.DEFAULT;
- long crcPerBlock = 0;
- boolean refetchBlocks = false;
- int lastRetriedIndex = -1;
-
- // get block checksum for each block
- long remaining = length;
- if (src.contains(HdfsConstants.SEPARATOR_DOT_SNAPSHOT_DIR_SEPARATOR)) {
- remaining = Math.min(length, blockLocations.getFileLength());
- }
- for(int i = 0; i < locatedblocks.size() && remaining > 0; i++) {
- if (refetchBlocks) { // refetch to get fresh tokens
- blockLocations = callGetBlockLocations(namenode, src, 0, length);
- if (null == blockLocations) {
- throw new FileNotFoundException("File does not exist: " + src);
- }
- if (blockLocations.isUnderConstruction()) {
- throw new IOException("Fail to get checksum, since file " + src
- + " is under construction.");
- }
- locatedblocks = blockLocations.getLocatedBlocks();
- refetchBlocks = false;
- }
- LocatedBlock lb = locatedblocks.get(i);
- final ExtendedBlock block = lb.getBlock();
- if (remaining < block.getNumBytes()) {
- block.setNumBytes(remaining);
- }
- remaining -= block.getNumBytes();
- final DatanodeInfo[] datanodes = lb.getLocations();
-
- //try each datanode location of the block
- final int timeout = 3000 * datanodes.length +
- dfsClientConf.getSocketTimeout();
- boolean done = false;
- for(int j = 0; !done && j < datanodes.length; j++) {
- DataOutputStream out = null;
- DataInputStream in = null;
-
- try {
- //connect to a datanode
- IOStreamPair pair = connectToDN(datanodes[j], timeout, lb);
- out = new DataOutputStream(new BufferedOutputStream(pair.out,
- smallBufferSize));
- in = new DataInputStream(pair.in);
-
- LOG.debug("write to {}: {}, block={}",
- datanodes[j], Op.BLOCK_CHECKSUM, block);
- // get block MD5
- new Sender(out).blockChecksum(block, lb.getBlockToken());
-
- final BlockOpResponseProto reply =
- BlockOpResponseProto.parseFrom(PBHelperClient.vintPrefixed(in));
-
- String logInfo = "for block " + block + " from datanode " +
- datanodes[j];
- DataTransferProtoUtil.checkBlockOpStatus(reply, logInfo);
-
- OpBlockChecksumResponseProto checksumData =
- reply.getChecksumResponse();
-
- //read byte-per-checksum
- final int bpc = checksumData.getBytesPerCrc();
- if (i == 0) { //first block
- bytesPerCRC = bpc;
- }
- else if (bpc != bytesPerCRC) {
- throw new IOException("Byte-per-checksum not matched: bpc=" + bpc
- + " but bytesPerCRC=" + bytesPerCRC);
- }
-
- //read crc-per-block
- final long cpb = checksumData.getCrcPerBlock();
- if (locatedblocks.size() > 1 && i == 0) {
- crcPerBlock = cpb;
- }
-
- //read md5
- final MD5Hash md5 = new MD5Hash(
- checksumData.getMd5().toByteArray());
- md5.write(md5out);
-
- // read crc-type
- final DataChecksum.Type ct;
- if (checksumData.hasCrcType()) {
- ct = PBHelperClient.convert(checksumData
- .getCrcType());
- } else {
- LOG.debug("Retrieving checksum from an earlier-version DataNode: " +
- "inferring checksum by reading first byte");
- ct = inferChecksumTypeByReading(lb, datanodes[j]);
- }
-
- if (i == 0) { // first block
- crcType = ct;
- } else if (crcType != DataChecksum.Type.MIXED
- && crcType != ct) {
- // if crc types are mixed in a file
- crcType = DataChecksum.Type.MIXED;
- }
-
- done = true;
-
- if (LOG.isDebugEnabled()) {
- if (i == 0) {
- LOG.debug("set bytesPerCRC=" + bytesPerCRC
- + ", crcPerBlock=" + crcPerBlock);
- }
- LOG.debug("got reply from " + datanodes[j] + ": md5=" + md5);
- }
- } catch (InvalidBlockTokenException ibte) {
- if (i > lastRetriedIndex) {
- LOG.debug("Got access token error in response to OP_BLOCK_CHECKSUM "
- + "for file {} for block {} from datanode {}. Will retry "
- + "the block once.",
- src, block, datanodes[j]);
- lastRetriedIndex = i;
- done = true; // actually it's not done; but we'll retry
- i--; // repeat at i-th block
- refetchBlocks = true;
- break;
- }
- } catch (IOException ie) {
- LOG.warn("src=" + src + ", datanodes["+j+"]=" + datanodes[j], ie);
- } finally {
- IOUtils.closeStream(in);
- IOUtils.closeStream(out);
- }
- }
-
- if (!done) {
- throw new IOException("Fail to get block MD5 for " + block);
- }
- }
- //compute file MD5
- final MD5Hash fileMD5 = MD5Hash.digest(md5out.getData());
- switch (crcType) {
- case CRC32:
- return new MD5MD5CRC32GzipFileChecksum(bytesPerCRC,
- crcPerBlock, fileMD5);
- case CRC32C:
- return new MD5MD5CRC32CastagnoliFileChecksum(bytesPerCRC,
- crcPerBlock, fileMD5);
- default:
- // If there is no block allocated for the file,
- // return one with the magic entry that matches what previous
- // hdfs versions return.
- if (locatedblocks.size() == 0) {
- return new MD5MD5CRC32GzipFileChecksum(0, 0, fileMD5);
- }
-
- // we should never get here since the validity was checked
- // when getCrcType() was called above.
- return null;
- }
+ return blockLocations;
}
- /**
- * Connect to the given datanode's datantrasfer port, and return
- * the resulting IOStreamPair. This includes encryption wrapping, etc.
- */
- private IOStreamPair connectToDN(DatanodeInfo dn, int timeout,
- LocatedBlock lb) throws IOException {
- boolean success = false;
- Socket sock = null;
- try {
- sock = socketFactory.createSocket();
- String dnAddr = dn.getXferAddr(getConf().isConnectToDnViaHostname());
- LOG.debug("Connecting to datanode {}", dnAddr);
- NetUtils.connect(sock, NetUtils.createSocketAddr(dnAddr), timeout);
- sock.setTcpNoDelay(dfsClientConf.getDataTransferTcpNoDelay());
- sock.setSoTimeout(timeout);
-
- OutputStream unbufOut = NetUtils.getOutputStream(sock);
- InputStream unbufIn = NetUtils.getInputStream(sock);
- IOStreamPair ret = saslClient.newSocketSend(sock, unbufOut, unbufIn, this,
- lb.getBlockToken(), dn);
- success = true;
- return ret;
- } finally {
- if (!success) {
- IOUtils.closeSocket(sock);
- }
- }
+ protected IOStreamPair connectToDN(DatanodeInfo dn, int timeout,
+ Token<BlockTokenIdentifier> blockToken)
+ throws IOException {
+ return DFSUtilClient.connectToDN(dn, timeout, conf, saslClient,
+ socketFactory, getConf().isConnectToDnViaHostname(), this, blockToken);
}
/**
@@ -1917,19 +1742,17 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
* @return the inferred checksum type
* @throws IOException if an error occurs
*/
- private Type inferChecksumTypeByReading(LocatedBlock lb, DatanodeInfo dn)
+ protected Type inferChecksumTypeByReading(LocatedBlock lb, DatanodeInfo dn)
throws IOException {
- IOStreamPair pair = connectToDN(dn, dfsClientConf.getSocketTimeout(), lb);
+ IOStreamPair pair = connectToDN(dn, dfsClientConf.getSocketTimeout(),
+ lb.getBlockToken());
try {
- DataOutputStream out = new DataOutputStream(
- new BufferedOutputStream(pair.out, smallBufferSize));
- DataInputStream in = new DataInputStream(pair.in);
-
- new Sender(out).readBlock(lb.getBlock(), lb.getBlockToken(), clientName,
+ new Sender((DataOutputStream) pair.out).readBlock(lb.getBlock(),
+ lb.getBlockToken(), clientName,
0, 1, true, CachingStrategy.newDefaultStrategy());
final BlockOpResponseProto reply =
- BlockOpResponseProto.parseFrom(PBHelperClient.vintPrefixed(in));
+ BlockOpResponseProto.parseFrom(PBHelperClient.vintPrefixed(pair.in));
String logInfo = "trying to read " + lb.getBlock() + " from datanode " +
dn;
DataTransferProtoUtil.checkBlockOpStatus(reply, logInfo);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/307ec80a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java
index d646252..880234e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java
@@ -39,6 +39,7 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
import org.apache.hadoop.hdfs.protocol.ReconfigurationProtocol;
import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactory;
import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient;
@@ -47,6 +48,7 @@ import org.apache.hadoop.hdfs.protocolPB.ReconfigurationProtocolTranslatorPB;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.util.IOUtilsClient;
import org.apache.hadoop.hdfs.web.WebHdfsConstants;
+import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.net.NodeBase;
import org.apache.hadoop.security.UserGroupInformation;
@@ -56,8 +58,13 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.net.SocketFactory;
+import java.io.BufferedOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
import java.io.IOException;
+import java.io.InputStream;
import java.io.InterruptedIOException;
+import java.io.OutputStream;
import java.io.UnsupportedEncodingException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
@@ -717,4 +724,46 @@ public class DFSUtilClient {
return corruptionMap;
}
}
+
+ /**
+ * Connect to the given datanode's datantrasfer port, and return
+ * the resulting IOStreamPair. This includes encryption wrapping, etc.
+ */
+ public static IOStreamPair connectToDN(DatanodeInfo dn, int timeout,
+ Configuration conf,
+ SaslDataTransferClient saslClient,
+ SocketFactory socketFactory,
+ boolean connectToDnViaHostname,
+ DataEncryptionKeyFactory dekFactory,
+ Token<BlockTokenIdentifier> blockToken)
+ throws IOException {
+
+ boolean success = false;
+ Socket sock = null;
+ try {
+ sock = socketFactory.createSocket();
+ String dnAddr = dn.getXferAddr(connectToDnViaHostname);
+ LOG.debug("Connecting to datanode {}", dnAddr);
+ NetUtils.connect(sock, NetUtils.createSocketAddr(dnAddr), timeout);
+ sock.setSoTimeout(timeout);
+
+ OutputStream unbufOut = NetUtils.getOutputStream(sock);
+ InputStream unbufIn = NetUtils.getInputStream(sock);
+ IOStreamPair pair = saslClient.newSocketSend(sock, unbufOut,
+ unbufIn, dekFactory, blockToken, dn);
+
+ IOStreamPair result = new IOStreamPair(
+ new DataInputStream(pair.in),
+ new DataOutputStream(new BufferedOutputStream(pair.out,
+ DFSUtilClient.getSmallBufferSize(conf)))
+ );
+
+ success = true;
+ return result;
+ } finally {
+ if (!success) {
+ IOUtils.closeSocket(sock);
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/307ec80a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/FileChecksumHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/FileChecksumHelper.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/FileChecksumHelper.java
new file mode 100644
index 0000000..d15db9f
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/FileChecksumHelper.java
@@ -0,0 +1,416 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import org.apache.hadoop.fs.MD5MD5CRC32CastagnoliFileChecksum;
+import org.apache.hadoop.fs.MD5MD5CRC32FileChecksum;
+import org.apache.hadoop.fs.MD5MD5CRC32GzipFileChecksum;
+import org.apache.hadoop.hdfs.protocol.ClientProtocol;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
+import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
+import org.apache.hadoop.hdfs.protocol.datatransfer.Op;
+import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumResponseProto;
+import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
+import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.MD5Hash;
+import org.apache.hadoop.util.DataChecksum;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Utility classes to compute file checksum for both replicated and striped
+ * files.
+ */
+final class FileChecksumHelper {
+ static final Logger LOG =
+ LoggerFactory.getLogger(FileChecksumHelper.class);
+
+ private FileChecksumHelper() {}
+
+ /**
+ * A common abstract class to compute file checksum.
+ */
+ static abstract class FileChecksumComputer {
+ private final String src;
+ private final long length;
+ private final DFSClient client;
+ private final ClientProtocol namenode;
+ private final DataOutputBuffer md5out = new DataOutputBuffer();
+
+ private MD5MD5CRC32FileChecksum fileChecksum;
+ private LocatedBlocks blockLocations;
+
+ private int timeout;
+ private List<LocatedBlock> locatedBlocks;
+ private long remaining = 0L;
+
+ private int bytesPerCRC = -1;
+ private DataChecksum.Type crcType = DataChecksum.Type.DEFAULT;
+ private long crcPerBlock = 0;
+ private boolean refetchBlocks = false;
+ private int lastRetriedIndex = -1;
+
+ /**
+ * Constructor that accepts all the input parameters for the computing.
+ */
+ FileChecksumComputer(String src, long length,
+ LocatedBlocks blockLocations,
+ ClientProtocol namenode,
+ DFSClient client) throws IOException {
+ this.src = src;
+ this.length = length;
+ this.blockLocations = blockLocations;
+ this.namenode = namenode;
+ this.client = client;
+
+ this.remaining = length;
+ if (src.contains(HdfsConstants.SEPARATOR_DOT_SNAPSHOT_DIR_SEPARATOR)) {
+ this.remaining = Math.min(length, blockLocations.getFileLength());
+ }
+
+ this.locatedBlocks = blockLocations.getLocatedBlocks();
+ }
+
+ String getSrc() {
+ return src;
+ }
+
+ long getLength() {
+ return length;
+ }
+
+ DFSClient getClient() {
+ return client;
+ }
+
+ ClientProtocol getNamenode() {
+ return namenode;
+ }
+
+ DataOutputBuffer getMd5out() {
+ return md5out;
+ }
+
+ MD5MD5CRC32FileChecksum getFileChecksum() {
+ return fileChecksum;
+ }
+
+ LocatedBlocks getBlockLocations() {
+ return blockLocations;
+ }
+
+ void setBlockLocations(LocatedBlocks blockLocations) {
+ this.blockLocations = blockLocations;
+ }
+
+ int getTimeout() {
+ return timeout;
+ }
+
+ void setTimeout(int timeout) {
+ this.timeout = timeout;
+ }
+
+ List<LocatedBlock> getLocatedBlocks() {
+ return locatedBlocks;
+ }
+
+ void setLocatedBlocks(List<LocatedBlock> locatedBlocks) {
+ this.locatedBlocks = locatedBlocks;
+ }
+
+ long getRemaining() {
+ return remaining;
+ }
+
+ void setRemaining(long remaining) {
+ this.remaining = remaining;
+ }
+
+ int getBytesPerCRC() {
+ return bytesPerCRC;
+ }
+
+ void setBytesPerCRC(int bytesPerCRC) {
+ this.bytesPerCRC = bytesPerCRC;
+ }
+
+ DataChecksum.Type getCrcType() {
+ return crcType;
+ }
+
+ void setCrcType(DataChecksum.Type crcType) {
+ this.crcType = crcType;
+ }
+
+ long getCrcPerBlock() {
+ return crcPerBlock;
+ }
+
+ void setCrcPerBlock(long crcPerBlock) {
+ this.crcPerBlock = crcPerBlock;
+ }
+
+ boolean isRefetchBlocks() {
+ return refetchBlocks;
+ }
+
+ void setRefetchBlocks(boolean refetchBlocks) {
+ this.refetchBlocks = refetchBlocks;
+ }
+
+ int getLastRetriedIndex() {
+ return lastRetriedIndex;
+ }
+
+ void setLastRetriedIndex(int lastRetriedIndex) {
+ this.lastRetriedIndex = lastRetriedIndex;
+ }
+
+ /**
+ * Perform the file checksum computing. The intermediate results are stored
+ * in the object and will be used later.
+ * @throws IOException
+ */
+ void compute() throws IOException {
+ checksumBlocks();
+
+ fileChecksum = makeFinalResult();
+ }
+
+ /**
+ * Compute and aggregate block checksums block by block.
+ * @throws IOException
+ */
+ abstract void checksumBlocks() throws IOException;
+
+ /**
+ * Make final file checksum result given the computing process done.
+ */
+ MD5MD5CRC32FileChecksum makeFinalResult() {
+ //compute file MD5
+ final MD5Hash fileMD5 = MD5Hash.digest(md5out.getData());
+ switch (crcType) {
+ case CRC32:
+ return new MD5MD5CRC32GzipFileChecksum(bytesPerCRC,
+ crcPerBlock, fileMD5);
+ case CRC32C:
+ return new MD5MD5CRC32CastagnoliFileChecksum(bytesPerCRC,
+ crcPerBlock, fileMD5);
+ default:
+ // If there is no block allocated for the file,
+ // return one with the magic entry that matches what previous
+ // hdfs versions return.
+ if (locatedBlocks.isEmpty()) {
+ return new MD5MD5CRC32GzipFileChecksum(0, 0, fileMD5);
+ }
+
+ // we should never get here since the validity was checked
+ // when getCrcType() was called above.
+ return null;
+ }
+ }
+
+ /**
+ * Create and return a sender given an IO stream pair.
+ */
+ Sender createSender(IOStreamPair pair) {
+ DataOutputStream out = (DataOutputStream) pair.out;
+ return new Sender(out);
+ }
+
+ /**
+ * Close an IO stream pair.
+ */
+ void close(IOStreamPair pair) {
+ if (pair != null) {
+ IOUtils.closeStream(pair.in);
+ IOUtils.closeStream(pair.out);
+ }
+ }
+ }
+
+ /**
+ * Replicated file checksum computer.
+ */
+ static class ReplicatedFileChecksumComputer extends FileChecksumComputer {
+ private int blockIdx;
+
+ ReplicatedFileChecksumComputer(String src, long length,
+ LocatedBlocks blockLocations,
+ ClientProtocol namenode,
+ DFSClient client) throws IOException {
+ super(src, length, blockLocations, namenode, client);
+ }
+
+ @Override
+ void checksumBlocks() throws IOException {
+ // get block checksum for each block
+ for (blockIdx = 0;
+ blockIdx < getLocatedBlocks().size() && getRemaining() >= 0;
+ blockIdx++) {
+ if (isRefetchBlocks()) { // refetch to get fresh tokens
+ setBlockLocations(getClient().getBlockLocations(getSrc(),
+ getLength()));
+ setLocatedBlocks(getBlockLocations().getLocatedBlocks());
+ setRefetchBlocks(false);
+ }
+
+ LocatedBlock locatedBlock = getLocatedBlocks().get(blockIdx);
+
+ if (!checksumBlock(locatedBlock)) {
+ throw new IOException("Fail to get block MD5 for " + locatedBlock);
+ }
+ }
+ }
+
+ /**
+ * Return true when sounds good to continue or retry, false when severe
+ * condition or totally failed.
+ */
+ private boolean checksumBlock(
+ LocatedBlock locatedBlock) throws IOException {
+ ExtendedBlock block = locatedBlock.getBlock();
+ if (getRemaining() < block.getNumBytes()) {
+ block.setNumBytes(getRemaining());
+ }
+ setRemaining(getRemaining() - block.getNumBytes());
+
+ DatanodeInfo[] datanodes = locatedBlock.getLocations();
+
+ int tmpTimeout = 3000 * datanodes.length +
+ getClient().getConf().getSocketTimeout();
+ setTimeout(tmpTimeout);
+
+ //try each datanode location of the block
+ boolean done = false;
+ for (int j = 0; !done && j < datanodes.length; j++) {
+ try {
+ tryDatanode(locatedBlock, datanodes[j]);
+ done = true;
+ } catch (InvalidBlockTokenException ibte) {
+ if (blockIdx > getLastRetriedIndex()) {
+ LOG.debug("Got access token error in response to OP_BLOCK_CHECKSUM "
+ + "for file {} for block {} from datanode {}. Will retry "
+ + "the block once.",
+ getSrc(), block, datanodes[j]);
+ setLastRetriedIndex(blockIdx);
+ done = true; // actually it's not done; but we'll retry
+ blockIdx--; // repeat at blockIdx-th block
+ setRefetchBlocks(true);
+ }
+ } catch (IOException ie) {
+ LOG.warn("src={}" + ", datanodes[{}]={}",
+ getSrc(), j, datanodes[j], ie);
+ }
+ }
+
+ return done;
+ }
+
+ /**
+ * Try one replica or datanode to compute the block checksum given a block.
+ */
+ private void tryDatanode(LocatedBlock locatedBlock,
+ DatanodeInfo datanode) throws IOException {
+
+ ExtendedBlock block = locatedBlock.getBlock();
+
+ try (IOStreamPair pair = getClient().connectToDN(datanode, getTimeout(),
+ locatedBlock.getBlockToken())) {
+
+ LOG.debug("write to {}: {}, block={}", datanode,
+ Op.BLOCK_CHECKSUM, block);
+
+ // get block MD5
+ createSender(pair).blockChecksum(block,
+ locatedBlock.getBlockToken());
+
+ final BlockOpResponseProto reply = BlockOpResponseProto.parseFrom(
+ PBHelperClient.vintPrefixed(pair.in));
+
+ String logInfo = "for block " + block + " from datanode " +
+ datanode;
+ DataTransferProtoUtil.checkBlockOpStatus(reply, logInfo);
+
+ OpBlockChecksumResponseProto checksumData =
+ reply.getChecksumResponse();
+
+ //read byte-per-checksum
+ final int bpc = checksumData.getBytesPerCrc();
+ if (blockIdx == 0) { //first block
+ setBytesPerCRC(bpc);
+ } else if (bpc != getBytesPerCRC()) {
+ throw new IOException("Byte-per-checksum not matched: bpc=" + bpc
+ + " but bytesPerCRC=" + getBytesPerCRC());
+ }
+
+ //read crc-per-block
+ final long cpb = checksumData.getCrcPerBlock();
+ if (getLocatedBlocks().size() > 1 && blockIdx == 0) {
+ setCrcPerBlock(cpb);
+ }
+
+ //read md5
+ final MD5Hash md5 = new MD5Hash(
+ checksumData.getMd5().toByteArray());
+ md5.write(getMd5out());
+
+ // read crc-type
+ final DataChecksum.Type ct;
+ if (checksumData.hasCrcType()) {
+ ct = PBHelperClient.convert(checksumData
+ .getCrcType());
+ } else {
+ LOG.debug("Retrieving checksum from an earlier-version DataNode: " +
+ "inferring checksum by reading first byte");
+ ct = getClient().inferChecksumTypeByReading(locatedBlock, datanode);
+ }
+
+ if (blockIdx == 0) { // first block
+ setCrcType(ct);
+ } else if (getCrcType() != DataChecksum.Type.MIXED
+ && getCrcType() != ct) {
+ // if crc types are mixed in a file
+ setCrcType(DataChecksum.Type.MIXED);
+ }
+
+ if (LOG.isDebugEnabled()) {
+ if (blockIdx == 0) {
+ LOG.debug("set bytesPerCRC=" + getBytesPerCRC()
+ + ", crcPerBlock=" + getCrcPerBlock());
+ }
+ LOG.debug("got reply from " + datanode + ": md5=" + md5);
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/307ec80a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/IOStreamPair.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/IOStreamPair.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/IOStreamPair.java
index 4157a30..4ec73e3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/IOStreamPair.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/IOStreamPair.java
@@ -17,16 +17,19 @@
*/
package org.apache.hadoop.hdfs.protocol.datatransfer;
+import java.io.Closeable;
+import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.io.IOUtils;
/**
* A little struct class to wrap an InputStream and an OutputStream.
*/
@InterfaceAudience.Private
-public class IOStreamPair {
+public class IOStreamPair implements Closeable {
public final InputStream in;
public final OutputStream out;
@@ -34,4 +37,10 @@ public class IOStreamPair {
this.in = in;
this.out = out;
}
+
+ @Override
+ public void close() throws IOException {
+ IOUtils.closeStream(in);
+ IOUtils.closeStream(out);
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/307ec80a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index c3ea5ce..bb847a1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -233,6 +233,9 @@ Trunk (Unreleased)
HDFS-9838. Refactor the excessReplicateMap to a class. (szetszwo)
+ HDFS-9733. Refactor DFSClient#getFileChecksum and DataXceiver#blockChecksum
+ (Kai Zheng via umamahesh)
+
OPTIMIZATIONS
BUG FIXES
http://git-wip-us.apache.org/repos/asf/hadoop/blob/307ec80a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockChecksumHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockChecksumHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockChecksumHelper.java
new file mode 100644
index 0000000..9a5552d
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockChecksumHelper.java
@@ -0,0 +1,254 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdfs.DFSUtilClient;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.MD5Hash;
+import org.apache.hadoop.util.DataChecksum;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.security.MessageDigest;
+
+/**
+ * Utilities for Block checksum computing, for both replicated and striped
+ * blocks.
+ */
+final class BlockChecksumHelper {
+
+ static final Logger LOG = LoggerFactory.getLogger(BlockChecksumHelper.class);
+
+ private BlockChecksumHelper() {}
+
+ /**
+ * The abstract base block checksum computer.
+ */
+ static abstract class BlockChecksumComputer {
+ private final DataNode datanode;
+ private final ExtendedBlock block;
+ // client side now can specify a range of the block for checksum
+ private final long requestLength;
+ private final LengthInputStream metadataIn;
+ private final DataInputStream checksumIn;
+ private final long visibleLength;
+ private final boolean partialBlk;
+
+ private byte[] outBytes;
+ private int bytesPerCRC = -1;
+ private DataChecksum.Type crcType = null;
+ private long crcPerBlock = -1;
+ private int checksumSize = -1;
+ private BlockMetadataHeader header;
+ private DataChecksum checksum;
+
+ BlockChecksumComputer(DataNode datanode,
+ ExtendedBlock block) throws IOException {
+ this.datanode = datanode;
+ this.block = block;
+ this.requestLength = block.getNumBytes();
+ Preconditions.checkArgument(requestLength >= 0);
+
+ this.metadataIn = datanode.data.getMetaDataInputStream(block);
+ this.visibleLength = datanode.data.getReplicaVisibleLength(block);
+ this.partialBlk = requestLength < visibleLength;
+
+ int ioFileBufferSize =
+ DFSUtilClient.getIoFileBufferSize(datanode.getConf());
+ this.checksumIn = new DataInputStream(
+ new BufferedInputStream(metadataIn, ioFileBufferSize));
+ }
+
+ protected DataNode getDatanode() {
+ return datanode;
+ }
+
+ protected ExtendedBlock getBlock() {
+ return block;
+ }
+
+ protected long getRequestLength() {
+ return requestLength;
+ }
+
+ protected LengthInputStream getMetadataIn() {
+ return metadataIn;
+ }
+
+ protected DataInputStream getChecksumIn() {
+ return checksumIn;
+ }
+
+ protected long getVisibleLength() {
+ return visibleLength;
+ }
+
+ protected boolean isPartialBlk() {
+ return partialBlk;
+ }
+
+ protected void setOutBytes(byte[] bytes) {
+ this.outBytes = bytes;
+ }
+
+ protected byte[] getOutBytes() {
+ return outBytes;
+ }
+
+ protected int getBytesPerCRC() {
+ return bytesPerCRC;
+ }
+
+ protected DataChecksum.Type getCrcType() {
+ return crcType;
+ }
+
+ protected long getCrcPerBlock() {
+ return crcPerBlock;
+ }
+
+ protected int getChecksumSize() {
+ return checksumSize;
+ }
+
+ protected BlockMetadataHeader getHeader() {
+ return header;
+ }
+
+ protected DataChecksum getChecksum() {
+ return checksum;
+ }
+
+ /**
+ * Perform the block checksum computing.
+ * @throws IOException
+ */
+ abstract void compute() throws IOException;
+
+ /**
+ * Read block metadata header.
+ * @throws IOException
+ */
+ protected void readHeader() throws IOException {
+ //read metadata file
+ header = BlockMetadataHeader.readHeader(checksumIn);
+ checksum = header.getChecksum();
+ checksumSize = checksum.getChecksumSize();
+ bytesPerCRC = checksum.getBytesPerChecksum();
+ crcPerBlock = checksumSize <= 0 ? 0 :
+ (metadataIn.getLength() -
+ BlockMetadataHeader.getHeaderSize()) / checksumSize;
+ crcType = checksum.getChecksumType();
+ }
+
+ /**
+ * Calculate partial block checksum.
+ * @return
+ * @throws IOException
+ */
+ protected byte[] crcPartialBlock() throws IOException {
+ int partialLength = (int) (requestLength % bytesPerCRC);
+ if (partialLength > 0) {
+ byte[] buf = new byte[partialLength];
+ final InputStream blockIn = datanode.data.getBlockInputStream(block,
+ requestLength - partialLength);
+ try {
+ // Get the CRC of the partialLength.
+ IOUtils.readFully(blockIn, buf, 0, partialLength);
+ } finally {
+ IOUtils.closeStream(blockIn);
+ }
+ checksum.update(buf, 0, partialLength);
+ byte[] partialCrc = new byte[checksumSize];
+ checksum.writeValue(partialCrc, 0, true);
+ return partialCrc;
+ }
+
+ return null;
+ }
+ }
+
+ /**
+ * Replicated block checksum computer.
+ */
+ static class ReplicatedBlockChecksumComputer extends BlockChecksumComputer {
+
+ ReplicatedBlockChecksumComputer(DataNode datanode,
+ ExtendedBlock block) throws IOException {
+ super(datanode, block);
+ }
+
+ @Override
+ void compute() throws IOException {
+ try {
+ readHeader();
+
+ MD5Hash md5out;
+ if (isPartialBlk() && getCrcPerBlock() > 0) {
+ md5out = checksumPartialBlock();
+ } else {
+ md5out = checksumWholeBlock();
+ }
+ setOutBytes(md5out.getDigest());
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("block=" + getBlock() + ", bytesPerCRC=" + getBytesPerCRC()
+ + ", crcPerBlock=" + getCrcPerBlock() + ", md5out=" + md5out);
+ }
+ } finally {
+ IOUtils.closeStream(getChecksumIn());
+ IOUtils.closeStream(getMetadataIn());
+ }
+ }
+
+ private MD5Hash checksumWholeBlock() throws IOException {
+ MD5Hash md5out = MD5Hash.digest(getChecksumIn());
+ return md5out;
+ }
+
+ private MD5Hash checksumPartialBlock() throws IOException {
+ byte[] buffer = new byte[4*1024];
+ MessageDigest digester = MD5Hash.getDigester();
+
+ long remaining = (getRequestLength() / getBytesPerCRC())
+ * getChecksumSize();
+ for (int toDigest = 0; remaining > 0; remaining -= toDigest) {
+ toDigest = getChecksumIn().read(buffer, 0,
+ (int) Math.min(remaining, buffer.length));
+ if (toDigest < 0) {
+ break;
+ }
+ digester.update(buffer, 0, toDigest);
+ }
+
+ byte[] partialCrc = crcPartialBlock();
+ if (partialCrc != null) {
+ digester.update(partialCrc);
+ }
+
+ return new MD5Hash(digester.digest());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/307ec80a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
index ee079cb..773a64c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
@@ -47,7 +47,6 @@ import org.apache.hadoop.io.ReadaheadPool.ReadaheadRequest;
import org.apache.hadoop.io.nativeio.NativeIO;
import org.apache.hadoop.net.SocketOutputStream;
import org.apache.hadoop.util.DataChecksum;
-import org.apache.htrace.core.Sampler;
import org.apache.htrace.core.TraceScope;
import static org.apache.hadoop.io.nativeio.NativeIO.POSIX.POSIX_FADV_DONTNEED;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/307ec80a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
index 470d3ca..3e2a25d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
@@ -3170,6 +3170,16 @@ public class DataNode extends ReconfigurableBase
return ecWorker;
}
+ IOStreamPair connectToDN(DatanodeInfo datanodeID, int timeout,
+ ExtendedBlock block,
+ Token<BlockTokenIdentifier> blockToken)
+ throws IOException {
+
+ return DFSUtilClient.connectToDN(datanodeID, timeout, conf, saslClient,
+ NetUtils.getDefaultSocketFactory(getConf()), false,
+ getDataEncryptionKeyFactoryForBlock(block), blockToken);
+ }
+
/**
* Get timeout value of each OOB type from configuration
*/
http://git-wip-us.apache.org/repos/asf/hadoop/blob/307ec80a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
index 0041cd8..1d4a79a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
@@ -17,37 +17,9 @@
*/
package org.apache.hadoop.hdfs.server.datanode;
-import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status.ERROR;
-import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status.ERROR_ACCESS_TOKEN;
-import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status.ERROR_INVALID;
-import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status.ERROR_UNSUPPORTED;
-import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status.SUCCESS;
-import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitFdResponse.USE_RECEIPT_VERIFICATION;
-import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitFdResponse.DO_NOT_USE_RECEIPT_VERIFICATION;
-import static org.apache.hadoop.hdfs.server.datanode.DataNode.DN_CLIENTTRACE_FORMAT;
-import static org.apache.hadoop.util.Time.monotonicNow;
-
-import java.io.BufferedInputStream;
-import java.io.BufferedOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.EOFException;
-import java.io.FileDescriptor;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InterruptedIOException;
-import java.io.OutputStream;
-import java.net.InetSocketAddress;
-import java.net.Socket;
-import java.net.SocketException;
-import java.net.SocketTimeoutException;
-import java.nio.channels.ClosedChannelException;
-import java.security.MessageDigest;
-import java.util.Arrays;
-import java.util.concurrent.TimeUnit;
-
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.protobuf.ByteString;
import org.apache.commons.logging.Log;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.DFSUtilClient;
@@ -73,26 +45,52 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitShmR
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.server.datanode.BlockChecksumHelper.BlockChecksumComputer;
+import org.apache.hadoop.hdfs.server.datanode.BlockChecksumHelper.ReplicatedBlockChecksumComputer;
import org.apache.hadoop.hdfs.server.datanode.DataNode.ShortCircuitFdsUnsupportedException;
import org.apache.hadoop.hdfs.server.datanode.DataNode.ShortCircuitFdsVersionException;
import org.apache.hadoop.hdfs.server.datanode.ShortCircuitRegistry.NewShmInfo;
-import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.SlotId;
import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.io.MD5Hash;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.net.unix.DomainSocket;
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.StopWatch;
-
-import com.google.common.base.Preconditions;
-import com.google.protobuf.ByteString;
import org.apache.hadoop.util.Time;
import org.slf4j.Logger;
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.EOFException;
+import java.io.FileDescriptor;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InterruptedIOException;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.SocketException;
+import java.net.SocketTimeoutException;
+import java.nio.channels.ClosedChannelException;
+import java.util.Arrays;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitFdResponse.DO_NOT_USE_RECEIPT_VERIFICATION;
+import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitFdResponse.USE_RECEIPT_VERIFICATION;
+import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status.ERROR;
+import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status.ERROR_ACCESS_TOKEN;
+import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status.ERROR_INVALID;
+import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status.ERROR_UNSUPPORTED;
+import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status.SUCCESS;
+import static org.apache.hadoop.hdfs.server.datanode.DataNode.DN_CLIENTTRACE_FORMAT;
+import static org.apache.hadoop.util.Time.monotonicNow;
+
/**
* Thread for processing incoming/outgoing data stream.
@@ -886,90 +884,32 @@ class DataXceiver extends Receiver implements Runnable {
}
}
- private MD5Hash calcPartialBlockChecksum(ExtendedBlock block,
- long requestLength, DataChecksum checksum, DataInputStream checksumIn)
- throws IOException {
- final int bytesPerCRC = checksum.getBytesPerChecksum();
- final int csize = checksum.getChecksumSize();
- final byte[] buffer = new byte[4*1024];
- MessageDigest digester = MD5Hash.getDigester();
-
- long remaining = requestLength / bytesPerCRC * csize;
- for (int toDigest = 0; remaining > 0; remaining -= toDigest) {
- toDigest = checksumIn.read(buffer, 0,
- (int) Math.min(remaining, buffer.length));
- if (toDigest < 0) {
- break;
- }
- digester.update(buffer, 0, toDigest);
- }
-
- int partialLength = (int) (requestLength % bytesPerCRC);
- if (partialLength > 0) {
- byte[] buf = new byte[partialLength];
- final InputStream blockIn = datanode.data.getBlockInputStream(block,
- requestLength - partialLength);
- try {
- // Get the CRC of the partialLength.
- IOUtils.readFully(blockIn, buf, 0, partialLength);
- } finally {
- IOUtils.closeStream(blockIn);
- }
- checksum.update(buf, 0, partialLength);
- byte[] partialCrc = new byte[csize];
- checksum.writeValue(partialCrc, 0, true);
- digester.update(partialCrc);
- }
- return new MD5Hash(digester.digest());
- }
-
@Override
- public void blockChecksum(final ExtendedBlock block,
- final Token<BlockTokenIdentifier> blockToken) throws IOException {
+ public void blockChecksum(ExtendedBlock block,
+ Token<BlockTokenIdentifier> blockToken)
+ throws IOException {
updateCurrentThreadName("Getting checksum for block " + block);
final DataOutputStream out = new DataOutputStream(
getOutputStream());
checkAccess(out, true, block, blockToken,
Op.BLOCK_CHECKSUM, BlockTokenIdentifier.AccessMode.READ);
- // client side now can specify a range of the block for checksum
- long requestLength = block.getNumBytes();
- Preconditions.checkArgument(requestLength >= 0);
- long visibleLength = datanode.data.getReplicaVisibleLength(block);
- boolean partialBlk = requestLength < visibleLength;
-
- final LengthInputStream metadataIn = datanode.data
- .getMetaDataInputStream(block);
-
- final DataInputStream checksumIn = new DataInputStream(
- new BufferedInputStream(metadataIn, ioFileBufferSize));
+
+ BlockChecksumComputer maker =
+ new ReplicatedBlockChecksumComputer(datanode, block);
+
try {
- //read metadata file
- final BlockMetadataHeader header = BlockMetadataHeader
- .readHeader(checksumIn);
- final DataChecksum checksum = header.getChecksum();
- final int csize = checksum.getChecksumSize();
- final int bytesPerCRC = checksum.getBytesPerChecksum();
- final long crcPerBlock = csize <= 0 ? 0 :
- (metadataIn.getLength() - BlockMetadataHeader.getHeaderSize()) / csize;
-
- final MD5Hash md5 = partialBlk && crcPerBlock > 0 ?
- calcPartialBlockChecksum(block, requestLength, checksum, checksumIn)
- : MD5Hash.digest(checksumIn);
- if (LOG.isDebugEnabled()) {
- LOG.debug("block=" + block + ", bytesPerCRC=" + bytesPerCRC
- + ", crcPerBlock=" + crcPerBlock + ", md5=" + md5);
- }
+ maker.compute();
//write reply
BlockOpResponseProto.newBuilder()
- .setStatus(SUCCESS)
- .setChecksumResponse(OpBlockChecksumResponseProto.newBuilder()
- .setBytesPerCrc(bytesPerCRC)
- .setCrcPerBlock(crcPerBlock)
- .setMd5(ByteString.copyFrom(md5.getDigest()))
- .setCrcType(PBHelperClient.convert(checksum.getChecksumType())))
- .build()
- .writeDelimitedTo(out);
+ .setStatus(SUCCESS)
+ .setChecksumResponse(OpBlockChecksumResponseProto.newBuilder()
+ .setBytesPerCrc(maker.getBytesPerCRC())
+ .setCrcPerBlock(maker.getCrcPerBlock())
+ .setMd5(ByteString.copyFrom(maker.getOutBytes()))
+ .setCrcType(PBHelperClient.convert(maker.getCrcType())))
+ .build()
+ .writeDelimitedTo(out);
out.flush();
} catch (IOException ioe) {
LOG.info("blockChecksum " + block + " received exception " + ioe);
@@ -977,8 +917,6 @@ class DataXceiver extends Receiver implements Runnable {
throw ioe;
} finally {
IOUtils.closeStream(out);
- IOUtils.closeStream(checksumIn);
- IOUtils.closeStream(metadataIn);
}
//update metrics
@@ -1276,7 +1214,7 @@ class DataXceiver extends Receiver implements Runnable {
/**
* Wait until the BP is registered, upto the configured amount of time.
* Throws an exception if times out, which should fail the client request.
- * @param the requested block
+ * @param block requested block
*/
void checkAndWaitForBP(final ExtendedBlock block)
throws IOException {