You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by to...@apache.org on 2013/01/23 19:38:57 UTC

svn commit: r1437616 - in /hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs: ./ src/main/java/org/apache/hadoop/hdfs/ src/main/java/org/apache/hadoop/hdfs/server/datanode/ src/test/java/org/apache/hadoop/hdfs/

Author: todd
Date: Wed Jan 23 18:38:56 2013
New Revision: 1437616

URL: http://svn.apache.org/viewvc?rev=1437616&view=rev
Log:
HDFS-4417. Fix case where local reads get disabled incorrectly. Contributed by Colin Patrick McCabe.

Added:
    hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelShortCircuitReadUnCached.java
Modified:
    hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-347.txt
    hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java
    hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
    hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/PeerCache.java
    hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
    hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferKeepalive.java
    hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPeerCache.java

Modified: hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-347.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-347.txt?rev=1437616&r1=1437615&r2=1437616&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-347.txt (original)
+++ hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-347.txt Wed Jan 23 18:38:56 2013
@@ -30,3 +30,6 @@ HDFS-4418. increase default FileInputStr
 
 HDFS-4416. Rename dfs.datanode.domain.socket.path to dfs.domain.socket.path
 (Colin Patrick McCabe via todd)
+
+HDFS-4417. Fix case where local reads get disabled incorrectly
+(Colin Patrick McCabe and todd via todd)

Modified: hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java?rev=1437616&r1=1437615&r2=1437616&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java Wed Jan 23 18:38:56 2013
@@ -68,7 +68,7 @@ public class BlockReaderFactory {
    *                             case.
    * @param allowShortCircuitLocalReads  True if short-circuit local reads
    *                                     should be allowed.
-   * @return New BlockReader instance, or null on error.
+   * @return New BlockReader instance
    */
   @SuppressWarnings("deprecation")
   public static BlockReader newBlockReader(

Modified: hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java?rev=1437616&r1=1437615&r2=1437616&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java Wed Jan 23 18:38:56 2013
@@ -35,8 +35,8 @@ import java.util.concurrent.ConcurrentHa
 
 import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.fs.ChecksumException;
 import org.apache.hadoop.fs.ByteBufferReadable;
+import org.apache.hadoop.fs.ChecksumException;
 import org.apache.hadoop.fs.FSInputStream;
 import org.apache.hadoop.fs.UnresolvedLinkException;
 import org.apache.hadoop.hdfs.net.DomainPeer;
@@ -56,7 +56,8 @@ import org.apache.hadoop.ipc.RemoteExcep
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.net.unix.DomainSocket;
 import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.hdfs.FileInputStreamCache;
+
+import com.google.common.annotations.VisibleForTesting;
 
 /****************************************************************
  * DFSInputStream provides bytes from a named file.  It handles 
@@ -64,11 +65,11 @@ import org.apache.hadoop.hdfs.FileInputS
  ****************************************************************/
 @InterfaceAudience.Private
 public class DFSInputStream extends FSInputStream implements ByteBufferReadable {
+  @VisibleForTesting
+  static boolean tcpReadsDisabledForTesting = false;
   private final PeerCache peerCache;
-
   private final DFSClient dfsClient;
   private boolean closed = false;
-
   private final String src;
   private final long prefetchSize;
   private BlockReader blockReader = null;
@@ -853,33 +854,23 @@ public class DFSInputStream extends FSIn
     }
   }
 
-  private Peer newPeer(InetSocketAddress addr) throws IOException {
+  private Peer newTcpPeer(InetSocketAddress addr) throws IOException {
     Peer peer = null;
     boolean success = false;
     Socket sock = null;
-    DomainSocket domSock = null;
-
     try {
-      domSock = dfsClient.getDomainSocketFactory().create(addr, this);
-      if (domSock != null) {
-        // Create a UNIX Domain peer.
-        peer = new DomainPeer(domSock);
-      } else {
-        // Create a conventional TCP-based Peer.
-        sock = dfsClient.socketFactory.createSocket();
-        NetUtils.connect(sock, addr,
-          dfsClient.getRandomLocalInterfaceAddr(),
-          dfsClient.getConf().socketTimeout);
-        peer = TcpPeerServer.peerFromSocketAndKey(sock, 
-            dfsClient.getDataEncryptionKey());
-      }
+      sock = dfsClient.socketFactory.createSocket();
+      NetUtils.connect(sock, addr,
+        dfsClient.getRandomLocalInterfaceAddr(),
+        dfsClient.getConf().socketTimeout);
+      peer = TcpPeerServer.peerFromSocketAndKey(sock, 
+          dfsClient.getDataEncryptionKey());
       success = true;
       return peer;
     } finally {
       if (!success) {
         IOUtils.closeQuietly(peer);
         IOUtils.closeQuietly(sock);
-        IOUtils.closeQuietly(domSock);
       }
     }
   }
@@ -888,6 +879,9 @@ public class DFSInputStream extends FSIn
    * Retrieve a BlockReader suitable for reading.
    * This method will reuse the cached connection to the DN if appropriate.
    * Otherwise, it will create a new connection.
+   * Throwing an IOException from this method is basically equivalent to 
+   * declaring the DataNode bad, so we try to connect a lot of different ways
+   * before doing that.
    *
    * @param dnAddr  Address of the datanode
    * @param chosenNode Chosen datanode information
@@ -912,9 +906,6 @@ public class DFSInputStream extends FSIn
                                        boolean verifyChecksum,
                                        String clientName)
       throws IOException {
-    
-    IOException err = null;
-
     // Firstly, we check to see if we have cached any file descriptors for
     // local blocks.  If so, we can just re-use those file descriptors.
     FileInputStream fis[] = fileInputStreamCache.get(chosenNode, block);
@@ -927,67 +918,84 @@ public class DFSInputStream extends FSIn
         block, startOffset, len, fis[0], fis[1], chosenNode, verifyChecksum);
     }
 
