You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by sz...@apache.org on 2012/09/26 15:23:21 UTC
svn commit: r1390466 - in
/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs: ./
src/main/java/org/apache/hadoop/hdfs/ src/test/java/org/apache/hadoop/hdfs/
Author: szetszwo
Date: Wed Sep 26 13:23:21 2012
New Revision: 1390466
URL: http://svn.apache.org/viewvc?rev=1390466&view=rev
Log:
HDFS-3373. Change DFSClient input stream socket cache to global static and add a thread to cleanup expired cache entries. Contributed by John George
Added:
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestSocketCache.java
Modified:
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/SocketCache.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestConnCache.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1390466&r1=1390465&r2=1390466&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Wed Sep 26 13:23:21 2012
@@ -235,6 +235,9 @@ Release 2.0.3-alpha - Unreleased
HDFS-3939. NN RPC address cleanup. (eli)
+ HDFS-3373. Change DFSClient input stream socket cache to global static and add
+ a thread to cleanup expired cache entries. (John George via szetszwo)
+
OPTIMIZATIONS
BUG FIXES
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java?rev=1390466&r1=1390465&r2=1390466&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java Wed Sep 26 13:23:21 2012
@@ -39,6 +39,8 @@ import static org.apache.hadoop.hdfs.DFS
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_RETRY_WINDOW_BASE;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADER;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADER_DEFAULT;
@@ -209,6 +211,7 @@ public class DFSClient implements java.i
final int writePacketSize;
final int socketTimeout;
final int socketCacheCapacity;
+ final long socketCacheExpiry;
/** Wait time window (in msec) if BlockMissingException is caught */
final int timeWindow;
final int nCachedConnRetry;
@@ -257,6 +260,8 @@ public class DFSClient implements java.i
taskId = conf.get("mapreduce.task.attempt.id", "NONMAPREDUCE");
socketCacheCapacity = conf.getInt(DFS_CLIENT_SOCKET_CACHE_CAPACITY_KEY,
DFS_CLIENT_SOCKET_CACHE_CAPACITY_DEFAULT);
+ socketCacheExpiry = conf.getLong(DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_KEY,
+ DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_DEFAULT);
prefetchSize = conf.getLong(DFS_CLIENT_READ_PREFETCH_SIZE_KEY,
10 * defaultBlockSize);
timeWindow = conf
@@ -427,7 +432,7 @@ public class DFSClient implements java.i
Joiner.on(',').join(localInterfaceAddrs) + "]");
}
- this.socketCache = new SocketCache(dfsClientConf.socketCacheCapacity);
+ this.socketCache = SocketCache.getInstance(dfsClientConf.socketCacheCapacity, dfsClientConf.socketCacheExpiry);
}
/**
@@ -641,7 +646,6 @@ public class DFSClient implements java.i
void abort() {
clientRunning = false;
closeAllFilesBeingWritten(true);
- socketCache.clear();
try {
// remove reference to this client and stop the renewer,
@@ -688,7 +692,6 @@ public class DFSClient implements java.i
public synchronized void close() throws IOException {
if(clientRunning) {
closeAllFilesBeingWritten(false);
- socketCache.clear();
clientRunning = false;
getLeaseRenewer().closeClient(this);
// close connections to the namenode
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java?rev=1390466&r1=1390465&r2=1390466&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java Wed Sep 26 13:23:21 2012
@@ -74,6 +74,8 @@ public class DFSConfigKeys extends Commo
public static final String DFS_CLIENT_FAILOVER_CONNECTION_RETRIES_ON_SOCKET_TIMEOUTS_KEY = "dfs.client.failover.connection.retries.on.timeouts";
public static final int DFS_CLIENT_FAILOVER_CONNECTION_RETRIES_ON_SOCKET_TIMEOUTS_DEFAULT = 0;
+ public static final String DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_KEY = "dfs.client.socketcache.expiryMsec";
+ public static final long DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_DEFAULT = 2 * 60 * 1000;
public static final String DFS_NAMENODE_BACKUP_ADDRESS_KEY = "dfs.namenode.backup.address";
public static final String DFS_NAMENODE_BACKUP_ADDRESS_DEFAULT = "localhost:50100";
public static final String DFS_NAMENODE_BACKUP_HTTP_ADDRESS_KEY = "dfs.namenode.backup.http-address";
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/SocketCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/SocketCache.java?rev=1390466&r1=1390465&r2=1390466&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/SocketCache.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/SocketCache.java Wed Sep 26 13:23:21 2012
@@ -26,51 +26,131 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map.Entry;
+import java.io.IOException;
import com.google.common.base.Preconditions;
import com.google.common.collect.LinkedListMultimap;
import org.apache.commons.logging.Log;
+import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.util.Daemon;
+import org.apache.hadoop.util.StringUtils;
/**
- * A cache of sockets.
+ * A cache of input stream sockets to Data Node.
*/
class SocketCache {
- static final Log LOG = LogFactory.getLog(SocketCache.class);
+ private static final Log LOG = LogFactory.getLog(SocketCache.class);
- private final LinkedListMultimap<SocketAddress, SocketAndStreams> multimap;
- private final int capacity;
+ @InterfaceAudience.Private
+ static class SocketAndStreams implements Closeable {
+ public final Socket sock;
+ public final IOStreamPair ioStreams;
+ long createTime;
+
+ public SocketAndStreams(Socket s, IOStreamPair ioStreams) {
+ this.sock = s;
+ this.ioStreams = ioStreams;
+ this.createTime = System.currentTimeMillis();
+ }
+
+ @Override
+ public void close() {
+ if (ioStreams != null) {
+ IOUtils.closeStream(ioStreams.in);
+ IOUtils.closeStream(ioStreams.out);
+ }
+ IOUtils.closeSocket(sock);
+ }
- /**
- * Create a SocketCache with the given capacity.
- * @param capacity Max cache size.
- */
- public SocketCache(int capacity) {
- multimap = LinkedListMultimap.create();
- this.capacity = capacity;
- if (capacity <= 0) {
- LOG.debug("SocketCache disabled in configuration.");
+ public long getCreateTime() {
+ return this.createTime;
}
}
+ private Daemon daemon;
+ /** A map for per user per datanode. */
+ private static LinkedListMultimap<SocketAddress, SocketAndStreams> multimap =
+ LinkedListMultimap.create();
+ private static int capacity;
+ private static long expiryPeriod;
+ private static SocketCache scInstance = new SocketCache();
+ private static boolean isInitedOnce = false;
+
+ public static synchronized SocketCache getInstance(int c, long e) {
+ // capacity is only initialized once
+ if (isInitedOnce == false) {
+ capacity = c;
+ expiryPeriod = e;
+
+ if (capacity == 0 ) {
+ LOG.info("SocketCache disabled.");
+ }
+ else if (expiryPeriod == 0) {
+ throw new IllegalStateException("Cannot initialize expiryPeriod to " +
+ expiryPeriod + "when cache is enabled.");
+ }
+ isInitedOnce = true;
+ } else { //already initialized once
+ if (capacity != c || expiryPeriod != e) {
+ LOG.info("capacity and expiry periods already set to " + capacity +
+ " and " + expiryPeriod + " respectively. Cannot set it to " + c +
+ " and " + e);
+ }
+ }
+
+ return scInstance;
+ }
+
+ private boolean isDaemonStarted() {
+ return (daemon == null)? false: true;
+ }
+
+ private synchronized void startExpiryDaemon() {
+ // start daemon only if not already started
+ if (isDaemonStarted() == true) {
+ return;
+ }
+
+ daemon = new Daemon(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ SocketCache.this.run();
+ } catch(InterruptedException e) {
+ //noop
+ } finally {
+ SocketCache.this.clear();
+ }
+ }
+
+ @Override
+ public String toString() {
+ return String.valueOf(SocketCache.this);
+ }
+ });
+ daemon.start();
+ }
+
/**
* Get a cached socket to the given address.
* @param remote Remote address the socket is connected to.
* @return A socket with unknown state, possibly closed underneath. Or null.
*/
public synchronized SocketAndStreams get(SocketAddress remote) {
+
if (capacity <= 0) { // disabled
return null;
}
-
- List<SocketAndStreams> socklist = multimap.get(remote);
- if (socklist == null) {
+
+ List<SocketAndStreams> sockStreamList = multimap.get(remote);
+ if (sockStreamList == null) {
return null;
}
- Iterator<SocketAndStreams> iter = socklist.iterator();
+ Iterator<SocketAndStreams> iter = sockStreamList.iterator();
while (iter.hasNext()) {
SocketAndStreams candidate = iter.next();
iter.remove();
@@ -86,14 +166,16 @@ class SocketCache {
* @param sock socket not used by anyone.
*/
public synchronized void put(Socket sock, IOStreamPair ioStreams) {
+
+ Preconditions.checkNotNull(sock);
SocketAndStreams s = new SocketAndStreams(sock, ioStreams);
if (capacity <= 0) {
// Cache disabled.
s.close();
return;
}
-
- Preconditions.checkNotNull(sock);
+
+ startExpiryDaemon();
SocketAddress remoteAddr = sock.getRemoteSocketAddress();
if (remoteAddr == null) {
@@ -106,7 +188,7 @@ class SocketCache {
if (capacity == multimap.size()) {
evictOldest();
}
- multimap.put(remoteAddr, new SocketAndStreams(sock, ioStreams));
+ multimap.put(remoteAddr, s);
}
public synchronized int size() {
@@ -114,13 +196,34 @@ class SocketCache {
}
/**
+ * Evict and close sockets older than expiry period from the cache.
+ */
+ private synchronized void evictExpired(long expiryPeriod) {
+ while (multimap.size() != 0) {
+ Iterator<Entry<SocketAddress, SocketAndStreams>> iter =
+ multimap.entries().iterator();
+ Entry<SocketAddress, SocketAndStreams> entry = iter.next();
+ // if oldest socket expired, remove it
+ if (entry == null ||
+ System.currentTimeMillis() - entry.getValue().getCreateTime() <
+ expiryPeriod) {
+ break;
+ }
+ iter.remove();
+ SocketAndStreams s = entry.getValue();
+ s.close();
+ }
+ }
+
+ /**
* Evict the oldest entry in the cache.
*/
private synchronized void evictOldest() {
Iterator<Entry<SocketAddress, SocketAndStreams>> iter =
multimap.entries().iterator();
if (!iter.hasNext()) {
- throw new IllegalStateException("Cannot evict from empty cache!");
+ throw new IllegalStateException("Cannot evict from empty cache! " +
+ "capacity: " + capacity);
}
Entry<SocketAddress, SocketAndStreams> entry = iter.next();
iter.remove();
@@ -129,38 +232,31 @@ class SocketCache {
}
/**
- * Empty the cache, and close all sockets.
+ * Periodically check in the cache and expire the entries
+ * older than expiryPeriod minutes
*/
- public synchronized void clear() {
- for (SocketAndStreams s : multimap.values()) {
- s.close();
+ private void run() throws InterruptedException {
+ for(long lastExpiryTime = System.currentTimeMillis();
+ !Thread.interrupted();
+ Thread.sleep(expiryPeriod)) {
+ final long elapsed = System.currentTimeMillis() - lastExpiryTime;
+ if (elapsed >= expiryPeriod) {
+ evictExpired(expiryPeriod);
+ lastExpiryTime = System.currentTimeMillis();
+ }
}
- multimap.clear();
- }
-
- @Override
- protected void finalize() {
clear();
+ throw new InterruptedException("Daemon Interrupted");
}
-
- @InterfaceAudience.Private
- static class SocketAndStreams implements Closeable {
- public final Socket sock;
- public final IOStreamPair ioStreams;
-
- public SocketAndStreams(Socket s, IOStreamPair ioStreams) {
- this.sock = s;
- this.ioStreams = ioStreams;
- }
-
- @Override
- public void close() {
- if (ioStreams != null) {
- IOUtils.closeStream(ioStreams.in);
- IOUtils.closeStream(ioStreams.out);
- }
- IOUtils.closeSocket(sock);
+
+ /**
+ * Empty the cache, and close all sockets.
+ */
+ private synchronized void clear() {
+ for (SocketAndStreams sockAndStream : multimap.values()) {
+ sockAndStream.close();
}
+ multimap.clear();
}
}
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestConnCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestConnCache.java?rev=1390466&r1=1390465&r2=1390466&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestConnCache.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestConnCache.java Wed Sep 26 13:23:21 2012
@@ -25,6 +25,7 @@ import static org.mockito.Mockito.spy;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
+import java.security.PrivilegedExceptionAction;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -54,10 +55,12 @@ public class TestConnCache {
static final int BLOCK_SIZE = 4096;
static final int FILE_SIZE = 3 * BLOCK_SIZE;
-
+ final static int CACHE_SIZE = 4;
+ final static long CACHE_EXPIRY_MS = 200;
static Configuration conf = null;
static MiniDFSCluster cluster = null;
static FileSystem fs = null;
+ static SocketCache cache;
static final Path testFile = new Path("/testConnCache.dat");
static byte authenticData[] = null;
@@ -93,6 +96,9 @@ public class TestConnCache {
public static void setupCluster() throws Exception {
final int REPLICATION_FACTOR = 1;
+ /* create a socket cache. There is only one socket cache per jvm */
+ cache = SocketCache.getInstance(CACHE_SIZE, CACHE_EXPIRY_MS);
+
util = new BlockReaderTestUtil(REPLICATION_FACTOR);
cluster = util.getCluster();
conf = util.getConf();
@@ -142,10 +148,7 @@ public class TestConnCache {
* Test the SocketCache itself.
*/
@Test
- public void testSocketCache() throws IOException {
- final int CACHE_SIZE = 4;
- SocketCache cache = new SocketCache(CACHE_SIZE);
-
+ public void testSocketCache() throws Exception {
// Make a client
InetSocketAddress nnAddr =
new InetSocketAddress("localhost", cluster.getNameNodePort());
@@ -159,6 +162,7 @@ public class TestConnCache {
DataNode dn = util.getDataNode(block);
InetSocketAddress dnAddr = dn.getXferAddress();
+
// Make some sockets to the DN
Socket[] dnSockets = new Socket[CACHE_SIZE];
for (int i = 0; i < dnSockets.length; ++i) {
@@ -166,6 +170,7 @@ public class TestConnCache {
dnAddr.getAddress(), dnAddr.getPort());
}
+
// Insert a socket to the NN
Socket nnSock = new Socket(nnAddr.getAddress(), nnAddr.getPort());
cache.put(nnSock, null);
@@ -179,7 +184,7 @@ public class TestConnCache {
assertEquals("NN socket evicted", null, cache.get(nnAddr));
assertTrue("Evicted socket closed", nnSock.isClosed());
-
+
// Lookup the DN socks
for (Socket dnSock : dnSockets) {
assertEquals("Retrieve cached sockets", dnSock, cache.get(dnAddr).sock);
@@ -189,6 +194,51 @@ public class TestConnCache {
assertEquals("Cache is empty", 0, cache.size());
}
+
+ /**
+ * Test the SocketCache expiry.
+ * Verify that socket cache entries expire after the set
+ * expiry time.
+ */
+ @Test
+ public void testSocketCacheExpiry() throws Exception {
+ // Make a client
+ InetSocketAddress nnAddr =
+ new InetSocketAddress("localhost", cluster.getNameNodePort());
+ DFSClient client = new DFSClient(nnAddr, conf);
+
+ // Find out the DN addr
+ LocatedBlock block =
+ client.getNamenode().getBlockLocations(
+ testFile.toString(), 0, FILE_SIZE)
+ .getLocatedBlocks().get(0);
+ DataNode dn = util.getDataNode(block);
+ InetSocketAddress dnAddr = dn.getXferAddress();
+
+
+ // Make some sockets to the DN and put in cache
+ Socket[] dnSockets = new Socket[CACHE_SIZE];
+ for (int i = 0; i < dnSockets.length; ++i) {
+ dnSockets[i] = client.socketFactory.createSocket(
+ dnAddr.getAddress(), dnAddr.getPort());
+ cache.put(dnSockets[i], null);
+ }
+
+ // Client side still has the sockets cached
+ assertEquals(CACHE_SIZE, client.socketCache.size());
+
+ //sleep for a second and see if it expired
+ Thread.sleep(CACHE_EXPIRY_MS + 1000);
+
+ // Client side has no sockets cached
+ assertEquals(0, client.socketCache.size());
+
+ //sleep for another second and see if
+ //the daemon thread runs fine on empty cache
+ Thread.sleep(CACHE_EXPIRY_MS + 1000);
+ }
+
+
/**
* Read a file served entirely from one DN. Seek around and read from
* different offsets. And verify that they all use the same socket.
@@ -229,33 +279,6 @@ public class TestConnCache {
in.close();
}
-
- /**
- * Test that the socket cache can be disabled by setting the capacity to
- * 0. Regression test for HDFS-3365.
- */
- @Test
- public void testDisableCache() throws IOException {
- LOG.info("Starting testDisableCache()");
-
- // Reading with the normally configured filesystem should
- // cache a socket.
- DFSTestUtil.readFile(fs, testFile);
- assertEquals(1, ((DistributedFileSystem)fs).dfs.socketCache.size());
-
- // Configure a new instance with no caching, ensure that it doesn't
- // cache anything
- Configuration confWithoutCache = new Configuration(fs.getConf());
- confWithoutCache.setInt(
- DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_KEY, 0);
- FileSystem fsWithoutCache = FileSystem.newInstance(confWithoutCache);
- try {
- DFSTestUtil.readFile(fsWithoutCache, testFile);
- assertEquals(0, ((DistributedFileSystem)fsWithoutCache).dfs.socketCache.size());
- } finally {
- fsWithoutCache.close();
- }
- }
@AfterClass
public static void teardownCluster() throws Exception {
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java?rev=1390466&r1=1390465&r2=1390466&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java Wed Sep 26 13:23:21 2012
@@ -120,12 +120,9 @@ public class TestDistributedFileSystem {
DFSTestUtil.readFile(fileSys, p);
DFSClient client = ((DistributedFileSystem)fileSys).dfs;
- SocketCache cache = client.socketCache;
- assertEquals(1, cache.size());
fileSys.close();
- assertEquals(0, cache.size());
} finally {
if (cluster != null) {cluster.shutdown();}
}
Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestSocketCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestSocketCache.java?rev=1390466&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestSocketCache.java (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestSocketCache.java Wed Sep 26 13:23:21 2012
@@ -0,0 +1,171 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.spy;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.security.PrivilegedExceptionAction;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.security.token.Token;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockito.Matchers;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+/**
+ * This class tests the client connection caching in a single node
+ * mini-cluster.
+ */
+public class TestSocketCache {
+ static final Log LOG = LogFactory.getLog(TestSocketCache.class);
+
+ static final int BLOCK_SIZE = 4096;
+ static final int FILE_SIZE = 3 * BLOCK_SIZE;
+ final static int CACHE_SIZE = 4;
+ final static long CACHE_EXPIRY_MS = 200;
+ static Configuration conf = null;
+ static MiniDFSCluster cluster = null;
+ static FileSystem fs = null;
+ static SocketCache cache;
+
+ static final Path testFile = new Path("/testConnCache.dat");
+ static byte authenticData[] = null;
+
+ static BlockReaderTestUtil util = null;
+
+
+ /**
+ * A mock Answer to remember the BlockReader used.
+ *
+ * It verifies that all invocation to DFSInputStream.getBlockReader()
+ * use the same socket.
+ */
+ private class MockGetBlockReader implements Answer<RemoteBlockReader2> {
+ public RemoteBlockReader2 reader = null;
+ private Socket sock = null;
+
+ @Override
+ public RemoteBlockReader2 answer(InvocationOnMock invocation) throws Throwable {
+ RemoteBlockReader2 prevReader = reader;
+ reader = (RemoteBlockReader2) invocation.callRealMethod();
+ if (sock == null) {
+ sock = reader.dnSock;
+ } else if (prevReader != null) {
+ assertSame("DFSInputStream should use the same socket",
+ sock, reader.dnSock);
+ }
+ return reader;
+ }
+ }
+
+ @BeforeClass
+ public static void setupCluster() throws Exception {
+ final int REPLICATION_FACTOR = 1;
+
+ HdfsConfiguration confWithoutCache = new HdfsConfiguration();
+ confWithoutCache.setInt(
+ DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_KEY, 0);
+ util = new BlockReaderTestUtil(REPLICATION_FACTOR, confWithoutCache);
+ cluster = util.getCluster();
+ conf = util.getConf();
+
+ authenticData = util.writeFile(testFile, FILE_SIZE / 1024);
+ }
+
+
+ /**
+ * (Optionally) seek to position, read and verify data.
+ *
+ * Seek to specified position if pos is non-negative.
+ */
+ private void pread(DFSInputStream in,
+ long pos,
+ byte[] buffer,
+ int offset,
+ int length)
+ throws IOException {
+ assertTrue("Test buffer too small", buffer.length >= offset + length);
+
+ if (pos >= 0)
+ in.seek(pos);
+
+ LOG.info("Reading from file of size " + in.getFileLength() +
+ " at offset " + in.getPos());
+
+ while (length > 0) {
+ int cnt = in.read(buffer, offset, length);
+ assertTrue("Error in read", cnt > 0);
+ offset += cnt;
+ length -= cnt;
+ }
+
+ // Verify
+ for (int i = 0; i < length; ++i) {
+ byte actual = buffer[i];
+ byte expect = authenticData[(int)pos + i];
+ assertEquals("Read data mismatch at file offset " + (pos + i) +
+ ". Expects " + expect + "; got " + actual,
+ actual, expect);
+ }
+ }
+
+
+ /**
+ * Test that the socket cache can be disabled by setting the capacity to
+ * 0. Regression test for HDFS-3365.
+ */
+ @Test
+ public void testDisableCache() throws IOException {
+ LOG.info("Starting testDisableCache()");
+
+ // Configure a new instance with no caching, ensure that it doesn't
+ // cache anything
+
+ FileSystem fsWithoutCache = FileSystem.newInstance(conf);
+ try {
+ DFSTestUtil.readFile(fsWithoutCache, testFile);
+ assertEquals(0, ((DistributedFileSystem)fsWithoutCache).dfs.socketCache.size());
+ } finally {
+ fsWithoutCache.close();
+ }
+ }
+
+ @AfterClass
+ public static void teardownCluster() throws Exception {
+ util.shutdown();
+ }
+}