You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by vi...@apache.org on 2015/09/02 23:49:49 UTC

[3/3] hadoop git commit: HADOOP-11802. DomainSocketWatcher thread terminates sometimes after there is an I/O error during requestShortCircuitShm (cmccabe)

HADOOP-11802. DomainSocketWatcher thread terminates sometimes after there is an I/O error during requestShortCircuitShm (cmccabe)

(cherry picked from commit a0e0a63209b5eb17dca5cc503be36aa52defeabd)
(cherry picked from commit 788b76761d5dfadf688406d50169e95401fe5d33)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/8770f1f0
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/8770f1f0
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/8770f1f0

Branch: refs/heads/branch-2.6.1
Commit: 8770f1f03952661f49aea65ae6498be676677812
Parents: 181281c
Author: Colin Patrick Mccabe <cm...@cloudera.com>
Authored: Thu Apr 23 18:59:52 2015 -0700
Committer: Vinod Kumar Vavilapalli <vi...@apache.org>
Committed: Wed Sep 2 14:38:26 2015 -0700

----------------------------------------------------------------------
 hadoop-common-project/hadoop-common/CHANGES.txt |  3 +
 .../hadoop/net/unix/DomainSocketWatcher.java    |  4 +-
 .../hadoop/net/unix/DomainSocketWatcher.c       | 10 ++-
 .../server/datanode/DataNodeFaultInjector.java  |  6 +-
 .../hdfs/server/datanode/DataXceiver.java       | 18 ++++-
 .../hdfs/shortcircuit/DfsClientShmManager.java  |  3 +-
 .../hdfs/shortcircuit/DomainSocketFactory.java  |  6 ++
 .../shortcircuit/TestShortCircuitCache.java     | 83 ++++++++++++++++++--
 8 files changed, 114 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/8770f1f0/hadoop-common-project/hadoop-common/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt
index fb6153f..800cb2a 100644
--- a/hadoop-common-project/hadoop-common/CHANGES.txt
+++ b/hadoop-common-project/hadoop-common/CHANGES.txt
@@ -57,6 +57,9 @@ Release 2.6.1 - UNRELEASED
     HADOOP-11730. Regression: s3n read failure recovery broken.
     (Takenori Sato via stevel)
 
+    HADOOP-11802: DomainSocketWatcher thread terminates sometimes after there
+    is an I/O error during requestShortCircuitShm (cmccabe)
+
 Release 2.6.0 - 2014-11-18
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8770f1f0/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/unix/DomainSocketWatcher.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/unix/DomainSocketWatcher.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/unix/DomainSocketWatcher.java
index 8c617dc..dfd76ea 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/unix/DomainSocketWatcher.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/unix/DomainSocketWatcher.java
@@ -510,8 +510,8 @@ public final class DomainSocketWatcher implements Closeable {
         }
       } catch (InterruptedException e) {
         LOG.info(toString() + " terminating on InterruptedException");
-      } catch (IOException e) {
-        LOG.error(toString() + " terminating on IOException", e);
+      } catch (Throwable e) {
+        LOG.error(toString() + " terminating on exception", e);
       } finally {
         lock.lock();
         try {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8770f1f0/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/net/unix/DomainSocketWatcher.c
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/net/unix/DomainSocketWatcher.c b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/net/unix/DomainSocketWatcher.c
index dbaa4fe..596601b 100644
--- a/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/net/unix/DomainSocketWatcher.c
+++ b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/net/unix/DomainSocketWatcher.c
@@ -111,7 +111,7 @@ JNIEnv *env, jobject obj, jint fd)
   pollfd = &sd->pollfd[sd->used_size];
   sd->used_size++;
   pollfd->fd = fd;
-  pollfd->events = POLLIN;
+  pollfd->events = POLLIN | POLLHUP;
   pollfd->revents = 0;
 }
 
