You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by to...@apache.org on 2013/01/22 03:59:26 UTC
svn commit: r1436730 - in
/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs: ./
src/main/java/org/apache/hadoop/hdfs/
src/main/java/org/apache/hadoop/hdfs/server/namenode/ src/main/proto/
Author: todd
Date: Tue Jan 22 02:59:25 2013
New Revision: 1436730
URL: http://svn.apache.org/viewvc?rev=1436730&view=rev
Log:
HDFS-4403. DFSClient can infer checksum type when not provided by reading first byte. Contributed by Todd Lipcon.
Modified:
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileChecksumServlets.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1436730&r1=1436729&r2=1436730&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Tue Jan 22 02:59:25 2013
@@ -494,6 +494,9 @@ Release 2.0.3-alpha - Unreleased
HDFS-4393. Make empty request and responses in protocol translators can be
static final members. (Brandon Li via suresh)
+ HDFS-4403. DFSClient can infer checksum type when not provided by reading
+ first byte (todd)
+
OPTIMIZATIONS
HDFS-3429. DataNode reads checksums even if client does not need them (todd)
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java?rev=1436730&r1=1436729&r2=1436730&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java Tue Jan 22 02:59:25 2013
@@ -150,6 +150,7 @@ import org.apache.hadoop.security.token.
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenRenewer;
import org.apache.hadoop.util.DataChecksum;
+import org.apache.hadoop.util.DataChecksum.Type;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.Time;
@@ -1562,7 +1563,7 @@ public class DFSClient implements java.i
*/
public MD5MD5CRC32FileChecksum getFileChecksum(String src) throws IOException {
checkOpen();
- return getFileChecksum(src, namenode, socketFactory,
+ return getFileChecksum(src, clientName, namenode, socketFactory,
dfsClientConf.socketTimeout, getDataEncryptionKey(),
dfsClientConf.connectToDnViaHostname);
}
@@ -1605,9 +1606,16 @@ public class DFSClient implements java.i
/**
* Get the checksum of a file.
* @param src The file path
+ * @param clientName the name of the client requesting the checksum.
+ * @param namenode the RPC proxy for the namenode
+ * @param socketFactory to create sockets to connect to DNs
+ * @param socketTimeout timeout to use when connecting and waiting for a response
+ * @param encryptionKey the key needed to communicate with DNs in this cluster
+ * @param connectToDnViaHostname {@see #connectToDnViaHostname()}
* @return The checksum
*/
- public static MD5MD5CRC32FileChecksum getFileChecksum(String src,
+ static MD5MD5CRC32FileChecksum getFileChecksum(String src,
+ String clientName,
ClientProtocol namenode, SocketFactory socketFactory, int socketTimeout,
DataEncryptionKey encryptionKey, boolean connectToDnViaHostname)
throws IOException {
@@ -1642,32 +1650,16 @@ public class DFSClient implements java.i
final int timeout = 3000 * datanodes.length + socketTimeout;
boolean done = false;
for(int j = 0; !done && j < datanodes.length; j++) {
- Socket sock = null;
DataOutputStream out = null;
DataInputStream in = null;
try {
//connect to a datanode
- sock = socketFactory.createSocket();
- String dnAddr = datanodes[j].getXferAddr(connectToDnViaHostname);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Connecting to datanode " + dnAddr);
- }
- NetUtils.connect(sock, NetUtils.createSocketAddr(dnAddr), timeout);
- sock.setSoTimeout(timeout);
-
- OutputStream unbufOut = NetUtils.getOutputStream(sock);
- InputStream unbufIn = NetUtils.getInputStream(sock);
- if (encryptionKey != null) {
- IOStreamPair encryptedStreams =
- DataTransferEncryptor.getEncryptedStreams(
- unbufOut, unbufIn, encryptionKey);
- unbufOut = encryptedStreams.out;
- unbufIn = encryptedStreams.in;
- }
- out = new DataOutputStream(new BufferedOutputStream(unbufOut,
+ IOStreamPair pair = connectToDN(socketFactory, connectToDnViaHostname,
+ encryptionKey, datanodes[j], timeout);
+ out = new DataOutputStream(new BufferedOutputStream(pair.out,
HdfsConstants.SMALL_BUFFER_SIZE));
- in = new DataInputStream(unbufIn);
+ in = new DataInputStream(pair.in);
if (LOG.isDebugEnabled()) {
LOG.debug("write to " + datanodes[j] + ": "
@@ -1680,19 +1672,8 @@ public class DFSClient implements java.i
BlockOpResponseProto.parseFrom(PBHelper.vintPrefixed(in));
if (reply.getStatus() != Status.SUCCESS) {
- if (reply.getStatus() == Status.ERROR_ACCESS_TOKEN
- && i > lastRetriedIndex) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Got access token error in response to OP_BLOCK_CHECKSUM "
- + "for file " + src + " for block " + block
- + " from datanode " + datanodes[j]
- + ". Will retry the block once.");
- }
- lastRetriedIndex = i;
- done = true; // actually it's not done; but we'll retry
- i--; // repeat at i-th block
- refetchBlocks = true;
- break;
+ if (reply.getStatus() == Status.ERROR_ACCESS_TOKEN) {
+ throw new InvalidBlockTokenException();
} else {
throw new IOException("Bad response " + reply + " for block "
+ block + " from datanode " + datanodes[j]);
@@ -1724,8 +1705,18 @@ public class DFSClient implements java.i
md5.write(md5out);
// read crc-type
- final DataChecksum.Type ct = PBHelper.convert(checksumData
- .getCrcType());
+ final DataChecksum.Type ct;
+ if (checksumData.hasCrcType()) {
+ ct = PBHelper.convert(checksumData
+ .getCrcType());
+ } else {
+ LOG.debug("Retrieving checksum from an earlier-version DataNode: " +
+ "inferring checksum by reading first byte");
+ ct = inferChecksumTypeByReading(
+ clientName, socketFactory, socketTimeout, lb, datanodes[j],
+ encryptionKey, connectToDnViaHostname);
+ }
+
if (i == 0) { // first block
crcType = ct;
} else if (crcType != DataChecksum.Type.MIXED
@@ -1743,12 +1734,25 @@ public class DFSClient implements java.i
}
LOG.debug("got reply from " + datanodes[j] + ": md5=" + md5);
}
+ } catch (InvalidBlockTokenException ibte) {
+ if (i > lastRetriedIndex) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Got access token error in response to OP_BLOCK_CHECKSUM "
+ + "for file " + src + " for block " + block
+ + " from datanode " + datanodes[j]
+ + ". Will retry the block once.");
+ }
+ lastRetriedIndex = i;
+ done = true; // actually it's not done; but we'll retry
+ i--; // repeat at i-th block
+ refetchBlocks = true;
+ break;
+ }
} catch (IOException ie) {
LOG.warn("src=" + src + ", datanodes["+j+"]=" + datanodes[j], ie);
} finally {
IOUtils.closeStream(in);
IOUtils.closeStream(out);
- IOUtils.closeSocket(sock);
}
}
@@ -1781,6 +1785,90 @@ public class DFSClient implements java.i
}
/**
+ * Connect to the given datanode's datantrasfer port, and return
+ * the resulting IOStreamPair. This includes encryption wrapping, etc.
+ */
+ private static IOStreamPair connectToDN(
+ SocketFactory socketFactory, boolean connectToDnViaHostname,
+ DataEncryptionKey encryptionKey, DatanodeInfo dn, int timeout)
+ throws IOException
+ {
+ boolean success = false;
+ Socket sock = null;
+ try {
+ sock = socketFactory.createSocket();
+ String dnAddr = dn.getXferAddr(connectToDnViaHostname);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Connecting to datanode " + dnAddr);
+ }
+ NetUtils.connect(sock, NetUtils.createSocketAddr(dnAddr), timeout);
+ sock.setSoTimeout(timeout);
+
+ OutputStream unbufOut = NetUtils.getOutputStream(sock);
+ InputStream unbufIn = NetUtils.getInputStream(sock);
+ IOStreamPair ret;
+ if (encryptionKey != null) {
+ ret = DataTransferEncryptor.getEncryptedStreams(
+ unbufOut, unbufIn, encryptionKey);
+ } else {
+ ret = new IOStreamPair(unbufIn, unbufOut);
+ }
+ success = true;
+ return ret;
+ } finally {
+ if (!success) {
+ IOUtils.closeSocket(sock);
+ }
+ }
+ }
+
+ /**
+ * Infer the checksum type for a replica by sending an OP_READ_BLOCK
+ * for the first byte of that replica. This is used for compatibility
+ * with older HDFS versions which did not include the checksum type in
+ * OpBlockChecksumResponseProto.
+ *
+ * @param in input stream from datanode
+ * @param out output stream to datanode
+ * @param lb the located block
+ * @param clientName the name of the DFSClient requesting the checksum
+ * @param dn the connected datanode
+ * @return the inferred checksum type
+ * @throws IOException if an error occurs
+ */
+ private static Type inferChecksumTypeByReading(
+ String clientName, SocketFactory socketFactory, int socketTimeout,
+ LocatedBlock lb, DatanodeInfo dn,
+ DataEncryptionKey encryptionKey, boolean connectToDnViaHostname)
+ throws IOException {
+ IOStreamPair pair = connectToDN(socketFactory, connectToDnViaHostname,
+ encryptionKey, dn, socketTimeout);
+
+ try {
+ DataOutputStream out = new DataOutputStream(new BufferedOutputStream(pair.out,
+ HdfsConstants.SMALL_BUFFER_SIZE));
+ DataInputStream in = new DataInputStream(pair.in);
+
+ new Sender(out).readBlock(lb.getBlock(), lb.getBlockToken(), clientName, 0, 1, true);
+ final BlockOpResponseProto reply =
+ BlockOpResponseProto.parseFrom(PBHelper.vintPrefixed(in));
+
+ if (reply.getStatus() != Status.SUCCESS) {
+ if (reply.getStatus() == Status.ERROR_ACCESS_TOKEN) {
+ throw new InvalidBlockTokenException();
+ } else {
+ throw new IOException("Bad response " + reply + " trying to read "
+ + lb.getBlock() + " from datanode " + dn);
+ }
+ }
+
+ return PBHelper.convert(reply.getReadOpChecksumInfo().getChecksum().getType());
+ } finally {
+ IOUtils.cleanup(null, pair.in, pair.out);
+ }
+ }
+
+ /**
* Set permissions to a file or directory.
* @param src path name.
* @param permission
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileChecksumServlets.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileChecksumServlets.java?rev=1436730&r1=1436729&r2=1436730&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileChecksumServlets.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileChecksumServlets.java Tue Jan 22 02:59:25 2013
@@ -21,7 +21,6 @@ import java.io.IOException;
import java.io.PrintWriter;
import java.net.URL;
-import javax.net.SocketFactory;
import javax.servlet.ServletContext;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
@@ -33,14 +32,11 @@ import org.apache.hadoop.fs.MD5MD5CRC32F
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.HdfsConfiguration;
-import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
-import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.common.JspHelper;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DatanodeJspHelper;
-import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.ServletUtil;
import org.znerd.xmlenc.XMLOutputter;
@@ -116,18 +112,11 @@ public class FileChecksumServlets {
final DataNode datanode = (DataNode) context.getAttribute("datanode");
final Configuration conf =
new HdfsConfiguration(datanode.getConf());
- final int socketTimeout = conf.getInt(
- DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY,
- HdfsServerConstants.READ_TIMEOUT);
- final SocketFactory socketFactory = NetUtils.getSocketFactory(conf,
- ClientProtocol.class);
try {
final DFSClient dfs = DatanodeJspHelper.getDFSClient(request,
datanode, conf, getUGI(request, conf));
- final ClientProtocol nnproxy = dfs.getNamenode();
- final MD5MD5CRC32FileChecksum checksum = DFSClient.getFileChecksum(
- path, nnproxy, socketFactory, socketTimeout, dfs.getDataEncryptionKey(), false);
+ final MD5MD5CRC32FileChecksum checksum = dfs.getFileChecksum(path);
MD5MD5CRC32FileChecksum.write(xml, checksum);
} catch(IOException ioe) {
writeXml(ioe, path, xml);
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto?rev=1436730&r1=1436729&r2=1436730&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto Tue Jan 22 02:59:25 2013
@@ -183,5 +183,5 @@ message OpBlockChecksumResponseProto {
required uint32 bytesPerCrc = 1;
required uint64 crcPerBlock = 2;
required bytes md5 = 3;
- optional ChecksumTypeProto crcType = 4 [default = CHECKSUM_CRC32];
+ optional ChecksumTypeProto crcType = 4;
}