You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by to...@apache.org on 2013/01/22 03:59:26 UTC

svn commit: r1436730 - in /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs: ./ src/main/java/org/apache/hadoop/hdfs/ src/main/java/org/apache/hadoop/hdfs/server/namenode/ src/main/proto/

Author: todd
Date: Tue Jan 22 02:59:25 2013
New Revision: 1436730

URL: http://svn.apache.org/viewvc?rev=1436730&view=rev
Log:
HDFS-4403. DFSClient can infer checksum type when not provided by reading first byte. Contributed by Todd Lipcon.

Modified:
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileChecksumServlets.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1436730&r1=1436729&r2=1436730&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Tue Jan 22 02:59:25 2013
@@ -494,6 +494,9 @@ Release 2.0.3-alpha - Unreleased 
     HDFS-4393. Make empty request and responses in protocol translators can be
     static final members. (Brandon Li via suresh)
 
+    HDFS-4403. DFSClient can infer checksum type when not provided by reading
+    first byte (todd)
+
   OPTIMIZATIONS
 
     HDFS-3429. DataNode reads checksums even if client does not need them (todd)

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java?rev=1436730&r1=1436729&r2=1436730&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java Tue Jan 22 02:59:25 2013
@@ -150,6 +150,7 @@ import org.apache.hadoop.security.token.
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenRenewer;
 import org.apache.hadoop.util.DataChecksum;
+import org.apache.hadoop.util.DataChecksum.Type;
 import org.apache.hadoop.util.Progressable;
 import org.apache.hadoop.util.Time;
 
