You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by ji...@apache.org on 2014/05/22 20:50:21 UTC

svn commit: r1596937 - in /hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src: main/java/org/apache/hadoop/fs/ main/java/org/apache/hadoop/hdfs/ main/java/org/apache/hadoop/hdfs/server/datanode/ main/java/org/apache/hadoop/hdfs/server/...

Author: jing9
Date: Thu May 22 18:50:20 2014
New Revision: 1596937

URL: http://svn.apache.org/r1596937
Log:
MAPREDUCE-5899. Merge change r1596931 from trunk.

Added:
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestGetFileChecksum.java
      - copied unchanged from r1596931, hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestGetFileChecksum.java
Modified:
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/Hdfs.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileChecksumServlets.java

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/Hdfs.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/Hdfs.java?rev=1596937&r1=1596936&r2=1596937&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/Hdfs.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/Hdfs.java Thu May 22 18:50:20 2014
@@ -115,7 +115,7 @@ public class Hdfs extends AbstractFileSy
   @Override
   public FileChecksum getFileChecksum(Path f) 
       throws IOException, UnresolvedLinkException {
-    return dfs.getFileChecksum(getUriPath(f));
+    return dfs.getFileChecksum(getUriPath(f), Long.MAX_VALUE);
   }
 
   @Override

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java?rev=1596937&r1=1596936&r2=1596937&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java Thu May 22 18:50:20 2014
@@ -1817,15 +1817,19 @@ public class DFSClient implements java.i
    }
 
   /**
-   * Get the checksum of a file.
+   * Get the checksum of the whole file of a range of the file. Note that the
+   * range always starts from the beginning of the file.
    * @param src The file path
+   * @param length The length of the range
    * @return The checksum 
    * @see DistributedFileSystem#getFileChecksum(Path)
    */
