You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by ji...@apache.org on 2014/05/27 19:21:42 UTC

svn commit: r1597829 - in /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs: CHANGES.txt src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitCache.java src/test/java/org/apache/hadoop/hdfs/TestBlockReaderFactory.java

Author: jing9
Date: Tue May 27 17:21:41 2014
New Revision: 1597829

URL: http://svn.apache.org/r1597829
Log:
HDFS-6227. ShortCircuitCache#unref should purge ShortCircuitReplicas whose streams have been closed by java interrupts. Contributed by Colin Patrick McCabe.

Modified:
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitCache.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderFactory.java

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1597829&r1=1597828&r2=1597829&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Tue May 27 17:21:41 2014
@@ -592,6 +592,9 @@ Release 2.5.0 - UNRELEASED
     HDFS-6443. Fix MiniQJMHACluster related test failures. (Zesheng Wu via
     Arpit Agarwal)
 
+    HDFS-6227. ShortCircuitCache#unref should purge ShortCircuitReplicas whose
+    streams have been closed by java interrupts. (Colin Patrick McCabe via jing9)
+
 Release 2.4.1 - UNRELEASED
 
   INCOMPATIBLE CHANGES

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitCache.java?rev=1597829&r1=1597828&r2=1597829&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitCache.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitCache.java Tue May 27 17:21:41 2014
@@ -437,11 +437,22 @@ public class ShortCircuitCache implement
   void unref(ShortCircuitReplica replica) {
     lock.lock();
     try {
-      // If the replica is stale, but we haven't purged it yet, let's do that.
-      // It would be a shame to evict a non-stale replica so that we could put
-      // a stale one into the cache.
-      if ((!replica.purged) && replica.isStale()) {
-        purge(replica);
+      // If the replica is stale or unusable, but we haven't purged it yet,
+      // let's do that.  It would be a shame to evict a non-stale replica so
+      // that we could put a stale or unusable one into the cache.
+      if (!replica.purged) {
+        String purgeReason = null;
+        if (!replica.getDataStream().getChannel().isOpen()) {
+          purgeReason = "purging replica because its data channel is closed.";
+        } else if (!replica.getMetaStream().getChannel().isOpen()) {
+          purgeReason = "purging replica because its meta channel is closed.";
+        } else if (replica.isStale()) {
+          purgeReason = "purging replica because it is stale.";
+        }
+        if (purgeReason != null) {
+          LOG.debug(this + ": " + purgeReason);
+          purge(replica);
+        }
       }
       String addedString = "";
       boolean shouldTrimEvictionMaps = false;

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderFactory.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderFactory.java?rev=1597829&r1=1597828&r2=1597829&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderFactory.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderFactory.java Tue May 27 17:21:41 2014
@@ -28,12 +28,15 @@ import static org.hamcrest.CoreMatchers.
 
 import java.io.File;
 import java.io.IOException;
+import java.nio.channels.ClosedByInterruptException;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -409,4 +412,121 @@ public class TestBlockReaderFactory {
         getDomainSocketWatcher().isClosed());
     cluster.shutdown();
   }
+
+  /**
+   * When an InterruptedException is sent to a thread calling
+   * FileChannel#read, the FileChannel is immediately closed and the
+   * thread gets an exception.  This effectively means that we might have
+   * someone asynchronously calling close() on the file descriptors we use
+   * in BlockReaderLocal.  So when unreferencing a ShortCircuitReplica in
+   * ShortCircuitCache#unref, we should check if the FileChannel objects
+   * are still open.  If not, we should purge the replica to avoid giving
+   * it out to any future readers.
+   *
+   * This is a regression test for HDFS-6227: Short circuit read failed
+   * due to ClosedChannelException.
+   *
+   * Note that you may still get ClosedChannelException errors if two threads
+   * are reading from the same replica and an InterruptedException is delivered
+   * to one of them.
+   */
+  @Test(timeout=120000)
+  public void testPurgingClosedReplicas() throws Exception {
+    BlockReaderTestUtil.enableBlockReaderFactoryTracing();
+    final AtomicInteger replicasCreated = new AtomicInteger(0);
+    final AtomicBoolean testFailed = new AtomicBoolean(false);
+    DFSInputStream.tcpReadsDisabledForTesting = true;
+    BlockReaderFactory.createShortCircuitReplicaInfoCallback =
+        new ShortCircuitCache.ShortCircuitReplicaCreator() {
+          @Override
+          public ShortCircuitReplicaInfo createShortCircuitReplicaInfo() {
+            replicasCreated.incrementAndGet();
+            return null;
+          }
+        };
+    TemporarySocketDirectory sockDir = new TemporarySocketDirectory();
+    Configuration conf = createShortCircuitConf(
+        "testPurgingClosedReplicas", sockDir);
+    final MiniDFSCluster cluster =
+        new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+    cluster.waitActive();
+    final DistributedFileSystem dfs = cluster.getFileSystem();
+    final String TEST_FILE = "/test_file";
+    final int TEST_FILE_LEN = 4095;
+    final int SEED = 0xFADE0;
+    final DistributedFileSystem fs =
+        (DistributedFileSystem)FileSystem.get(cluster.getURI(0), conf);
+    DFSTestUtil.createFile(fs, new Path(TEST_FILE), TEST_FILE_LEN,
+        (short)1, SEED);
+
+    final Semaphore sem = new Semaphore(0);
+    final List<LocatedBlock> locatedBlocks =
+        cluster.getNameNode().getRpcServer().getBlockLocations(
+            TEST_FILE, 0, TEST_FILE_LEN).getLocatedBlocks();
+    final LocatedBlock lblock = locatedBlocks.get(0); // first block
+    final byte[] buf = new byte[TEST_FILE_LEN];
+    Runnable readerRunnable = new Runnable() {
+      @Override
+      public void run() {
+        try {
+          while (true) {
+            BlockReader blockReader = null;
+            try {
+              blockReader = BlockReaderTestUtil.
+                  getBlockReader(cluster, lblock, 0, TEST_FILE_LEN);
+              sem.release();
+              try {
+                blockReader.readAll(buf, 0, TEST_FILE_LEN);
+              } finally {
+                sem.acquireUninterruptibly();
+              }
+            } catch (ClosedByInterruptException e) {
+              LOG.info("got the expected ClosedByInterruptException", e);
+              sem.release();
+              break;
+            } finally {
+              if (blockReader != null) blockReader.close();
+            }
+            LOG.info("read another " + TEST_FILE_LEN + " bytes.");
+          }
+        } catch (Throwable t) {
+          LOG.error("getBlockReader failure", t);
+          testFailed.set(true);
+          sem.release();
+        }
+      }
+    };
+    Thread thread = new Thread(readerRunnable);
+    thread.start();
+
+    // While the thread is reading, send it interrupts.
+    // These should trigger a ClosedChannelException.
+    while (thread.isAlive()) {
+      sem.acquireUninterruptibly();
+      thread.interrupt();
+      sem.release();
+    }
+    Assert.assertFalse(testFailed.get());
+
+    // We should be able to read from the file without
+    // getting a ClosedChannelException.
+    BlockReader blockReader = null;
+    try {
+      blockReader = BlockReaderTestUtil.
+          getBlockReader(cluster, lblock, 0, TEST_FILE_LEN);
+      blockReader.readFully(buf, 0, TEST_FILE_LEN);
+    } finally {
+      if (blockReader != null) blockReader.close();
+    }
+    byte expected[] = DFSTestUtil.
+        calculateFileContentsFromSeed(SEED, TEST_FILE_LEN);
+    Assert.assertTrue(Arrays.equals(buf, expected));
+
+    // Another ShortCircuitReplica object should have been created.
+    Assert.assertEquals(2, replicasCreated.get());
+
+    dfs.close();
+    cluster.shutdown();
+    sockDir.close();
+  }
 }