You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by sz...@apache.org on 2012/09/26 15:23:21 UTC

svn commit: r1390466 - in /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs: ./ src/main/java/org/apache/hadoop/hdfs/ src/test/java/org/apache/hadoop/hdfs/

Author: szetszwo
Date: Wed Sep 26 13:23:21 2012
New Revision: 1390466

URL: http://svn.apache.org/viewvc?rev=1390466&view=rev
Log:
HDFS-3373. Change DFSClient input stream socket cache to global static and add a thread to cleanup expired cache entries.  Contributed by John George

Added:
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestSocketCache.java
Modified:
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/SocketCache.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestConnCache.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1390466&r1=1390465&r2=1390466&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Wed Sep 26 13:23:21 2012
@@ -235,6 +235,9 @@ Release 2.0.3-alpha - Unreleased 
 
     HDFS-3939. NN RPC address cleanup. (eli)
 
+    HDFS-3373. Change DFSClient input stream socket cache to global static and add
+    a thread to cleanup expired cache entries. (John George via szetszwo)
+
   OPTIMIZATIONS
 
   BUG FIXES

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java?rev=1390466&r1=1390465&r2=1390466&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java Wed Sep 26 13:23:21 2012
@@ -39,6 +39,8 @@ import static org.apache.hadoop.hdfs.DFS
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_RETRY_WINDOW_BASE;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADER;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADER_DEFAULT;
@@ -209,6 +211,7 @@ public class DFSClient implements java.i
     final int writePacketSize;
     final int socketTimeout;
     final int socketCacheCapacity;
+    final long socketCacheExpiry;
     /** Wait time window (in msec) if BlockMissingException is caught */
     final int timeWindow;
     final int nCachedConnRetry;
@@ -257,6 +260,8 @@ public class DFSClient implements java.i
       taskId = conf.get("mapreduce.task.attempt.id", "NONMAPREDUCE");
       socketCacheCapacity = conf.getInt(DFS_CLIENT_SOCKET_CACHE_CAPACITY_KEY,
           DFS_CLIENT_SOCKET_CACHE_CAPACITY_DEFAULT);
+      socketCacheExpiry = conf.getLong(DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_KEY,
+          DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_DEFAULT);
       prefetchSize = conf.getLong(DFS_CLIENT_READ_PREFETCH_SIZE_KEY,
           10 * defaultBlockSize);
       timeWindow = conf
@@ -427,7 +432,7 @@ public class DFSClient implements java.i
       Joiner.on(',').join(localInterfaceAddrs) + "]");
     }
     