-  public MD5MD5CRC32FileChecksum getFileChecksum(String src) throws IOException {
+  public MD5MD5CRC32FileChecksum getFileChecksum(String src, long length)
+      throws IOException {
     checkOpen();
-    return getFileChecksum(src, clientName, namenode, socketFactory,
-        dfsClientConf.socketTimeout, getDataEncryptionKey(),
+    Preconditions.checkArgument(length >= 0);
+    return getFileChecksum(src, length, clientName, namenode,
+        socketFactory, dfsClientConf.socketTimeout, getDataEncryptionKey(),
         dfsClientConf.connectToDnViaHostname);
   }
   
@@ -1866,8 +1870,9 @@ public class DFSClient implements java.i
   }
 
   /**
-   * Get the checksum of a file.
+   * Get the checksum of the whole file or a range of the file.
    * @param src The file path
+   * @param length the length of the range, i.e., the range is [0, length]
    * @param clientName the name of the client requesting the checksum.
    * @param namenode the RPC proxy for the namenode
    * @param socketFactory to create sockets to connect to DNs
@@ -1877,12 +1882,13 @@ public class DFSClient implements java.i
    * @return The checksum 
    */
   private static MD5MD5CRC32FileChecksum getFileChecksum(String src,
-      String clientName,
-      ClientProtocol namenode, SocketFactory socketFactory, int socketTimeout,
+      long length, String clientName, ClientProtocol namenode,
+      SocketFactory socketFactory, int socketTimeout,
       DataEncryptionKey encryptionKey, boolean connectToDnViaHostname)
       throws IOException {
-    //get all block locations
-    LocatedBlocks blockLocations = callGetBlockLocations(namenode, src, 0, Long.MAX_VALUE);
+    //get block locations for the file range
+    LocatedBlocks blockLocations = callGetBlockLocations(namenode, src, 0,
+        length);
     if (null == blockLocations) {
       throw new FileNotFoundException("File does not exist: " + src);
     }
@@ -1894,10 +1900,11 @@ public class DFSClient implements java.i
     boolean refetchBlocks = false;
     int lastRetriedIndex = -1;
 
-    //get block checksum for each block
-    for(int i = 0; i < locatedblocks.size(); i++) {
+    // get block checksum for each block
+    long remaining = length;
+    for(int i = 0; i < locatedblocks.size() && remaining > 0; i++) {
       if (refetchBlocks) {  // refetch to get fresh tokens
-        blockLocations = callGetBlockLocations(namenode, src, 0, Long.MAX_VALUE);
+        blockLocations = callGetBlockLocations(namenode, src, 0, length);
         if (null == blockLocations) {
           throw new FileNotFoundException("File does not exist: " + src);
         }
@@ -1906,6 +1913,10 @@ public class DFSClient implements java.i
       }
       LocatedBlock lb = locatedblocks.get(i);
       final ExtendedBlock block = lb.getBlock();
+      if (remaining < block.getNumBytes()) {
+        block.setNumBytes(remaining);
+      }
+      remaining -= block.getNumBytes();
       final DatanodeInfo[] datanodes = lb.getLocations();
       
       //try each datanode location of the block

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java?rev=1596937&r1=1596936&r2=1596937&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java Thu May 22 18:50:20 2014
@@ -66,14 +66,12 @@ import org.apache.hadoop.hdfs.protocol.C
 import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.DirectoryListing;
-import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.RollingUpgradeAction;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus;
-import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo;
 import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
 import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
@@ -83,7 +81,6 @@ import org.apache.hadoop.hdfs.server.nam
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.AccessControlException;
-import org.apache.hadoop.security.token.SecretManager.InvalidToken;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.Progressable;
 
@@ -1188,7 +1185,7 @@ public class DistributedFileSystem exten
       @Override
       public FileChecksum doCall(final Path p)
           throws IOException, UnresolvedLinkException {
-        return dfs.getFileChecksum(getPathName(p));
+        return dfs.getFileChecksum(getPathName(p), Long.MAX_VALUE);
       }
 
       @Override
@@ -1200,6 +1197,32 @@ public class DistributedFileSystem exten
   }
 
   @Override
+  public FileChecksum getFileChecksum(Path f, final long length)
+      throws IOException {
+    statistics.incrementReadOps(1);
+    Path absF = fixRelativePart(f);
+    return new FileSystemLinkResolver<FileChecksum>() {
+      @Override
+      public FileChecksum doCall(final Path p)
+          throws IOException, UnresolvedLinkException {
+        return dfs.getFileChecksum(getPathName(p), length);
+      }
+
+      @Override
+      public FileChecksum next(final FileSystem fs, final Path p)
+          throws IOException {
+        if (fs instanceof DistributedFileSystem) {
+          return ((DistributedFileSystem) fs).getFileChecksum(p, length);
+        } else {
+          throw new UnsupportedFileSystemException(
+              "getFileChecksum(Path, long) is not supported by "
+                  + fs.getClass().getSimpleName()); 
+        }
+      }
+    }.resolve(this, absF);
+  }
+
+  @Override
   public void setPermission(Path p, final FsPermission permission
       ) throws IOException {
     statistics.incrementWriteOps(1);

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java?rev=1596937&r1=1596936&r2=1596937&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java Thu May 22 18:50:20 2014
@@ -42,6 +42,7 @@ import java.net.Socket;
 import java.net.SocketException;
 import java.net.UnknownHostException;
 import java.nio.channels.ClosedChannelException;
+import java.security.MessageDigest;
 import java.util.Arrays;
 
 import org.apache.commons.logging.Log;
@@ -83,6 +84,7 @@ import org.apache.hadoop.security.token.
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.DataChecksum;
 
+import com.google.common.base.Preconditions;
 import com.google.common.net.InetAddresses;
 import com.google.protobuf.ByteString;
 
@@ -802,7 +804,44 @@ class DataXceiver extends Receiver imple
       IOUtils.closeStream(out);
     }
   }
-  
+
+  private MD5Hash calcPartialBlockChecksum(ExtendedBlock block,
+      long requestLength, DataChecksum checksum, DataInputStream checksumIn)
+      throws IOException {
+    final int bytesPerCRC = checksum.getBytesPerChecksum();
+    final int csize = checksum.getChecksumSize();
+    final byte[] buffer = new byte[4*1024];
+    MessageDigest digester = MD5Hash.getDigester();
+
+    long remaining = requestLength / bytesPerCRC * csize;
+    for (int toDigest = 0; remaining > 0; remaining -= toDigest) {
+      toDigest = checksumIn.read(buffer, 0,
+          (int) Math.min(remaining, buffer.length));
+      if (toDigest < 0) {
+        break;
+      }
+      digester.update(buffer, 0, toDigest);
+    }
+    
+    int partialLength = (int) (requestLength % bytesPerCRC);
+    if (partialLength > 0) {
+      byte[] buf = new byte[partialLength];
+      final InputStream blockIn = datanode.data.getBlockInputStream(block,
+          requestLength - partialLength);
+      try {
+        // Get the CRC of the partialLength.
+        IOUtils.readFully(blockIn, buf, 0, partialLength);
+      } finally {
+        IOUtils.closeStream(blockIn);
+      }
+      checksum.update(buf, 0, partialLength);
+      byte[] partialCrc = new byte[csize];
+      checksum.writeValue(partialCrc, 0, true);
+      digester.update(partialCrc);
+    }
+    return new MD5Hash(digester.digest());
+  }
+
   @Override
   public void blockChecksum(final ExtendedBlock block,
       final Token<BlockTokenIdentifier> blockToken) throws IOException {
@@ -810,25 +849,32 @@ class DataXceiver extends Receiver imple
         getOutputStream());
     checkAccess(out, true, block, blockToken,
         Op.BLOCK_CHECKSUM, BlockTokenSecretManager.AccessMode.READ);
-    updateCurrentThreadName("Reading metadata for block " + block);
-    final LengthInputStream metadataIn = 
-      datanode.data.getMetaDataInputStream(block);
-    final DataInputStream checksumIn = new DataInputStream(new BufferedInputStream(
-        metadataIn, HdfsConstants.IO_FILE_BUFFER_SIZE));
+    // client side now can specify a range of the block for checksum
+    long requestLength = block.getNumBytes();
+    Preconditions.checkArgument(requestLength >= 0);
+    long visibleLength = datanode.data.getReplicaVisibleLength(block);
+    boolean partialBlk = requestLength < visibleLength;
 
+    updateCurrentThreadName("Reading metadata for block " + block);
+    final LengthInputStream metadataIn = datanode.data
+        .getMetaDataInputStream(block);
+    
+    final DataInputStream checksumIn = new DataInputStream(
+        new BufferedInputStream(metadataIn, HdfsConstants.IO_FILE_BUFFER_SIZE));
     updateCurrentThreadName("Getting checksum for block " + block);
     try {
       //read metadata file
-      final BlockMetadataHeader header = BlockMetadataHeader.readHeader(checksumIn);
-      final DataChecksum checksum = header.getChecksum(); 
+      final BlockMetadataHeader header = BlockMetadataHeader
+          .readHeader(checksumIn);
+      final DataChecksum checksum = header.getChecksum();
+      final int csize = checksum.getChecksumSize();
       final int bytesPerCRC = checksum.getBytesPerChecksum();
-      final long crcPerBlock = checksum.getChecksumSize() > 0 
-              ? (metadataIn.getLength() - BlockMetadataHeader.getHeaderSize())/checksum.getChecksumSize()
-              : 0;
-      
-      //compute block checksum
-      final MD5Hash md5 = MD5Hash.digest(checksumIn);
+      final long crcPerBlock = csize <= 0 ? 0 : 
+        (metadataIn.getLength() - BlockMetadataHeader.getHeaderSize()) / csize;
 
+      final MD5Hash md5 = partialBlk && crcPerBlock > 0 ? 
+          calcPartialBlockChecksum(block, requestLength, checksum, checksumIn)
+            : MD5Hash.digest(checksumIn);
       if (LOG.isDebugEnabled()) {
         LOG.debug("block=" + block + ", bytesPerCRC=" + bytesPerCRC
             + ", crcPerBlock=" + crcPerBlock + ", md5=" + md5);
@@ -841,8 +887,7 @@ class DataXceiver extends Receiver imple
           .setBytesPerCrc(bytesPerCRC)
           .setCrcPerBlock(crcPerBlock)
           .setMd5(ByteString.copyFrom(md5.getDigest()))
-          .setCrcType(PBHelper.convert(checksum.getChecksumType()))
-          )
+          .setCrcType(PBHelper.convert(checksum.getChecksumType())))
         .build()
         .writeDelimitedTo(out);
       out.flush();

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java?rev=1596937&r1=1596936&r2=1596937&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java Thu May 22 18:50:20 2014
@@ -74,7 +74,6 @@ import org.apache.hadoop.hdfs.web.resour
 import org.apache.hadoop.hdfs.web.resources.ReplicationParam;
 import org.apache.hadoop.hdfs.web.resources.UriFsPathParam;
 import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.io.Text;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
@@ -452,7 +451,7 @@ public class DatanodeWebHdfsMethods {
       MD5MD5CRC32FileChecksum checksum = null;
       DFSClient dfsclient = newDfsClient(nnId, conf);
       try {
-        checksum = dfsclient.getFileChecksum(fullpath);
+        checksum = dfsclient.getFileChecksum(fullpath, Long.MAX_VALUE);
         dfsclient.close();
         dfsclient = null;
       } finally {

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileChecksumServlets.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileChecksumServlets.java?rev=1596937&r1=1596936&r2=1596937&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileChecksumServlets.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileChecksumServlets.java Thu May 22 18:50:20 2014
@@ -121,7 +121,7 @@ public class FileChecksumServlets {
       try {
         final DFSClient dfs = DatanodeJspHelper.getDFSClient(request, 
             datanode, conf, getUGI(request, conf));
-        final MD5MD5CRC32FileChecksum checksum = dfs.getFileChecksum(path);
+        final MD5MD5CRC32FileChecksum checksum = dfs.getFileChecksum(path, Long.MAX_VALUE);
         MD5MD5CRC32FileChecksum.write(xml, checksum);
       } catch(IOException ioe) {
         writeXml(ioe, path, xml);