You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by to...@apache.org on 2013/01/12 00:52:24 UTC
svn commit: r1432335 [1/2] - in
/hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs: ./
dev-support/ src/main/java/org/apache/hadoop/hdfs/
src/main/java/org/apache/hadoop/hdfs/net/
src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer...
Author: todd
Date: Fri Jan 11 23:52:22 2013
New Revision: 1432335
URL: http://svn.apache.org/viewvc?rev=1432335&view=rev
Log:
HDFS-4356. BlockReaderLocal should use passed file descriptors rather than paths. Contributed by Colin Patrick McCabe.
Added:
hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DomainSocketFactory.java
hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/FileInputStreamCache.java
hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileInputStreamCache.java
hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelShortCircuitRead.java
hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelShortCircuitReadNoChecksum.java
hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelUnixDomainRead.java
Removed:
hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelLocalRead.java
Modified:
hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-347.txt
hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml
hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java
hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java
hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java
hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java
hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java
hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/DomainPeerServer.java
hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java
hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Op.java
hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java
hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java
hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java
hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientDatanodeProtocol.proto
hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto
hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java
hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocal.java
hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientBlockVerification.java
hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelRead.java
hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelReadUtil.java
hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestShortCircuitLocalRead.java
hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java
hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java
Modified: hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-347.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-347.txt?rev=1432335&r1=1432334&r2=1432335&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-347.txt (original)
+++ hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-347.txt Fri Jan 11 23:52:22 2013
@@ -7,3 +7,6 @@ HDFS-4353. Encapsulate connections to pe
HDFS-4354. Create DomainSocket and DomainPeer and associated unit tests.
(Colin Patrick McCabe via todd)
+
+HDFS-4356. BlockReaderLocal should use passed file descriptors rather than paths.
+(Colin Patrick McCabe via todd)
Modified: hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml?rev=1432335&r1=1432334&r2=1432335&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml (original)
+++ hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml Fri Jan 11 23:52:22 2013
@@ -290,6 +290,14 @@
<Method name="persistPaxosData" />
<Bug pattern="OS_OPEN_STREAM" />
</Match>
+
+ <!-- getShortCircuitFdsForRead is supposed to return open streams. -->
+ <Match>
+ <Class name="org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetImpl" />
+ <Method name="getShortCircuitFdsForRead" />
+ <Bug pattern="OBL_UNSATISFIED_OBLIGATION" />
+ </Match>
+
<!-- Don't complain about LocalDatanodeInfo's anonymous class -->
<Match>
<Class name="org.apache.hadoop.hdfs.BlockReaderLocal$LocalDatanodeInfo$1" />
Modified: hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java?rev=1432335&r1=1432334&r2=1432335&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java Fri Jan 11 23:52:22 2013
@@ -42,17 +42,28 @@ public interface BlockReader extends Byt
long skip(long n) throws IOException;
/**
+ * Returns an estimate of the number of bytes that can be read
+ * (or skipped over) from this input stream without performing
+ * network I/O.
+ */
+ int available() throws IOException;
+
+ /**
* Close the block reader.
*
* @param peerCache The PeerCache to put the Peer we're using back
* into, or null if we should simply close the Peer
* we're using (along with its Socket).
- * Some block readers, like BlockReaderLocal, may
- * not make use of this parameter.
+ * Ignored by Readers that don't maintain Peers.
+ * @param fisCache The FileInputStreamCache to put our FileInputStreams
+ * back into, or null if we should simply close them.
+ * Ignored by Readers that don't maintain
+ * FileInputStreams.
*
* @throws IOException
*/
- void close(PeerCache peerCache) throws IOException;
+ void close(PeerCache peerCache, FileInputStreamCache fisCache)
+ throws IOException;
/**
* Read exactly the given amount of data, throwing an exception
Modified: hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java?rev=1432335&r1=1432334&r2=1432335&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java Fri Jan 11 23:52:22 2013
@@ -17,22 +17,26 @@
*/
package org.apache.hadoop.hdfs;
+import java.io.BufferedOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.FileInputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
-import java.net.Socket;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdfs.DFSClient.Conf;
import org.apache.hadoop.hdfs.net.Peer;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
-import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferEncryptor;
-import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
-import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
+import org.apache.hadoop.hdfs.protocol.HdfsProtoUtil;
+import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
-import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.net.unix.DomainSocket;
import org.apache.hadoop.security.token.Token;
@@ -58,6 +62,12 @@ public class BlockReaderFactory {
* @param clientName Client name. Used for log messages.
* @param peer The peer
* @param datanodeID The datanode that the Peer is connected to
+ * @param domainSocketFactory The DomainSocketFactory to notify if the Peer
+ * is a DomainPeer which turns out to be faulty.
+ * If null, no factory will be notified in this
+ * case.
+ * @param allowShortCircuitLocalReads True if short-circuit local reads
+ * should be allowed.
* @return New BlockReader instance, or null on error.
*/
@SuppressWarnings("deprecation")
@@ -70,11 +80,44 @@ public class BlockReaderFactory {
boolean verifyChecksum,
String clientName,
Peer peer,
- DatanodeID datanodeID)
- throws IOException {
+ DatanodeID datanodeID,
+ DomainSocketFactory domSockFactory,
+ boolean allowShortCircuitLocalReads)
+ throws IOException {
peer.setReadTimeout(conf.getInt(DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY,
HdfsServerConstants.READ_TIMEOUT));
peer.setWriteTimeout(HdfsServerConstants.WRITE_TIMEOUT);
+
+ if (peer.getDomainSocket() != null) {
+ if (allowShortCircuitLocalReads) {
+ // If this is a domain socket, and short-circuit local reads are
+ // enabled, try to set up a BlockReaderLocal.
+ BlockReader reader = newShortCircuitBlockReader(conf, file,
+ block, blockToken, startOffset, len, peer, datanodeID,
+ domSockFactory, verifyChecksum);
+ if (reader != null) {
+ // One we've constructed the short-circuit block reader, we don't
+ // need the socket any more. So let's return it to the cache.
+ PeerCache peerCache = PeerCache.getInstance(
+ conf.getInt(DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_KEY,
+ DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_DEFAULT),
+ conf.getLong(DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_KEY,
+ DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_DEFAULT));
+ peerCache.put(datanodeID, peer);
+ return reader;
+ }
+ }
+ // If this is a domain socket and we couldn't (or didn't want to) set
+ // up a BlockReaderLocal, check that we are allowed to pass data traffic
+ // over the socket before proceeding.
+ if (!conf.getBoolean(DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC,
+ DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC_DEFAULT)) {
+ throw new IOException("Because we can't do short-circuit access, " +
+ "and data traffic over domain sockets is disabled, " +
+ "we cannot use this socket to talk to " + datanodeID);
+ }
+ }
+
if (conf.getBoolean(DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADER,
DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADER_DEFAULT)) {
return RemoteBlockReader.newBlockReader(file,
@@ -88,7 +131,94 @@ public class BlockReaderFactory {
verifyChecksum, clientName, peer, datanodeID);
}
}
-
+
+ /**
+ * Create a new short-circuit BlockReader.
+ *
+ * Here, we ask the DataNode to pass us file descriptors over our
+ * DomainSocket. If the DataNode declines to do so, we'll return null here;
+ * otherwise, we'll return the BlockReaderLocal. If the DataNode declines,
+ * this function will inform the DomainSocketFactory that short-circuit local
+ * reads are disabled for this DataNode, so that we don't ask again.
+ *
+ * @param conf the configuration.
+ * @param file the file name. Used in log messages.
+ * @param block The block object.
+ * @param blockToken The block token for security.
+ * @param startOffset The read offset, relative to block head.
+ * @param len The number of bytes to read, or -1 to read
+ * as many as possible.
+ * @param peer The peer to use.
+ * @param datanodeID The datanode that the Peer is connected to.
+ * @param domSockFactory The DomainSocketFactory to notify if the Peer
+ * is a DomainPeer which turns out to be faulty.
+ * If null, no factory will be notified in this
+ * case.
+ * @param verifyChecksum True if we should verify the checksums.
+ * Note: even if this is true, when
+ * DFS_CLIENT_READ_CHECKSUM_SKIP_CHECKSUM_KEY is
+ * set, we will skip checksums.
+ *
+ * @return The BlockReaderLocal, or null if the
+ * DataNode declined to provide short-circuit
+ * access.
+ * @throws IOException If there was a communication error.
+ */
+ private static BlockReaderLocal newShortCircuitBlockReader(
+ Configuration conf, String file, ExtendedBlock block,
+ Token<BlockTokenIdentifier> blockToken, long startOffset,
+ long len, Peer peer, DatanodeID datanodeID,
+ DomainSocketFactory domSockFactory, boolean verifyChecksum)
+ throws IOException {
+ final DataOutputStream out =
+ new DataOutputStream(new BufferedOutputStream(
+ peer.getOutputStream()));
+ new Sender(out).requestShortCircuitFds(block, blockToken, 1);
+ DataInputStream in =
+ new DataInputStream(peer.getInputStream());
+ BlockOpResponseProto resp = BlockOpResponseProto.parseFrom(
+ HdfsProtoUtil.vintPrefixed(in));
+ DomainSocket sock = peer.getDomainSocket();
+ switch (resp.getStatus()) {
+ case SUCCESS:
+ BlockReaderLocal reader = null;
+ byte buf[] = new byte[1];
+ FileInputStream fis[] = new FileInputStream[2];
+ sock.recvFileInputStreams(fis, buf, 0, buf.length);
+ try {
+ reader = new BlockReaderLocal(conf, file, block,
+ startOffset, len, fis[0], fis[1], datanodeID, verifyChecksum);
+ } finally {
+ if (reader == null) {
+ IOUtils.cleanup(DFSClient.LOG, fis[0], fis[1]);
+ }
+ }
+ return reader;
+ case ERROR_UNSUPPORTED:
+ if (!resp.hasShortCircuitAccessVersion()) {
+ DFSClient.LOG.warn("short-circuit read access is disabled for " +
+ "DataNode " + datanodeID + ". reason: " + resp.getMessage());
+ domSockFactory.disableShortCircuitForPath(sock.getPath());
+ } else {
+ DFSClient.LOG.warn("short-circuit read access for the file " +
+ file + " is disabled for DataNode " + datanodeID +
+ ". reason: " + resp.getMessage());
+ }
+ return null;
+ case ERROR_ACCESS_TOKEN:
+ String msg = "access control error while " +
+ "attempting to set up short-circuit access to " +
+ file + resp.getMessage();
+ DFSClient.LOG.debug(msg);
+ throw new InvalidBlockTokenException(msg);
+ default:
+ DFSClient.LOG.warn("error while attempting to set up short-circuit " +
+ "access to " + file + ": " + resp.getMessage());
+ domSockFactory.disableShortCircuitForPath(sock.getPath());
+ return null;
+ }
+ }
+
/**
* File name to print when accessing a block directly (from servlets)
* @param s Address of the block location
Modified: hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java?rev=1432335&r1=1432334&r2=1432335&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java Fri Jan 11 23:52:22 2013
@@ -18,30 +18,18 @@
package org.apache.hadoop.hdfs;
import java.io.DataInputStream;
-import java.io.File;
+import org.apache.hadoop.conf.Configuration;
import java.io.FileInputStream;
import java.io.IOException;
-import java.net.Socket;
import java.nio.ByteBuffer;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.LinkedHashMap;
-import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
-import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
-import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
-import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
-import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
import org.apache.hadoop.hdfs.util.DirectBufferPool;
-import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.DataChecksum;
/**
@@ -53,74 +41,19 @@ import org.apache.hadoop.util.DataChecks
* <ul>
* <li>The client performing short circuit reads must be configured at the
* datanode.</li>
- * <li>The client gets the path to the file where block is stored using
- * {@link org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol#getBlockLocalPathInfo(ExtendedBlock, Token)}
- * RPC call</li>
- * <li>Client uses kerberos authentication to connect to the datanode over RPC,
- * if security is enabled.</li>
+ * <li>The client gets the file descriptors for the metadata file and the data
+ * file for the block using
+ * {@link org.apache.hadoop.hdfs.server.datanode.DataXceiver#requestShortCircuitFds}.
+ * </li>
+ * <li>The client reads the file descriptors.</li>
* </ul>
*/
class BlockReaderLocal implements BlockReader {
- private static final Log LOG = LogFactory.getLog(DFSClient.class);
-
- //Stores the cache and proxy for a local datanode.
- private static class LocalDatanodeInfo {
- private ClientDatanodeProtocol proxy = null;
- private final Map<ExtendedBlock, BlockLocalPathInfo> cache;
-
- LocalDatanodeInfo() {
- final int cacheSize = 10000;
- final float hashTableLoadFactor = 0.75f;
- int hashTableCapacity = (int) Math.ceil(cacheSize / hashTableLoadFactor) + 1;
- cache = Collections
- .synchronizedMap(new LinkedHashMap<ExtendedBlock, BlockLocalPathInfo>(
- hashTableCapacity, hashTableLoadFactor, true) {
- private static final long serialVersionUID = 1;
-
- @Override
- protected boolean removeEldestEntry(
- Map.Entry<ExtendedBlock, BlockLocalPathInfo> eldest) {
- return size() > cacheSize;
- }
- });
- }
-
- private synchronized ClientDatanodeProtocol getDatanodeProxy(
- DatanodeInfo node, Configuration conf, int socketTimeout,
- boolean connectToDnViaHostname) throws IOException {
- if (proxy == null) {
- proxy = DFSUtil.createClientDatanodeProtocolProxy(node, conf,
- socketTimeout, connectToDnViaHostname);
- }
- return proxy;
- }
-
- private synchronized void resetDatanodeProxy() {
- if (null != proxy) {
- RPC.stopProxy(proxy);
- proxy = null;
- }
- }
-
- private BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock b) {
- return cache.get(b);
- }
-
- private void setBlockLocalPathInfo(ExtendedBlock b, BlockLocalPathInfo info) {
- cache.put(b, info);
- }
-
- private void removeBlockLocalPathInfo(ExtendedBlock b) {
- cache.remove(b);
- }
- }
-
- // Multiple datanodes could be running on the local machine. Store proxies in
- // a map keyed by the ipc port of the datanode.
- private static Map<Integer, LocalDatanodeInfo> localDatanodeInfoMap = new HashMap<Integer, LocalDatanodeInfo>();
+ static final Log LOG = LogFactory.getLog(DFSClient.class);
private final FileInputStream dataIn; // reader for the data file
private final FileInputStream checksumIn; // reader for the checksum file
+ private final boolean verifyChecksum;
/**
* Offset from the most recent chunk boundary at which the next read should
@@ -140,7 +73,6 @@ class BlockReaderLocal implements BlockR
private ByteBuffer slowReadBuff = null;
private ByteBuffer checksumBuff = null;
private DataChecksum checksum;
- private final boolean verifyChecksum;
private static DirectBufferPool bufferPool = new DirectBufferPool();
@@ -150,186 +82,90 @@ class BlockReaderLocal implements BlockR
/** offset in block where reader wants to actually read */
private long startOffset;
private final String filename;
-
- /**
- * The only way this object can be instantiated.
- */
- static BlockReaderLocal newBlockReader(Configuration conf, String file,
- ExtendedBlock blk, Token<BlockTokenIdentifier> token, DatanodeInfo node,
- int socketTimeout, long startOffset, long length,
- boolean connectToDnViaHostname) throws IOException {
-
- LocalDatanodeInfo localDatanodeInfo = getLocalDatanodeInfo(node
- .getIpcPort());
- // check the cache first
- BlockLocalPathInfo pathinfo = localDatanodeInfo.getBlockLocalPathInfo(blk);
- if (pathinfo == null) {
- pathinfo = getBlockPathInfo(blk, node, conf, socketTimeout, token,
- connectToDnViaHostname);
- }
-
- // check to see if the file exists. It may so happen that the
- // HDFS file has been deleted and this block-lookup is occurring
- // on behalf of a new HDFS file. This time, the block file could
- // be residing in a different portion of the fs.data.dir directory.
- // In this case, we remove this entry from the cache. The next
- // call to this method will re-populate the cache.
- FileInputStream dataIn = null;
- FileInputStream checksumIn = null;
- BlockReaderLocal localBlockReader = null;
- boolean skipChecksumCheck = skipChecksumCheck(conf);
- try {
- // get a local file system
- File blkfile = new File(pathinfo.getBlockPath());
- dataIn = new FileInputStream(blkfile);
- if (LOG.isDebugEnabled()) {
- LOG.debug("New BlockReaderLocal for file " + blkfile + " of size "
- + blkfile.length() + " startOffset " + startOffset + " length "
- + length + " short circuit checksum " + !skipChecksumCheck);
- }
-
- if (!skipChecksumCheck) {
- // get the metadata file
- File metafile = new File(pathinfo.getMetaPath());
- checksumIn = new FileInputStream(metafile);
-
- // read and handle the common header here. For now just a version
- BlockMetadataHeader header = BlockMetadataHeader
- .readHeader(new DataInputStream(checksumIn));
- short version = header.getVersion();
- if (version != BlockMetadataHeader.VERSION) {
- LOG.warn("Wrong version (" + version + ") for metadata file for "
- + blk + " ignoring ...");
- }
- DataChecksum checksum = header.getChecksum();
- long firstChunkOffset = startOffset
- - (startOffset % checksum.getBytesPerChecksum());
- localBlockReader = new BlockReaderLocal(conf, file, blk, token,
- startOffset, length, pathinfo, checksum, true, dataIn,
- firstChunkOffset, checksumIn);
- } else {
- localBlockReader = new BlockReaderLocal(conf, file, blk, token,
- startOffset, length, pathinfo, dataIn);
- }
- } catch (IOException e) {
- // remove from cache
- localDatanodeInfo.removeBlockLocalPathInfo(blk);
- DFSClient.LOG.warn("BlockReaderLocal: Removing " + blk
- + " from cache because local file " + pathinfo.getBlockPath()
- + " could not be opened.");
- throw e;
- } finally {
- if (localBlockReader == null) {
- if (dataIn != null) {
- dataIn.close();
- }
- if (checksumIn != null) {
- checksumIn.close();
- }
- }
- }
- return localBlockReader;
- }
-
- private static synchronized LocalDatanodeInfo getLocalDatanodeInfo(int port) {
- LocalDatanodeInfo ldInfo = localDatanodeInfoMap.get(port);
- if (ldInfo == null) {
- ldInfo = new LocalDatanodeInfo();
- localDatanodeInfoMap.put(port, ldInfo);
- }
- return ldInfo;
- }
-
- private static BlockLocalPathInfo getBlockPathInfo(ExtendedBlock blk,
- DatanodeInfo node, Configuration conf, int timeout,
- Token<BlockTokenIdentifier> token, boolean connectToDnViaHostname)
- throws IOException {
- LocalDatanodeInfo localDatanodeInfo = getLocalDatanodeInfo(node.getIpcPort());
- BlockLocalPathInfo pathinfo = null;
- ClientDatanodeProtocol proxy = localDatanodeInfo.getDatanodeProxy(node,
- conf, timeout, connectToDnViaHostname);
- try {
- // make RPC to local datanode to find local pathnames of blocks
- pathinfo = proxy.getBlockLocalPathInfo(blk, token);
- if (pathinfo != null) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Cached location of block " + blk + " as " + pathinfo);
- }
- localDatanodeInfo.setBlockLocalPathInfo(blk, pathinfo);
- }
- } catch (IOException e) {
- localDatanodeInfo.resetDatanodeProxy(); // Reset proxy on error
- throw e;
- }
- return pathinfo;
- }
-
- private static boolean skipChecksumCheck(Configuration conf) {
- return conf.getBoolean(
- DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY,
- DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_DEFAULT);
- }
+ private final DatanodeID datanodeID;
+ private final ExtendedBlock block;
- private static int getSlowReadBufferNumChunks(Configuration conf, int bytesPerChecksum) {
- int bufferSizeBytes = conf.getInt(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_KEY,
- DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_DEFAULT);
-
- if (bufferSizeBytes < bytesPerChecksum) {
- throw new IllegalArgumentException("Configured BlockReaderLocal buffer size (" + bufferSizeBytes + ") " +
- "is not large enough to hold a single chunk (" + bytesPerChecksum + "). Please configure " +
+ private static int getSlowReadBufferNumChunks(Configuration conf,
+ int bytesPerChecksum) {
+
+ int bufSize =
+ conf.getInt(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_KEY,
+ DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_DEFAULT);
+
+ if (bufSize < bytesPerChecksum) {
+ throw new IllegalArgumentException("Configured BlockReaderLocal buffer size (" +
+ bufSize + ") is not large enough to hold a single chunk (" +
+ bytesPerChecksum + "). Please configure " +
DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_KEY + " appropriately");
}
// Round down to nearest chunk size
- return bufferSizeBytes / bytesPerChecksum;
- }
-
- private BlockReaderLocal(Configuration conf, String hdfsfile,
- ExtendedBlock block, Token<BlockTokenIdentifier> token, long startOffset,
- long length, BlockLocalPathInfo pathinfo, FileInputStream dataIn)
- throws IOException {
- this(conf, hdfsfile, block, token, startOffset, length, pathinfo,
- DataChecksum.newDataChecksum(DataChecksum.Type.NULL, 4), false,
- dataIn, startOffset, null);
+ return bufSize / bytesPerChecksum;
}
- private BlockReaderLocal(Configuration conf, String hdfsfile,
- ExtendedBlock block, Token<BlockTokenIdentifier> token, long startOffset,
- long length, BlockLocalPathInfo pathinfo, DataChecksum checksum,
- boolean verifyChecksum, FileInputStream dataIn, long firstChunkOffset,
- FileInputStream checksumIn) throws IOException {
- this.filename = hdfsfile;
- this.checksum = checksum;
- this.verifyChecksum = verifyChecksum;
- this.startOffset = Math.max(startOffset, 0);
-
- bytesPerChecksum = this.checksum.getBytesPerChecksum();
- checksumSize = this.checksum.getChecksumSize();
-
+ public BlockReaderLocal(Configuration conf, String filename,
+ ExtendedBlock block, long startOffset, long length,
+ FileInputStream dataIn, FileInputStream checksumIn,
+ DatanodeID datanodeID, boolean verifyChecksum) throws IOException {
this.dataIn = dataIn;
this.checksumIn = checksumIn;
- this.offsetFromChunkBoundary = (int) (startOffset-firstChunkOffset);
-
- int chunksPerChecksumRead = getSlowReadBufferNumChunks(conf, bytesPerChecksum);
- slowReadBuff = bufferPool.getBuffer(bytesPerChecksum * chunksPerChecksumRead);
- checksumBuff = bufferPool.getBuffer(checksumSize * chunksPerChecksumRead);
- // Initially the buffers have nothing to read.
- slowReadBuff.flip();
- checksumBuff.flip();
+ this.startOffset = Math.max(startOffset, 0);
+ this.filename = filename;
+ this.datanodeID = datanodeID;
+ this.block = block;
+
+ // read and handle the common header here. For now just a version
+ checksumIn.getChannel().position(0);
+ BlockMetadataHeader header = BlockMetadataHeader
+ .readHeader(new DataInputStream(checksumIn));
+ short version = header.getVersion();
+ if (version != BlockMetadataHeader.VERSION) {
+ throw new IOException("Wrong version (" + version + ") of the " +
+ "metadata file for " + filename + ".");
+ }
+ if (!verifyChecksum) {
+ this.verifyChecksum = false;
+ } else {
+ this.verifyChecksum = !conf.getBoolean(DFSConfigKeys.
+ DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY,
+ DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_DEFAULT);
+ }
+ long firstChunkOffset;
+ if (this.verifyChecksum) {
+ this.checksum = header.getChecksum();
+ this.bytesPerChecksum = this.checksum.getBytesPerChecksum();
+ this.checksumSize = this.checksum.getChecksumSize();
+ firstChunkOffset = startOffset
+ - (startOffset % checksum.getBytesPerChecksum());
+ this.offsetFromChunkBoundary = (int) (startOffset - firstChunkOffset);
+
+ int chunksPerChecksumRead = getSlowReadBufferNumChunks(conf, bytesPerChecksum);
+ slowReadBuff = bufferPool.getBuffer(bytesPerChecksum * chunksPerChecksumRead);
+ checksumBuff = bufferPool.getBuffer(checksumSize * chunksPerChecksumRead);
+ // Initially the buffers have nothing to read.
+ slowReadBuff.flip();
+ checksumBuff.flip();
+ long checkSumOffset = (firstChunkOffset / bytesPerChecksum) * checksumSize;
+ IOUtils.skipFully(checksumIn, checkSumOffset);
+ } else {
+ firstChunkOffset = startOffset;
+ this.checksum = null;
+ this.bytesPerChecksum = 0;
+ this.checksumSize = 0;
+ this.offsetFromChunkBoundary = 0;
+ }
+
boolean success = false;
try {
- // Skip both input streams to beginning of the chunk containing startOffset
- IOUtils.skipFully(dataIn, firstChunkOffset);
- if (checksumIn != null) {
- long checkSumOffset = (firstChunkOffset / bytesPerChecksum) * checksumSize;
- IOUtils.skipFully(checksumIn, checkSumOffset);
- }
+ // Reposition both input streams to the beginning of the chunk
+ // containing startOffset
+ this.dataIn.getChannel().position(firstChunkOffset);
success = true;
} finally {
if (!success) {
- bufferPool.returnBuffer(slowReadBuff);
- bufferPool.returnBuffer(checksumBuff);
+ if (slowReadBuff != null) bufferPool.returnBuffer(slowReadBuff);
+ if (checksumBuff != null) bufferPool.returnBuffer(checksumBuff);
}
}
}
@@ -649,9 +485,17 @@ class BlockReaderLocal implements BlockR
}
@Override
- public synchronized void close(PeerCache peerCache) throws IOException {
- dataIn.close();
- if (checksumIn != null) {
+ public synchronized void close(PeerCache peerCache,
+ FileInputStreamCache fisCache) throws IOException {
+ if (fisCache != null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("putting FileInputStream for " + filename +
+ " back into FileInputStreamCache");
+ }
+ fisCache.put(datanodeID, block, new FileInputStream[] {dataIn, checksumIn});
+ } else {
+ LOG.debug("closing FileInputStream for " + filename);
+ dataIn.close();
checksumIn.close();
}
if (slowReadBuff != null) {
@@ -675,4 +519,10 @@ class BlockReaderLocal implements BlockR
public void readFully(byte[] buf, int off, int len) throws IOException {
BlockReaderUtil.readFully(this, buf, off, len);
}
+
+ @Override
+ public int available() throws IOException {
+ // We never do network I/O in BlockReaderLocal.
+ return Integer.MAX_VALUE;
+ }
}
Modified: hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java?rev=1432335&r1=1432334&r2=1432335&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java Fri Jan 11 23:52:22 2013
@@ -128,7 +128,6 @@ import org.apache.hadoop.hdfs.protocol.d
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
-import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
@@ -227,6 +226,11 @@ public class DFSClient implements java.i
final boolean getHdfsBlocksMetadataEnabled;
final int getFileBlockStorageLocationsNumThreads;
final int getFileBlockStorageLocationsTimeout;
+ final String domainSocketPath;
+ final boolean skipShortCircuitChecksums;
+ final int shortCircuitBufferSize;
+ final boolean shortCircuitLocalReads;
+ final boolean domainSocketDataTraffic;
Conf(Configuration conf) {
maxFailoverAttempts = conf.getInt(
@@ -288,6 +292,19 @@ public class DFSClient implements java.i
getFileBlockStorageLocationsTimeout = conf.getInt(
DFSConfigKeys.DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_TIMEOUT,
DFSConfigKeys.DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_TIMEOUT_DEFAULT);
+ domainSocketPath = conf.get(DFSConfigKeys.DFS_DATANODE_DOMAIN_SOCKET_PATH_KEY);
+ skipShortCircuitChecksums = conf.getBoolean(
+ DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY,
+ DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_DEFAULT);
+ shortCircuitBufferSize = conf.getInt(
+ DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_KEY,
+ DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_DEFAULT);
+ shortCircuitLocalReads = conf.getBoolean(
+ DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY,
+ DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_DEFAULT);
+ domainSocketDataTraffic = conf.getBoolean(
+ DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC,
+ DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC_DEFAULT);
}
private DataChecksum.Type getChecksumType(Configuration conf) {
@@ -345,7 +362,7 @@ public class DFSClient implements java.i
private final Map<String, DFSOutputStream> filesBeingWritten
= new HashMap<String, DFSOutputStream>();
- private boolean shortCircuitLocalReads;
+ private final DomainSocketFactory domainSocketFactory;
/**
* Same as this(NameNode.getAddress(conf), conf);
@@ -417,12 +434,8 @@ public class DFSClient implements java.i
}
// read directly from the block file if configured.
- this.shortCircuitLocalReads = conf.getBoolean(
- DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY,
- DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_DEFAULT);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Short circuit read is " + shortCircuitLocalReads);
- }
+ this.domainSocketFactory = new DomainSocketFactory(dfsClientConf);
+
String localInterfaces[] =
conf.getTrimmedStrings(DFSConfigKeys.DFS_CLIENT_LOCAL_INTERFACES);
localInterfaceAddrs = getLocalInterfaceAddrs(localInterfaces);
@@ -787,28 +800,11 @@ public class DFSClient implements java.i
AccessControlException.class);
}
}
-
- /**
- * Get {@link BlockReader} for short circuited local reads.
- */
- static BlockReader getLocalBlockReader(Configuration conf,
- String src, ExtendedBlock blk, Token<BlockTokenIdentifier> accessToken,
- DatanodeInfo chosenNode, int socketTimeout, long offsetIntoBlock,
- boolean connectToDnViaHostname) throws InvalidToken, IOException {
- try {
- return BlockReaderLocal.newBlockReader(conf, src, blk, accessToken,
- chosenNode, socketTimeout, offsetIntoBlock, blk.getNumBytes()
- - offsetIntoBlock, connectToDnViaHostname);
- } catch (RemoteException re) {
- throw re.unwrapRemoteException(InvalidToken.class,
- AccessControlException.class);
- }
- }
private static Map<String, Boolean> localAddrMap = Collections
.synchronizedMap(new HashMap<String, Boolean>());
- private static boolean isLocalAddress(InetSocketAddress targetAddr) {
+ static boolean isLocalAddress(InetSocketAddress targetAddr) {
InetAddress addr = targetAddr.getAddress();
Boolean cached = localAddrMap.get(addr.getHostAddress());
if (cached != null) {
@@ -2108,10 +2104,6 @@ public class DFSClient implements java.i
super(in);
}
}
-
- boolean shouldTryShortCircuitRead(InetSocketAddress targetAddr) {
- return shortCircuitLocalReads && isLocalAddress(targetAddr);
- }
void reportChecksumFailure(String file, ExtendedBlock blk, DatanodeInfo dn) {
DatanodeInfo [] dnArr = { dn };
@@ -2135,7 +2127,7 @@ public class DFSClient implements java.i
+ ", ugi=" + ugi + "]";
}
- void disableShortCircuit() {
- shortCircuitLocalReads = false;
+ public DomainSocketFactory getDomainSocketFactory() {
+ return domainSocketFactory;
}
}
Modified: hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java?rev=1432335&r1=1432334&r2=1432335&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java Fri Jan 11 23:52:22 2013
@@ -342,7 +342,13 @@ public class DFSConfigKeys extends Commo
public static final String DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY = "dfs.client.read.shortcircuit.skip.checksum";
public static final boolean DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_DEFAULT = false;
public static final String DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_KEY = "dfs.client.read.shortcircuit.buffer.size";
+ public static final String DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_SIZE_KEY = "dfs.client.read.shortcircuit.streams.cache.size";
+ public static final int DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_SIZE_DEFAULT = 10;
+ public static final String DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_EXPIRY_MS_KEY = "dfs.client.read.shortcircuit.streams.cache.expiry.ms";
+ public static final long DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_EXPIRY_MS_DEFAULT = 60000;
public static final int DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_DEFAULT = 1024 * 1024;
+ public static final String DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC = "dfs.client.domain.socket.data.traffic";
+ public static final boolean DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC_DEFAULT = false;
// property for fsimage compression
public static final String DFS_IMAGE_COMPRESS_KEY = "dfs.image.compress";
@@ -393,6 +399,7 @@ public class DFSConfigKeys extends Commo
public static final String DFS_WEB_AUTHENTICATION_KERBEROS_KEYTAB_KEY = "dfs.web.authentication.kerberos.keytab";
public static final String DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY = "dfs.block.local-path-access.user";
+ public static final String DFS_DATANODE_DOMAIN_SOCKET_PATH_KEY = "dfs.datanode.domain.socket.path";
// HA related configuration
public static final String DFS_HA_NAMENODES_KEY_PREFIX = "dfs.ha.namenodes";
Modified: hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java?rev=1432335&r1=1432334&r2=1432335&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java Fri Jan 11 23:52:22 2013
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hdfs;
+import java.io.FileInputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
@@ -38,7 +39,7 @@ import org.apache.hadoop.fs.ChecksumExce
import org.apache.hadoop.fs.ByteBufferReadable;
import org.apache.hadoop.fs.FSInputStream;
import org.apache.hadoop.fs.UnresolvedLinkException;
-import org.apache.hadoop.hdfs.net.EncryptedPeer;
+import org.apache.hadoop.hdfs.net.DomainPeer;
import org.apache.hadoop.hdfs.net.Peer;
import org.apache.hadoop.hdfs.net.TcpPeerServer;
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
@@ -46,17 +47,16 @@ import org.apache.hadoop.hdfs.protocol.D
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
-import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
-import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
import org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.net.unix.DomainSocket;
import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.hdfs.FileInputStreamCache;
/****************************************************************
* DFSInputStream provides bytes from a named file. It handles
@@ -80,6 +80,8 @@ public class DFSInputStream extends FSIn
private long pos = 0;
private long blockEnd = -1;
+ private final FileInputStreamCache fileInputStreamCache;
+
/**
* This variable tracks the number of failures since the start of the
* most recent user-facing operation. That is to say, it should be reset
@@ -115,6 +117,13 @@ public class DFSInputStream extends FSIn
this.buffersize = buffersize;
this.src = src;
this.peerCache = dfsClient.peerCache;
+ this.fileInputStreamCache = new FileInputStreamCache(
+ dfsClient.conf.getInt(DFSConfigKeys.
+ DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_SIZE_KEY,
+ DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_SIZE_DEFAULT),
+ dfsClient.conf.getLong(DFSConfigKeys.
+ DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_EXPIRY_MS_KEY,
+ DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_EXPIRY_MS_DEFAULT));
prefetchSize = dfsClient.getConf().prefetchSize;
timeWindow = dfsClient.getConf().timeWindow;
nCachedConnRetry = dfsClient.getConf().nCachedConnRetry;
@@ -247,7 +256,9 @@ public class DFSInputStream extends FSIn
locatedBlocks.getFileLength() + lastBlockBeingWrittenLength;
}
- private synchronized boolean blockUnderConstruction() {
+ // Short circuit local reads are forbidden for files that are
+ // under construction. See HDFS-2757.
+ synchronized boolean shortCircuitForbidden() {
return locatedBlocks.isUnderConstruction();
}
@@ -428,7 +439,7 @@ public class DFSInputStream extends FSIn
// Will be getting a new BlockReader.
if (blockReader != null) {
- blockReader.close(peerCache);
+ blockReader.close(peerCache, fileInputStreamCache);
blockReader = null;
}
@@ -510,10 +521,11 @@ public class DFSInputStream extends FSIn
dfsClient.checkOpen();
if (blockReader != null) {
- blockReader.close(peerCache);
+ blockReader.close(peerCache, fileInputStreamCache);
blockReader = null;
}
super.close();
+ fileInputStreamCache.close();
closed = true;
}
@@ -809,10 +821,6 @@ public class DFSInputStream extends FSIn
e.getPos() + " from " + chosenNode);
// we want to remember what we have tried
addIntoCorruptedBlockMap(block.getBlock(), chosenNode, corruptedBlockMap);
- } catch (AccessControlException ex) {
- DFSClient.LOG.warn("Short circuit access failed ", ex);
- dfsClient.disableShortCircuit();
- continue;
} catch (IOException e) {
if (e instanceof InvalidEncryptionKeyException && refetchEncryptionKey > 0) {
DFSClient.LOG.info("Will fetch a new encryption key and retry, "
@@ -837,7 +845,7 @@ public class DFSInputStream extends FSIn
}
} finally {
if (reader != null) {
- reader.close(peerCache);
+ reader.close(peerCache, fileInputStreamCache);
}
}
// Put chosen node into dead list, continue
@@ -849,19 +857,29 @@ public class DFSInputStream extends FSIn
Peer peer = null;
boolean success = false;
Socket sock = null;
+ DomainSocket domSock = null;
+
try {
- sock = dfsClient.socketFactory.createSocket();
- NetUtils.connect(sock, addr,
- dfsClient.getRandomLocalInterfaceAddr(),
- dfsClient.getConf().socketTimeout);
- peer = TcpPeerServer.peerFromSocketAndKey(sock,
- dfsClient.getDataEncryptionKey());
+ domSock = dfsClient.getDomainSocketFactory().create(addr, this);
+ if (domSock != null) {
+ // Create a UNIX Domain peer.
+ peer = new DomainPeer(domSock);
+ } else {
+ // Create a conventional TCP-based Peer.
+ sock = dfsClient.socketFactory.createSocket();
+ NetUtils.connect(sock, addr,
+ dfsClient.getRandomLocalInterfaceAddr(),
+ dfsClient.getConf().socketTimeout);
+ peer = TcpPeerServer.peerFromSocketAndKey(sock,
+ dfsClient.getDataEncryptionKey());
+ }
success = true;
return peer;
} finally {
if (!success) {
IOUtils.closeQuietly(peer);
IOUtils.closeQuietly(sock);
+ IOUtils.closeQuietly(domSock);
}
}
}
@@ -895,49 +913,77 @@ public class DFSInputStream extends FSIn
String clientName)
throws IOException {
- // Can't local read a block under construction, see HDFS-2757
- if (dfsClient.shouldTryShortCircuitRead(dnAddr) &&
- !blockUnderConstruction()) {
- return DFSClient.getLocalBlockReader(dfsClient.conf, src, block,
- blockToken, chosenNode, dfsClient.hdfsTimeout, startOffset,
- dfsClient.connectToDnViaHostname());
- }
-
IOException err = null;
- boolean fromCache = true;
- // Allow retry since there is no way of knowing whether the cached socket
- // is good until we actually use it.
- for (int retries = 0; retries <= nCachedConnRetry && fromCache; ++retries) {
+ // Firstly, we check to see if we have cached any file descriptors for
+ // local blocks. If so, we can just re-use those file descriptors.
+ FileInputStream fis[] = fileInputStreamCache.get(chosenNode, block);
+ if (fis != null) {
+ if (DFSClient.LOG.isDebugEnabled()) {
+ DFSClient.LOG.debug("got FileInputStreams for " + block + " from " +
+ "the FileInputStreamCache.");
+ }
+ return new BlockReaderLocal(dfsClient.conf, file,
+ block, startOffset, len, fis[0], fis[1], chosenNode, verifyChecksum);
+ }
+
+ // We retry several times here.
+ // On the first nCachedConnRetry times, we try to fetch a socket from
+ // the socketCache and use it. This may fail, since the old socket may
+ // have been closed by the peer.
+ // After that, we try to create a new socket using newPeer().
+ // This may create either a TCP socket or a UNIX domain socket, depending
+ // on the configuration and whether the peer is remote.
+ // If we try to create a UNIX domain socket and fail, we will not try that
+ // again. Instead, we'll try to create a TCP socket. Only after we've
+ // failed to create a TCP-based BlockReader will we throw an IOException
+ // from this function. Throwing an IOException from here is basically
+ // equivalent to declaring the DataNode bad.
+ boolean triedNonDomainSocketReader = false;
+ for (int retries = 0;
+ retries < nCachedConnRetry && (!triedNonDomainSocketReader);
+ ++retries) {
Peer peer = null;
- // Don't use the cache on the last attempt - it's possible that there
- // are arbitrarily many unusable sockets in the cache, but we don't
- // want to fail the read.
if (retries < nCachedConnRetry) {
peer = peerCache.get(chosenNode);
}
if (peer == null) {
peer = newPeer(dnAddr);
- fromCache = false;
+ if (peer.getDomainSocket() == null) {
+ triedNonDomainSocketReader = true;
+ }
}
-
+ boolean success = false;
try {
- // The OP_READ_BLOCK request is sent as we make the BlockReader
- BlockReader reader =
- BlockReaderFactory.newBlockReader(dfsClient.conf,
- file, block,
- blockToken,
- startOffset, len,
- verifyChecksum,
- clientName,
- peer,
- chosenNode);
- return reader;
- } catch (IOException ex) {
- // Our socket is no good.
- DFSClient.LOG.debug("Error making BlockReader. Closing stale " + peer, ex);
- IOUtils.closeQuietly(peer);
+ boolean allowShortCircuitLocalReads =
+ (peer.getDomainSocket() != null) &&
+ dfsClient.getConf().shortCircuitLocalReads &&
+ (!shortCircuitForbidden());
+ // Here we will try to send either an OP_READ_BLOCK request or an
+ // OP_REQUEST_SHORT_CIRCUIT_FDS, depending on what kind of block reader
+ // we're trying to create.
+ BlockReader blockReader = BlockReaderFactory.newBlockReader(
+ dfsClient.conf, file, block, blockToken, startOffset,
+ len, verifyChecksum, clientName, peer, chosenNode,
+ dfsClient.getDomainSocketFactory(), allowShortCircuitLocalReads);
+ success = true;
+ return blockReader;
+ } catch (IOException ex) {
+ // Our socket is no good.
+ DFSClient.LOG.debug("Error making BlockReader. " +
+ "Closing stale " + peer, ex);
+ if (peer.getDomainSocket() != null) {
+ // If the Peer that we got the error from was a DomainPeer,
+ // mark the socket path as bad, so that newDataSocket will not try
+ // to re-open this socket for a while.
+ dfsClient.getDomainSocketFactory().
+ disableDomainSocketPath(peer.getDomainSocket().getPath());
+ }
err = ex;
+ } finally {
+ if (!success) {
+ IOUtils.closeQuietly(peer);
+ }
}
}
@@ -1075,7 +1121,7 @@ public class DFSInputStream extends FSIn
// the TCP buffer, then just eat up the intervening data.
//
int diff = (int)(targetPos - pos);
- if (diff <= DFSClient.TCP_WINDOW_SIZE) {
+ if (diff <= blockReader.available()) {
try {
pos += blockReader.skip(diff);
if (pos == targetPos) {
Added: hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DomainSocketFactory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DomainSocketFactory.java?rev=1432335&view=auto
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DomainSocketFactory.java (added)
+++ hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DomainSocketFactory.java Fri Jan 11 23:52:22 2013
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hdfs.DFSClient.Conf;
+
+import org.apache.hadoop.net.unix.DomainSocket;
+
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+
+class DomainSocketFactory {
+ public static final Log LOG = LogFactory.getLog(DomainSocketFactory.class);
+ private final Conf conf;
+
+ enum PathStatus {
+ UNUSABLE,
+ SHORT_CIRCUIT_DISABLED,
+ }
+
+ /**
+ * Information about domain socket paths.
+ */
+ Cache<String, PathStatus> pathInfo =
+ CacheBuilder.newBuilder()
+ .expireAfterWrite(10, TimeUnit.MINUTES)
+ .build();
+
+ public DomainSocketFactory(Conf conf) {
+ this.conf = conf;
+
+ String feature = null;
+ if (conf.shortCircuitLocalReads) {
+ feature = "The short-circuit local reads feature";
+ } else if (conf.domainSocketDataTraffic) {
+ feature = "UNIX domain socket data traffic";
+ }
+ if (feature != null) {
+ if (conf.domainSocketPath == null) {
+ LOG.warn(feature + " is disabled because you have not set " +
+ DFSConfigKeys.DFS_DATANODE_DOMAIN_SOCKET_PATH_KEY);
+ } else if (DomainSocket.getLoadingFailureReason() != null) {
+ LOG.error(feature + " is disabled because " +
+ DomainSocket.getLoadingFailureReason());
+ } else {
+ LOG.debug(feature + "is enabled.");
+ }
+ }
+ }
+
+ /**
+ * Create a DomainSocket.
+ *
+ * @param addr The address of the DataNode
+ * @param stream The DFSInputStream the socket will be created for.
+ *
+ * @return null if the socket could not be created; the
+ * socket otherwise. If there was an error while
+ * creating the socket, we will add the socket path
+ * to our list of failed domain socket paths.
+ */
+ DomainSocket create(InetSocketAddress addr, DFSInputStream stream) {
+ // If there is no domain socket path configured, we can't use domain
+ // sockets.
+ if (conf.domainSocketPath == null) return null;
+ // UNIX domain sockets can only be used to talk to local peers
+ if (!DFSClient.isLocalAddress(addr)) return null;
+ // If the DomainSocket code is not loaded, we can't create
+ // DomainSocket objects.
+ if (DomainSocket.getLoadingFailureReason() != null) return null;
+ String escapedPath = DomainSocket.
+ getEffectivePath(conf.domainSocketPath, addr.getPort());
+ PathStatus info = pathInfo.getIfPresent(escapedPath);
+ if (info == PathStatus.UNUSABLE) {
+ // We tried to connect to this domain socket before, and it was totally
+ // unusable.
+ return null;
+ }
+ if ((!conf.domainSocketDataTraffic) &&
+ ((info == PathStatus.SHORT_CIRCUIT_DISABLED) ||
+ stream.shortCircuitForbidden())) {
+ // If we don't want to pass data over domain sockets, and we don't want
+ // to pass file descriptors over them either, we have no use for domain
+ // sockets.
+ return null;
+ }
+ boolean success = false;
+ DomainSocket sock = null;
+ try {
+ sock = DomainSocket.connect(escapedPath);
+ sock.setAttribute(DomainSocket.RCV_TIMEO, conf.socketTimeout);
+ success = true;
+ } catch (IOException e) {
+ LOG.error("error creating DomainSocket", e);
+ // fall through
+ } finally {
+ if (!success) {
+ if (sock != null) {
+ IOUtils.closeQuietly(sock);
+ }
+ pathInfo.put(escapedPath, PathStatus.UNUSABLE);
+ sock = null;
+ }
+ }
+ return sock;
+ }
+
+ public void disableShortCircuitForPath(String path) {
+ pathInfo.put(path, PathStatus.SHORT_CIRCUIT_DISABLED);
+ }
+
+ public void disableDomainSocketPath(String path) {
+ pathInfo.put(path, PathStatus.UNUSABLE);
+ }
+}
\ No newline at end of file
Added: hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/FileInputStreamCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/FileInputStreamCache.java?rev=1432335&view=auto
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/FileInputStreamCache.java (added)
+++ hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/FileInputStreamCache.java Fri Jan 11 23:52:22 2013
@@ -0,0 +1,265 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import java.io.FileInputStream;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.util.Time;
+
+import com.google.common.collect.LinkedListMultimap;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+/**
+ * FileInputStream cache is used to cache FileInputStream objects that we
+ * have received from the DataNode.
+ */
+class FileInputStreamCache {
+ private final static Log LOG = LogFactory.getLog(FileInputStreamCache.class);
+
+ /**
+ * The executor service that runs the cacheCleaner. There is only one of
+ * these per VM.
+ */
+ private final static ScheduledThreadPoolExecutor executor
+ = new ScheduledThreadPoolExecutor(1, new ThreadFactoryBuilder().
+ setDaemon(true).setNameFormat("FileInputStreamCache Cleaner").
+ build());
+
+ /**
+ * The CacheCleaner for this FileInputStreamCache. We don't create this
+ * and schedule it until it becomes necessary.
+ */
+ private CacheCleaner cacheCleaner;
+
+ /**
+ * Maximum number of entries to allow in the cache.
+ */
+ private final int maxCacheSize;
+
+ /**
+ * The minimum time in milliseconds to preserve an element in the cache.
+ */
+ private final long expiryTimeMs;
+
+ /**
+ * True if the FileInputStreamCache is closed.
+ */
+ private boolean closed = false;
+
+ /**
+ * Cache entries.
+ */
+ private final LinkedListMultimap<Key, Value> map = LinkedListMultimap.create();
+
+ /**
+ * Expiry thread which makes sure that the file descriptors get closed
+ * after a while.
+ */
+ class CacheCleaner implements Runnable {
+ @Override
+ public void run() {
+ synchronized(FileInputStreamCache.this) {
+ if (closed) return;
+ long curTime = Time.monotonicNow();
+ for (Iterator<Entry<Key, Value>> iter = map.entries().iterator();
+ iter.hasNext();
+ iter = map.entries().iterator()) {
+ Entry<Key, Value> entry = iter.next();
+ if (entry.getValue().getTime() + expiryTimeMs >= curTime) {
+ break;
+ }
+ entry.getValue().close();
+ iter.remove();
+ }
+ }
+ }
+ }
+
+ /**
+ * The key identifying a FileInputStream array.
+ */
+ static class Key {
+ private final DatanodeID datanodeID;
+ private final ExtendedBlock block;
+
+ public Key(DatanodeID datanodeID, ExtendedBlock block) {
+ this.datanodeID = datanodeID;
+ this.block = block;
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (!(other instanceof FileInputStreamCache.Key)) {
+ return false;
+ }
+ FileInputStreamCache.Key otherKey = (FileInputStreamCache.Key)other;
+ return (block.equals(otherKey.block) &
+ (block.getGenerationStamp() == otherKey.block.getGenerationStamp()) &
+ datanodeID.equals(otherKey.datanodeID));
+ }
+
+ @Override
+ public int hashCode() {
+ return block.hashCode();
+ }
+ }
+
+ /**
+ * The value containing a FileInputStream array and the time it was added to
+ * the cache.
+ */
+ static class Value {
+ private final FileInputStream fis[];
+ private final long time;
+
+ public Value (FileInputStream fis[]) {
+ this.fis = fis;
+ this.time = Time.monotonicNow();
+ }
+
+ public FileInputStream[] getFileInputStreams() {
+ return fis;
+ }
+
+ public long getTime() {
+ return time;
+ }
+
+ public void close() {
+ IOUtils.cleanup(LOG, fis);
+ }
+ }
+
+ /**
+ * Create a new FileInputStream
+ *
+ * @param maxCacheSize The maximum number of elements to allow in
+ * the cache.
+ * @param expiryTimeMs The minimum time in milliseconds to preserve
+ * elements in the cache.
+ */
+ public FileInputStreamCache(int maxCacheSize, long expiryTimeMs) {
+ this.maxCacheSize = maxCacheSize;
+ this.expiryTimeMs = expiryTimeMs;
+ }
+
+ /**
+ * Put an array of FileInputStream objects into the cache.
+ *
+ * @param datanodeID The DatanodeID to store the streams under.
+ * @param block The Block to store the streams under.
+ * @param fis The streams.
+ */
+ public void put(DatanodeID datanodeID, ExtendedBlock block,
+ FileInputStream fis[]) {
+ boolean inserted = false;
+ try {
+ synchronized(this) {
+ if (closed) return;
+ if (map.size() + 1 > maxCacheSize) {
+ Iterator<Entry<Key, Value>> iter = map.entries().iterator();
+ if (!iter.hasNext()) return;
+ Entry<Key, Value> entry = iter.next();
+ entry.getValue().close();
+ iter.remove();
+ }
+ if (cacheCleaner == null) {
+ cacheCleaner = new CacheCleaner();
+ executor.scheduleAtFixedRate(cacheCleaner, expiryTimeMs, expiryTimeMs,
+ TimeUnit.MILLISECONDS);
+ }
+ map.put(new Key(datanodeID, block), new Value(fis));
+ inserted = true;
+ }
+ } finally {
+ if (!inserted) {
+ IOUtils.cleanup(LOG, fis);
+ }
+ }
+ }
+
+ /**
+ * Find and remove an array of FileInputStream objects from the cache.
+ *
+ * @param datanodeID The DatanodeID to search for.
+ * @param block The Block to search for.
+ *
+ * @return null if no streams can be found; the
+ * array otherwise. If this is non-null, the
+ * array will have been removed from the cache.
+ */
+ public synchronized FileInputStream[] get(DatanodeID datanodeID,
+ ExtendedBlock block) {
+ Key key = new Key(datanodeID, block);
+ List<Value> ret = map.get(key);
+ if (ret.isEmpty()) return null;
+ Value val = ret.get(0);
+ map.remove(key, val);
+ return val.getFileInputStreams();
+ }
+
+ /**
+ * Close the cache and free all associated resources.
+ */
+ public synchronized void close() {
+ if (closed) return;
+ closed = true;
+ if (cacheCleaner != null) {
+ executor.remove(cacheCleaner);
+ }
+ for (Iterator<Entry<Key, Value>> iter = map.entries().iterator();
+ iter.hasNext();
+ iter = map.entries().iterator()) {
+ Entry<Key, Value> entry = iter.next();
+ entry.getValue().close();
+ iter.remove();
+ }
+ }
+
+ public synchronized String toString() {
+ StringBuilder bld = new StringBuilder();
+ bld.append("FileInputStreamCache(");
+ String prefix = "";
+ for (Entry<Key, Value> entry : map.entries()) {
+ bld.append(prefix);
+ bld.append(entry.getKey());
+ prefix = ", ";
+ }
+ bld.append(")");
+ return bld.toString();
+ }
+
+ public long getExpiryTimeMs() {
+ return expiryTimeMs;
+ }
+
+ public int getMaxCacheSize() {
+ return maxCacheSize;
+ }
+}
Modified: hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java?rev=1432335&r1=1432334&r2=1432335&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java Fri Jan 11 23:52:22 2013
@@ -413,7 +413,8 @@ public class RemoteBlockReader extends F
}
@Override
- public synchronized void close(PeerCache peerCache) throws IOException {
+ public synchronized void close(PeerCache peerCache,
+ FileInputStreamCache fisCache) throws IOException {
startOffset = -1;
checksum = null;
if (peerCache != null & sentStatusCode) {
@@ -470,4 +471,11 @@ public class RemoteBlockReader extends F
public int read(ByteBuffer buf) throws IOException {
throw new UnsupportedOperationException("readDirect unsupported in RemoteBlockReader");
}
+
+ @Override
+ public int available() throws IOException {
+ // An optimistic estimate of how much data is available
+ // to us without doing network I/O.
+ return DFSClient.TCP_WINDOW_SIZE;
+ }
}
Modified: hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java?rev=1432335&r1=1432334&r2=1432335&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java Fri Jan 11 23:52:22 2013
@@ -275,7 +275,8 @@ public class RemoteBlockReader2 impleme
@Override
- public synchronized void close(PeerCache peerCache) throws IOException {
+ public synchronized void close(PeerCache peerCache,
+ FileInputStreamCache fisCache) throws IOException {
packetReceiver.close();
startOffset = -1;
checksum = null;
@@ -422,4 +423,11 @@ public class RemoteBlockReader2 impleme
}
}
}
+
+ @Override
+ public int available() throws IOException {
+ // An optimistic estimate of how much data is available
+ // to us without doing network I/O.
+ return DFSClient.TCP_WINDOW_SIZE;
+ }
}
Modified: hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/DomainPeerServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/DomainPeerServer.java?rev=1432335&r1=1432334&r2=1432335&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/DomainPeerServer.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/DomainPeerServer.java Fri Jan 11 23:52:22 2013
@@ -18,7 +18,6 @@
package org.apache.hadoop.hdfs.net;
-import java.io.Closeable;
import java.io.IOException;
import java.net.SocketTimeoutException;
@@ -27,8 +26,10 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.hdfs.net.PeerServer;
import org.apache.hadoop.net.unix.DomainSocket;
+import org.apache.hadoop.classification.InterfaceAudience;
-class DomainPeerServer implements PeerServer {
+@InterfaceAudience.Private
+public class DomainPeerServer implements PeerServer {
static Log LOG = LogFactory.getLog(DomainPeerServer.class);
private final DomainSocket sock;
Modified: hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java?rev=1432335&r1=1432334&r2=1432335&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java Fri Jan 11 23:52:22 2013
@@ -105,6 +105,18 @@ public interface DataTransferProtocol {
final DatanodeInfo[] targets) throws IOException;
/**
+ * Request short circuit access file descriptors from a DataNode.
+ *
+ * @param blk The block to get file descriptors for.
+ * @param blockToken Security token for accessing the block.
+ * @param maxVersion Maximum version of the block data the client
+ * can understand.
+ */
+ public void requestShortCircuitFds(final ExtendedBlock blk,
+ final Token<BlockTokenIdentifier> blockToken,
+ int maxVersion) throws IOException;
+
+ /**
* Receive a block from a source datanode
* and then notifies the namenode
* to remove the copy from the original datanode.
Modified: hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Op.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Op.java?rev=1432335&r1=1432334&r2=1432335&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Op.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Op.java Fri Jan 11 23:52:22 2013
@@ -34,7 +34,8 @@ public enum Op {
REPLACE_BLOCK((byte)83),
COPY_BLOCK((byte)84),
BLOCK_CHECKSUM((byte)85),
- TRANSFER_BLOCK((byte)86);
+ TRANSFER_BLOCK((byte)86),
+ REQUEST_SHORT_CIRCUIT_FDS((byte)87);
/** The code for this operation. */
public final byte code;
Modified: hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java?rev=1432335&r1=1432334&r2=1432335&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java Fri Jan 11 23:52:22 2013
@@ -32,6 +32,7 @@ import org.apache.hadoop.hdfs.protocol.p
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReadBlockProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReplaceBlockProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpTransferBlockProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpRequestShortCircuitAccessProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProto;
/** Receiver */
@@ -77,6 +78,9 @@ public abstract class Receiver implement
case TRANSFER_BLOCK:
opTransferBlock(in);
break;
+ case REQUEST_SHORT_CIRCUIT_FDS:
+ opRequestShortCircuitFds(in);
+ break;
default:
throw new IOException("Unknown op " + op + " in data stream");
}
@@ -117,6 +121,15 @@ public abstract class Receiver implement
fromProtos(proto.getTargetsList()));
}
+ /** Receive {@link Op#REQUEST_SHORT_CIRCUIT_FDS} */
+ private void opRequestShortCircuitFds(DataInputStream in) throws IOException {
+ final OpRequestShortCircuitAccessProto proto =
+ OpRequestShortCircuitAccessProto.parseFrom(vintPrefixed(in));
+ requestShortCircuitFds(fromProto(proto.getHeader().getBlock()),
+ fromProto(proto.getHeader().getToken()),
+ proto.getMaxVersion());
+ }
+
/** Receive OP_REPLACE_BLOCK */
private void opReplaceBlock(DataInputStream in) throws IOException {
OpReplaceBlockProto proto = OpReplaceBlockProto.parseFrom(vintPrefixed(in));
Modified: hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java?rev=1432335&r1=1432334&r2=1432335&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java Fri Jan 11 23:52:22 2013
@@ -36,6 +36,7 @@ import org.apache.hadoop.hdfs.protocol.p
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReadBlockProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReplaceBlockProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpTransferBlockProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpRequestShortCircuitAccessProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProto;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.security.token.Token;
@@ -136,6 +137,17 @@ public class Sender implements DataTrans
}
@Override
+ public void requestShortCircuitFds(final ExtendedBlock blk,
+ final Token<BlockTokenIdentifier> blockToken,
+ int maxVersion) throws IOException {
+ OpRequestShortCircuitAccessProto proto =
+ OpRequestShortCircuitAccessProto.newBuilder()
+ .setHeader(DataTransferProtoUtil.buildBaseHeader(
+ blk, blockToken)).setMaxVersion(maxVersion).build();
+ send(out, Op.REQUEST_SHORT_CIRCUIT_FDS, proto);
+ }
+
+ @Override
public void replaceBlock(final ExtendedBlock blk,
final Token<BlockTokenIdentifier> blockToken,
final String delHint,
Modified: hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java?rev=1432335&r1=1432334&r2=1432335&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java Fri Jan 11 23:52:22 2013
@@ -213,7 +213,7 @@ public class JspHelper {
offsetIntoBlock, amtToRead, true,
"JspHelper", TcpPeerServer.peerFromSocketAndKey(s, encryptionKey),
new DatanodeID(addr.getAddress().toString(),
- addr.getHostName(), poolId, addr.getPort(), 0, 0));
+ addr.getHostName(), poolId, addr.getPort(), 0, 0), null, false);
byte[] buf = new byte[(int)amtToRead];
int readOffset = 0;
@@ -232,8 +232,7 @@ public class JspHelper {
amtToRead -= numRead;
readOffset += numRead;
}
- blockReader = null;
- s.close();
+ blockReader.close(null, null);
out.print(HtmlQuoting.quoteHtmlChars(new String(buf)));
}