You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by to...@apache.org on 2013/01/23 19:38:57 UTC
svn commit: r1437616 - in
/hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs: ./
src/main/java/org/apache/hadoop/hdfs/
src/main/java/org/apache/hadoop/hdfs/server/datanode/
src/test/java/org/apache/hadoop/hdfs/
Author: todd
Date: Wed Jan 23 18:38:56 2013
New Revision: 1437616
URL: http://svn.apache.org/viewvc?rev=1437616&view=rev
Log:
HDFS-4417. Fix case where local reads get disabled incorrectly. Contributed by Colin Patrick McCabe.
Added:
hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelShortCircuitReadUnCached.java
Modified:
hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-347.txt
hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java
hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/PeerCache.java
hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferKeepalive.java
hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPeerCache.java
Modified: hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-347.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-347.txt?rev=1437616&r1=1437615&r2=1437616&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-347.txt (original)
+++ hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-347.txt Wed Jan 23 18:38:56 2013
@@ -30,3 +30,6 @@ HDFS-4418. increase default FileInputStr
HDFS-4416. Rename dfs.datanode.domain.socket.path to dfs.domain.socket.path
(Colin Patrick McCabe via todd)
+
+HDFS-4417. Fix case where local reads get disabled incorrectly
+(Colin Patrick McCabe and todd via todd)
Modified: hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java?rev=1437616&r1=1437615&r2=1437616&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java Wed Jan 23 18:38:56 2013
@@ -68,7 +68,7 @@ public class BlockReaderFactory {
* case.
* @param allowShortCircuitLocalReads True if short-circuit local reads
* should be allowed.
- * @return New BlockReader instance, or null on error.
+ * @return New BlockReader instance
*/
@SuppressWarnings("deprecation")
public static BlockReader newBlockReader(
Modified: hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java?rev=1437616&r1=1437615&r2=1437616&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java Wed Jan 23 18:38:56 2013
@@ -35,8 +35,8 @@ import java.util.concurrent.ConcurrentHa
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.fs.ChecksumException;
import org.apache.hadoop.fs.ByteBufferReadable;
+import org.apache.hadoop.fs.ChecksumException;
import org.apache.hadoop.fs.FSInputStream;
import org.apache.hadoop.fs.UnresolvedLinkException;
import org.apache.hadoop.hdfs.net.DomainPeer;
@@ -56,7 +56,8 @@ import org.apache.hadoop.ipc.RemoteExcep
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.net.unix.DomainSocket;
import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.hdfs.FileInputStreamCache;
+
+import com.google.common.annotations.VisibleForTesting;
/****************************************************************
* DFSInputStream provides bytes from a named file. It handles
@@ -64,11 +65,11 @@ import org.apache.hadoop.hdfs.FileInputS
****************************************************************/
@InterfaceAudience.Private
public class DFSInputStream extends FSInputStream implements ByteBufferReadable {
+ @VisibleForTesting
+ static boolean tcpReadsDisabledForTesting = false;
private final PeerCache peerCache;
-
private final DFSClient dfsClient;
private boolean closed = false;
-
private final String src;
private final long prefetchSize;
private BlockReader blockReader = null;
@@ -853,33 +854,23 @@ public class DFSInputStream extends FSIn
}
}
- private Peer newPeer(InetSocketAddress addr) throws IOException {
+ private Peer newTcpPeer(InetSocketAddress addr) throws IOException {
Peer peer = null;
boolean success = false;
Socket sock = null;
- DomainSocket domSock = null;
-
try {
- domSock = dfsClient.getDomainSocketFactory().create(addr, this);
- if (domSock != null) {
- // Create a UNIX Domain peer.
- peer = new DomainPeer(domSock);
- } else {
- // Create a conventional TCP-based Peer.
- sock = dfsClient.socketFactory.createSocket();
- NetUtils.connect(sock, addr,
- dfsClient.getRandomLocalInterfaceAddr(),
- dfsClient.getConf().socketTimeout);
- peer = TcpPeerServer.peerFromSocketAndKey(sock,
- dfsClient.getDataEncryptionKey());
- }
+ sock = dfsClient.socketFactory.createSocket();
+ NetUtils.connect(sock, addr,
+ dfsClient.getRandomLocalInterfaceAddr(),
+ dfsClient.getConf().socketTimeout);
+ peer = TcpPeerServer.peerFromSocketAndKey(sock,
+ dfsClient.getDataEncryptionKey());
success = true;
return peer;
} finally {
if (!success) {
IOUtils.closeQuietly(peer);
IOUtils.closeQuietly(sock);
- IOUtils.closeQuietly(domSock);
}
}
}
@@ -888,6 +879,9 @@ public class DFSInputStream extends FSIn
* Retrieve a BlockReader suitable for reading.
* This method will reuse the cached connection to the DN if appropriate.
* Otherwise, it will create a new connection.
+ * Throwing an IOException from this method is basically equivalent to
+ * declaring the DataNode bad, so we try to connect a lot of different ways
+ * before doing that.
*
* @param dnAddr Address of the datanode
* @param chosenNode Chosen datanode information
@@ -912,9 +906,6 @@ public class DFSInputStream extends FSIn
boolean verifyChecksum,
String clientName)
throws IOException {
-
- IOException err = null;
-
// Firstly, we check to see if we have cached any file descriptors for
// local blocks. If so, we can just re-use those file descriptors.
FileInputStream fis[] = fileInputStreamCache.get(chosenNode, block);
@@ -927,67 +918,84 @@ public class DFSInputStream extends FSIn
block, startOffset, len, fis[0], fis[1], chosenNode, verifyChecksum);
}
- // We retry several times here.
- // On the first nCachedConnRetry times, we try to fetch a socket from
- // the socketCache and use it. This may fail, since the old socket may
- // have been closed by the peer.
- // After that, we try to create a new socket using newPeer().
- // This may create either a TCP socket or a UNIX domain socket, depending
- // on the configuration and whether the peer is remote.
- // If we try to create a UNIX domain socket and fail, we will not try that
- // again. Instead, we'll try to create a TCP socket. Only after we've
- // failed to create a TCP-based BlockReader will we throw an IOException
- // from this function. Throwing an IOException from here is basically
- // equivalent to declaring the DataNode bad.
- boolean triedNonDomainSocketReader = false;
- for (int retries = 0;
- retries < nCachedConnRetry || (!triedNonDomainSocketReader);
- ++retries) {
- Peer peer = null;
- if (retries < nCachedConnRetry) {
- peer = peerCache.get(chosenNode);
- }
- if (peer == null) {
- peer = newPeer(dnAddr);
- if (peer.getDomainSocket() == null) {
- triedNonDomainSocketReader = true;
+ // Look for cached domain peers.
+ int cacheTries = 0;
+ DomainSocketFactory dsFactory = dfsClient.getDomainSocketFactory();
+ BlockReader reader = null;
+ for (; cacheTries < nCachedConnRetry; ++cacheTries) {
+ Peer peer = peerCache.get(chosenNode, true);
+ if (peer == null) break;
+ try {
+ boolean allowShortCircuitLocalReads = dfsClient.getConf().
+ shortCircuitLocalReads && (!shortCircuitForbidden());
+ reader = BlockReaderFactory.newBlockReader(
+ dfsClient.conf, file, block, blockToken, startOffset,
+ len, verifyChecksum, clientName, peer, chosenNode,
+ dsFactory, allowShortCircuitLocalReads);
+ return reader;
+ } catch (IOException ex) {
+ DFSClient.LOG.debug("Error making BlockReader with DomainSocket. " +
+ "Closing stale " + peer, ex);
+ } finally {
+ if (reader == null) {
+ IOUtils.closeQuietly(peer);
}
}
- boolean success = false;
+ }
+
+ // Try to create a DomainPeer.
+ DomainSocket domSock = dsFactory.create(dnAddr, this);
+ if (domSock != null) {
+ Peer peer = new DomainPeer(domSock);
try {
- boolean allowShortCircuitLocalReads =
- (peer.getDomainSocket() != null) &&
- dfsClient.getConf().shortCircuitLocalReads &&
- (!shortCircuitForbidden());
- // Here we will try to send either an OP_READ_BLOCK request or an
- // OP_REQUEST_SHORT_CIRCUIT_FDS, depending on what kind of block reader
- // we're trying to create.
- BlockReader blockReader = BlockReaderFactory.newBlockReader(
+ boolean allowShortCircuitLocalReads = dfsClient.getConf().
+ shortCircuitLocalReads && (!shortCircuitForbidden());
+ reader = BlockReaderFactory.newBlockReader(
dfsClient.conf, file, block, blockToken, startOffset,
len, verifyChecksum, clientName, peer, chosenNode,
- dfsClient.getDomainSocketFactory(), allowShortCircuitLocalReads);
- success = true;
- return blockReader;
- } catch (IOException ex) {
- // Our socket is no good.
- DFSClient.LOG.debug("Error making BlockReader. " +
- "Closing stale " + peer, ex);
- if (peer.getDomainSocket() != null) {
- // If the Peer that we got the error from was a DomainPeer,
- // mark the socket path as bad, so that newDataSocket will not try
- // to re-open this socket for a while.
- dfsClient.getDomainSocketFactory().
- disableDomainSocketPath(peer.getDomainSocket().getPath());
+ dsFactory, allowShortCircuitLocalReads);
+ return reader;
+ } catch (IOException e) {
+ DFSClient.LOG.warn("failed to connect to " + domSock, e);
+ } finally {
+ if (reader == null) {
+ // If the Peer that we got the error from was a DomainPeer,
+ // mark the socket path as bad, so that newDataSocket will not try
+ // to re-open this socket for a while.
+ dsFactory.disableDomainSocketPath(domSock.getPath());
+ IOUtils.closeQuietly(peer);
}
- err = ex;
+ }
+ }
+
+ // Look for cached peers.
+ for (; cacheTries < nCachedConnRetry; ++cacheTries) {
+ Peer peer = peerCache.get(chosenNode, false);
+ if (peer == null) break;
+ try {
+ reader = BlockReaderFactory.newBlockReader(
+ dfsClient.conf, file, block, blockToken, startOffset,
+ len, verifyChecksum, clientName, peer, chosenNode,
+ dsFactory, false);
+ return reader;
+ } catch (IOException ex) {
+ DFSClient.LOG.debug("Error making BlockReader. Closing stale " +
+ peer, ex);
} finally {
- if (!success) {
+ if (reader == null) {
IOUtils.closeQuietly(peer);
}
}
}
-
- throw err;
+ if (tcpReadsDisabledForTesting) {
+ throw new IOException("TCP reads are disabled.");
+ }
+ // Try to create a new remote peer.
+ Peer peer = newTcpPeer(dnAddr);
+ return BlockReaderFactory.newBlockReader(
+ dfsClient.conf, file, block, blockToken, startOffset,
+ len, verifyChecksum, clientName, peer, chosenNode,
+ dsFactory, false);
}
Modified: hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/PeerCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/PeerCache.java?rev=1437616&r1=1437615&r2=1437616&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/PeerCache.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/PeerCache.java Wed Jan 23 18:38:56 2013
@@ -39,6 +39,30 @@ import org.apache.hadoop.util.Time;
class PeerCache {
private static final Log LOG = LogFactory.getLog(PeerCache.class);
+ private static class Key {
+ final DatanodeID dnID;
+ final boolean isDomain;
+
+ Key(DatanodeID dnID, boolean isDomain) {
+ this.dnID = dnID;
+ this.isDomain = isDomain;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (!(o instanceof Key)) {
+ return false;
+ }
+ Key other = (Key)o;
+ return dnID.equals(other.dnID) && isDomain == other.isDomain;
+ }
+
+ @Override
+ public int hashCode() {
+ return dnID.hashCode() ^ (isDomain ? 1 : 0);
+ }
+ }
+
private static class Value {
private final Peer peer;
private final long time;
@@ -59,7 +83,7 @@ class PeerCache {
private Daemon daemon;
/** A map for per user per datanode. */
- private static LinkedListMultimap<DatanodeID, Value> multimap =
+ private static LinkedListMultimap<Key, Value> multimap =
LinkedListMultimap.create();
private static int capacity;
private static long expiryPeriod;
@@ -124,16 +148,18 @@ class PeerCache {
/**
* Get a cached peer connected to the given DataNode.
* @param dnId The DataNode to get a Peer for.
+ * @param isDomain Whether to retrieve a DomainPeer or not.
+ *
* @return An open Peer connected to the DN, or null if none
* was found.
*/
- public synchronized Peer get(DatanodeID dnId) {
+ public synchronized Peer get(DatanodeID dnId, boolean isDomain) {
if (capacity <= 0) { // disabled
return null;
}
- List<Value> sockStreamList = multimap.get(dnId);
+ List<Value> sockStreamList = multimap.get(new Key(dnId, isDomain));
if (sockStreamList == null) {
return null;
}
@@ -168,7 +194,8 @@ class PeerCache {
if (capacity == multimap.size()) {
evictOldest();
}
- multimap.put(dnId, new Value(peer, Time.monotonicNow()));
+ multimap.put(new Key(dnId, peer.getDomainSocket() != null),
+ new Value(peer, Time.monotonicNow()));
}
public synchronized int size() {
@@ -180,9 +207,9 @@ class PeerCache {
*/
private synchronized void evictExpired(long expiryPeriod) {
while (multimap.size() != 0) {
- Iterator<Entry<DatanodeID, Value>> iter =
+ Iterator<Entry<Key, Value>> iter =
multimap.entries().iterator();
- Entry<DatanodeID, Value> entry = iter.next();
+ Entry<Key, Value> entry = iter.next();
// if oldest socket expired, remove it
if (entry == null ||
Time.monotonicNow() - entry.getValue().getTime() <
@@ -201,13 +228,13 @@ class PeerCache {
// We can get the oldest element immediately, because of an interesting
// property of LinkedListMultimap: its iterator traverses entries in the
// order that they were added.
- Iterator<Entry<DatanodeID, Value>> iter =
+ Iterator<Entry<Key, Value>> iter =
multimap.entries().iterator();
if (!iter.hasNext()) {
throw new IllegalStateException("Cannot evict from empty cache! " +
"capacity: " + capacity);
}
- Entry<DatanodeID, Value> entry = iter.next();
+ Entry<Key, Value> entry = iter.next();
IOUtils.cleanup(LOG, entry.getValue().getPeer());
iter.remove();
}
Modified: hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java?rev=1437616&r1=1437615&r2=1437616&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java Wed Jan 23 18:38:56 2013
@@ -70,7 +70,6 @@ import org.apache.hadoop.hdfs.server.pro
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.MD5Hash;
import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.net.unix.DomainSocket;
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.DataChecksum;
Modified: hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferKeepalive.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferKeepalive.java?rev=1437616&r1=1437615&r2=1437616&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferKeepalive.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferKeepalive.java Wed Jan 23 18:38:56 2013
@@ -114,7 +114,7 @@ public class TestDataTransferKeepalive {
// Take it out of the cache - reading should
// give an EOF.
- Peer peer = dfsClient.peerCache.get(dn.getDatanodeId());
+ Peer peer = dfsClient.peerCache.get(dn.getDatanodeId(), false);
assertNotNull(peer);
assertEquals(-1, peer.getInputStream().read());
}
Added: hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelShortCircuitReadUnCached.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelShortCircuitReadUnCached.java?rev=1437616&view=auto
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelShortCircuitReadUnCached.java (added)
+++ hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelShortCircuitReadUnCached.java Wed Jan 23 18:38:56 2013
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import java.io.File;
+import org.apache.hadoop.net.unix.DomainSocket;
+import org.apache.hadoop.net.unix.TemporarySocketDirectory;
+import org.junit.AfterClass;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import static org.hamcrest.CoreMatchers.*;
+
+/**
+ * This class tests short-circuit local reads without any FileInputStream or
+ * Socket caching. This is a regression test for HDFS-4417.
+ */
+public class TestParallelShortCircuitReadUnCached extends TestParallelReadUtil {
+ private static TemporarySocketDirectory sockDir;
+
+ @BeforeClass
+ static public void setupCluster() throws Exception {
+ if (DomainSocket.getLoadingFailureReason() != null) return;
+ sockDir = new TemporarySocketDirectory();
+ HdfsConfiguration conf = new HdfsConfiguration();
+ conf.set(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY,
+ new File(sockDir.getDir(),
+ "TestParallelShortCircuitReadUnCached._PORT.sock").getAbsolutePath());
+ conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, true);
+ conf.setBoolean(DFSConfigKeys.
+ DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY, false);
+ conf.setBoolean(DFSConfigKeys.
+ DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC, true);
+ // We want to test reading from stale sockets.
+ conf.setInt(DFSConfigKeys.DFS_DATANODE_SOCKET_REUSE_KEEPALIVE_KEY, 1);
+ conf.setLong(DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_KEY,
+ 5 * 60 * 1000);
+ conf.setInt(DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_KEY, 32);
+ // Avoid using the FileInputStreamCache.
+ conf.setInt(DFSConfigKeys.
+ DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_SIZE_KEY, 0);
+ DomainSocket.disableBindPathValidation();
+ DFSInputStream.tcpReadsDisabledForTesting = true;
+ setupCluster(1, conf);
+ }
+
+ @Before
+ public void before() {
+ Assume.assumeThat(DomainSocket.getLoadingFailureReason(), equalTo(null));
+ }
+
+ @AfterClass
+ static public void teardownCluster() throws Exception {
+ if (DomainSocket.getLoadingFailureReason() != null) return;
+ sockDir.close();
+ TestParallelReadUtil.teardownCluster();
+ }
+}
\ No newline at end of file
Modified: hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPeerCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPeerCache.java?rev=1437616&r1=1437615&r2=1437616&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPeerCache.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPeerCache.java Wed Jan 23 18:38:56 2013
@@ -33,22 +33,25 @@ import org.apache.hadoop.hdfs.protocol.D
import org.apache.hadoop.hdfs.net.Peer;
import org.apache.hadoop.net.unix.DomainSocket;
import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
public class TestPeerCache {
static final Log LOG = LogFactory.getLog(TestPeerCache.class);
private static final int CAPACITY = 3;
private static final int EXPIRY_PERIOD = 20;
- private static PeerCache cache =
- PeerCache.getInstance(CAPACITY, EXPIRY_PERIOD);
private static class FakePeer implements Peer {
private boolean closed = false;
+ private final boolean hasDomain;
private DatanodeID dnId;
- public FakePeer(DatanodeID dnId) {
+ public FakePeer(DatanodeID dnId, boolean hasDomain) {
this.dnId = dnId;
+ this.hasDomain = hasDomain;
}
@Override
@@ -118,39 +121,50 @@ public class TestPeerCache {
@Override
public DomainSocket getDomainSocket() {
- return null;
+ if (!hasDomain) return null;
+ // Return a mock which throws an exception whenever any function is
+ // called.
+ return Mockito.mock(DomainSocket.class,
+ new Answer<Object>() {
+ @Override
+ public Object answer(InvocationOnMock invocation)
+ throws Throwable {
+ throw new RuntimeException("injected fault.");
+ } });
}
}
@Test
public void testAddAndRetrieve() throws Exception {
+ PeerCache cache = PeerCache.getInstance(3, 100000);
DatanodeID dnId = new DatanodeID("192.168.0.1",
"fakehostname", "fake_storage_id",
100, 101, 102);
- FakePeer peer = new FakePeer(dnId);
+ FakePeer peer = new FakePeer(dnId, false);
cache.put(dnId, peer);
assertTrue(!peer.isClosed());
assertEquals(1, cache.size());
- assertEquals(peer, cache.get(dnId));
+ assertEquals(peer, cache.get(dnId, false));
assertEquals(0, cache.size());
cache.clear();
}
@Test
public void testExpiry() throws Exception {
+ final int CAPACITY = 3;
+ final int EXPIRY_PERIOD = 10;
+ PeerCache cache = PeerCache.getInstance(CAPACITY, EXPIRY_PERIOD);
DatanodeID dnIds[] = new DatanodeID[CAPACITY];
FakePeer peers[] = new FakePeer[CAPACITY];
for (int i = 0; i < CAPACITY; ++i) {
dnIds[i] = new DatanodeID("192.168.0.1",
"fakehostname_" + i, "fake_storage_id",
100, 101, 102);
- peers[i] = new FakePeer(dnIds[i]);
+ peers[i] = new FakePeer(dnIds[i], false);
}
for (int i = 0; i < CAPACITY; ++i) {
cache.put(dnIds[i], peers[i]);
}
- // Check that the peers are cached
- assertEquals(CAPACITY, cache.size());
// Wait for the peers to expire
Thread.sleep(EXPIRY_PERIOD * 50);
@@ -169,13 +183,15 @@ public class TestPeerCache {
@Test
public void testEviction() throws Exception {
+ final int CAPACITY = 3;
+ PeerCache cache = PeerCache.getInstance(CAPACITY, 100000);
DatanodeID dnIds[] = new DatanodeID[CAPACITY + 1];
FakePeer peers[] = new FakePeer[CAPACITY + 1];
for (int i = 0; i < dnIds.length; ++i) {
dnIds[i] = new DatanodeID("192.168.0.1",
"fakehostname_" + i, "fake_storage_id_" + i,
100, 101, 102);
- peers[i] = new FakePeer(dnIds[i]);
+ peers[i] = new FakePeer(dnIds[i], false);
}
for (int i = 0; i < CAPACITY; ++i) {
cache.put(dnIds[i], peers[i]);
@@ -186,11 +202,11 @@ public class TestPeerCache {
// Add another entry and check that the first entry was evicted
cache.put(dnIds[CAPACITY], peers[CAPACITY]);
assertEquals(CAPACITY, cache.size());
- assertSame(null, cache.get(dnIds[0]));
+ assertSame(null, cache.get(dnIds[0], false));
// Make sure that the other entries are still there
for (int i = 1; i < CAPACITY; ++i) {
- Peer peer = cache.get(dnIds[i]);
+ Peer peer = cache.get(dnIds[i], false);
assertSame(peers[i], peer);
assertTrue(!peer.isClosed());
peer.close();
@@ -201,19 +217,56 @@ public class TestPeerCache {
@Test
public void testMultiplePeersWithSameDnId() throws Exception {
+ final int CAPACITY = 3;
+ PeerCache cache = PeerCache.getInstance(CAPACITY, 100000);
+ DatanodeID dnId = new DatanodeID("192.168.0.1",
+ "fakehostname", "fake_storage_id",
+ 100, 101, 102);
+ HashSet<FakePeer> peers = new HashSet<FakePeer>(CAPACITY);
+ for (int i = 0; i < CAPACITY; ++i) {
+ FakePeer peer = new FakePeer(dnId, false);
+ peers.add(peer);
+ cache.put(dnId, peer);
+ }
+ // Check that all of the peers ended up in the cache
+ assertEquals(CAPACITY, cache.size());
+ while (!peers.isEmpty()) {
+ Peer peer = cache.get(dnId, false);
+ assertTrue(peer != null);
+ assertTrue(!peer.isClosed());
+ peers.remove(peer);
+ }
+ assertEquals(0, cache.size());
+ cache.clear();
+ }
+
+ @Test
+ public void testDomainSocketPeers() throws Exception {
+ final int CAPACITY = 3;
+ PeerCache cache = PeerCache.getInstance(CAPACITY, 100000);
DatanodeID dnId = new DatanodeID("192.168.0.1",
"fakehostname", "fake_storage_id",
100, 101, 102);
HashSet<FakePeer> peers = new HashSet<FakePeer>(CAPACITY);
for (int i = 0; i < CAPACITY; ++i) {
- FakePeer peer = new FakePeer(dnId);
+ FakePeer peer = new FakePeer(dnId, i == CAPACITY - 1);
peers.add(peer);
cache.put(dnId, peer);
}
// Check that all of the peers ended up in the cache
assertEquals(CAPACITY, cache.size());
+ // Test that get(requireDomainPeer=true) finds the peer with the
+ // domain socket.
+ Peer peer = cache.get(dnId, true);
+ assertTrue(peer.getDomainSocket() != null);
+ peers.remove(peer);
+ // Test that get(requireDomainPeer=true) returns null when there are
+ // no more peers with domain sockets.
+ peer = cache.get(dnId, true);
+ assertTrue(peer == null);
+ // Check that all of the other peers ended up in the cache.
while (!peers.isEmpty()) {
- Peer peer = cache.get(dnId);
+ peer = cache.get(dnId, false);
assertTrue(peer != null);
assertTrue(!peer.isClosed());
peers.remove(peer);