You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by vi...@apache.org on 2015/09/02 23:49:49 UTC
[3/3] hadoop git commit: HADOOP-11802. DomainSocketWatcher thread
terminates sometimes after there is an I/O error during
requestShortCircuitShm (cmccabe)
HADOOP-11802. DomainSocketWatcher thread terminates sometimes after there is an I/O error during requestShortCircuitShm (cmccabe)
(cherry picked from commit a0e0a63209b5eb17dca5cc503be36aa52defeabd)
(cherry picked from commit 788b76761d5dfadf688406d50169e95401fe5d33)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/8770f1f0
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/8770f1f0
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/8770f1f0
Branch: refs/heads/branch-2.6.1
Commit: 8770f1f03952661f49aea65ae6498be676677812
Parents: 181281c
Author: Colin Patrick Mccabe <cm...@cloudera.com>
Authored: Thu Apr 23 18:59:52 2015 -0700
Committer: Vinod Kumar Vavilapalli <vi...@apache.org>
Committed: Wed Sep 2 14:38:26 2015 -0700
----------------------------------------------------------------------
hadoop-common-project/hadoop-common/CHANGES.txt | 3 +
.../hadoop/net/unix/DomainSocketWatcher.java | 4 +-
.../hadoop/net/unix/DomainSocketWatcher.c | 10 ++-
.../server/datanode/DataNodeFaultInjector.java | 6 +-
.../hdfs/server/datanode/DataXceiver.java | 18 ++++-
.../hdfs/shortcircuit/DfsClientShmManager.java | 3 +-
.../hdfs/shortcircuit/DomainSocketFactory.java | 6 ++
.../shortcircuit/TestShortCircuitCache.java | 83 ++++++++++++++++++--
8 files changed, 114 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8770f1f0/hadoop-common-project/hadoop-common/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt
index fb6153f..800cb2a 100644
--- a/hadoop-common-project/hadoop-common/CHANGES.txt
+++ b/hadoop-common-project/hadoop-common/CHANGES.txt
@@ -57,6 +57,9 @@ Release 2.6.1 - UNRELEASED
HADOOP-11730. Regression: s3n read failure recovery broken.
(Takenori Sato via stevel)
+ HADOOP-11802: DomainSocketWatcher thread terminates sometimes after there
+ is an I/O error during requestShortCircuitShm (cmccabe)
+
Release 2.6.0 - 2014-11-18
INCOMPATIBLE CHANGES
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8770f1f0/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/unix/DomainSocketWatcher.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/unix/DomainSocketWatcher.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/unix/DomainSocketWatcher.java
index 8c617dc..dfd76ea 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/unix/DomainSocketWatcher.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/unix/DomainSocketWatcher.java
@@ -510,8 +510,8 @@ public final class DomainSocketWatcher implements Closeable {
}
} catch (InterruptedException e) {
LOG.info(toString() + " terminating on InterruptedException");
- } catch (IOException e) {
- LOG.error(toString() + " terminating on IOException", e);
+ } catch (Throwable e) {
+ LOG.error(toString() + " terminating on exception", e);
} finally {
lock.lock();
try {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8770f1f0/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/net/unix/DomainSocketWatcher.c
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/net/unix/DomainSocketWatcher.c b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/net/unix/DomainSocketWatcher.c
index dbaa4fe..596601b 100644
--- a/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/net/unix/DomainSocketWatcher.c
+++ b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/net/unix/DomainSocketWatcher.c
@@ -111,7 +111,7 @@ JNIEnv *env, jobject obj, jint fd)
pollfd = &sd->pollfd[sd->used_size];
sd->used_size++;
pollfd->fd = fd;
- pollfd->events = POLLIN;
+ pollfd->events = POLLIN | POLLHUP;
pollfd->revents = 0;
}
@@ -162,7 +162,10 @@ JNIEnv *env, jobject obj)
GetLongField(env, obj, fd_set_data_fid);
used_size = sd->used_size;
for (i = 0; i < used_size; i++) {
- if (sd->pollfd[i].revents & POLLIN) {
+ // We check for both POLLIN and POLLHUP, because on some OSes, when a socket
+ // is shutdown(), it sends POLLHUP rather than POLLIN.
+ if ((sd->pollfd[i].revents & POLLIN) ||
+ (sd->pollfd[i].revents & POLLHUP)) {
num_readable++;
} else {
sd->pollfd[i].revents = 0;
@@ -177,7 +180,8 @@ JNIEnv *env, jobject obj)
}
j = 0;
for (i = 0; ((i < used_size) && (j < num_readable)); i++) {
- if (sd->pollfd[i].revents & POLLIN) {
+ if ((sd->pollfd[i].revents & POLLIN) ||
+ (sd->pollfd[i].revents & POLLHUP)) {
carr[j] = sd->pollfd[i].fd;
j++;
sd->pollfd[i].revents = 0;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8770f1f0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java
index 31ac80b..a2d127f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java
@@ -17,10 +17,12 @@
*/
package org.apache.hadoop.hdfs.server.datanode;
-import com.google.common.annotations.VisibleForTesting;
+import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
+import com.google.common.annotations.VisibleForTesting;
+
/**
* Used for injecting faults in DFSClient and DFSOutputStream tests.
* Calls into this are a no-op in production code.
@@ -35,4 +37,6 @@ public class DataNodeFaultInjector {
}
public void getHdfsBlocksMetadata() {}
+
+ public void sendShortCircuitShmResponse() throws IOException {}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8770f1f0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
index 7485697..f7efff0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
@@ -398,6 +398,7 @@ class DataXceiver extends Receiver implements Runnable {
private void sendShmSuccessResponse(DomainSocket sock, NewShmInfo shmInfo)
throws IOException {
+ DataNodeFaultInjector.get().sendShortCircuitShmResponse();
ShortCircuitShmResponseProto.newBuilder().setStatus(SUCCESS).
setId(PBHelper.convert(shmInfo.shmId)).build().
writeDelimitedTo(socketOut);
@@ -456,10 +457,19 @@ class DataXceiver extends Receiver implements Runnable {
}
}
if ((!success) && (peer == null)) {
- // If we failed to pass the shared memory segment to the client,
- // close the UNIX domain socket now. This will trigger the
- // DomainSocketWatcher callback, cleaning up the segment.
- IOUtils.cleanup(null, sock);
+ // The socket is now managed by the DomainSocketWatcher. However,
+ // we failed to pass it to the client. We call shutdown() on the
+ // UNIX domain socket now. This will trigger the DomainSocketWatcher
+ // callback. The callback will close the domain socket.
+ // We don't want to close the socket here, since that might lead to
+ // bad behavior inside the poll() call. See HADOOP-11802 for details.
+ try {
+ LOG.warn("Failed to send success response back to the client. " +
+ "Shutting down socket for " + shmInfo.shmId + ".");
+ sock.shutdown();
+ } catch (IOException e) {
+ LOG.warn("Failed to shut down socket in error handler", e);
+ }
}
IOUtils.cleanup(null, shmInfo);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8770f1f0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DfsClientShmManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DfsClientShmManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DfsClientShmManager.java
index 6dbaf84..2544042 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DfsClientShmManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DfsClientShmManager.java
@@ -216,10 +216,11 @@ public class DfsClientShmManager implements Closeable {
* Must be called with the EndpointShmManager lock held.
*
* @param peer The peer to use to talk to the DataNode.
- * @param clientName The client name.
* @param usedPeer (out param) Will be set to true if we used the peer.
* When a peer is used
*
+ * @param clientName The client name.
+ * @param blockId The block ID to use.
* @return null if the DataNode does not support shared memory
* segments, or experienced an error creating the
* shm. The shared memory segment itself on success.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8770f1f0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DomainSocketFactory.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DomainSocketFactory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DomainSocketFactory.java
index 5fd31a9..60adb02 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DomainSocketFactory.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DomainSocketFactory.java
@@ -21,6 +21,7 @@ import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.concurrent.TimeUnit;
+import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -185,4 +186,9 @@ public class DomainSocketFactory {
public void disableDomainSocketPath(String path) {
pathMap.put(path, PathState.UNUSABLE);
}
+
+ @VisibleForTesting
+ public void clearPathMap() {
+ pathMap.invalidateAll();
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8770f1f0/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitCache.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitCache.java
index ad2f176..0558301 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitCache.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitCache.java
@@ -55,6 +55,7 @@ import org.apache.hadoop.hdfs.net.DomainPeer;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
+import org.apache.hadoop.hdfs.server.datanode.DataNodeFaultInjector;
import org.apache.hadoop.hdfs.server.datanode.ShortCircuitRegistry;
import org.apache.hadoop.hdfs.server.datanode.ShortCircuitRegistry.RegisteredShm;
import org.apache.hadoop.hdfs.shortcircuit.DfsClientShmManager.PerDatanodeVisitorInfo;
@@ -73,6 +74,9 @@ import org.apache.hadoop.util.Time;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
@@ -621,6 +625,18 @@ public class TestShortCircuitCache {
sockDir.close();
}
+ static private void checkNumberOfSegmentsAndSlots(final int expectedSegments,
+ final int expectedSlots, ShortCircuitRegistry registry) {
+ registry.visit(new ShortCircuitRegistry.Visitor() {
+ @Override
+ public void accept(HashMap<ShmId, RegisteredShm> segments,
+ HashMultimap<ExtendedBlockId, Slot> slots) {
+ Assert.assertEquals(expectedSegments, segments.size());
+ Assert.assertEquals(expectedSlots, slots.size());
+ }
+ });
+ }
+
public static class TestCleanupFailureInjector
extends BlockReaderFactory.FailureInjector {
@Override
@@ -664,16 +680,67 @@ public class TestShortCircuitCache {
GenericTestUtils.assertExceptionContains("TCP reads were disabled for " +
"testing, but we failed to do a non-TCP read.", t);
}
- ShortCircuitRegistry registry =
- cluster.getDataNodes().get(0).getShortCircuitRegistry();
- registry.visit(new ShortCircuitRegistry.Visitor() {
+ checkNumberOfSegmentsAndSlots(1, 1,
+ cluster.getDataNodes().get(0).getShortCircuitRegistry());
+ cluster.shutdown();
+ sockDir.close();
+ }
+
+ // Regression test for HADOOP-11802
+ @Test(timeout=60000)
+ public void testDataXceiverHandlesRequestShortCircuitShmFailure()
+ throws Exception {
+ BlockReaderTestUtil.enableShortCircuitShmTracing();
+ TemporarySocketDirectory sockDir = new TemporarySocketDirectory();
+ Configuration conf = createShortCircuitConf(
+ "testDataXceiverHandlesRequestShortCircuitShmFailure", sockDir);
+ conf.setLong(DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_EXPIRY_MS_KEY,
+ 1000000000L);
+ MiniDFSCluster cluster =
+ new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+ cluster.waitActive();
+ DistributedFileSystem fs = cluster.getFileSystem();
+ final Path TEST_PATH1 = new Path("/test_file1");
+ DFSTestUtil.createFile(fs, TEST_PATH1, 4096,
+ (short)1, 0xFADE1);
+ LOG.info("Setting failure injector and performing a read which " +
+ "should fail...");
+ DataNodeFaultInjector failureInjector = Mockito.mock(DataNodeFaultInjector.class);
+ Mockito.doAnswer(new Answer<Void>() {
@Override
- public void accept(HashMap<ShmId, RegisteredShm> segments,
- HashMultimap<ExtendedBlockId, Slot> slots) {
- Assert.assertEquals(1, segments.size());
- Assert.assertEquals(1, slots.size());
+ public Void answer(InvocationOnMock invocation) throws Throwable {
+ throw new IOException("injected error into sendShmResponse");
}
- });
+ }).when(failureInjector).sendShortCircuitShmResponse();
+ DataNodeFaultInjector prevInjector = DataNodeFaultInjector.instance;
+ DataNodeFaultInjector.instance = failureInjector;
+
+ try {
+ // The first read will try to allocate a shared memory segment and slot.
+ // The shared memory segment allocation will fail because of the failure
+ // injector.
+ DFSTestUtil.readFileBuffer(fs, TEST_PATH1);
+ Assert.fail("expected readFileBuffer to fail, but it succeeded.");
+ } catch (Throwable t) {
+ GenericTestUtils.assertExceptionContains("TCP reads were disabled for " +
+ "testing, but we failed to do a non-TCP read.", t);
+ }
+
+ checkNumberOfSegmentsAndSlots(0, 0,
+ cluster.getDataNodes().get(0).getShortCircuitRegistry());
+
+ LOG.info("Clearing failure injector and performing another read...");
+ DataNodeFaultInjector.instance = prevInjector;
+
+ fs.getClient().getClientContext().getDomainSocketFactory().clearPathMap();
+
+ // The second read should succeed.
+ DFSTestUtil.readFileBuffer(fs, TEST_PATH1);
+
+ // We should have added a new short-circuit shared memory segment and slot.
+ checkNumberOfSegmentsAndSlots(1, 1,
+ cluster.getDataNodes().get(0).getShortCircuitRegistry());
+
cluster.shutdown();
sockDir.close();
}