You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by ji...@apache.org on 2014/05/22 20:50:21 UTC
svn commit: r1596937 - in
/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src:
main/java/org/apache/hadoop/fs/ main/java/org/apache/hadoop/hdfs/
main/java/org/apache/hadoop/hdfs/server/datanode/
main/java/org/apache/hadoop/hdfs/server/...
Author: jing9
Date: Thu May 22 18:50:20 2014
New Revision: 1596937
URL: http://svn.apache.org/r1596937
Log:
MAPREDUCE-5899. Merge change r1596931 from trunk.
Added:
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestGetFileChecksum.java
- copied unchanged from r1596931, hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestGetFileChecksum.java
Modified:
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/Hdfs.java
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileChecksumServlets.java
Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/Hdfs.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/Hdfs.java?rev=1596937&r1=1596936&r2=1596937&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/Hdfs.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/Hdfs.java Thu May 22 18:50:20 2014
@@ -115,7 +115,7 @@ public class Hdfs extends AbstractFileSy
@Override
public FileChecksum getFileChecksum(Path f)
throws IOException, UnresolvedLinkException {
- return dfs.getFileChecksum(getUriPath(f));
+ return dfs.getFileChecksum(getUriPath(f), Long.MAX_VALUE);
}
@Override
Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java?rev=1596937&r1=1596936&r2=1596937&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java Thu May 22 18:50:20 2014
@@ -1817,15 +1817,19 @@ public class DFSClient implements java.i
}
/**
- * Get the checksum of a file.
+ * Get the checksum of the whole file of a range of the file. Note that the
+ * range always starts from the beginning of the file.
* @param src The file path
+ * @param length The length of the range
* @return The checksum
* @see DistributedFileSystem#getFileChecksum(Path)
*/
- public MD5MD5CRC32FileChecksum getFileChecksum(String src) throws IOException {
+ public MD5MD5CRC32FileChecksum getFileChecksum(String src, long length)
+ throws IOException {
checkOpen();
- return getFileChecksum(src, clientName, namenode, socketFactory,
- dfsClientConf.socketTimeout, getDataEncryptionKey(),
+ Preconditions.checkArgument(length >= 0);
+ return getFileChecksum(src, length, clientName, namenode,
+ socketFactory, dfsClientConf.socketTimeout, getDataEncryptionKey(),
dfsClientConf.connectToDnViaHostname);
}
@@ -1866,8 +1870,9 @@ public class DFSClient implements java.i
}
/**
- * Get the checksum of a file.
+ * Get the checksum of the whole file or a range of the file.
* @param src The file path
+ * @param length the length of the range, i.e., the range is [0, length]
* @param clientName the name of the client requesting the checksum.
* @param namenode the RPC proxy for the namenode
* @param socketFactory to create sockets to connect to DNs
@@ -1877,12 +1882,13 @@ public class DFSClient implements java.i
* @return The checksum
*/
private static MD5MD5CRC32FileChecksum getFileChecksum(String src,
- String clientName,
- ClientProtocol namenode, SocketFactory socketFactory, int socketTimeout,
+ long length, String clientName, ClientProtocol namenode,
+ SocketFactory socketFactory, int socketTimeout,
DataEncryptionKey encryptionKey, boolean connectToDnViaHostname)
throws IOException {
- //get all block locations
- LocatedBlocks blockLocations = callGetBlockLocations(namenode, src, 0, Long.MAX_VALUE);
+ //get block locations for the file range
+ LocatedBlocks blockLocations = callGetBlockLocations(namenode, src, 0,
+ length);
if (null == blockLocations) {
throw new FileNotFoundException("File does not exist: " + src);
}
@@ -1894,10 +1900,11 @@ public class DFSClient implements java.i
boolean refetchBlocks = false;
int lastRetriedIndex = -1;
- //get block checksum for each block
- for(int i = 0; i < locatedblocks.size(); i++) {
+ // get block checksum for each block
+ long remaining = length;
+ for(int i = 0; i < locatedblocks.size() && remaining > 0; i++) {
if (refetchBlocks) { // refetch to get fresh tokens
- blockLocations = callGetBlockLocations(namenode, src, 0, Long.MAX_VALUE);
+ blockLocations = callGetBlockLocations(namenode, src, 0, length);
if (null == blockLocations) {
throw new FileNotFoundException("File does not exist: " + src);
}
@@ -1906,6 +1913,10 @@ public class DFSClient implements java.i
}
LocatedBlock lb = locatedblocks.get(i);
final ExtendedBlock block = lb.getBlock();
+ if (remaining < block.getNumBytes()) {
+ block.setNumBytes(remaining);
+ }
+ remaining -= block.getNumBytes();
final DatanodeInfo[] datanodes = lb.getLocations();
//try each datanode location of the block
Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java?rev=1596937&r1=1596936&r2=1596937&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java Thu May 22 18:50:20 2014
@@ -66,14 +66,12 @@ import org.apache.hadoop.hdfs.protocol.C
import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.DirectoryListing;
-import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.RollingUpgradeAction;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus;
-import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo;
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
@@ -83,7 +81,6 @@ import org.apache.hadoop.hdfs.server.nam
import org.apache.hadoop.io.Text;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.AccessControlException;
-import org.apache.hadoop.security.token.SecretManager.InvalidToken;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.Progressable;
@@ -1188,7 +1185,7 @@ public class DistributedFileSystem exten
@Override
public FileChecksum doCall(final Path p)
throws IOException, UnresolvedLinkException {
- return dfs.getFileChecksum(getPathName(p));
+ return dfs.getFileChecksum(getPathName(p), Long.MAX_VALUE);
}
@Override
@@ -1200,6 +1197,32 @@ public class DistributedFileSystem exten
}
@Override
+ public FileChecksum getFileChecksum(Path f, final long length)
+ throws IOException {
+ statistics.incrementReadOps(1);
+ Path absF = fixRelativePart(f);
+ return new FileSystemLinkResolver<FileChecksum>() {
+ @Override
+ public FileChecksum doCall(final Path p)
+ throws IOException, UnresolvedLinkException {
+ return dfs.getFileChecksum(getPathName(p), length);
+ }
+
+ @Override
+ public FileChecksum next(final FileSystem fs, final Path p)
+ throws IOException {
+ if (fs instanceof DistributedFileSystem) {
+ return ((DistributedFileSystem) fs).getFileChecksum(p, length);
+ } else {
+ throw new UnsupportedFileSystemException(
+ "getFileChecksum(Path, long) is not supported by "
+ + fs.getClass().getSimpleName());
+ }
+ }
+ }.resolve(this, absF);
+ }
+
+ @Override
public void setPermission(Path p, final FsPermission permission
) throws IOException {
statistics.incrementWriteOps(1);
Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java?rev=1596937&r1=1596936&r2=1596937&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java Thu May 22 18:50:20 2014
@@ -42,6 +42,7 @@ import java.net.Socket;
import java.net.SocketException;
import java.net.UnknownHostException;
import java.nio.channels.ClosedChannelException;
+import java.security.MessageDigest;
import java.util.Arrays;
import org.apache.commons.logging.Log;
@@ -83,6 +84,7 @@ import org.apache.hadoop.security.token.
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.DataChecksum;
+import com.google.common.base.Preconditions;
import com.google.common.net.InetAddresses;
import com.google.protobuf.ByteString;
@@ -802,7 +804,44 @@ class DataXceiver extends Receiver imple
IOUtils.closeStream(out);
}
}
-
+
+ private MD5Hash calcPartialBlockChecksum(ExtendedBlock block,
+ long requestLength, DataChecksum checksum, DataInputStream checksumIn)
+ throws IOException {
+ final int bytesPerCRC = checksum.getBytesPerChecksum();
+ final int csize = checksum.getChecksumSize();
+ final byte[] buffer = new byte[4*1024];
+ MessageDigest digester = MD5Hash.getDigester();
+
+ long remaining = requestLength / bytesPerCRC * csize;
+ for (int toDigest = 0; remaining > 0; remaining -= toDigest) {
+ toDigest = checksumIn.read(buffer, 0,
+ (int) Math.min(remaining, buffer.length));
+ if (toDigest < 0) {
+ break;
+ }
+ digester.update(buffer, 0, toDigest);
+ }
+
+ int partialLength = (int) (requestLength % bytesPerCRC);
+ if (partialLength > 0) {
+ byte[] buf = new byte[partialLength];
+ final InputStream blockIn = datanode.data.getBlockInputStream(block,
+ requestLength - partialLength);
+ try {
+ // Get the CRC of the partialLength.
+ IOUtils.readFully(blockIn, buf, 0, partialLength);
+ } finally {
+ IOUtils.closeStream(blockIn);
+ }
+ checksum.update(buf, 0, partialLength);
+ byte[] partialCrc = new byte[csize];
+ checksum.writeValue(partialCrc, 0, true);
+ digester.update(partialCrc);
+ }
+ return new MD5Hash(digester.digest());
+ }
+
@Override
public void blockChecksum(final ExtendedBlock block,
final Token<BlockTokenIdentifier> blockToken) throws IOException {
@@ -810,25 +849,32 @@ class DataXceiver extends Receiver imple
getOutputStream());
checkAccess(out, true, block, blockToken,
Op.BLOCK_CHECKSUM, BlockTokenSecretManager.AccessMode.READ);
- updateCurrentThreadName("Reading metadata for block " + block);
- final LengthInputStream metadataIn =
- datanode.data.getMetaDataInputStream(block);
- final DataInputStream checksumIn = new DataInputStream(new BufferedInputStream(
- metadataIn, HdfsConstants.IO_FILE_BUFFER_SIZE));
+ // client side now can specify a range of the block for checksum
+ long requestLength = block.getNumBytes();
+ Preconditions.checkArgument(requestLength >= 0);
+ long visibleLength = datanode.data.getReplicaVisibleLength(block);
+ boolean partialBlk = requestLength < visibleLength;
+ updateCurrentThreadName("Reading metadata for block " + block);
+ final LengthInputStream metadataIn = datanode.data
+ .getMetaDataInputStream(block);
+
+ final DataInputStream checksumIn = new DataInputStream(
+ new BufferedInputStream(metadataIn, HdfsConstants.IO_FILE_BUFFER_SIZE));
updateCurrentThreadName("Getting checksum for block " + block);
try {
//read metadata file
- final BlockMetadataHeader header = BlockMetadataHeader.readHeader(checksumIn);
- final DataChecksum checksum = header.getChecksum();
+ final BlockMetadataHeader header = BlockMetadataHeader
+ .readHeader(checksumIn);
+ final DataChecksum checksum = header.getChecksum();
+ final int csize = checksum.getChecksumSize();
final int bytesPerCRC = checksum.getBytesPerChecksum();
- final long crcPerBlock = checksum.getChecksumSize() > 0
- ? (metadataIn.getLength() - BlockMetadataHeader.getHeaderSize())/checksum.getChecksumSize()
- : 0;
-
- //compute block checksum
- final MD5Hash md5 = MD5Hash.digest(checksumIn);
+ final long crcPerBlock = csize <= 0 ? 0 :
+ (metadataIn.getLength() - BlockMetadataHeader.getHeaderSize()) / csize;
+ final MD5Hash md5 = partialBlk && crcPerBlock > 0 ?
+ calcPartialBlockChecksum(block, requestLength, checksum, checksumIn)
+ : MD5Hash.digest(checksumIn);
if (LOG.isDebugEnabled()) {
LOG.debug("block=" + block + ", bytesPerCRC=" + bytesPerCRC
+ ", crcPerBlock=" + crcPerBlock + ", md5=" + md5);
@@ -841,8 +887,7 @@ class DataXceiver extends Receiver imple
.setBytesPerCrc(bytesPerCRC)
.setCrcPerBlock(crcPerBlock)
.setMd5(ByteString.copyFrom(md5.getDigest()))
- .setCrcType(PBHelper.convert(checksum.getChecksumType()))
- )
+ .setCrcType(PBHelper.convert(checksum.getChecksumType())))
.build()
.writeDelimitedTo(out);
out.flush();
Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java?rev=1596937&r1=1596936&r2=1596937&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java Thu May 22 18:50:20 2014
@@ -74,7 +74,6 @@ import org.apache.hadoop.hdfs.web.resour
import org.apache.hadoop.hdfs.web.resources.ReplicationParam;
import org.apache.hadoop.hdfs.web.resources.UriFsPathParam;
import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
@@ -452,7 +451,7 @@ public class DatanodeWebHdfsMethods {
MD5MD5CRC32FileChecksum checksum = null;
DFSClient dfsclient = newDfsClient(nnId, conf);
try {
- checksum = dfsclient.getFileChecksum(fullpath);
+ checksum = dfsclient.getFileChecksum(fullpath, Long.MAX_VALUE);
dfsclient.close();
dfsclient = null;
} finally {
Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileChecksumServlets.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileChecksumServlets.java?rev=1596937&r1=1596936&r2=1596937&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileChecksumServlets.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileChecksumServlets.java Thu May 22 18:50:20 2014
@@ -121,7 +121,7 @@ public class FileChecksumServlets {
try {
final DFSClient dfs = DatanodeJspHelper.getDFSClient(request,
datanode, conf, getUGI(request, conf));
- final MD5MD5CRC32FileChecksum checksum = dfs.getFileChecksum(path);
+ final MD5MD5CRC32FileChecksum checksum = dfs.getFileChecksum(path, Long.MAX_VALUE);
MD5MD5CRC32FileChecksum.write(xml, checksum);
} catch(IOException ioe) {
writeXml(ioe, path, xml);