You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by bo...@apache.org on 2012/09/28 18:44:21 UTC

svn commit: r1391542 - in /hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs: ./ src/main/java/org/apache/hadoop/hdfs/ src/test/java/org/apache/hadoop/hdfs/

Author: bobby
Date: Fri Sep 28 16:44:20 2012
New Revision: 1391542

URL: http://svn.apache.org/viewvc?rev=1391542&view=rev
Log:
HDFS-3373. FileContext HDFS implementation can leak socket caches (John George via bobby)

Modified:
    hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
    hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
    hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
    hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/SocketCache.java
    hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestConnCache.java
    hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java

Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1391542&r1=1391541&r2=1391542&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Fri Sep 28 16:44:20 2012
@@ -24,6 +24,9 @@ Release 0.23.4 - UNRELEASED
     HDFS-3731. Release upgrade must handle blocks being written from 1.0
     (Kihwal Lee via daryn)
 
+    HDFS-3373. FileContext HDFS implementation can leak socket caches (John
+    George via bobby)
+
 Release 0.23.3 - UNRELEASED
 
   INCOMPATIBLE CHANGES

Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java?rev=1391542&r1=1391541&r2=1391542&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java Fri Sep 28 16:44:20 2012
@@ -34,6 +34,8 @@ import static org.apache.hadoop.hdfs.DFS
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_RETRY_WINDOW_BASE;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADER;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADER_DEFAULT;
@@ -178,6 +180,7 @@ public class DFSClient implements java.i
     final int writePacketSize;
     final int socketTimeout;
     final int socketCacheCapacity;
+    final long socketCacheExpiry;
     /** Wait time window (in msec) if BlockMissingException is caught */
     final int timeWindow;
     final int nCachedConnRetry;
@@ -212,6 +215,8 @@ public class DFSClient implements java.i
       taskId = conf.get("mapreduce.task.attempt.id", "NONMAPREDUCE");
       socketCacheCapacity = conf.getInt(DFS_CLIENT_SOCKET_CACHE_CAPACITY_KEY,
           DFS_CLIENT_SOCKET_CACHE_CAPACITY_DEFAULT);
+      socketCacheExpiry = conf.getLong(DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_KEY,
+          DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_DEFAULT);
       prefetchSize = conf.getLong(DFS_CLIENT_READ_PREFETCH_SIZE_KEY,
           10 * defaultBlockSize);
       timeWindow = conf
@@ -336,7 +341,7 @@ public class DFSClient implements java.i
         nameNodeAddr.getHostName() + ":" + nameNodeAddr.getPort();
     this.clientName = "DFSClient_" + dfsClientConf.taskId + "_" + 
          DFSUtil.getRandom().nextInt()  + "_" + Thread.currentThread().getId();
