You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by bo...@apache.org on 2012/09/28 18:44:21 UTC
svn commit: r1391542 - in
/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs: ./
src/main/java/org/apache/hadoop/hdfs/ src/test/java/org/apache/hadoop/hdfs/
Author: bobby
Date: Fri Sep 28 16:44:20 2012
New Revision: 1391542
URL: http://svn.apache.org/viewvc?rev=1391542&view=rev
Log:
HDFS-3373. FileContext HDFS implementation can leak socket caches (John George via bobby)
Modified:
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/SocketCache.java
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestConnCache.java
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java
Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1391542&r1=1391541&r2=1391542&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Fri Sep 28 16:44:20 2012
@@ -24,6 +24,9 @@ Release 0.23.4 - UNRELEASED
HDFS-3731. Release upgrade must handle blocks being written from 1.0
(Kihwal Lee via daryn)
+ HDFS-3373. FileContext HDFS implementation can leak socket caches (John
+ George via bobby)
+
Release 0.23.3 - UNRELEASED
INCOMPATIBLE CHANGES
Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java?rev=1391542&r1=1391541&r2=1391542&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java Fri Sep 28 16:44:20 2012
@@ -34,6 +34,8 @@ import static org.apache.hadoop.hdfs.DFS
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_RETRY_WINDOW_BASE;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADER;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADER_DEFAULT;
@@ -178,6 +180,7 @@ public class DFSClient implements java.i
final int writePacketSize;
final int socketTimeout;
final int socketCacheCapacity;
+ final long socketCacheExpiry;
/** Wait time window (in msec) if BlockMissingException is caught */
final int timeWindow;
final int nCachedConnRetry;
@@ -212,6 +215,8 @@ public class DFSClient implements java.i
taskId = conf.get("mapreduce.task.attempt.id", "NONMAPREDUCE");
socketCacheCapacity = conf.getInt(DFS_CLIENT_SOCKET_CACHE_CAPACITY_KEY,
DFS_CLIENT_SOCKET_CACHE_CAPACITY_DEFAULT);
+ socketCacheExpiry = conf.getLong(DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_KEY,
+ DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_DEFAULT);
prefetchSize = conf.getLong(DFS_CLIENT_READ_PREFETCH_SIZE_KEY,
10 * defaultBlockSize);
timeWindow = conf
@@ -336,7 +341,7 @@ public class DFSClient implements java.i
nameNodeAddr.getHostName() + ":" + nameNodeAddr.getPort();
this.clientName = "DFSClient_" + dfsClientConf.taskId + "_" +
DFSUtil.getRandom().nextInt() + "_" + Thread.currentThread().getId();
- this.socketCache = new SocketCache(dfsClientConf.socketCacheCapacity);
+ this.socketCache = SocketCache.getInstance(dfsClientConf.socketCacheCapacity, dfsClientConf.socketCacheExpiry);
if (nameNodeAddr != null && rpcNamenode == null) {
this.rpcNamenode = DFSUtil.createRPCNamenode(nameNodeAddr, conf, ugi);
this.namenode = DFSUtil.createNamenode(this.rpcNamenode);
@@ -506,7 +511,6 @@ public class DFSClient implements java.i
void abort() {
clientRunning = false;
closeAllFilesBeingWritten(true);
- socketCache.clear();
try {
// remove reference to this client and stop the renewer,
// if there is no more clients under the renewer.
@@ -551,7 +555,6 @@ public class DFSClient implements java.i
public synchronized void close() throws IOException {
if(clientRunning) {
closeAllFilesBeingWritten(false);
- socketCache.clear();
clientRunning = false;
getLeaseRenewer().closeClient(this);
// close connections to the namenode
Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java?rev=1391542&r1=1391541&r2=1391542&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java Fri Sep 28 16:44:20 2012
@@ -49,6 +49,8 @@ public class DFSConfigKeys extends Commo
public static final String DFS_CLIENT_SOCKET_CACHE_CAPACITY_KEY = "dfs.client.socketcache.capacity";
public static final int DFS_CLIENT_SOCKET_CACHE_CAPACITY_DEFAULT = 16;
+ public static final String DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_KEY = "dfs.client.socketcache.expiryMsec";
+ public static final long DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_DEFAULT = 2 * 60 * 1000;
public static final String DFS_NAMENODE_BACKUP_ADDRESS_KEY = "dfs.namenode.backup.address";
public static final String DFS_NAMENODE_BACKUP_ADDRESS_DEFAULT = "localhost:50100";
public static final String DFS_NAMENODE_BACKUP_HTTP_ADDRESS_KEY = "dfs.namenode.backup.http-address";
Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/SocketCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/SocketCache.java?rev=1391542&r1=1391541&r2=1391542&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/SocketCache.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/SocketCache.java Fri Sep 28 16:44:20 2012
@@ -25,28 +25,103 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map.Entry;
+import java.io.IOException;
import com.google.common.base.Preconditions;
import com.google.common.collect.LinkedListMultimap;
import org.apache.commons.logging.Log;
+import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.util.Daemon;
+import org.apache.hadoop.util.StringUtils;
/**
- * A cache of sockets.
+ * A cache of input stream sockets to Data Node.
*/
class SocketCache {
- static final Log LOG = LogFactory.getLog(SocketCache.class);
+ private static final Log LOG = LogFactory.getLog(SocketCache.class);
+ private Daemon daemon;
+ /** A map for per user per datanode. */
+ private static LinkedListMultimap<SocketAddress, SocketProp> multimap =
+ LinkedListMultimap.create();
+ private static int capacity;
+ private static long expiryPeriod;
+ private static SocketCache scInstance = new SocketCache();
- private final LinkedListMultimap<SocketAddress, Socket> multimap;
- private final int capacity;
+ private static class SocketProp {
+ Socket s;
+ long createTime;
+ public SocketProp(Socket s)
+ {
+ this.s=s;
+ this.createTime = System.currentTimeMillis();
+ }
- /**
- * Create a SocketCache with the given capacity.
- * @param capacity Max cache size.
- */
- public SocketCache(int capacity) {
- multimap = LinkedListMultimap.create();
- this.capacity = capacity;
+ public long getCreateTime() {
+ return this.createTime;
+ }
+
+ public Socket getSocket() {
+ return this.s;
+ }
+ }
+
+ // capacity and expiryPeriod are only initialized once.
+ private static boolean isInitedOnce() {
+ if (capacity == 0 || expiryPeriod == 0) {
+ return false;
+ }
+ return true;
+ }
+
+ public static synchronized SocketCache getInstance(int c, long e) {
+
+ if (c == 0 || e == 0) {
+ throw new IllegalStateException("Cannot initialize ZERO capacity " +
+ "or expiryPeriod");
+ }
+
+ // capacity is only initialzied once
+ if (isInitedOnce() == false) {
+ capacity = c;
+ expiryPeriod = e;
+ } else if (capacity != c || expiryPeriod != e) {
+ LOG.info("capacity and expiry periods already set to " + capacity +
+ " and " + expiryPeriod + " respectively. Cannot set it to " + c +
+ " and " + e);
+ }
+
+ return scInstance;
+ }
+
+ private boolean isDaemonStarted() {
+ return (daemon == null)? false: true;
+ }
+
+ private synchronized void startExpiryDaemon() {
+ // start daemon only if not already started
+ if (isDaemonStarted() == true) {
+ return;
+ }
+
+ daemon = new Daemon(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ SocketCache.this.run();
+ } catch(InterruptedException e) {
+ //noop
+ } finally {
+ SocketCache.this.clear();
+ }
+ }
+
+ @Override
+ public String toString() {
+ return String.valueOf(SocketCache.this);
+ }
+ });
+ daemon.start();
}
/**
@@ -55,14 +130,14 @@ class SocketCache {
* @return A socket with unknown state, possibly closed underneath. Or null.
*/
public synchronized Socket get(SocketAddress remote) {
- List<Socket> socklist = multimap.get(remote);
- if (socklist == null) {
+ List<SocketProp> sockPropList = multimap.get(remote);
+ if (sockPropList == null) {
return null;
}
- Iterator<Socket> iter = socklist.iterator();
+ Iterator<SocketProp> iter = sockPropList.iterator();
while (iter.hasNext()) {
- Socket candidate = iter.next();
+ Socket candidate = iter.next().getSocket();
iter.remove();
if (!candidate.isClosed()) {
return candidate;
@@ -76,7 +151,9 @@ class SocketCache {
* @param sock socket not used by anyone.
*/
public synchronized void put(Socket sock) {
+
Preconditions.checkNotNull(sock);
+ startExpiryDaemon();
SocketAddress remoteAddr = sock.getRemoteSocketAddress();
if (remoteAddr == null) {
@@ -89,7 +166,7 @@ class SocketCache {
if (capacity == multimap.size()) {
evictOldest();
}
- multimap.put(remoteAddr, sock);
+ multimap.put(remoteAddr, new SocketProp (sock));
}
public synchronized int size() {
@@ -97,32 +174,67 @@ class SocketCache {
}
/**
+ * Evict and close sockets older than expiry period from the cache.
+ */
+ private synchronized void evictExpired(long expiryPeriod) {
+ while (multimap.size() != 0) {
+ Iterator<Entry<SocketAddress, SocketProp>> iter =
+ multimap.entries().iterator();
+ Entry<SocketAddress, SocketProp> entry = iter.next();
+
+ // if oldest socket expired, remove it
+ if (entry == null ||
+ System.currentTimeMillis() - entry.getValue().getCreateTime() <
+ expiryPeriod) {
+ break;
+ }
+ iter.remove();
+ Socket sock = entry.getValue().getSocket();
+ IOUtils.closeSocket(sock);
+ }
+ }
+
+ /**
* Evict the oldest entry in the cache.
*/
private synchronized void evictOldest() {
- Iterator<Entry<SocketAddress, Socket>> iter =
+ Iterator<Entry<SocketAddress, SocketProp>> iter =
multimap.entries().iterator();
if (!iter.hasNext()) {
throw new IllegalStateException("Cannot evict from empty cache!");
}
- Entry<SocketAddress, Socket> entry = iter.next();
+ Entry<SocketAddress, SocketProp> entry = iter.next();
iter.remove();
- Socket sock = entry.getValue();
+ Socket sock = entry.getValue().getSocket();
IOUtils.closeSocket(sock);
}
/**
- * Empty the cache, and close all sockets.
+ * Periodically check in the cache and expire the entries
+ * older than expiryPeriod minutes
*/
- public synchronized void clear() {
- for (Socket sock : multimap.values()) {
- IOUtils.closeSocket(sock);
+ private void run() throws InterruptedException {
+ for(long lastExpiryTime = System.currentTimeMillis();
+ !Thread.interrupted();
+ Thread.sleep(expiryPeriod)) {
+ final long elapsed = System.currentTimeMillis() - lastExpiryTime;
+ if (elapsed >= expiryPeriod) {
+ evictExpired(expiryPeriod);
+ lastExpiryTime = System.currentTimeMillis();
+ }
}
- multimap.clear();
+ clear();
+ throw new InterruptedException("Daemon Interrupted");
}
- protected void finalize() {
- clear();
+ /**
+ * Empty the cache, and close all sockets.
+ */
+ private synchronized void clear() {
+ for (SocketProp sockProp : multimap.values()) {
+ IOUtils.closeSocket(sockProp.getSocket());
+ }
+ multimap.clear();
}
}
Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestConnCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestConnCache.java?rev=1391542&r1=1391541&r2=1391542&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestConnCache.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestConnCache.java Fri Sep 28 16:44:20 2012
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -56,10 +57,12 @@ public class TestConnCache {
static final int BLOCK_SIZE = 4096;
static final int FILE_SIZE = 3 * BLOCK_SIZE;
-
+ final static int CACHE_SIZE = 4;
+ final static long CACHE_EXPIRY_MS = 200;
static Configuration conf = null;
static MiniDFSCluster cluster = null;
static FileSystem fs = null;
+ static SocketCache cache;
static final Path testFile = new Path("/testConnCache.dat");
static byte authenticData[] = null;
@@ -94,6 +97,9 @@ public class TestConnCache {
public static void setupCluster() throws Exception {
final int REPLICATION_FACTOR = 1;
+ /* create a socket cache. There is only one socket cache per jvm */
+ cache = SocketCache.getInstance(CACHE_SIZE, CACHE_EXPIRY_MS);
+
util = new BlockReaderTestUtil(REPLICATION_FACTOR);
cluster = util.getCluster();
conf = util.getConf();
@@ -143,10 +149,7 @@ public class TestConnCache {
* Test the SocketCache itself.
*/
@Test
- public void testSocketCache() throws IOException {
- final int CACHE_SIZE = 4;
- SocketCache cache = new SocketCache(CACHE_SIZE);
-
+ public void testSocketCache() throws Exception {
// Make a client
InetSocketAddress nnAddr =
new InetSocketAddress("localhost", cluster.getNameNodePort());
@@ -160,6 +163,7 @@ public class TestConnCache {
DataNode dn = util.getDataNode(block);
InetSocketAddress dnAddr = dn.getSelfAddr();
+
// Make some sockets to the DN
Socket[] dnSockets = new Socket[CACHE_SIZE];
for (int i = 0; i < dnSockets.length; ++i) {
@@ -167,6 +171,7 @@ public class TestConnCache {
dnAddr.getAddress(), dnAddr.getPort());
}
+
// Insert a socket to the NN
Socket nnSock = new Socket(nnAddr.getAddress(), nnAddr.getPort());
cache.put(nnSock);
@@ -180,7 +185,7 @@ public class TestConnCache {
assertEquals("NN socket evicted", null, cache.get(nnAddr));
assertTrue("Evicted socket closed", nnSock.isClosed());
-
+
// Lookup the DN socks
for (Socket dnSock : dnSockets) {
assertEquals("Retrieve cached sockets", dnSock, cache.get(dnAddr));
@@ -190,6 +195,51 @@ public class TestConnCache {
assertEquals("Cache is empty", 0, cache.size());
}
+
+ /**
+ * Test the SocketCache expiry.
+ * Verify that socket cache entries expire after the set
+ * expiry time.
+ */
+ @Test
+ public void testSocketCacheExpiry() throws Exception {
+ // Make a client
+ InetSocketAddress nnAddr =
+ new InetSocketAddress("localhost", cluster.getNameNodePort());
+ DFSClient client = new DFSClient(nnAddr, conf);
+
+ // Find out the DN addr
+ LocatedBlock block =
+ client.getNamenode().getBlockLocations(
+ testFile.toString(), 0, FILE_SIZE)
+ .getLocatedBlocks().get(0);
+ DataNode dn = util.getDataNode(block);
+ InetSocketAddress dnAddr = dn.getSelfAddr();
+
+
+ // Make some sockets to the DN and put in cache
+ Socket[] dnSockets = new Socket[CACHE_SIZE];
+ for (int i = 0; i < dnSockets.length; ++i) {
+ dnSockets[i] = client.socketFactory.createSocket(
+ dnAddr.getAddress(), dnAddr.getPort());
+ cache.put(dnSockets[i]);
+ }
+
+ // Client side still has the sockets cached
+ assertEquals(CACHE_SIZE, client.socketCache.size());
+
+ //sleep for a second and see if it expired
+ Thread.sleep(CACHE_EXPIRY_MS + 1000);
+
+ // Client side has no sockets cached
+ assertEquals(0, client.socketCache.size());
+
+ //hang in for a second to ensure the thread
+ // does well even when cache is empty
+ Thread.sleep(CACHE_EXPIRY_MS + 1000);
+ }
+
+
/**
* Read a file served entirely from one DN. Seek around and read from
* different offsets. And verify that they all use the same socket.
Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java?rev=1391542&r1=1391541&r2=1391542&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java Fri Sep 28 16:44:20 2012
@@ -89,7 +89,6 @@ public class TestDistributedFileSystem {
/**
* Tests DFSClient.close throws no ConcurrentModificationException if
* multiple files are open.
- * Also tests that any cached sockets are closed. (HDFS-3359)
*/
@Test
public void testDFSClose() throws Exception {
@@ -109,12 +108,9 @@ public class TestDistributedFileSystem {
DFSTestUtil.readFile(fileSys, p);
DFSClient client = ((DistributedFileSystem)fileSys).dfs;
- SocketCache cache = client.socketCache;
- assertEquals(1, cache.size());
fileSys.close();
- assertEquals(0, cache.size());
} finally {
if (cluster != null) {cluster.shutdown();}
}