@@ -1562,7 +1563,7 @@ public class DFSClient implements java.i
    */
   public MD5MD5CRC32FileChecksum getFileChecksum(String src) throws IOException {
     checkOpen();
-    return getFileChecksum(src, namenode, socketFactory,
+    return getFileChecksum(src, clientName, namenode, socketFactory,
         dfsClientConf.socketTimeout, getDataEncryptionKey(),
         dfsClientConf.connectToDnViaHostname);
   }
@@ -1605,9 +1606,16 @@ public class DFSClient implements java.i
   /**
    * Get the checksum of a file.
    * @param src The file path
+   * @param clientName the name of the client requesting the checksum.
+   * @param namenode the RPC proxy for the namenode
+   * @param socketFactory to create sockets to connect to DNs
+   * @param socketTimeout timeout to use when connecting and waiting for a response
+   * @param encryptionKey the key needed to communicate with DNs in this cluster
+   * @param connectToDnViaHostname {@see #connectToDnViaHostname()}
    * @return The checksum 
    */
-  public static MD5MD5CRC32FileChecksum getFileChecksum(String src,
+  static MD5MD5CRC32FileChecksum getFileChecksum(String src,
+      String clientName,
       ClientProtocol namenode, SocketFactory socketFactory, int socketTimeout,
       DataEncryptionKey encryptionKey, boolean connectToDnViaHostname)
       throws IOException {
@@ -1642,32 +1650,16 @@ public class DFSClient implements java.i
       final int timeout = 3000 * datanodes.length + socketTimeout;
       boolean done = false;
       for(int j = 0; !done && j < datanodes.length; j++) {
-        Socket sock = null;
         DataOutputStream out = null;
         DataInputStream in = null;
         
         try {
           //connect to a datanode
-          sock = socketFactory.createSocket();
-          String dnAddr = datanodes[j].getXferAddr(connectToDnViaHostname);
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("Connecting to datanode " + dnAddr);
-          }
-          NetUtils.connect(sock, NetUtils.createSocketAddr(dnAddr), timeout);
-          sock.setSoTimeout(timeout);
-
-          OutputStream unbufOut = NetUtils.getOutputStream(sock);
-          InputStream unbufIn = NetUtils.getInputStream(sock);
-          if (encryptionKey != null) {
-            IOStreamPair encryptedStreams =
-                DataTransferEncryptor.getEncryptedStreams(
-                    unbufOut, unbufIn, encryptionKey);
-            unbufOut = encryptedStreams.out;
-            unbufIn = encryptedStreams.in;
-          }
-          out = new DataOutputStream(new BufferedOutputStream(unbufOut,
+          IOStreamPair pair = connectToDN(socketFactory, connectToDnViaHostname,
+              encryptionKey, datanodes[j], timeout);
+          out = new DataOutputStream(new BufferedOutputStream(pair.out,
               HdfsConstants.SMALL_BUFFER_SIZE));
-          in = new DataInputStream(unbufIn);
+          in = new DataInputStream(pair.in);
 
           if (LOG.isDebugEnabled()) {
             LOG.debug("write to " + datanodes[j] + ": "
@@ -1680,19 +1672,8 @@ public class DFSClient implements java.i
             BlockOpResponseProto.parseFrom(PBHelper.vintPrefixed(in));
 
           if (reply.getStatus() != Status.SUCCESS) {
-            if (reply.getStatus() == Status.ERROR_ACCESS_TOKEN
-                && i > lastRetriedIndex) {
-              if (LOG.isDebugEnabled()) {
-                LOG.debug("Got access token error in response to OP_BLOCK_CHECKSUM "
-                    + "for file " + src + " for block " + block
-                    + " from datanode " + datanodes[j]
-                    + ". Will retry the block once.");
-              }
-              lastRetriedIndex = i;
-              done = true; // actually it's not done; but we'll retry
-              i--; // repeat at i-th block
-              refetchBlocks = true;
-              break;
+            if (reply.getStatus() == Status.ERROR_ACCESS_TOKEN) {
+              throw new InvalidBlockTokenException();
             } else {
               throw new IOException("Bad response " + reply + " for block "
                   + block + " from datanode " + datanodes[j]);
@@ -1724,8 +1705,18 @@ public class DFSClient implements java.i
           md5.write(md5out);
           
           // read crc-type
-          final DataChecksum.Type ct = PBHelper.convert(checksumData
-              .getCrcType());
+          final DataChecksum.Type ct;
+          if (checksumData.hasCrcType()) {
+            ct = PBHelper.convert(checksumData
+                .getCrcType());
+          } else {
+            LOG.debug("Retrieving checksum from an earlier-version DataNode: " +
+                      "inferring checksum by reading first byte");
+            ct = inferChecksumTypeByReading(
+                clientName, socketFactory, socketTimeout, lb, datanodes[j],
+                encryptionKey, connectToDnViaHostname);
+          }
+
           if (i == 0) { // first block
             crcType = ct;
           } else if (crcType != DataChecksum.Type.MIXED
@@ -1743,12 +1734,25 @@ public class DFSClient implements java.i
             }
             LOG.debug("got reply from " + datanodes[j] + ": md5=" + md5);
           }
+        } catch (InvalidBlockTokenException ibte) {
+          if (i > lastRetriedIndex) {
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("Got access token error in response to OP_BLOCK_CHECKSUM "
+                  + "for file " + src + " for block " + block
+                  + " from datanode " + datanodes[j]
+                  + ". Will retry the block once.");
+            }
+            lastRetriedIndex = i;
+            done = true; // actually it's not done; but we'll retry
+            i--; // repeat at i-th block
+            refetchBlocks = true;
+            break;
+          }
         } catch (IOException ie) {
           LOG.warn("src=" + src + ", datanodes["+j+"]=" + datanodes[j], ie);
         } finally {
           IOUtils.closeStream(in);
           IOUtils.closeStream(out);
-          IOUtils.closeSocket(sock);        
         }
       }
 
@@ -1781,6 +1785,90 @@ public class DFSClient implements java.i
   }
 
   /**
+   * Connect to the given datanode's datantrasfer port, and return
+   * the resulting IOStreamPair. This includes encryption wrapping, etc.
+   */
+  private static IOStreamPair connectToDN(
+      SocketFactory socketFactory, boolean connectToDnViaHostname,
+      DataEncryptionKey encryptionKey, DatanodeInfo dn, int timeout)
+      throws IOException
+  {
+    boolean success = false;
+    Socket sock = null;
+    try {
+      sock = socketFactory.createSocket();
+      String dnAddr = dn.getXferAddr(connectToDnViaHostname);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Connecting to datanode " + dnAddr);
+      }
+      NetUtils.connect(sock, NetUtils.createSocketAddr(dnAddr), timeout);
+      sock.setSoTimeout(timeout);
+  
+      OutputStream unbufOut = NetUtils.getOutputStream(sock);
+      InputStream unbufIn = NetUtils.getInputStream(sock);
+      IOStreamPair ret;
+      if (encryptionKey != null) {
+        ret = DataTransferEncryptor.getEncryptedStreams(
+                unbufOut, unbufIn, encryptionKey);
+      } else {
+        ret = new IOStreamPair(unbufIn, unbufOut);        
+      }
+      success = true;
+      return ret;
+    } finally {
+      if (!success) {
+        IOUtils.closeSocket(sock);
+      }
+    }
+  }
+  
+  /**
+   * Infer the checksum type for a replica by sending an OP_READ_BLOCK
+   * for the first byte of that replica. This is used for compatibility
+   * with older HDFS versions which did not include the checksum type in
+   * OpBlockChecksumResponseProto.
+   *
+   * @param in input stream from datanode
+   * @param out output stream to datanode
+   * @param lb the located block
+   * @param clientName the name of the DFSClient requesting the checksum
+   * @param dn the connected datanode
+   * @return the inferred checksum type
+   * @throws IOException if an error occurs
+   */
+  private static Type inferChecksumTypeByReading(
+      String clientName, SocketFactory socketFactory, int socketTimeout,
+      LocatedBlock lb, DatanodeInfo dn,
+      DataEncryptionKey encryptionKey, boolean connectToDnViaHostname)
+      throws IOException {
+    IOStreamPair pair = connectToDN(socketFactory, connectToDnViaHostname,
+        encryptionKey, dn, socketTimeout);
+
+    try {
+      DataOutputStream out = new DataOutputStream(new BufferedOutputStream(pair.out,
+          HdfsConstants.SMALL_BUFFER_SIZE));
+      DataInputStream in = new DataInputStream(pair.in);
+  
+      new Sender(out).readBlock(lb.getBlock(), lb.getBlockToken(), clientName, 0, 1, true);
+      final BlockOpResponseProto reply =
+          BlockOpResponseProto.parseFrom(PBHelper.vintPrefixed(in));
+      
+      if (reply.getStatus() != Status.SUCCESS) {
+        if (reply.getStatus() == Status.ERROR_ACCESS_TOKEN) {
+          throw new InvalidBlockTokenException();
+        } else {
+          throw new IOException("Bad response " + reply + " trying to read "
+              + lb.getBlock() + " from datanode " + dn);
+        }
+      }
+      
+      return PBHelper.convert(reply.getReadOpChecksumInfo().getChecksum().getType());
+    } finally {
+      IOUtils.cleanup(null, pair.in, pair.out);
+    }
+  }
+
+  /**
    * Set permissions to a file or directory.
    * @param src path name.
    * @param permission

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileChecksumServlets.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileChecksumServlets.java?rev=1436730&r1=1436729&r2=1436730&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileChecksumServlets.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileChecksumServlets.java Tue Jan 22 02:59:25 2013
@@ -21,7 +21,6 @@ import java.io.IOException;
 import java.io.PrintWriter;
 import java.net.URL;
 
-import javax.net.SocketFactory;
 import javax.servlet.ServletContext;
 import javax.servlet.ServletException;
 import javax.servlet.http.HttpServletRequest;
@@ -33,14 +32,11 @@ import org.apache.hadoop.fs.MD5MD5CRC32F
 import org.apache.hadoop.hdfs.DFSClient;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
-import org.apache.hadoop.hdfs.protocol.ClientProtocol;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
-import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
 import org.apache.hadoop.hdfs.server.common.JspHelper;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.datanode.DatanodeJspHelper;
-import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.ServletUtil;
 import org.znerd.xmlenc.XMLOutputter;
@@ -116,18 +112,11 @@ public class FileChecksumServlets {
       final DataNode datanode = (DataNode) context.getAttribute("datanode");
       final Configuration conf = 
         new HdfsConfiguration(datanode.getConf());
-      final int socketTimeout = conf.getInt(
-          DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY,
-          HdfsServerConstants.READ_TIMEOUT);
-      final SocketFactory socketFactory = NetUtils.getSocketFactory(conf,
-          ClientProtocol.class);
       
       try {
         final DFSClient dfs = DatanodeJspHelper.getDFSClient(request, 
             datanode, conf, getUGI(request, conf));
-        final ClientProtocol nnproxy = dfs.getNamenode();
-        final MD5MD5CRC32FileChecksum checksum = DFSClient.getFileChecksum(
-            path, nnproxy, socketFactory, socketTimeout, dfs.getDataEncryptionKey(), false);
+        final MD5MD5CRC32FileChecksum checksum = dfs.getFileChecksum(path);
         MD5MD5CRC32FileChecksum.write(xml, checksum);
       } catch(IOException ioe) {
         writeXml(ioe, path, xml);

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto?rev=1436730&r1=1436729&r2=1436730&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto Tue Jan 22 02:59:25 2013
@@ -183,5 +183,5 @@ message OpBlockChecksumResponseProto {
   required uint32 bytesPerCrc = 1;
   required uint64 crcPerBlock = 2;
   required bytes md5 = 3;
-  optional ChecksumTypeProto crcType = 4 [default = CHECKSUM_CRC32];
+  optional ChecksumTypeProto crcType = 4;
 }