You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by sz...@apache.org on 2013/04/14 01:06:00 UTC
svn commit: r1467713 [1/4] - in
/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs: ./
dev-support/ src/main/java/ src/main/java/org/apache/hadoop/hdfs/
src/main/java/org/apache/hadoop/hdfs/net/
src/main/java/org/apache/hadoop/hdfs/proto...
Author: szetszwo
Date: Sat Apr 13 23:05:54 2013
New Revision: 1467713
URL: http://svn.apache.org/r1467713
Log:
Merging r1466653 through r1467712 from trunk.
Added:
hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java
- copied unchanged from r1467712, hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java
hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DomainSocketFactory.java
- copied unchanged from r1467712, hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DomainSocketFactory.java
hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/FileInputStreamCache.java
- copied unchanged from r1467712, hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/FileInputStreamCache.java
hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/PeerCache.java
- copied unchanged from r1467712, hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/PeerCache.java
hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/
- copied from r1467712, hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/
hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/BasicInetPeer.java
- copied unchanged from r1467712, hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/BasicInetPeer.java
hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/DomainPeer.java
- copied unchanged from r1467712, hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/DomainPeer.java
hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/DomainPeerServer.java
- copied unchanged from r1467712, hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/DomainPeerServer.java
hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/EncryptedPeer.java
- copied unchanged from r1467712, hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/EncryptedPeer.java
hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/NioInetPeer.java
- copied unchanged from r1467712, hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/NioInetPeer.java
hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/Peer.java
- copied unchanged from r1467712, hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/Peer.java
hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/PeerServer.java
- copied unchanged from r1467712, hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/PeerServer.java
hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/TcpPeerServer.java
- copied unchanged from r1467712, hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/TcpPeerServer.java
hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/site/apt/ShortCircuitLocalReads.apt.vm
- copied unchanged from r1467712, hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/site/apt/ShortCircuitLocalReads.apt.vm
hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocalLegacy.java
- copied unchanged from r1467712, hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocalLegacy.java
hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDisableConnCache.java
- copied unchanged from r1467712, hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDisableConnCache.java
hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileInputStreamCache.java
- copied unchanged from r1467712, hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileInputStreamCache.java
hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelShortCircuitLegacyRead.java
- copied unchanged from r1467712, hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelShortCircuitLegacyRead.java
hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelShortCircuitRead.java
- copied unchanged from r1467712, hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelShortCircuitRead.java
hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelShortCircuitReadNoChecksum.java
- copied unchanged from r1467712, hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelShortCircuitReadNoChecksum.java
hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelShortCircuitReadUnCached.java
- copied unchanged from r1467712, hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelShortCircuitReadUnCached.java
hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelUnixDomainRead.java
- copied unchanged from r1467712, hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelUnixDomainRead.java
hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPeerCache.java
- copied unchanged from r1467712, hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPeerCache.java
Removed:
hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/SocketCache.java
hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelLocalRead.java
hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestSocketCache.java
Modified:
hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/ (props changed)
hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml
hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/pom.xml
hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/ (props changed)
hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java
hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java
hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java
hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java
hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java
hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java
hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Op.java
hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java
hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java
hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/delegation/DelegationTokenSecretManager.java
hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java
hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java
hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/native/ (props changed)
hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientDatanodeProtocol.proto
hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto
hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/datanode/ (props changed)
hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/hdfs/ (props changed)
hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/secondary/ (props changed)
hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/hdfs/ (props changed)
hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java
hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocal.java
hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientBlockVerification.java
hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestConnCache.java
hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSUpgrade.java
hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferKeepalive.java
hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelRead.java
hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelReadUtil.java
hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestSafeMode.java
hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestShortCircuitLocalRead.java
hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java
hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java
hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCheckpoint.java
hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestSecurityTokenEditLog.java
Propchange: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/
------------------------------------------------------------------------------
Merged /hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs:r1430995-1467533
Merged /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs:r1466653-1467712
Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1467713&r1=1467712&r2=1467713&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Sat Apr 13 23:05:54 2013
@@ -187,6 +187,9 @@ Trunk (Unreleased)
HDFS-4633 TestDFSClientExcludedNodes fails sporadically if excluded nodes
cache expires too quickly (Chris Nauroth via Sanjay)
+ HDFS-347. DFS read performance suboptimal when client co-located on nodes
+ with data. (Colin Patrick McCabe via todd and atm)
+
OPTIMIZATIONS
BUG FIXES
@@ -355,6 +358,62 @@ Trunk (Unreleased)
HDFS-4674. TestBPOfferService fails on Windows due to failure parsing
datanode data directory as URI. (Chris Nauroth via suresh)
+ BREAKDOWN OF HDFS-347 SUBTASKS AND RELATED JIRAS
+
+ HDFS-4353. Encapsulate connections to peers in Peer and PeerServer classes.
+ (Colin Patrick McCabe via todd)
+
+ HDFS-4354. Create DomainSocket and DomainPeer and associated unit tests.
+ (Colin Patrick McCabe via todd)
+
+ HDFS-4356. BlockReaderLocal should use passed file descriptors rather than paths.
+ (Colin Patrick McCabe via todd)
+
+ HDFS-4388. DomainSocket should throw AsynchronousCloseException when appropriate.
+ (Colin Patrick McCabe via todd)
+
+ HDFS-4390. Bypass UNIX domain socket unit tests when they cannot be run.
+ (Colin Patrick McCabe via todd)
+
+ HDFS-4400. DFSInputStream#getBlockReader: last retries should ignore the cache
+ (Colin Patrick McCabe via todd)
+
+ HDFS-4401. Fix bug in DomainSocket path validation
+ (Colin Patrick McCabe via todd)
+
+ HDFS-4402. Some small DomainSocket fixes: avoid findbugs warning, change
+ log level, etc. (Colin Patrick McCabe via todd)
+
+ HDFS-4418. increase default FileInputStreamCache size (todd)
+
+ HDFS-4416. Rename dfs.datanode.domain.socket.path to dfs.domain.socket.path
+ (Colin Patrick McCabe via todd)
+
+ HDFS-4417. Fix case where local reads get disabled incorrectly
+ (Colin Patrick McCabe and todd via todd)
+
+ HDFS-4433. Make TestPeerCache not flaky (Colin Patrick McCabe via todd)
+
+ HDFS-4438. TestDomainSocket fails when system umask is set to 0002. (Colin
+ Patrick McCabe via atm)
+
+ HDFS-4440. Avoid annoying log message when dfs.domain.socket.path is not
+ set. (Colin Patrick McCabe via atm)
+
+ HDFS-4473. Don't create domain socket unless we need it. (Colin Patrick McCabe via atm)
+
+ HDFS-4485. DN should chmod socket path a+w. (Colin Patrick McCabe via atm)
+
+ HDFS-4453. Make a simple doc to describe the usage and design of the
+ shortcircuit read feature. (Colin Patrick McCabe via atm)
+
+ HDFS-4496. DFSClient: don't create a domain socket unless we need it (Colin
+ Patrick McCabe via todd)
+
+ HDFS-347: style cleanups (Colin Patrick McCabe via atm)
+
+ HDFS-4538. Allow use of legacy blockreader (Colin Patrick McCabe via todd)
+
Release 2.0.5-beta - UNRELEASED
INCOMPATIBLE CHANGES
@@ -399,6 +458,9 @@ Release 2.0.5-beta - UNRELEASED
HDFS-3940. Add Gset#clear method and clear the block map when namenode is
shutdown. (suresh)
+ HDFS-4679. Namenode operation checks should be done in a consistent
+ manner. (suresh)
+
OPTIMIZATIONS
BUG FIXES
@@ -500,6 +562,9 @@ Release 2.0.5-beta - UNRELEASED
HDFS-4643. Fix flakiness in TestQuorumJournalManager. (todd)
+ HDFS-4639. startFileInternal() should not increment generation stamp.
+ (Plamen Jeliazkov via shv)
+
Release 2.0.4-alpha - UNRELEASED
INCOMPATIBLE CHANGES
@@ -2473,6 +2538,20 @@ Release 2.0.0-alpha - 05-23-2012
HDFS-3039. Address findbugs and javadoc warnings on branch. (todd via atm)
+Release 0.23.8 - UNRELEASED
+
+ INCOMPATIBLE CHANGES
+
+ NEW FEATURES
+
+ IMPROVEMENTS
+
+ OPTIMIZATIONS
+
+ BUG FIXES
+
+ HDFS-4477. Secondary namenode may retain old tokens (daryn via kihwal)
+
Release 0.23.7 - UNRELEASED
INCOMPATIBLE CHANGES
Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml?rev=1467713&r1=1467712&r2=1467713&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml Sat Apr 13 23:05:54 2013
@@ -290,6 +290,14 @@
<Method name="persistPaxosData" />
<Bug pattern="OS_OPEN_STREAM" />
</Match>
+
+ <!-- getShortCircuitFdsForRead is supposed to return open streams. -->
+ <Match>
+ <Class name="org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetImpl" />
+ <Method name="getShortCircuitFdsForRead" />
+ <Bug pattern="OBL_UNSATISFIED_OBLIGATION" />
+ </Match>
+
<!-- Don't complain about LocalDatanodeInfo's anonymous class -->
<Match>
<Class name="org.apache.hadoop.hdfs.BlockReaderLocal$LocalDatanodeInfo$1" />
Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/pom.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/pom.xml?rev=1467713&r1=1467712&r2=1467713&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/pom.xml (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/pom.xml Sat Apr 13 23:05:54 2013
@@ -515,6 +515,7 @@ http://maven.apache.org/xsd/maven-4.0.0.
<excludes>
<exclude>CHANGES.txt</exclude>
<exclude>CHANGES.HDFS-1623.txt</exclude>
+ <exclude>CHANGES.HDFS-347.txt</exclude>
<exclude>.idea/**</exclude>
<exclude>src/main/conf/*</exclude>
<exclude>src/main/docs/**</exclude>
Propchange: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/
------------------------------------------------------------------------------
Merged /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java:r1466653-1467712
Merged /hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java:r1430995-1467533
Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java?rev=1467713&r1=1467712&r2=1467713&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java Sat Apr 13 23:05:54 2013
@@ -18,10 +18,8 @@
package org.apache.hadoop.hdfs;
import java.io.IOException;
-import java.net.Socket;
import org.apache.hadoop.fs.ByteBufferReadable;
-import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
/**
* A BlockReader is responsible for reading a single block
@@ -43,7 +41,29 @@ public interface BlockReader extends Byt
*/
long skip(long n) throws IOException;
- void close() throws IOException;
+ /**
+ * Returns an estimate of the number of bytes that can be read
+ * (or skipped over) from this input stream without performing
+ * network I/O.
+ */
+ int available() throws IOException;
+
+ /**
+ * Close the block reader.
+ *
+ * @param peerCache The PeerCache to put the Peer we're using back
+ * into, or null if we should simply close the Peer
+ * we're using (along with its Socket).
+ * Ignored by Readers that don't maintain Peers.
+ * @param fisCache The FileInputStreamCache to put our FileInputStreams
+ * back into, or null if we should simply close them.
+ * Ignored by Readers that don't maintain
+ * FileInputStreams.
+ *
+ * @throws IOException
+ */
+ void close(PeerCache peerCache, FileInputStreamCache fisCache)
+ throws IOException;
/**
* Read exactly the given amount of data, throwing an exception
@@ -60,20 +80,4 @@ public interface BlockReader extends Byt
* filled or the next call will return EOF.
*/
int readAll(byte[] buf, int offset, int len) throws IOException;
-
- /**
- * Take the socket used to talk to the DN.
- */
- Socket takeSocket();
-
- /**
- * Whether the BlockReader has reached the end of its input stream
- * and successfully sent a status code back to the datanode.
- */
- boolean hasSentStatusCode();
-
- /**
- * @return a reference to the streams this block reader is using.
- */
- IOStreamPair getStreams();
}
Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java?rev=1467713&r1=1467712&r2=1467713&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java Sat Apr 13 23:05:54 2013
@@ -17,20 +17,31 @@
*/
package org.apache.hadoop.hdfs;
+import java.io.BufferedOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.FileInputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
-import java.net.Socket;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdfs.DFSClient.Conf;
+import org.apache.hadoop.hdfs.net.Peer;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
-import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferEncryptor;
-import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
-import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
+import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
+import org.apache.hadoop.hdfs.protocolPB.PBHelper;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
-import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.net.unix.DomainSocket;
+import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.SecretManager.InvalidToken;
import org.apache.hadoop.security.token.Token;
@@ -40,74 +51,181 @@ import org.apache.hadoop.security.token.
@InterfaceAudience.Private
public class BlockReaderFactory {
/**
- * @see #newBlockReader(Conf, Socket, String, ExtendedBlock, Token, long, long, int, boolean, String)
- */
- public static BlockReader newBlockReader(
- Configuration conf,
- Socket sock, String file,
- ExtendedBlock block, Token<BlockTokenIdentifier> blockToken,
- long startOffset, long len, DataEncryptionKey encryptionKey)
- throws IOException {
- int bufferSize = conf.getInt(DFSConfigKeys.IO_FILE_BUFFER_SIZE_KEY,
- DFSConfigKeys.IO_FILE_BUFFER_SIZE_DEFAULT);
- return newBlockReader(new Conf(conf),
- sock, file, block, blockToken, startOffset,
- len, bufferSize, true, "", encryptionKey, null);
- }
-
- /**
* Create a new BlockReader specifically to satisfy a read.
* This method also sends the OP_READ_BLOCK request.
*
* @param conf the DFSClient configuration
- * @param sock An established Socket to the DN. The BlockReader will not close it normally
* @param file File location
* @param block The block object
* @param blockToken The block token for security
* @param startOffset The read offset, relative to block head
- * @param len The number of bytes to read
+ * @param len The number of bytes to read, or -1 to read as many as
+ * possible.
* @param bufferSize The IO buffer size (not the client buffer size)
+ * Ignored except on the legacy BlockReader.
* @param verifyChecksum Whether to verify checksum
- * @param clientName Client name
- * @return New BlockReader instance, or null on error.
+ * @param clientName Client name. Used for log messages.
+ * @param peer The peer
+ * @param datanodeID The datanode that the Peer is connected to
+ * @param domainSocketFactory The DomainSocketFactory to notify if the Peer
+ * is a DomainPeer which turns out to be faulty.
+ * If null, no factory will be notified in this
+ * case.
+ * @param allowShortCircuitLocalReads True if short-circuit local reads
+ * should be allowed.
+ * @return New BlockReader instance
*/
@SuppressWarnings("deprecation")
public static BlockReader newBlockReader(
- Conf conf,
- Socket sock, String file,
+ Configuration conf,
+ String file,
ExtendedBlock block,
Token<BlockTokenIdentifier> blockToken,
long startOffset, long len,
- int bufferSize, boolean verifyChecksum,
+ boolean verifyChecksum,
String clientName,
- DataEncryptionKey encryptionKey,
- IOStreamPair ioStreams)
- throws IOException {
-
- if (conf.useLegacyBlockReader) {
- if (encryptionKey != null) {
- throw new RuntimeException("Encryption is not supported with the legacy block reader.");
+ Peer peer,
+ DatanodeID datanodeID,
+ DomainSocketFactory domSockFactory,
+ boolean allowShortCircuitLocalReads)
+ throws IOException {
+ peer.setReadTimeout(conf.getInt(DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY,
+ HdfsServerConstants.READ_TIMEOUT));
+ peer.setWriteTimeout(HdfsServerConstants.WRITE_TIMEOUT);
+
+ if (peer.getDomainSocket() != null) {
+ if (allowShortCircuitLocalReads &&
+ (!conf.getBoolean(DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL,
+ DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL_DEFAULT))) {
+ // If this is a domain socket, and short-circuit local reads are
+ // enabled, try to set up a BlockReaderLocal.
+ BlockReader reader = newShortCircuitBlockReader(conf, file,
+ block, blockToken, startOffset, len, peer, datanodeID,
+ domSockFactory, verifyChecksum);
+ if (reader != null) {
+ // One we've constructed the short-circuit block reader, we don't
+ // need the socket any more. So let's return it to the cache.
+ PeerCache peerCache = PeerCache.getInstance(
+ conf.getInt(DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_KEY,
+ DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_DEFAULT),
+ conf.getLong(DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_KEY,
+ DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_DEFAULT));
+ peerCache.put(datanodeID, peer);
+ return reader;
+ }
+ }
+ // If this is a domain socket and we couldn't (or didn't want to) set
+ // up a BlockReaderLocal, check that we are allowed to pass data traffic
+ // over the socket before proceeding.
+ if (!conf.getBoolean(DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC,
+ DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC_DEFAULT)) {
+ throw new IOException("Because we can't do short-circuit access, " +
+ "and data traffic over domain sockets is disabled, " +
+ "we cannot use this socket to talk to " + datanodeID);
}
- return RemoteBlockReader.newBlockReader(
- sock, file, block, blockToken, startOffset, len, bufferSize, verifyChecksum, clientName);
+ }
+
+ if (conf.getBoolean(DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADER,
+ DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADER_DEFAULT)) {
+ return RemoteBlockReader.newBlockReader(file,
+ block, blockToken, startOffset, len,
+ conf.getInt(DFSConfigKeys.IO_FILE_BUFFER_SIZE_KEY,
+ DFSConfigKeys.IO_FILE_BUFFER_SIZE_DEFAULT),
+ verifyChecksum, clientName, peer, datanodeID);
} else {
- if (ioStreams == null) {
- ioStreams = new IOStreamPair(NetUtils.getInputStream(sock),
- NetUtils.getOutputStream(sock, HdfsServerConstants.WRITE_TIMEOUT));
- if (encryptionKey != null) {
- IOStreamPair encryptedStreams =
- DataTransferEncryptor.getEncryptedStreams(
- ioStreams.out, ioStreams.in, encryptionKey);
- ioStreams = encryptedStreams;
+ return RemoteBlockReader2.newBlockReader(
+ file, block, blockToken, startOffset, len,
+ verifyChecksum, clientName, peer, datanodeID);
+ }
+ }
+
+ /**
+ * Create a new short-circuit BlockReader.
+ *
+ * Here, we ask the DataNode to pass us file descriptors over our
+ * DomainSocket. If the DataNode declines to do so, we'll return null here;
+ * otherwise, we'll return the BlockReaderLocal. If the DataNode declines,
+ * this function will inform the DomainSocketFactory that short-circuit local
+ * reads are disabled for this DataNode, so that we don't ask again.
+ *
+ * @param conf the configuration.
+ * @param file the file name. Used in log messages.
+ * @param block The block object.
+ * @param blockToken The block token for security.
+ * @param startOffset The read offset, relative to block head.
+ * @param len The number of bytes to read, or -1 to read
+ * as many as possible.
+ * @param peer The peer to use.
+ * @param datanodeID The datanode that the Peer is connected to.
+ * @param domSockFactory The DomainSocketFactory to notify if the Peer
+ * is a DomainPeer which turns out to be faulty.
+ * If null, no factory will be notified in this
+ * case.
+ * @param verifyChecksum True if we should verify the checksums.
+ * Note: even if this is true, when
+ * DFS_CLIENT_READ_CHECKSUM_SKIP_CHECKSUM_KEY is
+ * set, we will skip checksums.
+ *
+ * @return The BlockReaderLocal, or null if the
+ * DataNode declined to provide short-circuit
+ * access.
+ * @throws IOException If there was a communication error.
+ */
+ private static BlockReaderLocal newShortCircuitBlockReader(
+ Configuration conf, String file, ExtendedBlock block,
+ Token<BlockTokenIdentifier> blockToken, long startOffset,
+ long len, Peer peer, DatanodeID datanodeID,
+ DomainSocketFactory domSockFactory, boolean verifyChecksum)
+ throws IOException {
+ final DataOutputStream out =
+ new DataOutputStream(new BufferedOutputStream(
+ peer.getOutputStream()));
+ new Sender(out).requestShortCircuitFds(block, blockToken, 1);
+ DataInputStream in =
+ new DataInputStream(peer.getInputStream());
+ BlockOpResponseProto resp = BlockOpResponseProto.parseFrom(
+ PBHelper.vintPrefixed(in));
+ DomainSocket sock = peer.getDomainSocket();
+ switch (resp.getStatus()) {
+ case SUCCESS:
+ BlockReaderLocal reader = null;
+ byte buf[] = new byte[1];
+ FileInputStream fis[] = new FileInputStream[2];
+ sock.recvFileInputStreams(fis, buf, 0, buf.length);
+ try {
+ reader = new BlockReaderLocal(conf, file, block,
+ startOffset, len, fis[0], fis[1], datanodeID, verifyChecksum);
+ } finally {
+ if (reader == null) {
+ IOUtils.cleanup(DFSClient.LOG, fis[0], fis[1]);
}
}
-
- return RemoteBlockReader2.newBlockReader(
- sock, file, block, blockToken, startOffset, len, bufferSize,
- verifyChecksum, clientName, encryptionKey, ioStreams);
+ return reader;
+ case ERROR_UNSUPPORTED:
+ if (!resp.hasShortCircuitAccessVersion()) {
+ DFSClient.LOG.warn("short-circuit read access is disabled for " +
+ "DataNode " + datanodeID + ". reason: " + resp.getMessage());
+ domSockFactory.disableShortCircuitForPath(sock.getPath());
+ } else {
+ DFSClient.LOG.warn("short-circuit read access for the file " +
+ file + " is disabled for DataNode " + datanodeID +
+ ". reason: " + resp.getMessage());
+ }
+ return null;
+ case ERROR_ACCESS_TOKEN:
+ String msg = "access control error while " +
+ "attempting to set up short-circuit access to " +
+ file + resp.getMessage();
+ DFSClient.LOG.debug(msg);
+ throw new InvalidBlockTokenException(msg);
+ default:
+ DFSClient.LOG.warn("error while attempting to set up short-circuit " +
+ "access to " + file + ": " + resp.getMessage());
+ domSockFactory.disableShortCircuitForPath(sock.getPath());
+ return null;
}
}
-
+
/**
* File name to print when accessing a block directly (from servlets)
* @param s Address of the block location
@@ -119,4 +237,24 @@ public class BlockReaderFactory {
final String poolId, final long blockId) {
return s.toString() + ":" + poolId + ":" + blockId;
}
+
+ /**
+ * Get {@link BlockReaderLocalLegacy} for short circuited local reads.
+ * This block reader implements the path-based style of local reads
+ * first introduced in HDFS-2246.
+ */
+ static BlockReader getLegacyBlockReaderLocal(UserGroupInformation ugi,
+ Configuration conf, String src, ExtendedBlock blk,
+ Token<BlockTokenIdentifier> accessToken, DatanodeInfo chosenNode,
+ int socketTimeout, long offsetIntoBlock,
+ boolean connectToDnViaHostname) throws InvalidToken, IOException {
+ try {
+ return BlockReaderLocalLegacy.newBlockReader(ugi, conf, src,
+ blk, accessToken, chosenNode, socketTimeout, offsetIntoBlock,
+ blk.getNumBytes() - offsetIntoBlock, connectToDnViaHostname);
+ } catch (RemoteException re) {
+ throw re.unwrapRemoteException(InvalidToken.class,
+ AccessControlException.class);
+ }
+ }
}
Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java?rev=1467713&r1=1467712&r2=1467713&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java Sat Apr 13 23:05:54 2013
@@ -18,33 +18,20 @@
package org.apache.hadoop.hdfs;
import java.io.DataInputStream;
-import java.io.File;
+import org.apache.hadoop.conf.Configuration;
+
+import java.io.BufferedInputStream;
import java.io.FileInputStream;
import java.io.IOException;
-import java.net.Socket;
import java.nio.ByteBuffer;
-import java.security.PrivilegedExceptionAction;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.LinkedHashMap;
-import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
-import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
-import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
-import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
-import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
import org.apache.hadoop.hdfs.util.DirectBufferPool;
-import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.DataChecksum;
/**
@@ -56,84 +43,19 @@ import org.apache.hadoop.util.DataChecks
* <ul>
* <li>The client performing short circuit reads must be configured at the
* datanode.</li>
- * <li>The client gets the path to the file where block is stored using
- * {@link org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol#getBlockLocalPathInfo(ExtendedBlock, Token)}
- * RPC call</li>
- * <li>Client uses kerberos authentication to connect to the datanode over RPC,
- * if security is enabled.</li>
+ * <li>The client gets the file descriptors for the metadata file and the data
+ * file for the block using
+ * {@link org.apache.hadoop.hdfs.server.datanode.DataXceiver#requestShortCircuitFds}.
+ * </li>
+ * <li>The client reads the file descriptors.</li>
* </ul>
*/
class BlockReaderLocal implements BlockReader {
- private static final Log LOG = LogFactory.getLog(DFSClient.class);
-
- //Stores the cache and proxy for a local datanode.
- private static class LocalDatanodeInfo {
- private ClientDatanodeProtocol proxy = null;
- private final Map<ExtendedBlock, BlockLocalPathInfo> cache;
-
- LocalDatanodeInfo() {
- final int cacheSize = 10000;
- final float hashTableLoadFactor = 0.75f;
- int hashTableCapacity = (int) Math.ceil(cacheSize / hashTableLoadFactor) + 1;
- cache = Collections
- .synchronizedMap(new LinkedHashMap<ExtendedBlock, BlockLocalPathInfo>(
- hashTableCapacity, hashTableLoadFactor, true) {
- private static final long serialVersionUID = 1;
-
- @Override
- protected boolean removeEldestEntry(
- Map.Entry<ExtendedBlock, BlockLocalPathInfo> eldest) {
- return size() > cacheSize;
- }
- });
- }
-
- private synchronized ClientDatanodeProtocol getDatanodeProxy(
- UserGroupInformation ugi, final DatanodeInfo node,
- final Configuration conf, final int socketTimeout,
- final boolean connectToDnViaHostname) throws IOException {
- if (proxy == null) {
- try {
- proxy = ugi.doAs(new PrivilegedExceptionAction<ClientDatanodeProtocol>() {
- @Override
- public ClientDatanodeProtocol run() throws Exception {
- return DFSUtil.createClientDatanodeProtocolProxy(node, conf,
- socketTimeout, connectToDnViaHostname);
- }
- });
- } catch (InterruptedException e) {
- LOG.warn("encountered exception ", e);
- }
- }
- return proxy;
- }
-
- private synchronized void resetDatanodeProxy() {
- if (null != proxy) {
- RPC.stopProxy(proxy);
- proxy = null;
- }
- }
-
- private BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock b) {
- return cache.get(b);
- }
-
- private void setBlockLocalPathInfo(ExtendedBlock b, BlockLocalPathInfo info) {
- cache.put(b, info);
- }
-
- private void removeBlockLocalPathInfo(ExtendedBlock b) {
- cache.remove(b);
- }
- }
-
- // Multiple datanodes could be running on the local machine. Store proxies in
- // a map keyed by the ipc port of the datanode.
- private static Map<Integer, LocalDatanodeInfo> localDatanodeInfoMap = new HashMap<Integer, LocalDatanodeInfo>();
+ static final Log LOG = LogFactory.getLog(BlockReaderLocal.class);
private final FileInputStream dataIn; // reader for the data file
private final FileInputStream checksumIn; // reader for the checksum file
+ private final boolean verifyChecksum;
/**
* Offset from the most recent chunk boundary at which the next read should
@@ -153,7 +75,6 @@ class BlockReaderLocal implements BlockR
private ByteBuffer slowReadBuff = null;
private ByteBuffer checksumBuff = null;
private DataChecksum checksum;
- private final boolean verifyChecksum;
private static DirectBufferPool bufferPool = new DirectBufferPool();
@@ -163,187 +84,92 @@ class BlockReaderLocal implements BlockR
/** offset in block where reader wants to actually read */
private long startOffset;
private final String filename;
-
- /**
- * The only way this object can be instantiated.
- */
- static BlockReaderLocal newBlockReader(UserGroupInformation ugi,
- Configuration conf, String file, ExtendedBlock blk,
- Token<BlockTokenIdentifier> token, DatanodeInfo node, int socketTimeout,
- long startOffset, long length, boolean connectToDnViaHostname)
- throws IOException {
-
- LocalDatanodeInfo localDatanodeInfo = getLocalDatanodeInfo(node
- .getIpcPort());
- // check the cache first
- BlockLocalPathInfo pathinfo = localDatanodeInfo.getBlockLocalPathInfo(blk);
- if (pathinfo == null) {
- pathinfo = getBlockPathInfo(ugi, blk, node, conf, socketTimeout, token,
- connectToDnViaHostname);
- }
-
- // check to see if the file exists. It may so happen that the
- // HDFS file has been deleted and this block-lookup is occurring
- // on behalf of a new HDFS file. This time, the block file could
- // be residing in a different portion of the fs.data.dir directory.
- // In this case, we remove this entry from the cache. The next
- // call to this method will re-populate the cache.
- FileInputStream dataIn = null;
- FileInputStream checksumIn = null;
- BlockReaderLocal localBlockReader = null;
- boolean skipChecksumCheck = skipChecksumCheck(conf);
- try {
- // get a local file system
- File blkfile = new File(pathinfo.getBlockPath());
- dataIn = new FileInputStream(blkfile);
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("New BlockReaderLocal for file " + blkfile + " of size "
- + blkfile.length() + " startOffset " + startOffset + " length "
- + length + " short circuit checksum " + !skipChecksumCheck);
- }
- if (!skipChecksumCheck) {
- // get the metadata file
- File metafile = new File(pathinfo.getMetaPath());
- checksumIn = new FileInputStream(metafile);
-
- // read and handle the common header here. For now just a version
- BlockMetadataHeader header = BlockMetadataHeader
- .readHeader(new DataInputStream(checksumIn));
- short version = header.getVersion();
- if (version != BlockMetadataHeader.VERSION) {
- LOG.warn("Wrong version (" + version + ") for metadata file for "
- + blk + " ignoring ...");
- }
- DataChecksum checksum = header.getChecksum();
- long firstChunkOffset = startOffset
- - (startOffset % checksum.getBytesPerChecksum());
- localBlockReader = new BlockReaderLocal(conf, file, blk, token,
- startOffset, length, pathinfo, checksum, true, dataIn,
- firstChunkOffset, checksumIn);
- } else {
- localBlockReader = new BlockReaderLocal(conf, file, blk, token,
- startOffset, length, pathinfo, dataIn);
- }
- } catch (IOException e) {
- // remove from cache
- localDatanodeInfo.removeBlockLocalPathInfo(blk);
- DFSClient.LOG.warn("BlockReaderLocal: Removing " + blk
- + " from cache because local file " + pathinfo.getBlockPath()
- + " could not be opened.");
- throw e;
- } finally {
- if (localBlockReader == null) {
- if (dataIn != null) {
- dataIn.close();
- }
- if (checksumIn != null) {
- checksumIn.close();
- }
- }
- }
- return localBlockReader;
- }
+ private final DatanodeID datanodeID;
+ private final ExtendedBlock block;
- private static synchronized LocalDatanodeInfo getLocalDatanodeInfo(int port) {
- LocalDatanodeInfo ldInfo = localDatanodeInfoMap.get(port);
- if (ldInfo == null) {
- ldInfo = new LocalDatanodeInfo();
- localDatanodeInfoMap.put(port, ldInfo);
- }
- return ldInfo;
- }
-
- private static BlockLocalPathInfo getBlockPathInfo(UserGroupInformation ugi,
- ExtendedBlock blk, DatanodeInfo node, Configuration conf, int timeout,
- Token<BlockTokenIdentifier> token, boolean connectToDnViaHostname)
- throws IOException {
- LocalDatanodeInfo localDatanodeInfo = getLocalDatanodeInfo(node.getIpcPort());
- BlockLocalPathInfo pathinfo = null;
- ClientDatanodeProtocol proxy = localDatanodeInfo.getDatanodeProxy(ugi, node,
- conf, timeout, connectToDnViaHostname);
- try {
- // make RPC to local datanode to find local pathnames of blocks
- pathinfo = proxy.getBlockLocalPathInfo(blk, token);
- if (pathinfo != null) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Cached location of block " + blk + " as " + pathinfo);
- }
- localDatanodeInfo.setBlockLocalPathInfo(blk, pathinfo);
- }
- } catch (IOException e) {
- localDatanodeInfo.resetDatanodeProxy(); // Reset proxy on error
- throw e;
- }
- return pathinfo;
- }
-
- private static boolean skipChecksumCheck(Configuration conf) {
- return conf.getBoolean(
- DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY,
- DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_DEFAULT);
- }
-
- private static int getSlowReadBufferNumChunks(Configuration conf, int bytesPerChecksum) {
- int bufferSizeBytes = conf.getInt(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_KEY,
- DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_DEFAULT);
-
- if (bufferSizeBytes < bytesPerChecksum) {
- throw new IllegalArgumentException("Configured BlockReaderLocal buffer size (" + bufferSizeBytes + ") " +
- "is not large enough to hold a single chunk (" + bytesPerChecksum + "). Please configure " +
+ private static int getSlowReadBufferNumChunks(Configuration conf,
+ int bytesPerChecksum) {
+
+ int bufSize =
+ conf.getInt(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_KEY,
+ DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_DEFAULT);
+
+ if (bufSize < bytesPerChecksum) {
+ throw new IllegalArgumentException("Configured BlockReaderLocal buffer size (" +
+ bufSize + ") is not large enough to hold a single chunk (" +
+ bytesPerChecksum + "). Please configure " +
DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_KEY + " appropriately");
}
// Round down to nearest chunk size
- return bufferSizeBytes / bytesPerChecksum;
- }
-
- private BlockReaderLocal(Configuration conf, String hdfsfile,
- ExtendedBlock block, Token<BlockTokenIdentifier> token, long startOffset,
- long length, BlockLocalPathInfo pathinfo, FileInputStream dataIn)
- throws IOException {
- this(conf, hdfsfile, block, token, startOffset, length, pathinfo,
- DataChecksum.newDataChecksum(DataChecksum.Type.NULL, 4), false,
- dataIn, startOffset, null);
+ return bufSize / bytesPerChecksum;
}
- private BlockReaderLocal(Configuration conf, String hdfsfile,
- ExtendedBlock block, Token<BlockTokenIdentifier> token, long startOffset,
- long length, BlockLocalPathInfo pathinfo, DataChecksum checksum,
- boolean verifyChecksum, FileInputStream dataIn, long firstChunkOffset,
- FileInputStream checksumIn) throws IOException {
- this.filename = hdfsfile;
- this.checksum = checksum;
- this.verifyChecksum = verifyChecksum;
- this.startOffset = Math.max(startOffset, 0);
-
- bytesPerChecksum = this.checksum.getBytesPerChecksum();
- checksumSize = this.checksum.getChecksumSize();
-
+ public BlockReaderLocal(Configuration conf, String filename,
+ ExtendedBlock block, long startOffset, long length,
+ FileInputStream dataIn, FileInputStream checksumIn,
+ DatanodeID datanodeID, boolean verifyChecksum) throws IOException {
this.dataIn = dataIn;
this.checksumIn = checksumIn;
- this.offsetFromChunkBoundary = (int) (startOffset-firstChunkOffset);
-
- int chunksPerChecksumRead = getSlowReadBufferNumChunks(conf, bytesPerChecksum);
- slowReadBuff = bufferPool.getBuffer(bytesPerChecksum * chunksPerChecksumRead);
- checksumBuff = bufferPool.getBuffer(checksumSize * chunksPerChecksumRead);
- // Initially the buffers have nothing to read.
- slowReadBuff.flip();
- checksumBuff.flip();
+ this.startOffset = Math.max(startOffset, 0);
+ this.filename = filename;
+ this.datanodeID = datanodeID;
+ this.block = block;
+
+ // read and handle the common header here. For now just a version
+ checksumIn.getChannel().position(0);
+ BlockMetadataHeader header = BlockMetadataHeader
+ .readHeader(new DataInputStream(
+ new BufferedInputStream(checksumIn,
+ BlockMetadataHeader.getHeaderSize())));
+ short version = header.getVersion();
+ if (version != BlockMetadataHeader.VERSION) {
+ throw new IOException("Wrong version (" + version + ") of the " +
+ "metadata file for " + filename + ".");
+ }
+ if (!verifyChecksum) {
+ this.verifyChecksum = false;
+ } else {
+ this.verifyChecksum = !conf.getBoolean(DFSConfigKeys.
+ DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY,
+ DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_DEFAULT);
+ }
+ long firstChunkOffset;
+ if (this.verifyChecksum) {
+ this.checksum = header.getChecksum();
+ this.bytesPerChecksum = this.checksum.getBytesPerChecksum();
+ this.checksumSize = this.checksum.getChecksumSize();
+ firstChunkOffset = startOffset
+ - (startOffset % checksum.getBytesPerChecksum());
+ this.offsetFromChunkBoundary = (int) (startOffset - firstChunkOffset);
+
+ int chunksPerChecksumRead = getSlowReadBufferNumChunks(conf, bytesPerChecksum);
+ slowReadBuff = bufferPool.getBuffer(bytesPerChecksum * chunksPerChecksumRead);
+ checksumBuff = bufferPool.getBuffer(checksumSize * chunksPerChecksumRead);
+ // Initially the buffers have nothing to read.
+ slowReadBuff.flip();
+ checksumBuff.flip();
+ long checkSumOffset = (firstChunkOffset / bytesPerChecksum) * checksumSize;
+ IOUtils.skipFully(checksumIn, checkSumOffset);
+ } else {
+ firstChunkOffset = startOffset;
+ this.checksum = null;
+ this.bytesPerChecksum = 0;
+ this.checksumSize = 0;
+ this.offsetFromChunkBoundary = 0;
+ }
+
boolean success = false;
try {
- // Skip both input streams to beginning of the chunk containing startOffset
- IOUtils.skipFully(dataIn, firstChunkOffset);
- if (checksumIn != null) {
- long checkSumOffset = (firstChunkOffset / bytesPerChecksum) * checksumSize;
- IOUtils.skipFully(checksumIn, checkSumOffset);
- }
+ // Reposition both input streams to the beginning of the chunk
+ // containing startOffset
+ this.dataIn.getChannel().position(firstChunkOffset);
success = true;
} finally {
if (!success) {
- bufferPool.returnBuffer(slowReadBuff);
- bufferPool.returnBuffer(checksumBuff);
+ if (slowReadBuff != null) bufferPool.returnBuffer(slowReadBuff);
+ if (checksumBuff != null) bufferPool.returnBuffer(checksumBuff);
}
}
}
@@ -663,10 +489,17 @@ class BlockReaderLocal implements BlockR
}
@Override
- public synchronized void close() throws IOException {
- dataIn.close();
- if (checksumIn != null) {
- checksumIn.close();
+ public synchronized void close(PeerCache peerCache,
+ FileInputStreamCache fisCache) throws IOException {
+ if (fisCache != null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("putting FileInputStream for " + filename +
+ " back into FileInputStreamCache");
+ }
+ fisCache.put(datanodeID, block, new FileInputStream[] {dataIn, checksumIn});
+ } else {
+ LOG.debug("closing FileInputStream for " + filename);
+ IOUtils.cleanup(LOG, dataIn, checksumIn);
}
if (slowReadBuff != null) {
bufferPool.returnBuffer(slowReadBuff);
@@ -691,17 +524,8 @@ class BlockReaderLocal implements BlockR
}
@Override
- public Socket takeSocket() {
- return null;
- }
-
- @Override
- public boolean hasSentStatusCode() {
- return false;
- }
-
- @Override
- public IOStreamPair getStreams() {
- return null;
+ public int available() throws IOException {
+ // We never do network I/O in BlockReaderLocal.
+ return Integer.MAX_VALUE;
}
}
Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java?rev=1467713&r1=1467712&r2=1467713&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java Sat Apr 13 23:05:54 2013
@@ -131,7 +131,6 @@ import org.apache.hadoop.hdfs.protocol.p
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
import org.apache.hadoop.hdfs.protocolPB.PBHelper;
-import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
@@ -196,12 +195,13 @@ public class DFSClient implements java.i
final FileSystem.Statistics stats;
final int hdfsTimeout; // timeout value for a DFS operation.
private final String authority;
- final SocketCache socketCache;
+ final PeerCache peerCache;
final Conf dfsClientConf;
private Random r = new Random();
private SocketAddress[] localInterfaceAddrs;
private DataEncryptionKey encryptionKey;
-
+ private boolean shouldUseLegacyBlockReaderLocal;
+
/**
* DFSClient configuration
*/
@@ -228,11 +228,16 @@ public class DFSClient implements java.i
final short defaultReplication;
final String taskId;
final FsPermission uMask;
- final boolean useLegacyBlockReader;
+ final boolean useLegacyBlockReaderLocal;
final boolean connectToDnViaHostname;
final boolean getHdfsBlocksMetadataEnabled;
final int getFileBlockStorageLocationsNumThreads;
final int getFileBlockStorageLocationsTimeout;
+ final String domainSocketPath;
+ final boolean skipShortCircuitChecksums;
+ final int shortCircuitBufferSize;
+ final boolean shortCircuitLocalReads;
+ final boolean domainSocketDataTraffic;
Conf(Configuration conf) {
maxFailoverAttempts = conf.getInt(
@@ -283,9 +288,9 @@ public class DFSClient implements java.i
.getInt(DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_KEY,
DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_DEFAULT);
uMask = FsPermission.getUMask(conf);
- useLegacyBlockReader = conf.getBoolean(
- DFS_CLIENT_USE_LEGACY_BLOCKREADER,
- DFS_CLIENT_USE_LEGACY_BLOCKREADER_DEFAULT);
+ useLegacyBlockReaderLocal = conf.getBoolean(
+ DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL,
+ DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL_DEFAULT);
connectToDnViaHostname = conf.getBoolean(DFS_CLIENT_USE_DN_HOSTNAME,
DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT);
getHdfsBlocksMetadataEnabled = conf.getBoolean(
@@ -297,6 +302,20 @@ public class DFSClient implements java.i
getFileBlockStorageLocationsTimeout = conf.getInt(
DFSConfigKeys.DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_TIMEOUT,
DFSConfigKeys.DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_TIMEOUT_DEFAULT);
+ domainSocketPath = conf.getTrimmed(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY,
+ DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_DEFAULT);
+ skipShortCircuitChecksums = conf.getBoolean(
+ DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY,
+ DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_DEFAULT);
+ shortCircuitBufferSize = conf.getInt(
+ DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_KEY,
+ DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_DEFAULT);
+ shortCircuitLocalReads = conf.getBoolean(
+ DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY,
+ DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_DEFAULT);
+ domainSocketDataTraffic = conf.getBoolean(
+ DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC,
+ DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC_DEFAULT);
}
private DataChecksum.Type getChecksumType(Configuration conf) {
@@ -354,7 +373,7 @@ public class DFSClient implements java.i
private final Map<String, DFSOutputStream> filesBeingWritten
= new HashMap<String, DFSOutputStream>();
- private boolean shortCircuitLocalReads;
+ private final DomainSocketFactory domainSocketFactory;
/**
* Same as this(NameNode.getAddress(conf), conf);
@@ -398,6 +417,11 @@ public class DFSClient implements java.i
throws IOException {
// Copy only the required DFSClient configuration
this.dfsClientConf = new Conf(conf);
+ this.shouldUseLegacyBlockReaderLocal =
+ this.dfsClientConf.useLegacyBlockReaderLocal;
+ if (this.dfsClientConf.useLegacyBlockReaderLocal) {
+ LOG.debug("Using legacy short-circuit local reads.");
+ }
this.conf = conf;
this.stats = stats;
this.socketFactory = NetUtils.getSocketFactory(conf, ClientProtocol.class);
@@ -427,12 +451,8 @@ public class DFSClient implements java.i
}
// read directly from the block file if configured.
- this.shortCircuitLocalReads = conf.getBoolean(
- DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY,
- DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_DEFAULT);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Short circuit read is " + shortCircuitLocalReads);
- }
+ this.domainSocketFactory = new DomainSocketFactory(dfsClientConf);
+
String localInterfaces[] =
conf.getTrimmedStrings(DFSConfigKeys.DFS_CLIENT_LOCAL_INTERFACES);
localInterfaceAddrs = getLocalInterfaceAddrs(localInterfaces);
@@ -442,7 +462,7 @@ public class DFSClient implements java.i
Joiner.on(',').join(localInterfaceAddrs) + "]");
}
- this.socketCache = SocketCache.getInstance(dfsClientConf.socketCacheCapacity, dfsClientConf.socketCacheExpiry);
+ this.peerCache = PeerCache.getInstance(dfsClientConf.socketCacheCapacity, dfsClientConf.socketCacheExpiry);
}
/**
@@ -797,29 +817,11 @@ public class DFSClient implements java.i
AccessControlException.class);
}
}
-
- /**
- * Get {@link BlockReader} for short circuited local reads.
- */
- static BlockReader getLocalBlockReader(UserGroupInformation ugi,
- Configuration conf, String src, ExtendedBlock blk,
- Token<BlockTokenIdentifier> accessToken, DatanodeInfo chosenNode,
- int socketTimeout, long offsetIntoBlock, boolean connectToDnViaHostname)
- throws InvalidToken, IOException {
- try {
- return BlockReaderLocal.newBlockReader(ugi, conf, src, blk, accessToken,
- chosenNode, socketTimeout, offsetIntoBlock, blk.getNumBytes()
- - offsetIntoBlock, connectToDnViaHostname);
- } catch (RemoteException re) {
- throw re.unwrapRemoteException(InvalidToken.class,
- AccessControlException.class);
- }
- }
private static Map<String, Boolean> localAddrMap = Collections
.synchronizedMap(new HashMap<String, Boolean>());
- private static boolean isLocalAddress(InetSocketAddress targetAddr) {
+ static boolean isLocalAddress(InetSocketAddress targetAddr) {
InetAddress addr = targetAddr.getAddress();
Boolean cached = localAddrMap.get(addr.getHostAddress());
if (cached != null) {
@@ -2319,10 +2321,6 @@ public class DFSClient implements java.i
super(in);
}
}
-
- boolean shouldTryShortCircuitRead(InetSocketAddress targetAddr) {
- return shortCircuitLocalReads && isLocalAddress(targetAddr);
- }
void reportChecksumFailure(String file, ExtendedBlock blk, DatanodeInfo dn) {
DatanodeInfo [] dnArr = { dn };
@@ -2346,13 +2344,15 @@ public class DFSClient implements java.i
+ ", ugi=" + ugi + "]";
}
- void disableShortCircuit() {
- LOG.info("Short circuit is disabled");
- shortCircuitLocalReads = false;
+ public DomainSocketFactory getDomainSocketFactory() {
+ return domainSocketFactory;
}
-
- @VisibleForTesting
- boolean getShortCircuitLocalReads() {
- return shortCircuitLocalReads;
+
+ public void disableLegacyBlockReaderLocal() {
+ shouldUseLegacyBlockReaderLocal = false;
+ }
+
+ public boolean useLegacyBlockReaderLocal() {
+ return shouldUseLegacyBlockReaderLocal;
}
}
Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java?rev=1467713&r1=1467712&r2=1467713&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java Sat Apr 13 23:05:54 2013
@@ -265,6 +265,8 @@ public class DFSConfigKeys extends Commo
public static final int DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_DEFAULT = 3;
public static final String DFS_CLIENT_USE_LEGACY_BLOCKREADER = "dfs.client.use.legacy.blockreader";
public static final boolean DFS_CLIENT_USE_LEGACY_BLOCKREADER_DEFAULT = false;
+ public static final String DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL = "dfs.client.use.legacy.blockreader.local";
+ public static final boolean DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL_DEFAULT = false;
public static final String DFS_BALANCER_MOVEDWINWIDTH_KEY = "dfs.balancer.movedWinWidth";
public static final long DFS_BALANCER_MOVEDWINWIDTH_DEFAULT = 5400*1000L;
public static final String DFS_DATANODE_ADDRESS_KEY = "dfs.datanode.address";
@@ -347,7 +349,13 @@ public class DFSConfigKeys extends Commo
public static final String DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY = "dfs.client.read.shortcircuit.skip.checksum";
public static final boolean DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_DEFAULT = false;
public static final String DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_KEY = "dfs.client.read.shortcircuit.buffer.size";
+ public static final String DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_SIZE_KEY = "dfs.client.read.shortcircuit.streams.cache.size";
+ public static final int DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_SIZE_DEFAULT = 100;
+ public static final String DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_EXPIRY_MS_KEY = "dfs.client.read.shortcircuit.streams.cache.expiry.ms";
+ public static final long DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_EXPIRY_MS_DEFAULT = 5000;
public static final int DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_DEFAULT = 1024 * 1024;
+ public static final String DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC = "dfs.client.domain.socket.data.traffic";
+ public static final boolean DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC_DEFAULT = false;
// property for fsimage compression
public static final String DFS_IMAGE_COMPRESS_KEY = "dfs.image.compress";
@@ -404,6 +412,8 @@ public class DFSConfigKeys extends Commo
public static final int DFS_NAMENODE_MAX_OP_SIZE_DEFAULT = 50 * 1024 * 1024;
public static final String DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY = "dfs.block.local-path-access.user";
+ public static final String DFS_DOMAIN_SOCKET_PATH_KEY = "dfs.domain.socket.path";
+ public static final String DFS_DOMAIN_SOCKET_PATH_DEFAULT = "";
// HA related configuration
public static final String DFS_HA_NAMENODES_KEY_PREFIX = "dfs.ha.namenodes";
Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java?rev=1467713&r1=1467712&r2=1467713&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java Sat Apr 13 23:05:54 2013
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hdfs;
+import java.io.FileInputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
@@ -32,18 +33,20 @@ import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import org.apache.commons.io.IOUtils;
import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.fs.ChecksumException;
import org.apache.hadoop.fs.ByteBufferReadable;
+import org.apache.hadoop.fs.ChecksumException;
import org.apache.hadoop.fs.FSInputStream;
import org.apache.hadoop.fs.UnresolvedLinkException;
-import org.apache.hadoop.hdfs.SocketCache.SocketAndStreams;
+import org.apache.hadoop.hdfs.net.DomainPeer;
+import org.apache.hadoop.hdfs.net.Peer;
+import org.apache.hadoop.hdfs.net.TcpPeerServer;
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
-import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
@@ -51,20 +54,23 @@ import org.apache.hadoop.hdfs.server.dat
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.net.unix.DomainSocket;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.token.Token;
+import com.google.common.annotations.VisibleForTesting;
+
/****************************************************************
* DFSInputStream provides bytes from a named file. It handles
* negotiation of the namenode and various datanodes as necessary.
****************************************************************/
@InterfaceAudience.Private
public class DFSInputStream extends FSInputStream implements ByteBufferReadable {
- private final SocketCache socketCache;
-
+ @VisibleForTesting
+ static boolean tcpReadsDisabledForTesting = false;
+ private final PeerCache peerCache;
private final DFSClient dfsClient;
private boolean closed = false;
-
private final String src;
private final long prefetchSize;
private BlockReader blockReader = null;
@@ -76,6 +82,8 @@ public class DFSInputStream extends FSIn
private long pos = 0;
private long blockEnd = -1;
+ private final FileInputStreamCache fileInputStreamCache;
+
/**
* This variable tracks the number of failures since the start of the
* most recent user-facing operation. That is to say, it should be reset
@@ -110,7 +118,14 @@ public class DFSInputStream extends FSIn
this.verifyChecksum = verifyChecksum;
this.buffersize = buffersize;
this.src = src;
- this.socketCache = dfsClient.socketCache;
+ this.peerCache = dfsClient.peerCache;
+ this.fileInputStreamCache = new FileInputStreamCache(
+ dfsClient.conf.getInt(DFSConfigKeys.
+ DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_SIZE_KEY,
+ DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_SIZE_DEFAULT),
+ dfsClient.conf.getLong(DFSConfigKeys.
+ DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_EXPIRY_MS_KEY,
+ DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_EXPIRY_MS_DEFAULT));
prefetchSize = dfsClient.getConf().prefetchSize;
timeWindow = dfsClient.getConf().timeWindow;
nCachedConnRetry = dfsClient.getConf().nCachedConnRetry;
@@ -243,7 +258,9 @@ public class DFSInputStream extends FSIn
locatedBlocks.getFileLength() + lastBlockBeingWrittenLength;
}
- private synchronized boolean blockUnderConstruction() {
+ // Short circuit local reads are forbidden for files that are
+ // under construction. See HDFS-2757.
+ synchronized boolean shortCircuitForbidden() {
return locatedBlocks.isUnderConstruction();
}
@@ -424,7 +441,7 @@ public class DFSInputStream extends FSIn
// Will be getting a new BlockReader.
if (blockReader != null) {
- closeBlockReader(blockReader);
+ blockReader.close(peerCache, fileInputStreamCache);
blockReader = null;
}
@@ -462,7 +479,7 @@ public class DFSInputStream extends FSIn
return chosenNode;
} catch (AccessControlException ex) {
DFSClient.LOG.warn("Short circuit access failed " + ex);
- dfsClient.disableShortCircuit();
+ dfsClient.disableLegacyBlockReaderLocal();
continue;
} catch (IOException ex) {
if (ex instanceof InvalidEncryptionKeyException && refetchEncryptionKey > 0) {
@@ -510,10 +527,11 @@ public class DFSInputStream extends FSIn
dfsClient.checkOpen();
if (blockReader != null) {
- closeBlockReader(blockReader);
+ blockReader.close(peerCache, fileInputStreamCache);
blockReader = null;
}
super.close();
+ fileInputStreamCache.close();
closed = true;
}
@@ -811,7 +829,7 @@ public class DFSInputStream extends FSIn
addIntoCorruptedBlockMap(block.getBlock(), chosenNode, corruptedBlockMap);
} catch (AccessControlException ex) {
DFSClient.LOG.warn("Short circuit access failed " + ex);
- dfsClient.disableShortCircuit();
+ dfsClient.disableLegacyBlockReaderLocal();
continue;
} catch (IOException e) {
if (e instanceof InvalidEncryptionKeyException && refetchEncryptionKey > 0) {
@@ -837,7 +855,7 @@ public class DFSInputStream extends FSIn
}
} finally {
if (reader != null) {
- closeBlockReader(reader);
+ reader.close(peerCache, fileInputStreamCache);
}
}
// Put chosen node into dead list, continue
@@ -845,22 +863,34 @@ public class DFSInputStream extends FSIn
}
}
- /**
- * Close the given BlockReader and cache its socket.
- */
- private void closeBlockReader(BlockReader reader) throws IOException {
- if (reader.hasSentStatusCode()) {
- IOStreamPair ioStreams = reader.getStreams();
- Socket oldSock = reader.takeSocket();
- socketCache.put(oldSock, ioStreams);
+ private Peer newTcpPeer(InetSocketAddress addr) throws IOException {
+ Peer peer = null;
+ boolean success = false;
+ Socket sock = null;
+ try {
+ sock = dfsClient.socketFactory.createSocket();
+ NetUtils.connect(sock, addr,
+ dfsClient.getRandomLocalInterfaceAddr(),
+ dfsClient.getConf().socketTimeout);
+ peer = TcpPeerServer.peerFromSocketAndKey(sock,
+ dfsClient.getDataEncryptionKey());
+ success = true;
+ return peer;
+ } finally {
+ if (!success) {
+ IOUtils.closeQuietly(peer);
+ IOUtils.closeQuietly(sock);
+ }
}
- reader.close();
}
/**
* Retrieve a BlockReader suitable for reading.
* This method will reuse the cached connection to the DN if appropriate.
* Otherwise, it will create a new connection.
+ * Throwing an IOException from this method is basically equivalent to
+ * declaring the DataNode bad, so we try to connect a lot of different ways
+ * before doing that.
*
* @param dnAddr Address of the datanode
* @param chosenNode Chosen datanode information
@@ -885,82 +915,113 @@ public class DFSInputStream extends FSIn
boolean verifyChecksum,
String clientName)
throws IOException {
-
- // Can't local read a block under construction, see HDFS-2757
- if (dfsClient.shouldTryShortCircuitRead(dnAddr) &&
- !blockUnderConstruction()) {
- return DFSClient.getLocalBlockReader(dfsClient.ugi, dfsClient.conf,
- src, block, blockToken, chosenNode, dfsClient.hdfsTimeout,
- startOffset, dfsClient.connectToDnViaHostname());
+ // Firstly, we check to see if we have cached any file descriptors for
+ // local blocks. If so, we can just re-use those file descriptors.
+ FileInputStream fis[] = fileInputStreamCache.get(chosenNode, block);
+ if (fis != null) {
+ if (DFSClient.LOG.isDebugEnabled()) {
+ DFSClient.LOG.debug("got FileInputStreams for " + block + " from " +
+ "the FileInputStreamCache.");
+ }
+ return new BlockReaderLocal(dfsClient.conf, file,
+ block, startOffset, len, fis[0], fis[1], chosenNode, verifyChecksum);
}
- IOException err = null;
- boolean fromCache = true;
+ // If the legacy local block reader is enabled and we are reading a local
+ // block, try to create a BlockReaderLocalLegacy. The legacy local block
+ // reader implements local reads in the style first introduced by HDFS-2246.
+ if ((dfsClient.useLegacyBlockReaderLocal()) &&
+ DFSClient.isLocalAddress(dnAddr) &&
+ (!shortCircuitForbidden())) {
+ try {
+ return BlockReaderFactory.getLegacyBlockReaderLocal(dfsClient.ugi,
+ dfsClient.conf, clientName, block, blockToken, chosenNode,
+ dfsClient.hdfsTimeout, startOffset,dfsClient.connectToDnViaHostname());
+ } catch (IOException e) {
+ DFSClient.LOG.warn("error creating legacy BlockReaderLocal. " +
+ "Disabling legacy local reads.", e);
+ dfsClient.disableLegacyBlockReaderLocal();
+ }
+ }
- // Allow retry since there is no way of knowing whether the cached socket
- // is good until we actually use it.
- for (int retries = 0; retries <= nCachedConnRetry && fromCache; ++retries) {
- SocketAndStreams sockAndStreams = null;
- // Don't use the cache on the last attempt - it's possible that there
- // are arbitrarily many unusable sockets in the cache, but we don't
- // want to fail the read.
- if (retries < nCachedConnRetry) {
- sockAndStreams = socketCache.get(dnAddr);
- }
- Socket sock;
- if (sockAndStreams == null) {
- fromCache = false;
+ // Look for cached domain peers.
+ int cacheTries = 0;
+ DomainSocketFactory dsFactory = dfsClient.getDomainSocketFactory();
+ BlockReader reader = null;
+ for (; cacheTries < nCachedConnRetry; ++cacheTries) {
+ Peer peer = peerCache.get(chosenNode, true);
+ if (peer == null) break;
+ try {
+ boolean allowShortCircuitLocalReads = dfsClient.getConf().
+ shortCircuitLocalReads && (!shortCircuitForbidden());
+ reader = BlockReaderFactory.newBlockReader(
+ dfsClient.conf, file, block, blockToken, startOffset,
+ len, verifyChecksum, clientName, peer, chosenNode,
+ dsFactory, allowShortCircuitLocalReads);
+ return reader;
+ } catch (IOException ex) {
+ DFSClient.LOG.debug("Error making BlockReader with DomainSocket. " +
+ "Closing stale " + peer, ex);
+ } finally {
+ if (reader == null) {
+ IOUtils.closeQuietly(peer);
+ }
+ }
+ }
- sock = dfsClient.socketFactory.createSocket();
-
- // TCP_NODELAY is crucial here because of bad interactions between
- // Nagle's Algorithm and Delayed ACKs. With connection keepalive
- // between the client and DN, the conversation looks like:
- // 1. Client -> DN: Read block X
- // 2. DN -> Client: data for block X
- // 3. Client -> DN: Status OK (successful read)
- // 4. Client -> DN: Read block Y
- // The fact that step #3 and #4 are both in the client->DN direction
- // triggers Nagling. If the DN is using delayed ACKs, this results
- // in a delay of 40ms or more.
- //
- // TCP_NODELAY disables nagling and thus avoids this performance
- // disaster.
- sock.setTcpNoDelay(true);
-
- NetUtils.connect(sock, dnAddr,
- dfsClient.getRandomLocalInterfaceAddr(),
- dfsClient.getConf().socketTimeout);
- sock.setSoTimeout(dfsClient.getConf().socketTimeout);
- } else {
- sock = sockAndStreams.sock;
+ // Try to create a DomainPeer.
+ DomainSocket domSock = dsFactory.create(dnAddr, this);
+ if (domSock != null) {
+ Peer peer = new DomainPeer(domSock);
+ try {
+ boolean allowShortCircuitLocalReads = dfsClient.getConf().
+ shortCircuitLocalReads && (!shortCircuitForbidden());
+ reader = BlockReaderFactory.newBlockReader(
+ dfsClient.conf, file, block, blockToken, startOffset,
+ len, verifyChecksum, clientName, peer, chosenNode,
+ dsFactory, allowShortCircuitLocalReads);
+ return reader;
+ } catch (IOException e) {
+ DFSClient.LOG.warn("failed to connect to " + domSock, e);
+ } finally {
+ if (reader == null) {
+ // If the Peer that we got the error from was a DomainPeer,
+ // mark the socket path as bad, so that newDataSocket will not try
+ // to re-open this socket for a while.
+ dsFactory.disableDomainSocketPath(domSock.getPath());
+ IOUtils.closeQuietly(peer);
+ }
}
+ }
+ // Look for cached peers.
+ for (; cacheTries < nCachedConnRetry; ++cacheTries) {
+ Peer peer = peerCache.get(chosenNode, false);
+ if (peer == null) break;
try {
- // The OP_READ_BLOCK request is sent as we make the BlockReader
- BlockReader reader =
- BlockReaderFactory.newBlockReader(dfsClient.getConf(),
- sock, file, block,
- blockToken,
- startOffset, len,
- bufferSize, verifyChecksum,
- clientName,
- dfsClient.getDataEncryptionKey(),
- sockAndStreams == null ? null : sockAndStreams.ioStreams);
+ reader = BlockReaderFactory.newBlockReader(
+ dfsClient.conf, file, block, blockToken, startOffset,
+ len, verifyChecksum, clientName, peer, chosenNode,
+ dsFactory, false);
return reader;
} catch (IOException ex) {
- // Our socket is no good.
- DFSClient.LOG.debug("Error making BlockReader. Closing stale " + sock, ex);
- if (sockAndStreams != null) {
- sockAndStreams.close();
- } else {
- sock.close();
+ DFSClient.LOG.debug("Error making BlockReader. Closing stale " +
+ peer, ex);
+ } finally {
+ if (reader == null) {
+ IOUtils.closeQuietly(peer);
}
- err = ex;
}
}
-
- throw err;
+ if (tcpReadsDisabledForTesting) {
+ throw new IOException("TCP reads are disabled.");
+ }
+ // Try to create a new remote peer.
+ Peer peer = newTcpPeer(dnAddr);
+ return BlockReaderFactory.newBlockReader(
+ dfsClient.conf, file, block, blockToken, startOffset,
+ len, verifyChecksum, clientName, peer, chosenNode,
+ dsFactory, false);
}
@@ -1094,7 +1155,7 @@ public class DFSInputStream extends FSIn
// the TCP buffer, then just eat up the intervening data.
//
int diff = (int)(targetPos - pos);
- if (diff <= DFSClient.TCP_WINDOW_SIZE) {
+ if (diff <= blockReader.available()) {
try {
pos += blockReader.skip(diff);
if (pos == targetPos) {