You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by ji...@apache.org on 2014/05/27 19:21:42 UTC
svn commit: r1597829 - in
/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs: CHANGES.txt
src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitCache.java
src/test/java/org/apache/hadoop/hdfs/TestBlockReaderFactory.java
Author: jing9
Date: Tue May 27 17:21:41 2014
New Revision: 1597829
URL: http://svn.apache.org/r1597829
Log:
HDFS-6227. ShortCircuitCache#unref should purge ShortCircuitReplicas whose streams have been closed by java interrupts. Contributed by Colin Patrick McCabe.
Modified:
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitCache.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderFactory.java
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1597829&r1=1597828&r2=1597829&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Tue May 27 17:21:41 2014
@@ -592,6 +592,9 @@ Release 2.5.0 - UNRELEASED
HDFS-6443. Fix MiniQJMHACluster related test failures. (Zesheng Wu via
Arpit Agarwal)
+ HDFS-6227. ShortCircuitCache#unref should purge ShortCircuitReplicas whose
+ streams have been closed by java interrupts. (Colin Patrick McCabe via jing9)
+
Release 2.4.1 - UNRELEASED
INCOMPATIBLE CHANGES
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitCache.java?rev=1597829&r1=1597828&r2=1597829&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitCache.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitCache.java Tue May 27 17:21:41 2014
@@ -437,11 +437,22 @@ public class ShortCircuitCache implement
void unref(ShortCircuitReplica replica) {
lock.lock();
try {
- // If the replica is stale, but we haven't purged it yet, let's do that.
- // It would be a shame to evict a non-stale replica so that we could put
- // a stale one into the cache.
- if ((!replica.purged) && replica.isStale()) {
- purge(replica);
+ // If the replica is stale or unusable, but we haven't purged it yet,
+ // let's do that. It would be a shame to evict a non-stale replica so
+ // that we could put a stale or unusable one into the cache.
+ if (!replica.purged) {
+ String purgeReason = null;
+ if (!replica.getDataStream().getChannel().isOpen()) {
+ purgeReason = "purging replica because its data channel is closed.";
+ } else if (!replica.getMetaStream().getChannel().isOpen()) {
+ purgeReason = "purging replica because its meta channel is closed.";
+ } else if (replica.isStale()) {
+ purgeReason = "purging replica because it is stale.";
+ }
+ if (purgeReason != null) {
+ LOG.debug(this + ": " + purgeReason);
+ purge(replica);
+ }
}
String addedString = "";
boolean shouldTrimEvictionMaps = false;
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderFactory.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderFactory.java?rev=1597829&r1=1597828&r2=1597829&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderFactory.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderFactory.java Tue May 27 17:21:41 2014
@@ -28,12 +28,15 @@ import static org.hamcrest.CoreMatchers.
import java.io.File;
import java.io.IOException;
+import java.nio.channels.ClosedByInterruptException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -409,4 +412,121 @@ public class TestBlockReaderFactory {
getDomainSocketWatcher().isClosed());
cluster.shutdown();
}
+
+ /**
+ * When an InterruptedException is sent to a thread calling
+ * FileChannel#read, the FileChannel is immediately closed and the
+ * thread gets an exception. This effectively means that we might have
+ * someone asynchronously calling close() on the file descriptors we use
+ * in BlockReaderLocal. So when unreferencing a ShortCircuitReplica in
+ * ShortCircuitCache#unref, we should check if the FileChannel objects
+ * are still open. If not, we should purge the replica to avoid giving
+ * it out to any future readers.
+ *
+ * This is a regression test for HDFS-6227: Short circuit read failed
+ * due to ClosedChannelException.
+ *
+ * Note that you may still get ClosedChannelException errors if two threads
+ * are reading from the same replica and an InterruptedException is delivered
+ * to one of them.
+ */
+ @Test(timeout=120000)
+ public void testPurgingClosedReplicas() throws Exception {
+ BlockReaderTestUtil.enableBlockReaderFactoryTracing();
+ final AtomicInteger replicasCreated = new AtomicInteger(0);
+ final AtomicBoolean testFailed = new AtomicBoolean(false);
+ DFSInputStream.tcpReadsDisabledForTesting = true;
+ BlockReaderFactory.createShortCircuitReplicaInfoCallback =
+ new ShortCircuitCache.ShortCircuitReplicaCreator() {
+ @Override
+ public ShortCircuitReplicaInfo createShortCircuitReplicaInfo() {
+ replicasCreated.incrementAndGet();
+ return null;
+ }
+ };
+ TemporarySocketDirectory sockDir = new TemporarySocketDirectory();
+ Configuration conf = createShortCircuitConf(
+ "testPurgingClosedReplicas", sockDir);
+ final MiniDFSCluster cluster =
+ new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+ cluster.waitActive();
+ final DistributedFileSystem dfs = cluster.getFileSystem();
+ final String TEST_FILE = "/test_file";
+ final int TEST_FILE_LEN = 4095;
+ final int SEED = 0xFADE0;
+ final DistributedFileSystem fs =
+ (DistributedFileSystem)FileSystem.get(cluster.getURI(0), conf);
+ DFSTestUtil.createFile(fs, new Path(TEST_FILE), TEST_FILE_LEN,
+ (short)1, SEED);
+
+ final Semaphore sem = new Semaphore(0);
+ final List<LocatedBlock> locatedBlocks =
+ cluster.getNameNode().getRpcServer().getBlockLocations(
+ TEST_FILE, 0, TEST_FILE_LEN).getLocatedBlocks();
+ final LocatedBlock lblock = locatedBlocks.get(0); // first block
+ final byte[] buf = new byte[TEST_FILE_LEN];
+ Runnable readerRunnable = new Runnable() {
+ @Override
+ public void run() {
+ try {
+ while (true) {
+ BlockReader blockReader = null;
+ try {
+ blockReader = BlockReaderTestUtil.
+ getBlockReader(cluster, lblock, 0, TEST_FILE_LEN);
+ sem.release();
+ try {
+ blockReader.readAll(buf, 0, TEST_FILE_LEN);
+ } finally {
+ sem.acquireUninterruptibly();
+ }
+ } catch (ClosedByInterruptException e) {
+ LOG.info("got the expected ClosedByInterruptException", e);
+ sem.release();
+ break;
+ } finally {
+ if (blockReader != null) blockReader.close();
+ }
+ LOG.info("read another " + TEST_FILE_LEN + " bytes.");
+ }
+ } catch (Throwable t) {
+ LOG.error("getBlockReader failure", t);
+ testFailed.set(true);
+ sem.release();
+ }
+ }
+ };
+ Thread thread = new Thread(readerRunnable);
+ thread.start();
+
+ // While the thread is reading, send it interrupts.
+ // These should trigger a ClosedChannelException.
+ while (thread.isAlive()) {
+ sem.acquireUninterruptibly();
+ thread.interrupt();
+ sem.release();
+ }
+ Assert.assertFalse(testFailed.get());
+
+ // We should be able to read from the file without
+ // getting a ClosedChannelException.
+ BlockReader blockReader = null;
+ try {
+ blockReader = BlockReaderTestUtil.
+ getBlockReader(cluster, lblock, 0, TEST_FILE_LEN);
+ blockReader.readFully(buf, 0, TEST_FILE_LEN);
+ } finally {
+ if (blockReader != null) blockReader.close();
+ }
+ byte expected[] = DFSTestUtil.
+ calculateFileContentsFromSeed(SEED, TEST_FILE_LEN);
+ Assert.assertTrue(Arrays.equals(buf, expected));
+
+ // Another ShortCircuitReplica object should have been created.
+ Assert.assertEquals(2, replicasCreated.get());
+
+ dfs.close();
+ cluster.shutdown();
+ sockDir.close();
+ }
}