-    this.socketCache = new SocketCache(dfsClientConf.socketCacheCapacity);
+    this.socketCache = SocketCache.getInstance(dfsClientConf.socketCacheCapacity, dfsClientConf.socketCacheExpiry);
     if (nameNodeAddr != null && rpcNamenode == null) {
       this.rpcNamenode = DFSUtil.createRPCNamenode(nameNodeAddr, conf, ugi);
       this.namenode = DFSUtil.createNamenode(this.rpcNamenode);
@@ -506,7 +511,6 @@ public class DFSClient implements java.i
   void abort() {
     clientRunning = false;
     closeAllFilesBeingWritten(true);
-    socketCache.clear();
     try {
       // remove reference to this client and stop the renewer,
       // if there is no more clients under the renewer.
@@ -551,7 +555,6 @@ public class DFSClient implements java.i
   public synchronized void close() throws IOException {
     if(clientRunning) {
       closeAllFilesBeingWritten(false);
-      socketCache.clear();
       clientRunning = false;
       getLeaseRenewer().closeClient(this);
       // close connections to the namenode

Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java?rev=1391542&r1=1391541&r2=1391542&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java Fri Sep 28 16:44:20 2012
@@ -49,6 +49,8 @@ public class DFSConfigKeys extends Commo
   public static final String  DFS_CLIENT_SOCKET_CACHE_CAPACITY_KEY = "dfs.client.socketcache.capacity";
   public static final int     DFS_CLIENT_SOCKET_CACHE_CAPACITY_DEFAULT = 16;
   
+  public static final String  DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_KEY = "dfs.client.socketcache.expiryMsec";
+  public static final long    DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_DEFAULT = 2 * 60 * 1000;
   public static final String  DFS_NAMENODE_BACKUP_ADDRESS_KEY = "dfs.namenode.backup.address";
   public static final String  DFS_NAMENODE_BACKUP_ADDRESS_DEFAULT = "localhost:50100";
   public static final String  DFS_NAMENODE_BACKUP_HTTP_ADDRESS_KEY = "dfs.namenode.backup.http-address";

Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/SocketCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/SocketCache.java?rev=1391542&r1=1391541&r2=1391542&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/SocketCache.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/SocketCache.java Fri Sep 28 16:44:20 2012
@@ -25,28 +25,103 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map.Entry;
 
+import java.io.IOException;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.LinkedListMultimap;
 import org.apache.commons.logging.Log;
+import org.apache.hadoop.HadoopIllegalArgumentException;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.util.Daemon;
+import org.apache.hadoop.util.StringUtils;
 
 /**
- * A cache of sockets.
+ * A cache of input stream sockets to Data Node.
  */
 class SocketCache {
-  static final Log LOG = LogFactory.getLog(SocketCache.class);
+  private static final Log LOG = LogFactory.getLog(SocketCache.class);
+  private Daemon daemon;
+  /** A map for per user per datanode. */
+  private static LinkedListMultimap<SocketAddress, SocketProp> multimap =
+    LinkedListMultimap.create();
+  private static int capacity;
+  private static long expiryPeriod;
+  private static SocketCache scInstance = new SocketCache();
 
-  private final LinkedListMultimap<SocketAddress, Socket> multimap;
-  private final int capacity;
+  private static class SocketProp {
+    Socket s;
+    long createTime;
+    public SocketProp(Socket s)
+    {
+      this.s=s;
+      this.createTime = System.currentTimeMillis();
+    }
 
-  /**
-   * Create a SocketCache with the given capacity.
-   * @param capacity  Max cache size.
-   */
-  public SocketCache(int capacity) {
-    multimap = LinkedListMultimap.create();
-    this.capacity = capacity;
+    public long getCreateTime() {
+      return this.createTime;
+    }
+
+    public Socket getSocket() {
+      return this.s;
+    }
+  }
+
+  // capacity and expiryPeriod are only initialized once.
+  private static boolean isInitedOnce() {
+    if (capacity == 0 || expiryPeriod == 0) {
+      return false;
+    }
+    return true;
+  }
+
+  public static synchronized SocketCache getInstance(int c, long e) {
+
+    if (c == 0 || e == 0) {
+      throw new IllegalStateException("Cannot initialize ZERO capacity " +
+        "or expiryPeriod");
+    }
+
+    // capacity is only initialzied once
+    if (isInitedOnce() == false) {
+      capacity = c;
+      expiryPeriod = e;
+    } else if (capacity != c || expiryPeriod != e) {
+      LOG.info("capacity and expiry periods already set to " + capacity +
+        " and " + expiryPeriod + " respectively. Cannot set it to " + c +
+        " and " + e);
+    }
+
+    return scInstance;
+  }
+
+  private boolean isDaemonStarted() {
+    return (daemon == null)? false: true;
+  }
+
+  private synchronized void startExpiryDaemon() {
+    // start daemon only if not already started
+    if (isDaemonStarted() == true) {
+      return;
+    }
+    
+    daemon = new Daemon(new Runnable() {
+      @Override
+      public void run() {
+        try {
+          SocketCache.this.run();
+        } catch(InterruptedException e) {
+          //noop
+        } finally {
+          SocketCache.this.clear();
+        }
+      }
+
+      @Override
+      public String toString() {
+        return String.valueOf(SocketCache.this);
+      }
+    });
+    daemon.start();
   }
 
   /**
@@ -55,14 +130,14 @@ class SocketCache {
    * @return  A socket with unknown state, possibly closed underneath. Or null.
    */
   public synchronized Socket get(SocketAddress remote) {
-    List<Socket> socklist = multimap.get(remote);
-    if (socklist == null) {
+    List<SocketProp> sockPropList = multimap.get(remote);
+    if (sockPropList == null) {
       return null;
     }
 
-    Iterator<Socket> iter = socklist.iterator();
+    Iterator<SocketProp> iter = sockPropList.iterator();
     while (iter.hasNext()) {
-      Socket candidate = iter.next();
+      Socket candidate = iter.next().getSocket();
       iter.remove();
       if (!candidate.isClosed()) {
         return candidate;
@@ -76,7 +151,9 @@ class SocketCache {
    * @param sock socket not used by anyone.
    */
   public synchronized void put(Socket sock) {
+
     Preconditions.checkNotNull(sock);
+    startExpiryDaemon();
 
     SocketAddress remoteAddr = sock.getRemoteSocketAddress();
     if (remoteAddr == null) {
@@ -89,7 +166,7 @@ class SocketCache {
     if (capacity == multimap.size()) {
       evictOldest();
     }
-    multimap.put(remoteAddr, sock);
+    multimap.put(remoteAddr, new SocketProp (sock));
   }
 
   public synchronized int size() {
@@ -97,32 +174,67 @@ class SocketCache {
   }
 
   /**
+   * Evict and close sockets older than expiry period from the cache.
+   */
+  private synchronized void evictExpired(long expiryPeriod) {
+    while (multimap.size() != 0) {
+      Iterator<Entry<SocketAddress, SocketProp>> iter =
+        multimap.entries().iterator();
+      Entry<SocketAddress, SocketProp> entry = iter.next();
+
+      // if oldest socket expired, remove it
+      if (entry == null || 
+        System.currentTimeMillis() - entry.getValue().getCreateTime() < 
+        expiryPeriod) {
+        break;
+      }
+      iter.remove();
+      Socket sock = entry.getValue().getSocket();
+      IOUtils.closeSocket(sock);
+    }
+  }
+
+  /**
    * Evict the oldest entry in the cache.
    */
   private synchronized void evictOldest() {
-    Iterator<Entry<SocketAddress, Socket>> iter =
+    Iterator<Entry<SocketAddress, SocketProp>> iter =
       multimap.entries().iterator();
     if (!iter.hasNext()) {
       throw new IllegalStateException("Cannot evict from empty cache!");
     }
-    Entry<SocketAddress, Socket> entry = iter.next();
+    Entry<SocketAddress, SocketProp> entry = iter.next();
     iter.remove();
-    Socket sock = entry.getValue();
+    Socket sock = entry.getValue().getSocket();
     IOUtils.closeSocket(sock);
   }
 
   /**
-   * Empty the cache, and close all sockets.
+   * Periodically check in the cache and expire the entries
+   * older than expiryPeriod minutes
    */
-  public synchronized void clear() {
-    for (Socket sock : multimap.values()) {
-      IOUtils.closeSocket(sock);
+  private void run() throws InterruptedException {
+    for(long lastExpiryTime = System.currentTimeMillis();
+        !Thread.interrupted();
+        Thread.sleep(expiryPeriod)) {
+      final long elapsed = System.currentTimeMillis() - lastExpiryTime;
+      if (elapsed >= expiryPeriod) {
+        evictExpired(expiryPeriod);
+        lastExpiryTime = System.currentTimeMillis();
+      }
     }
-    multimap.clear();
+    clear();
+    throw new InterruptedException("Daemon Interrupted");
   }
 
-  protected void finalize() {
-    clear();
+  /**
+   * Empty the cache, and close all sockets.
+   */
+  private synchronized void clear() {
+    for (SocketProp sockProp : multimap.values()) {
+      IOUtils.closeSocket(sockProp.getSocket());
+    }
+    multimap.clear();
   }
 
 }

Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestConnCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestConnCache.java?rev=1391542&r1=1391541&r2=1391542&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestConnCache.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestConnCache.java Fri Sep 28 16:44:20 2012
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs;
 import java.net.InetSocketAddress;
 import java.net.Socket;
 import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -56,10 +57,12 @@ public class TestConnCache {
 
   static final int BLOCK_SIZE = 4096;
   static final int FILE_SIZE = 3 * BLOCK_SIZE;
-
+  final static int CACHE_SIZE = 4;
+  final static long CACHE_EXPIRY_MS = 200;
   static Configuration conf = null;
   static MiniDFSCluster cluster = null;
   static FileSystem fs = null;
+  static SocketCache cache;
 
   static final Path testFile = new Path("/testConnCache.dat");
   static byte authenticData[] = null;
@@ -94,6 +97,9 @@ public class TestConnCache {
   public static void setupCluster() throws Exception {
     final int REPLICATION_FACTOR = 1;
 
+    /* create a socket cache. There is only one socket cache per jvm */
+    cache = SocketCache.getInstance(CACHE_SIZE, CACHE_EXPIRY_MS);
+
     util = new BlockReaderTestUtil(REPLICATION_FACTOR);
     cluster = util.getCluster();
     conf = util.getConf();
@@ -143,10 +149,7 @@ public class TestConnCache {
    * Test the SocketCache itself.
    */
   @Test
-  public void testSocketCache() throws IOException {
-    final int CACHE_SIZE = 4;
-    SocketCache cache = new SocketCache(CACHE_SIZE);
-
+  public void testSocketCache() throws Exception {
     // Make a client
     InetSocketAddress nnAddr =
         new InetSocketAddress("localhost", cluster.getNameNodePort());
@@ -160,6 +163,7 @@ public class TestConnCache {
     DataNode dn = util.getDataNode(block);
     InetSocketAddress dnAddr = dn.getSelfAddr();
 
+
     // Make some sockets to the DN
     Socket[] dnSockets = new Socket[CACHE_SIZE];
     for (int i = 0; i < dnSockets.length; ++i) {
@@ -167,6 +171,7 @@ public class TestConnCache {
           dnAddr.getAddress(), dnAddr.getPort());
     }
 
+
     // Insert a socket to the NN
     Socket nnSock = new Socket(nnAddr.getAddress(), nnAddr.getPort());
     cache.put(nnSock);
@@ -180,7 +185,7 @@ public class TestConnCache {
 
     assertEquals("NN socket evicted", null, cache.get(nnAddr));
     assertTrue("Evicted socket closed", nnSock.isClosed());
-
+ 
     // Lookup the DN socks
     for (Socket dnSock : dnSockets) {
       assertEquals("Retrieve cached sockets", dnSock, cache.get(dnAddr));
@@ -190,6 +195,51 @@ public class TestConnCache {
     assertEquals("Cache is empty", 0, cache.size());
   }
 
+
+  /**
+   * Test the SocketCache expiry.
+   * Verify that socket cache entries expire after the set
+   * expiry time.
+   */
+  @Test
+  public void testSocketCacheExpiry() throws Exception {
+    // Make a client
+    InetSocketAddress nnAddr =
+        new InetSocketAddress("localhost", cluster.getNameNodePort());
+    DFSClient client = new DFSClient(nnAddr, conf);
+
+    // Find out the DN addr
+    LocatedBlock block =
+        client.getNamenode().getBlockLocations(
+            testFile.toString(), 0, FILE_SIZE)
+        .getLocatedBlocks().get(0);
+    DataNode dn = util.getDataNode(block);
+    InetSocketAddress dnAddr = dn.getSelfAddr();
+
+
+    // Make some sockets to the DN and put in cache
+    Socket[] dnSockets = new Socket[CACHE_SIZE];
+    for (int i = 0; i < dnSockets.length; ++i) {
+      dnSockets[i] = client.socketFactory.createSocket(
+          dnAddr.getAddress(), dnAddr.getPort());
+      cache.put(dnSockets[i]);
+    }
+
+    // Client side still has the sockets cached
+    assertEquals(CACHE_SIZE, client.socketCache.size());
+
+    //sleep for a second and see if it expired
+    Thread.sleep(CACHE_EXPIRY_MS + 1000);
+    
+    // Client side has no sockets cached
+    assertEquals(0, client.socketCache.size());
+
+    //hang in for a second to ensure the thread 
+    // does well even when cache is empty
+    Thread.sleep(CACHE_EXPIRY_MS + 1000);
+  }
+
+
   /**
    * Read a file served entirely from one DN. Seek around and read from
    * different offsets. And verify that they all use the same socket.

Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java?rev=1391542&r1=1391541&r2=1391542&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java Fri Sep 28 16:44:20 2012
@@ -89,7 +89,6 @@ public class TestDistributedFileSystem {
   /**
    * Tests DFSClient.close throws no ConcurrentModificationException if 
    * multiple files are open.
-   * Also tests that any cached sockets are closed. (HDFS-3359)
    */
   @Test
   public void testDFSClose() throws Exception {
@@ -109,12 +108,9 @@ public class TestDistributedFileSystem {
       DFSTestUtil.readFile(fileSys, p);
       
       DFSClient client = ((DistributedFileSystem)fileSys).dfs;
-      SocketCache cache = client.socketCache;
-      assertEquals(1, cache.size());
 
       fileSys.close();
       
-      assertEquals(0, cache.size());
     } finally {
       if (cluster != null) {cluster.shutdown();}
     }