-    // We retry several times here.
-    // On the first nCachedConnRetry times, we try to fetch a socket from
-    // the socketCache and use it.  This may fail, since the old socket may
-    // have been closed by the peer.
-    // After that, we try to create a new socket using newPeer().
-    // This may create either a TCP socket or a UNIX domain socket, depending
-    // on the configuration and whether the peer is remote.
-    // If we try to create a UNIX domain socket and fail, we will not try that 
-    // again.  Instead, we'll try to create a TCP socket.  Only after we've 
-    // failed to create a TCP-based BlockReader will we throw an IOException
-    // from this function.  Throwing an IOException from here is basically
-    // equivalent to declaring the DataNode bad.
-    boolean triedNonDomainSocketReader = false;
-    for (int retries = 0;
-          retries < nCachedConnRetry || (!triedNonDomainSocketReader);
-          ++retries) {
-      Peer peer = null;
-      if (retries < nCachedConnRetry) {
-        peer = peerCache.get(chosenNode);
-      }
-      if (peer == null) {
-        peer = newPeer(dnAddr);
-        if (peer.getDomainSocket() == null) {
-          triedNonDomainSocketReader = true;
+    // Look for cached domain peers.
+    int cacheTries = 0;
+    DomainSocketFactory dsFactory = dfsClient.getDomainSocketFactory();
+    BlockReader reader = null;
+    for (; cacheTries < nCachedConnRetry; ++cacheTries) {
+      Peer peer = peerCache.get(chosenNode, true);
+      if (peer == null) break;
+      try {
+        boolean allowShortCircuitLocalReads = dfsClient.getConf().
+            shortCircuitLocalReads && (!shortCircuitForbidden());
+        reader = BlockReaderFactory.newBlockReader(
+            dfsClient.conf, file, block, blockToken, startOffset,
+            len, verifyChecksum, clientName, peer, chosenNode, 
+            dsFactory, allowShortCircuitLocalReads);
+        return reader;
+      } catch (IOException ex) {
+        DFSClient.LOG.debug("Error making BlockReader with DomainSocket. " +
+            "Closing stale " + peer, ex);
+      } finally {
+        if (reader == null) {
+          IOUtils.closeQuietly(peer);
         }
       }
-      boolean success = false;
+    }
+
+    // Try to create a DomainPeer.
+    DomainSocket domSock = dsFactory.create(dnAddr, this);
+    if (domSock != null) {
+      Peer peer = new DomainPeer(domSock);
       try {
-        boolean allowShortCircuitLocalReads =
-          (peer.getDomainSocket() != null) &&
-          dfsClient.getConf().shortCircuitLocalReads && 
-          (!shortCircuitForbidden());
-        // Here we will try to send either an OP_READ_BLOCK request or an 
-        // OP_REQUEST_SHORT_CIRCUIT_FDS, depending on what kind of block reader 
-        // we're trying to create.
-        BlockReader blockReader = BlockReaderFactory.newBlockReader(
+        boolean allowShortCircuitLocalReads = dfsClient.getConf().
+            shortCircuitLocalReads && (!shortCircuitForbidden());
+        reader = BlockReaderFactory.newBlockReader(
             dfsClient.conf, file, block, blockToken, startOffset,
             len, verifyChecksum, clientName, peer, chosenNode, 
-            dfsClient.getDomainSocketFactory(), allowShortCircuitLocalReads);
-        success = true;
-        return blockReader;
-       } catch (IOException ex) {
-         // Our socket is no good.
-        DFSClient.LOG.debug("Error making BlockReader. " +
-            "Closing stale " + peer, ex);
-        if (peer.getDomainSocket() != null) {
-          // If the Peer that we got the error from was a DomainPeer,
-          // mark the socket path as bad, so that newDataSocket will not try 
-          // to re-open this socket for a while.
-          dfsClient.getDomainSocketFactory().
-              disableDomainSocketPath(peer.getDomainSocket().getPath());
+            dsFactory, allowShortCircuitLocalReads);
+        return reader;
+      } catch (IOException e) {
+        DFSClient.LOG.warn("failed to connect to " + domSock, e);
+      } finally {
+        if (reader == null) {
+         // If the Peer that we got the error from was a DomainPeer,
+         // mark the socket path as bad, so that newDataSocket will not try 
+         // to re-open this socket for a while.
+         dsFactory.disableDomainSocketPath(domSock.getPath());
+         IOUtils.closeQuietly(peer);
         }
-        err = ex;
+      }
+    }
+
+    // Look for cached peers.
+    for (; cacheTries < nCachedConnRetry; ++cacheTries) {
+      Peer peer = peerCache.get(chosenNode, false);
+      if (peer == null) break;
+      try {
+        reader = BlockReaderFactory.newBlockReader(
+            dfsClient.conf, file, block, blockToken, startOffset,
+            len, verifyChecksum, clientName, peer, chosenNode, 
+            dsFactory, false);
+        return reader;
+      } catch (IOException ex) {
+        DFSClient.LOG.debug("Error making BlockReader. Closing stale " +
+          peer, ex);
       } finally {
-        if (!success) {
+        if (reader == null) {
           IOUtils.closeQuietly(peer);
         }
       }
     }
-
-    throw err;
+    if (tcpReadsDisabledForTesting) {
+      throw new IOException("TCP reads are disabled.");
+    }
+    // Try to create a new remote peer.
+    Peer peer = newTcpPeer(dnAddr);
+    return BlockReaderFactory.newBlockReader(
+        dfsClient.conf, file, block, blockToken, startOffset,
+        len, verifyChecksum, clientName, peer, chosenNode, 
+        dsFactory, false);
   }
 
 

Modified: hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/PeerCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/PeerCache.java?rev=1437616&r1=1437615&r2=1437616&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/PeerCache.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/PeerCache.java Wed Jan 23 18:38:56 2013
@@ -39,6 +39,30 @@ import org.apache.hadoop.util.Time;
 class PeerCache {
   private static final Log LOG = LogFactory.getLog(PeerCache.class);
   
+  private static class Key {
+    final DatanodeID dnID;
+    final boolean isDomain;
+    
+    Key(DatanodeID dnID, boolean isDomain) {
+      this.dnID = dnID;
+      this.isDomain = isDomain;
+    }
+    
+    @Override
+    public boolean equals(Object o) {
+      if (!(o instanceof Key)) {
+        return false;
+      }
+      Key other = (Key)o;
+      return dnID.equals(other.dnID) && isDomain == other.isDomain;
+    }
+
+    @Override
+    public int hashCode() {
+      return dnID.hashCode() ^ (isDomain ? 1 : 0);
+    }
+  }
+  
   private static class Value {
     private final Peer peer;
     private final long time;
@@ -59,7 +83,7 @@ class PeerCache {
 
   private Daemon daemon;
   /** A map for per user per datanode. */
-  private static LinkedListMultimap<DatanodeID, Value> multimap =
+  private static LinkedListMultimap<Key, Value> multimap =
     LinkedListMultimap.create();
   private static int capacity;
   private static long expiryPeriod;
@@ -124,16 +148,18 @@ class PeerCache {
   /**
    * Get a cached peer connected to the given DataNode.
    * @param dnId         The DataNode to get a Peer for.
+   * @param isDomain     Whether to retrieve a DomainPeer or not.
+   *
    * @return             An open Peer connected to the DN, or null if none
    *                     was found. 
    */
-  public synchronized Peer get(DatanodeID dnId) {
+  public synchronized Peer get(DatanodeID dnId, boolean isDomain) {
 
     if (capacity <= 0) { // disabled
       return null;
     }
 
-    List<Value> sockStreamList = multimap.get(dnId);
+    List<Value> sockStreamList = multimap.get(new Key(dnId, isDomain));
     if (sockStreamList == null) {
       return null;
     }
@@ -168,7 +194,8 @@ class PeerCache {
     if (capacity == multimap.size()) {
       evictOldest();
     }
-    multimap.put(dnId, new Value(peer, Time.monotonicNow()));
+    multimap.put(new Key(dnId, peer.getDomainSocket() != null),
+        new Value(peer, Time.monotonicNow()));
   }
 
   public synchronized int size() {
@@ -180,9 +207,9 @@ class PeerCache {
    */
   private synchronized void evictExpired(long expiryPeriod) {
     while (multimap.size() != 0) {
-      Iterator<Entry<DatanodeID, Value>> iter =
+      Iterator<Entry<Key, Value>> iter =
         multimap.entries().iterator();
-      Entry<DatanodeID, Value> entry = iter.next();
+      Entry<Key, Value> entry = iter.next();
       // if oldest socket expired, remove it
       if (entry == null || 
         Time.monotonicNow() - entry.getValue().getTime() <
@@ -201,13 +228,13 @@ class PeerCache {
     // We can get the oldest element immediately, because of an interesting
     // property of LinkedListMultimap: its iterator traverses entries in the
     // order that they were added.
-    Iterator<Entry<DatanodeID, Value>> iter =
+    Iterator<Entry<Key, Value>> iter =
       multimap.entries().iterator();
     if (!iter.hasNext()) {
       throw new IllegalStateException("Cannot evict from empty cache! " +
         "capacity: " + capacity);
     }
-    Entry<DatanodeID, Value> entry = iter.next();
+    Entry<Key, Value> entry = iter.next();
     IOUtils.cleanup(LOG, entry.getValue().getPeer());
     iter.remove();
   }

Modified: hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java?rev=1437616&r1=1437615&r2=1437616&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java Wed Jan 23 18:38:56 2013
@@ -70,7 +70,6 @@ import org.apache.hadoop.hdfs.server.pro
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.MD5Hash;
 import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.net.unix.DomainSocket;
 import org.apache.hadoop.security.token.SecretManager.InvalidToken;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.DataChecksum;

Modified: hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferKeepalive.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferKeepalive.java?rev=1437616&r1=1437615&r2=1437616&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferKeepalive.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferKeepalive.java Wed Jan 23 18:38:56 2013
@@ -114,7 +114,7 @@ public class TestDataTransferKeepalive {
     
     // Take it out of the cache - reading should
     // give an EOF.
-    Peer peer = dfsClient.peerCache.get(dn.getDatanodeId());
+    Peer peer = dfsClient.peerCache.get(dn.getDatanodeId(), false);
     assertNotNull(peer);
     assertEquals(-1, peer.getInputStream().read());
   }

Added: hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelShortCircuitReadUnCached.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelShortCircuitReadUnCached.java?rev=1437616&view=auto
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelShortCircuitReadUnCached.java (added)
+++ hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelShortCircuitReadUnCached.java Wed Jan 23 18:38:56 2013
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import java.io.File;
+import org.apache.hadoop.net.unix.DomainSocket;
+import org.apache.hadoop.net.unix.TemporarySocketDirectory;
+import org.junit.AfterClass;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import static org.hamcrest.CoreMatchers.*;
+
+/**
+ * This class tests short-circuit local reads without any FileInputStream or
+ * Socket caching.  This is a regression test for HDFS-4417.
+ */
+public class TestParallelShortCircuitReadUnCached extends TestParallelReadUtil {
+  private static TemporarySocketDirectory sockDir;
+
+  @BeforeClass
+  static public void setupCluster() throws Exception {
+    if (DomainSocket.getLoadingFailureReason() != null) return;
+    sockDir = new TemporarySocketDirectory();
+    HdfsConfiguration conf = new HdfsConfiguration();
+    conf.set(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY,
+      new File(sockDir.getDir(), 
+        "TestParallelShortCircuitReadUnCached._PORT.sock").getAbsolutePath());
+    conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, true);
+    conf.setBoolean(DFSConfigKeys.
+        DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY, false);
+    conf.setBoolean(DFSConfigKeys.
+        DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC, true);
+    // We want to test reading from stale sockets.
+    conf.setInt(DFSConfigKeys.DFS_DATANODE_SOCKET_REUSE_KEEPALIVE_KEY, 1);
+    conf.setLong(DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_KEY,
+        5 * 60 * 1000);
+    conf.setInt(DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_KEY, 32);
+    // Avoid using the FileInputStreamCache.
+    conf.setInt(DFSConfigKeys.
+        DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_SIZE_KEY, 0);
+    DomainSocket.disableBindPathValidation();
+    DFSInputStream.tcpReadsDisabledForTesting = true;
+    setupCluster(1, conf);
+  }
+
+  @Before
+  public void before() {
+    Assume.assumeThat(DomainSocket.getLoadingFailureReason(), equalTo(null));
+  }
+
+  @AfterClass
+  static public void teardownCluster() throws Exception {
+    if (DomainSocket.getLoadingFailureReason() != null) return;
+    sockDir.close();
+    TestParallelReadUtil.teardownCluster();
+  }
+}
\ No newline at end of file

Modified: hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPeerCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPeerCache.java?rev=1437616&r1=1437615&r2=1437616&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPeerCache.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPeerCache.java Wed Jan 23 18:38:56 2013
@@ -33,22 +33,25 @@ import org.apache.hadoop.hdfs.protocol.D
 import org.apache.hadoop.hdfs.net.Peer;
 import org.apache.hadoop.net.unix.DomainSocket;
 import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 
 public class TestPeerCache {
   static final Log LOG = LogFactory.getLog(TestPeerCache.class);
 
   private static final int CAPACITY = 3;
   private static final int EXPIRY_PERIOD = 20;
-  private static PeerCache cache =
-      PeerCache.getInstance(CAPACITY, EXPIRY_PERIOD);
 
   private static class FakePeer implements Peer {
     private boolean closed = false;
+    private final boolean hasDomain;
 
     private DatanodeID dnId;
 
-    public FakePeer(DatanodeID dnId) {
+    public FakePeer(DatanodeID dnId, boolean hasDomain) {
       this.dnId = dnId;
+      this.hasDomain = hasDomain;
     }
 
     @Override
@@ -118,39 +121,50 @@ public class TestPeerCache {
 
     @Override
     public DomainSocket getDomainSocket() {
-      return null;
+      if (!hasDomain) return null;
+      // Return a mock which throws an exception whenever any function is
+      // called.
+      return Mockito.mock(DomainSocket.class,
+          new Answer<Object>() {
+            @Override
+            public Object answer(InvocationOnMock invocation)
+                throws Throwable {
+              throw new RuntimeException("injected fault.");
+          } });
     }
   }
 
   @Test
   public void testAddAndRetrieve() throws Exception {
+    PeerCache cache = PeerCache.getInstance(3, 100000);
     DatanodeID dnId = new DatanodeID("192.168.0.1",
           "fakehostname", "fake_storage_id",
           100, 101, 102);
-    FakePeer peer = new FakePeer(dnId);
+    FakePeer peer = new FakePeer(dnId, false);
     cache.put(dnId, peer);
     assertTrue(!peer.isClosed());
     assertEquals(1, cache.size());
-    assertEquals(peer, cache.get(dnId));
+    assertEquals(peer, cache.get(dnId, false));
     assertEquals(0, cache.size());
     cache.clear();
   }
 
   @Test
   public void testExpiry() throws Exception {
+    final int CAPACITY = 3;
+    final int EXPIRY_PERIOD = 10;
+    PeerCache cache = PeerCache.getInstance(CAPACITY, EXPIRY_PERIOD);
     DatanodeID dnIds[] = new DatanodeID[CAPACITY];
     FakePeer peers[] = new FakePeer[CAPACITY];
     for (int i = 0; i < CAPACITY; ++i) {
       dnIds[i] = new DatanodeID("192.168.0.1",
           "fakehostname_" + i, "fake_storage_id",
           100, 101, 102);
-      peers[i] = new FakePeer(dnIds[i]);
+      peers[i] = new FakePeer(dnIds[i], false);
     }
     for (int i = 0; i < CAPACITY; ++i) {
       cache.put(dnIds[i], peers[i]);
     }
-    // Check that the peers are cached
-    assertEquals(CAPACITY, cache.size());
 
     // Wait for the peers to expire
     Thread.sleep(EXPIRY_PERIOD * 50);
@@ -169,13 +183,15 @@ public class TestPeerCache {
 
   @Test
   public void testEviction() throws Exception {
+    final int CAPACITY = 3;
+    PeerCache cache = PeerCache.getInstance(CAPACITY, 100000);
     DatanodeID dnIds[] = new DatanodeID[CAPACITY + 1];
     FakePeer peers[] = new FakePeer[CAPACITY + 1];
     for (int i = 0; i < dnIds.length; ++i) {
       dnIds[i] = new DatanodeID("192.168.0.1",
           "fakehostname_" + i, "fake_storage_id_" + i,
           100, 101, 102);
-      peers[i] = new FakePeer(dnIds[i]);
+      peers[i] = new FakePeer(dnIds[i], false);
     }
     for (int i = 0; i < CAPACITY; ++i) {
       cache.put(dnIds[i], peers[i]);
@@ -186,11 +202,11 @@ public class TestPeerCache {
     // Add another entry and check that the first entry was evicted
     cache.put(dnIds[CAPACITY], peers[CAPACITY]);
     assertEquals(CAPACITY, cache.size());
-    assertSame(null, cache.get(dnIds[0]));
+    assertSame(null, cache.get(dnIds[0], false));
 
     // Make sure that the other entries are still there
     for (int i = 1; i < CAPACITY; ++i) {
-      Peer peer = cache.get(dnIds[i]);
+      Peer peer = cache.get(dnIds[i], false);
       assertSame(peers[i], peer);
       assertTrue(!peer.isClosed());
       peer.close();
@@ -201,19 +217,56 @@ public class TestPeerCache {
 
   @Test
   public void testMultiplePeersWithSameDnId() throws Exception {
+    final int CAPACITY = 3;
+    PeerCache cache = PeerCache.getInstance(CAPACITY, 100000);
+    DatanodeID dnId = new DatanodeID("192.168.0.1",
+          "fakehostname", "fake_storage_id",
+          100, 101, 102);
+    HashSet<FakePeer> peers = new HashSet<FakePeer>(CAPACITY);
+    for (int i = 0; i < CAPACITY; ++i) {
+      FakePeer peer = new FakePeer(dnId, false);
+      peers.add(peer);
+      cache.put(dnId, peer);
+    }
+    // Check that all of the peers ended up in the cache
+    assertEquals(CAPACITY, cache.size());
+    while (!peers.isEmpty()) {
+      Peer peer = cache.get(dnId, false);
+      assertTrue(peer != null);
+      assertTrue(!peer.isClosed());
+      peers.remove(peer);
+    }
+    assertEquals(0, cache.size());
+    cache.clear();
+  }
+
+  @Test
+  public void testDomainSocketPeers() throws Exception {
+    final int CAPACITY = 3;
+    PeerCache cache = PeerCache.getInstance(CAPACITY, 100000);
     DatanodeID dnId = new DatanodeID("192.168.0.1",
           "fakehostname", "fake_storage_id",
           100, 101, 102);
     HashSet<FakePeer> peers = new HashSet<FakePeer>(CAPACITY);
     for (int i = 0; i < CAPACITY; ++i) {
-      FakePeer peer = new FakePeer(dnId);
+      FakePeer peer = new FakePeer(dnId, i == CAPACITY - 1);
       peers.add(peer);
       cache.put(dnId, peer);
     }
     // Check that all of the peers ended up in the cache
     assertEquals(CAPACITY, cache.size());
+    // Test that get(requireDomainPeer=true) finds the peer with the 
+    // domain socket.
+    Peer peer = cache.get(dnId, true);
+    assertTrue(peer.getDomainSocket() != null);
+    peers.remove(peer);
+    // Test that get(requireDomainPeer=true) returns null when there are
+    // no more peers with domain sockets.
+    peer = cache.get(dnId, true);
+    assertTrue(peer == null);
+    // Check that all of the other peers ended up in the cache.
     while (!peers.isEmpty()) {
-      Peer peer = cache.get(dnId);
+      peer = cache.get(dnId, false);
       assertTrue(peer != null);
       assertTrue(!peer.isClosed());
       peers.remove(peer);