-    this.socketCache = new SocketCache(dfsClientConf.socketCacheCapacity);
+    this.socketCache = SocketCache.getInstance(dfsClientConf.socketCacheCapacity, dfsClientConf.socketCacheExpiry);
   }
 
   /**
@@ -641,7 +646,6 @@ public class DFSClient implements java.i
   void abort() {
     clientRunning = false;
     closeAllFilesBeingWritten(true);
-    socketCache.clear();
 
     try {
       // remove reference to this client and stop the renewer,
@@ -688,7 +692,6 @@ public class DFSClient implements java.i
   public synchronized void close() throws IOException {
     if(clientRunning) {
       closeAllFilesBeingWritten(false);
-      socketCache.clear();
       clientRunning = false;
       getLeaseRenewer().closeClient(this);
       // close connections to the namenode

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java?rev=1390466&r1=1390465&r2=1390466&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java Wed Sep 26 13:23:21 2012
@@ -74,6 +74,8 @@ public class DFSConfigKeys extends Commo
   public static final String  DFS_CLIENT_FAILOVER_CONNECTION_RETRIES_ON_SOCKET_TIMEOUTS_KEY = "dfs.client.failover.connection.retries.on.timeouts";
   public static final int     DFS_CLIENT_FAILOVER_CONNECTION_RETRIES_ON_SOCKET_TIMEOUTS_DEFAULT = 0;
   
+  public static final String  DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_KEY = "dfs.client.socketcache.expiryMsec";
+  public static final long    DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_DEFAULT = 2 * 60 * 1000;
   public static final String  DFS_NAMENODE_BACKUP_ADDRESS_KEY = "dfs.namenode.backup.address";
   public static final String  DFS_NAMENODE_BACKUP_ADDRESS_DEFAULT = "localhost:50100";
   public static final String  DFS_NAMENODE_BACKUP_HTTP_ADDRESS_KEY = "dfs.namenode.backup.http-address";

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/SocketCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/SocketCache.java?rev=1390466&r1=1390465&r2=1390466&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/SocketCache.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/SocketCache.java Wed Sep 26 13:23:21 2012
@@ -26,51 +26,131 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map.Entry;
 
+import java.io.IOException;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.LinkedListMultimap;
 import org.apache.commons.logging.Log;
+import org.apache.hadoop.HadoopIllegalArgumentException;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
 import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.util.Daemon;
+import org.apache.hadoop.util.StringUtils;
 
 /**
- * A cache of sockets.
+ * A cache of input stream sockets to Data Node.
  */
 class SocketCache {
-  static final Log LOG = LogFactory.getLog(SocketCache.class);
+  private static final Log LOG = LogFactory.getLog(SocketCache.class);
 
-  private final LinkedListMultimap<SocketAddress, SocketAndStreams> multimap;
-  private final int capacity;
+  @InterfaceAudience.Private
+  static class SocketAndStreams implements Closeable {
+    public final Socket sock;
+    public final IOStreamPair ioStreams;
+    long createTime;
+    
+    public SocketAndStreams(Socket s, IOStreamPair ioStreams) {
+      this.sock = s;
+      this.ioStreams = ioStreams;
+      this.createTime = System.currentTimeMillis();
+    }
+    
+    @Override
+    public void close() {
+      if (ioStreams != null) { 
+        IOUtils.closeStream(ioStreams.in);
+        IOUtils.closeStream(ioStreams.out);
+      }
+      IOUtils.closeSocket(sock);
+    }
 
-  /**
-   * Create a SocketCache with the given capacity.
-   * @param capacity  Max cache size.
-   */
-  public SocketCache(int capacity) {
-    multimap = LinkedListMultimap.create();
-    this.capacity = capacity;
-    if (capacity <= 0) {
-      LOG.debug("SocketCache disabled in configuration.");
+    public long getCreateTime() {
+      return this.createTime;
     }
   }
 
+  private Daemon daemon;
+  /** A map for per user per datanode. */
+  private static LinkedListMultimap<SocketAddress, SocketAndStreams> multimap =
+    LinkedListMultimap.create();
+  private static int capacity;
+  private static long expiryPeriod;
+  private static SocketCache scInstance = new SocketCache();
+  private static boolean isInitedOnce = false;
+ 
+  public static synchronized SocketCache getInstance(int c, long e) {
+    // capacity is only initialized once
+    if (isInitedOnce == false) {
+      capacity = c;
+      expiryPeriod = e;
+
+      if (capacity == 0 ) {
+        LOG.info("SocketCache disabled.");
+      }
+      else if (expiryPeriod == 0) {
+        throw new IllegalStateException("Cannot initialize expiryPeriod to " +
+           expiryPeriod + "when cache is enabled.");
+      }
+      isInitedOnce = true;
+    } else { //already initialized once
+      if (capacity != c || expiryPeriod != e) {
+        LOG.info("capacity and expiry periods already set to " + capacity + 
+          " and " + expiryPeriod + " respectively. Cannot set it to " + c + 
+          " and " + e);
+      }
+    }
+
+    return scInstance;
+  }
+
+  private boolean isDaemonStarted() {
+    return (daemon == null)? false: true;
+  }
+
+  private synchronized void startExpiryDaemon() {
+    // start daemon only if not already started
+    if (isDaemonStarted() == true) {
+      return;
+    }
+    
+    daemon = new Daemon(new Runnable() {
+      @Override
+      public void run() {
+        try {
+          SocketCache.this.run();
+        } catch(InterruptedException e) {
+          //noop
+        } finally {
+          SocketCache.this.clear();
+        }
+      }
+
+      @Override
+      public String toString() {
+        return String.valueOf(SocketCache.this);
+      }
+    });
+    daemon.start();
+  }
+
   /**
    * Get a cached socket to the given address.
    * @param remote  Remote address the socket is connected to.
    * @return  A socket with unknown state, possibly closed underneath. Or null.
    */
   public synchronized SocketAndStreams get(SocketAddress remote) {
+
     if (capacity <= 0) { // disabled
       return null;
     }
-    
-    List<SocketAndStreams> socklist = multimap.get(remote);
-    if (socklist == null) {
+
+    List<SocketAndStreams> sockStreamList = multimap.get(remote);
+    if (sockStreamList == null) {
       return null;
     }
 
-    Iterator<SocketAndStreams> iter = socklist.iterator();
+    Iterator<SocketAndStreams> iter = sockStreamList.iterator();
     while (iter.hasNext()) {
       SocketAndStreams candidate = iter.next();
       iter.remove();
@@ -86,14 +166,16 @@ class SocketCache {
    * @param sock socket not used by anyone.
    */
   public synchronized void put(Socket sock, IOStreamPair ioStreams) {
+
+    Preconditions.checkNotNull(sock);
     SocketAndStreams s = new SocketAndStreams(sock, ioStreams);
     if (capacity <= 0) {
       // Cache disabled.
       s.close();
       return;
     }
-    
-    Preconditions.checkNotNull(sock);
+ 
+    startExpiryDaemon();
 
     SocketAddress remoteAddr = sock.getRemoteSocketAddress();
     if (remoteAddr == null) {
@@ -106,7 +188,7 @@ class SocketCache {
     if (capacity == multimap.size()) {
       evictOldest();
     }
-    multimap.put(remoteAddr, new SocketAndStreams(sock, ioStreams));
+    multimap.put(remoteAddr, s);
   }
 
   public synchronized int size() {
@@ -114,13 +196,34 @@ class SocketCache {
   }
 
   /**
+   * Evict and close sockets older than expiry period from the cache.
+   */
+  private synchronized void evictExpired(long expiryPeriod) {
+    while (multimap.size() != 0) {
+      Iterator<Entry<SocketAddress, SocketAndStreams>> iter =
+        multimap.entries().iterator();
+      Entry<SocketAddress, SocketAndStreams> entry = iter.next();
+      // if oldest socket expired, remove it
+      if (entry == null || 
+        System.currentTimeMillis() - entry.getValue().getCreateTime() < 
+        expiryPeriod) {
+        break;
+      }
+      iter.remove();
+      SocketAndStreams s = entry.getValue();
+      s.close();
+    }
+  }
+
+  /**
    * Evict the oldest entry in the cache.
    */
   private synchronized void evictOldest() {
     Iterator<Entry<SocketAddress, SocketAndStreams>> iter =
       multimap.entries().iterator();
     if (!iter.hasNext()) {
-      throw new IllegalStateException("Cannot evict from empty cache!");
+      throw new IllegalStateException("Cannot evict from empty cache! " +
+        "capacity: " + capacity);
     }
     Entry<SocketAddress, SocketAndStreams> entry = iter.next();
     iter.remove();
@@ -129,38 +232,31 @@ class SocketCache {
   }
 
   /**
-   * Empty the cache, and close all sockets.
+   * Periodically check in the cache and expire the entries
+   * older than expiryPeriod minutes
    */
-  public synchronized void clear() {
-    for (SocketAndStreams s : multimap.values()) {
-      s.close();
+  private void run() throws InterruptedException {
+    for(long lastExpiryTime = System.currentTimeMillis();
+        !Thread.interrupted();
+        Thread.sleep(expiryPeriod)) {
+      final long elapsed = System.currentTimeMillis() - lastExpiryTime;
+      if (elapsed >= expiryPeriod) {
+        evictExpired(expiryPeriod);
+        lastExpiryTime = System.currentTimeMillis();
+      }
     }
-    multimap.clear();
-  }
-
-  @Override
-  protected void finalize() {
     clear();
+    throw new InterruptedException("Daemon Interrupted");
   }
-  
-  @InterfaceAudience.Private
-  static class SocketAndStreams implements Closeable {
-    public final Socket sock;
-    public final IOStreamPair ioStreams;
-    
-    public SocketAndStreams(Socket s, IOStreamPair ioStreams) {
-      this.sock = s;
-      this.ioStreams = ioStreams;
-    }
-    
-    @Override
-    public void close() {
-      if (ioStreams != null) { 
-        IOUtils.closeStream(ioStreams.in);
-        IOUtils.closeStream(ioStreams.out);
-      }
-      IOUtils.closeSocket(sock);
+
+  /**
+   * Empty the cache, and close all sockets.
+   */
+  private synchronized void clear() {
+    for (SocketAndStreams sockAndStream : multimap.values()) {
+      sockAndStream.close();
     }
+    multimap.clear();
   }
 
 }

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestConnCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestConnCache.java?rev=1390466&r1=1390465&r2=1390466&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestConnCache.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestConnCache.java Wed Sep 26 13:23:21 2012
@@ -25,6 +25,7 @@ import static org.mockito.Mockito.spy;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.Socket;
+import java.security.PrivilegedExceptionAction;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -54,10 +55,12 @@ public class TestConnCache {
 
   static final int BLOCK_SIZE = 4096;
   static final int FILE_SIZE = 3 * BLOCK_SIZE;
-
+  final static int CACHE_SIZE = 4;
+  final static long CACHE_EXPIRY_MS = 200;
   static Configuration conf = null;
   static MiniDFSCluster cluster = null;
   static FileSystem fs = null;
+  static SocketCache cache;
 
   static final Path testFile = new Path("/testConnCache.dat");
   static byte authenticData[] = null;
@@ -93,6 +96,9 @@ public class TestConnCache {
   public static void setupCluster() throws Exception {
     final int REPLICATION_FACTOR = 1;
 
+    /* create a socket cache. There is only one socket cache per jvm */
+    cache = SocketCache.getInstance(CACHE_SIZE, CACHE_EXPIRY_MS);
+
     util = new BlockReaderTestUtil(REPLICATION_FACTOR);
     cluster = util.getCluster();
     conf = util.getConf();
@@ -142,10 +148,7 @@ public class TestConnCache {
    * Test the SocketCache itself.
    */
   @Test
-  public void testSocketCache() throws IOException {
-    final int CACHE_SIZE = 4;
-    SocketCache cache = new SocketCache(CACHE_SIZE);
-
+  public void testSocketCache() throws Exception {
     // Make a client
     InetSocketAddress nnAddr =
         new InetSocketAddress("localhost", cluster.getNameNodePort());
@@ -159,6 +162,7 @@ public class TestConnCache {
     DataNode dn = util.getDataNode(block);
     InetSocketAddress dnAddr = dn.getXferAddress();
 
+
     // Make some sockets to the DN
     Socket[] dnSockets = new Socket[CACHE_SIZE];
     for (int i = 0; i < dnSockets.length; ++i) {
@@ -166,6 +170,7 @@ public class TestConnCache {
           dnAddr.getAddress(), dnAddr.getPort());
     }
 
+
     // Insert a socket to the NN
     Socket nnSock = new Socket(nnAddr.getAddress(), nnAddr.getPort());
     cache.put(nnSock, null);
@@ -179,7 +184,7 @@ public class TestConnCache {
 
     assertEquals("NN socket evicted", null, cache.get(nnAddr));
     assertTrue("Evicted socket closed", nnSock.isClosed());
-
+ 
     // Lookup the DN socks
     for (Socket dnSock : dnSockets) {
       assertEquals("Retrieve cached sockets", dnSock, cache.get(dnAddr).sock);
@@ -189,6 +194,51 @@ public class TestConnCache {
     assertEquals("Cache is empty", 0, cache.size());
   }
 
+
+  /**
+   * Test the SocketCache expiry.
+   * Verify that socket cache entries expire after the set
+   * expiry time.
+   */
+  @Test
+  public void testSocketCacheExpiry() throws Exception {
+    // Make a client
+    InetSocketAddress nnAddr =
+        new InetSocketAddress("localhost", cluster.getNameNodePort());
+    DFSClient client = new DFSClient(nnAddr, conf);
+
+    // Find out the DN addr
+    LocatedBlock block =
+        client.getNamenode().getBlockLocations(
+            testFile.toString(), 0, FILE_SIZE)
+        .getLocatedBlocks().get(0);
+    DataNode dn = util.getDataNode(block);
+    InetSocketAddress dnAddr = dn.getXferAddress();
+
+
+    // Make some sockets to the DN and put in cache
+    Socket[] dnSockets = new Socket[CACHE_SIZE];
+    for (int i = 0; i < dnSockets.length; ++i) {
+      dnSockets[i] = client.socketFactory.createSocket(
+          dnAddr.getAddress(), dnAddr.getPort());
+      cache.put(dnSockets[i], null);
+    }
+
+    // Client side still has the sockets cached
+    assertEquals(CACHE_SIZE, client.socketCache.size());
+
+    //sleep for a second and see if it expired
+    Thread.sleep(CACHE_EXPIRY_MS + 1000);
+    
+    // Client side has no sockets cached
+    assertEquals(0, client.socketCache.size());
+
+    //sleep for another second and see if 
+    //the daemon thread runs fine on empty cache
+    Thread.sleep(CACHE_EXPIRY_MS + 1000);
+  }
+
+
   /**
    * Read a file served entirely from one DN. Seek around and read from
    * different offsets. And verify that they all use the same socket.
@@ -229,33 +279,6 @@ public class TestConnCache {
 
     in.close();
   }
-  
-  /**
-   * Test that the socket cache can be disabled by setting the capacity to
-   * 0. Regression test for HDFS-3365.
-   */
-  @Test
-  public void testDisableCache() throws IOException {
-    LOG.info("Starting testDisableCache()");
-
-    // Reading with the normally configured filesystem should
-    // cache a socket.
-    DFSTestUtil.readFile(fs, testFile);
-    assertEquals(1, ((DistributedFileSystem)fs).dfs.socketCache.size());
-    
-    // Configure a new instance with no caching, ensure that it doesn't
-    // cache anything
-    Configuration confWithoutCache = new Configuration(fs.getConf());
-    confWithoutCache.setInt(
-        DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_KEY, 0);
-    FileSystem fsWithoutCache = FileSystem.newInstance(confWithoutCache);
-    try {
-      DFSTestUtil.readFile(fsWithoutCache, testFile);
-      assertEquals(0, ((DistributedFileSystem)fsWithoutCache).dfs.socketCache.size());
-    } finally {
-      fsWithoutCache.close();
-    }
-  }
 
   @AfterClass
   public static void teardownCluster() throws Exception {

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java?rev=1390466&r1=1390465&r2=1390466&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java Wed Sep 26 13:23:21 2012
@@ -120,12 +120,9 @@ public class TestDistributedFileSystem {
       DFSTestUtil.readFile(fileSys, p);
       
       DFSClient client = ((DistributedFileSystem)fileSys).dfs;
-      SocketCache cache = client.socketCache;
-      assertEquals(1, cache.size());
 
       fileSys.close();
       
-      assertEquals(0, cache.size());
     } finally {
       if (cluster != null) {cluster.shutdown();}
     }

Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestSocketCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestSocketCache.java?rev=1390466&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestSocketCache.java (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestSocketCache.java Wed Sep 26 13:23:21 2012
@@ -0,0 +1,171 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.spy;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.security.PrivilegedExceptionAction;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.security.token.Token;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockito.Matchers;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+/**
+ * This class tests the client connection caching in a single node
+ * mini-cluster.
+ */
+public class TestSocketCache {
+  static final Log LOG = LogFactory.getLog(TestSocketCache.class);
+
+  static final int BLOCK_SIZE = 4096;
+  static final int FILE_SIZE = 3 * BLOCK_SIZE;
+  final static int CACHE_SIZE = 4;
+  final static long CACHE_EXPIRY_MS = 200;
+  static Configuration conf = null;
+  static MiniDFSCluster cluster = null;
+  static FileSystem fs = null;
+  static SocketCache cache;
+
+  static final Path testFile = new Path("/testConnCache.dat");
+  static byte authenticData[] = null;
+
+  static BlockReaderTestUtil util = null;
+
+
+  /**
+   * A mock Answer to remember the BlockReader used.
+   *
+   * It verifies that all invocation to DFSInputStream.getBlockReader()
+   * use the same socket.
+   */
+  private class MockGetBlockReader implements Answer<RemoteBlockReader2> {
+    public RemoteBlockReader2 reader = null;
+    private Socket sock = null;
+
+    @Override
+    public RemoteBlockReader2 answer(InvocationOnMock invocation) throws Throwable {
+      RemoteBlockReader2 prevReader = reader;
+      reader = (RemoteBlockReader2) invocation.callRealMethod();
+      if (sock == null) {
+        sock = reader.dnSock;
+      } else if (prevReader != null) {
+        assertSame("DFSInputStream should use the same socket",
+                   sock, reader.dnSock);
+      }
+      return reader;
+    }
+  }
+
+  @BeforeClass
+  public static void setupCluster() throws Exception {
+    final int REPLICATION_FACTOR = 1;
+
+    HdfsConfiguration confWithoutCache = new HdfsConfiguration();
+    confWithoutCache.setInt(
+        DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_KEY, 0);
+    util = new BlockReaderTestUtil(REPLICATION_FACTOR, confWithoutCache);
+    cluster = util.getCluster();
+    conf = util.getConf();
+
+    authenticData = util.writeFile(testFile, FILE_SIZE / 1024);
+  }
+
+
+  /**
+   * (Optionally) seek to position, read and verify data.
+   *
+   * Seek to specified position if pos is non-negative.
+   */
+  private void pread(DFSInputStream in,
+                     long pos,
+                     byte[] buffer,
+                     int offset,
+                     int length)
+      throws IOException {
+    assertTrue("Test buffer too small", buffer.length >= offset + length);
+
+    if (pos >= 0)
+      in.seek(pos);
+
+    LOG.info("Reading from file of size " + in.getFileLength() +
+             " at offset " + in.getPos());
+
+    while (length > 0) {
+      int cnt = in.read(buffer, offset, length);
+      assertTrue("Error in read", cnt > 0);
+      offset += cnt;
+      length -= cnt;
+    }
+
+    // Verify
+    for (int i = 0; i < length; ++i) {
+      byte actual = buffer[i];
+      byte expect = authenticData[(int)pos + i];
+      assertEquals("Read data mismatch at file offset " + (pos + i) +
+                   ". Expects " + expect + "; got " + actual,
+                   actual, expect);
+    }
+  }
+
+  
+  /**
+   * Test that the socket cache can be disabled by setting the capacity to
+   * 0. Regression test for HDFS-3365.
+   */
+  @Test
+  public void testDisableCache() throws IOException {
+    LOG.info("Starting testDisableCache()");
+
+    // Configure a new instance with no caching, ensure that it doesn't
+    // cache anything
+
+    FileSystem fsWithoutCache = FileSystem.newInstance(conf);
+    try {
+      DFSTestUtil.readFile(fsWithoutCache, testFile);
+      assertEquals(0, ((DistributedFileSystem)fsWithoutCache).dfs.socketCache.size());
+    } finally {
+      fsWithoutCache.close();
+    }
+  }
+
+  @AfterClass
+  public static void teardownCluster() throws Exception {
+    util.shutdown();
+  }
+}