@@ -162,7 +162,10 @@ JNIEnv *env, jobject obj)
       GetLongField(env, obj, fd_set_data_fid);
   used_size = sd->used_size;
   for (i = 0; i < used_size; i++) {
-    if (sd->pollfd[i].revents & POLLIN) {
+    // We check for both POLLIN and POLLHUP, because on some OSes, when a socket
+    // is shutdown(), it sends POLLHUP rather than POLLIN.
+    if ((sd->pollfd[i].revents & POLLIN) ||
+        (sd->pollfd[i].revents & POLLHUP)) {
       num_readable++;
     } else {
       sd->pollfd[i].revents = 0;
@@ -177,7 +180,8 @@ JNIEnv *env, jobject obj)
     }
     j = 0;
     for (i = 0; ((i < used_size) && (j < num_readable)); i++) {
-      if (sd->pollfd[i].revents & POLLIN) {
+      if ((sd->pollfd[i].revents & POLLIN) ||
+          (sd->pollfd[i].revents & POLLHUP)) {
         carr[j] = sd->pollfd[i].fd;
         j++;
         sd->pollfd[i].revents = 0;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8770f1f0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java
index 31ac80b..a2d127f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java
@@ -17,10 +17,12 @@
  */
 package org.apache.hadoop.hdfs.server.datanode;
 
-import com.google.common.annotations.VisibleForTesting;
+import java.io.IOException;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 
+import com.google.common.annotations.VisibleForTesting;
+
 /**
  * Used for injecting faults in DFSClient and DFSOutputStream tests.
  * Calls into this are a no-op in production code. 
@@ -35,4 +37,6 @@ public class DataNodeFaultInjector {
   }
 
   public void getHdfsBlocksMetadata() {}
+
+  public void sendShortCircuitShmResponse() throws IOException {}
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8770f1f0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
index 7485697..f7efff0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
@@ -398,6 +398,7 @@ class DataXceiver extends Receiver implements Runnable {
 
   private void sendShmSuccessResponse(DomainSocket sock, NewShmInfo shmInfo)
       throws IOException {
+    DataNodeFaultInjector.get().sendShortCircuitShmResponse();
     ShortCircuitShmResponseProto.newBuilder().setStatus(SUCCESS).
         setId(PBHelper.convert(shmInfo.shmId)).build().
         writeDelimitedTo(socketOut);
@@ -456,10 +457,19 @@ class DataXceiver extends Receiver implements Runnable {
         }
       }
       if ((!success) && (peer == null)) {
-        // If we failed to pass the shared memory segment to the client,
-        // close the UNIX domain socket now.  This will trigger the 
-        // DomainSocketWatcher callback, cleaning up the segment.
-        IOUtils.cleanup(null, sock);
+        // The socket is now managed by the DomainSocketWatcher.  However,
+        // we failed to pass it to the client.  We call shutdown() on the
+        // UNIX domain socket now.  This will trigger the DomainSocketWatcher
+        // callback.  The callback will close the domain socket.
+        // We don't want to close the socket here, since that might lead to
+        // bad behavior inside the poll() call.  See HADOOP-11802 for details.
+        try {
+          LOG.warn("Failed to send success response back to the client.  " +
+              "Shutting down socket for " + shmInfo.shmId + ".");
+          sock.shutdown();
+        } catch (IOException e) {
+          LOG.warn("Failed to shut down socket in error handler", e);
+        }
       }
       IOUtils.cleanup(null, shmInfo);
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8770f1f0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DfsClientShmManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DfsClientShmManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DfsClientShmManager.java
index 6dbaf84..2544042 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DfsClientShmManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DfsClientShmManager.java
@@ -216,10 +216,11 @@ public class DfsClientShmManager implements Closeable {
      * Must be called with the EndpointShmManager lock held.
      *
      * @param peer          The peer to use to talk to the DataNode.
-     * @param clientName    The client name.
      * @param usedPeer      (out param) Will be set to true if we used the peer.
      *                        When a peer is used
      *
+     * @param clientName    The client name.
+     * @param blockId       The block ID to use.
      * @return              null if the DataNode does not support shared memory
      *                        segments, or experienced an error creating the
      *                        shm.  The shared memory segment itself on success.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8770f1f0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DomainSocketFactory.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DomainSocketFactory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DomainSocketFactory.java
index 5fd31a9..60adb02 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DomainSocketFactory.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DomainSocketFactory.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.concurrent.TimeUnit;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -185,4 +186,9 @@ public class DomainSocketFactory {
   public void disableDomainSocketPath(String path) {
     pathMap.put(path, PathState.UNUSABLE);
   }
+
+  @VisibleForTesting
+  public void clearPathMap() {
+    pathMap.invalidateAll();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8770f1f0/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitCache.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitCache.java
index ad2f176..0558301 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitCache.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitCache.java
@@ -55,6 +55,7 @@ import org.apache.hadoop.hdfs.net.DomainPeer;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
+import org.apache.hadoop.hdfs.server.datanode.DataNodeFaultInjector;
 import org.apache.hadoop.hdfs.server.datanode.ShortCircuitRegistry;
 import org.apache.hadoop.hdfs.server.datanode.ShortCircuitRegistry.RegisteredShm;
 import org.apache.hadoop.hdfs.shortcircuit.DfsClientShmManager.PerDatanodeVisitorInfo;
@@ -73,6 +74,9 @@ import org.apache.hadoop.util.Time;
 import org.junit.Assert;
 import org.junit.Assume;
 import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 
 import com.google.common.base.Preconditions;
 import com.google.common.base.Supplier;
@@ -621,6 +625,18 @@ public class TestShortCircuitCache {
     sockDir.close();
   }
 
+  static private void checkNumberOfSegmentsAndSlots(final int expectedSegments,
+        final int expectedSlots, ShortCircuitRegistry registry) {
+    registry.visit(new ShortCircuitRegistry.Visitor() {
+      @Override
+      public void accept(HashMap<ShmId, RegisteredShm> segments,
+                         HashMultimap<ExtendedBlockId, Slot> slots) {
+        Assert.assertEquals(expectedSegments, segments.size());
+        Assert.assertEquals(expectedSlots, slots.size());
+      }
+    });
+  }
+
   public static class TestCleanupFailureInjector
         extends BlockReaderFactory.FailureInjector {
     @Override
@@ -664,16 +680,67 @@ public class TestShortCircuitCache {
       GenericTestUtils.assertExceptionContains("TCP reads were disabled for " +
           "testing, but we failed to do a non-TCP read.", t);
     }
-    ShortCircuitRegistry registry =
-      cluster.getDataNodes().get(0).getShortCircuitRegistry();
-    registry.visit(new ShortCircuitRegistry.Visitor() {
+    checkNumberOfSegmentsAndSlots(1, 1,
+        cluster.getDataNodes().get(0).getShortCircuitRegistry());
+    cluster.shutdown();
+    sockDir.close();
+  }
+
+  // Regression test for HADOOP-11802
+  @Test(timeout=60000)
+  public void testDataXceiverHandlesRequestShortCircuitShmFailure()
+      throws Exception {
+    BlockReaderTestUtil.enableShortCircuitShmTracing();
+    TemporarySocketDirectory sockDir = new TemporarySocketDirectory();
+    Configuration conf = createShortCircuitConf(
+        "testDataXceiverHandlesRequestShortCircuitShmFailure", sockDir);
+    conf.setLong(DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_EXPIRY_MS_KEY,
+        1000000000L);
+    MiniDFSCluster cluster =
+        new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+    cluster.waitActive();
+    DistributedFileSystem fs = cluster.getFileSystem();
+    final Path TEST_PATH1 = new Path("/test_file1");
+    DFSTestUtil.createFile(fs, TEST_PATH1, 4096,
+        (short)1, 0xFADE1);
+    LOG.info("Setting failure injector and performing a read which " +
+        "should fail...");
+    DataNodeFaultInjector failureInjector = Mockito.mock(DataNodeFaultInjector.class);
+    Mockito.doAnswer(new Answer<Void>() {
       @Override
-      public void accept(HashMap<ShmId, RegisteredShm> segments,
-                         HashMultimap<ExtendedBlockId, Slot> slots) {
-        Assert.assertEquals(1, segments.size());
-        Assert.assertEquals(1, slots.size());
+      public Void answer(InvocationOnMock invocation) throws Throwable {
+        throw new IOException("injected error into sendShmResponse");
       }
-    });
+    }).when(failureInjector).sendShortCircuitShmResponse();
+    DataNodeFaultInjector prevInjector = DataNodeFaultInjector.instance;
+    DataNodeFaultInjector.instance = failureInjector;
+
+    try {
+      // The first read will try to allocate a shared memory segment and slot.
+      // The shared memory segment allocation will fail because of the failure
+      // injector.
+      DFSTestUtil.readFileBuffer(fs, TEST_PATH1);
+      Assert.fail("expected readFileBuffer to fail, but it succeeded.");
+    } catch (Throwable t) {
+      GenericTestUtils.assertExceptionContains("TCP reads were disabled for " +
+          "testing, but we failed to do a non-TCP read.", t);
+    }
+
+    checkNumberOfSegmentsAndSlots(0, 0,
+        cluster.getDataNodes().get(0).getShortCircuitRegistry());
+
+    LOG.info("Clearing failure injector and performing another read...");
+    DataNodeFaultInjector.instance = prevInjector;
+
+    fs.getClient().getClientContext().getDomainSocketFactory().clearPathMap();
+
+    // The second read should succeed.
+    DFSTestUtil.readFileBuffer(fs, TEST_PATH1);
+
+    // We should have added a new short-circuit shared memory segment and slot.
+    checkNumberOfSegmentsAndSlots(1, 1,
+        cluster.getDataNodes().get(0).getShortCircuitRegistry());
+
     cluster.shutdown();
     sockDir.close();
   }