You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by we...@apache.org on 2022/04/18 02:32:39 UTC
[hadoop] branch trunk updated: HDFS-16535. SlotReleaser should reuse the domain socket based on socket paths (#4158)
This is an automated email from the ASF dual-hosted git repository.
weichiu pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new 35d4c02bccd HDFS-16535. SlotReleaser should reuse the domain socket based on socket paths (#4158)
35d4c02bccd is described below
commit 35d4c02bccd303b5a2608a952fa1f8487001087b
Author: Quanlong Huang <hu...@126.com>
AuthorDate: Mon Apr 18 10:32:29 2022 +0800
HDFS-16535. SlotReleaser should reuse the domain socket based on socket paths (#4158)
Reviewed-by: Lisheng Sun <su...@apache.org>
---
.../hdfs/shortcircuit/ShortCircuitCache.java | 25 ++++---
.../hdfs/shortcircuit/TestShortCircuitCache.java | 78 ++++++++++++++++++++++
2 files changed, 95 insertions(+), 8 deletions(-)
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitCache.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitCache.java
index a950388a312..df2a92c75c9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitCache.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitCache.java
@@ -189,6 +189,7 @@ public class ShortCircuitCache implements Closeable {
final DfsClientShm shm = (DfsClientShm)slot.getShm();
final DomainSocket shmSock = shm.getPeer().getDomainSocket();
final String path = shmSock.getPath();
+ DomainSocket domainSocket = pathToDomainSocket.get(path);
DataOutputStream out = null;
boolean success = false;
int retries = 2;
@@ -196,9 +197,10 @@ public class ShortCircuitCache implements Closeable {
while (retries > 0) {
try {
if (domainSocket == null || !domainSocket.isOpen()) {
- // we are running in single thread mode, no protection needed for
- // domainSocket
domainSocket = DomainSocket.connect(path);
+ // we are running in single thread mode, no protection needed for
+ // pathToDomainSocket
+ pathToDomainSocket.put(path, domainSocket);
}
out = new DataOutputStream(
@@ -221,13 +223,16 @@ public class ShortCircuitCache implements Closeable {
} catch (SocketException se) {
// the domain socket on datanode may be timed out, we retry once
retries--;
- domainSocket.close();
- domainSocket = null;
+ if (domainSocket != null) {
+ domainSocket.close();
+ domainSocket = null;
+ pathToDomainSocket.remove(path);
+ }
if (retries == 0) {
throw new SocketException("Create domain socket failed");
}
}
- }
+ } // end of while block
} catch (IOException e) {
LOG.warn(ShortCircuitCache.this + ": failed to release "
+ "short-circuit shared memory slot " + slot + " by sending "
@@ -240,10 +245,10 @@ public class ShortCircuitCache implements Closeable {
} else {
shm.getEndpointShmManager().shutdown(shm);
IOUtilsClient.cleanupWithLogger(LOG, domainSocket, out);
- domainSocket = null;
+ pathToDomainSocket.remove(path);
}
}
- }
+ } // end of run()
}
public interface ShortCircuitReplicaCreator {
@@ -354,7 +359,11 @@ public class ShortCircuitCache implements Closeable {
*/
private final DfsClientShmManager shmManager;
- private DomainSocket domainSocket = null;
+ /**
+ * A map contains all DomainSockets used in SlotReleaser. Keys are the domain socket
+ * paths of short-circuit shared memory segments.
+ */
+ private Map<String, DomainSocket> pathToDomainSocket = new HashMap<>();
public static ShortCircuitCache fromConf(ShortCircuitConf conf) {
return new ShortCircuitCache(
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitCache.java
index 9754da33483..965ac0ac98b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitCache.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitCache.java
@@ -63,6 +63,7 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo.DatanodeInfoBuilder;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeFaultInjector;
import org.apache.hadoop.hdfs.server.datanode.ShortCircuitRegistry;
import org.apache.hadoop.hdfs.server.datanode.ShortCircuitRegistry.RegisteredShm;
@@ -957,6 +958,83 @@ public class TestShortCircuitCache {
}
}
+ // Regression test for HDFS-16535
+ @Test(timeout = 60000)
+ public void testDomainSocketClosedByMultipleDNs() throws Exception {
+ TemporarySocketDirectory sockDir = new TemporarySocketDirectory();
+ String testName = "testDomainSocketClosedByMultipleDNs";
+ Configuration conf = createShortCircuitConf(testName, sockDir);
+ conf.set(DFS_DOMAIN_SOCKET_PATH_KEY, new File(sockDir.getDir(),
+ testName + "._PORT").getAbsolutePath());
+ MiniDFSCluster cluster =
+ new MiniDFSCluster.Builder(conf).numDataNodes(2).build();
+
+ try {
+ cluster.waitActive();
+ DistributedFileSystem fs = cluster.getFileSystem();
+ final ShortCircuitCache cache =
+ fs.getClient().getClientContext().getShortCircuitCache();
+
+ ExtendedBlockId blockId0 = new ExtendedBlockId(123, "xyz");
+ ExtendedBlockId blockId1 = new ExtendedBlockId(456, "xyz");
+
+ DataNode dn0 = cluster.getDataNodes().get(0);
+ DataNode dn1 = cluster.getDataNodes().get(1);
+
+ DomainPeer peer0 = new DomainPeer(DomainSocket.connect(new File(
+ sockDir.getDir(), testName + "." + dn0.getXferPort()).getAbsolutePath()));
+ DomainPeer peer1 = new DomainPeer(DomainSocket.connect(new File(
+ sockDir.getDir(), testName + "." + dn1.getXferPort()).getAbsolutePath()));
+
+ final DatanodeInfo dnInfo0 = new DatanodeInfo.DatanodeInfoBuilder()
+ .setNodeID(dn0.getDatanodeId()).build();
+ final DatanodeInfo dnInfo1 = new DatanodeInfo.DatanodeInfoBuilder()
+ .setNodeID(dn1.getDatanodeId()).build();
+
+ // Allocate 2 shm slots from DataNode-0
+ MutableBoolean usedPeer = new MutableBoolean(false);
+ Slot slot1 = cache.allocShmSlot(dnInfo0, peer0, usedPeer, blockId0,
+ "testDomainSocketClosedByMultipleDNs_client");
+ dn0.getShortCircuitRegistry()
+ .registerSlot(blockId0, slot1.getSlotId(), false);
+
+ Slot slot2 = cache.allocShmSlot(dnInfo0, peer0, usedPeer, blockId0,
+ "testDomainSocketClosedByMultipleDNs_client");
+ dn0.getShortCircuitRegistry()
+ .registerSlot(blockId0, slot2.getSlotId(), false);
+
+ // Allocate 1 shm slot from DataNode-1
+ Slot slot3 = cache.allocShmSlot(dnInfo1, peer1, usedPeer, blockId1,
+ "testDomainSocketClosedByMultipleDNs_client");
+ dn1.getShortCircuitRegistry()
+ .registerSlot(blockId1, slot3.getSlotId(), false);
+
+ Assert.assertEquals(2, cache.getDfsClientShmManager().getShmNum());
+ Assert.assertEquals(1, dn0.getShortCircuitRegistry().getShmNum());
+ Assert.assertEquals(1, dn1.getShortCircuitRegistry().getShmNum());
+
+ // Release the slot of DataNode-1 first.
+ cache.scheduleSlotReleaser(slot3);
+ Thread.sleep(2000);
+ Assert.assertEquals(1, cache.getDfsClientShmManager().getShmNum());
+
+ // Release the slots of DataNode-0.
+ cache.scheduleSlotReleaser(slot1);
+ Thread.sleep(2000);
+ Assert.assertEquals("0 ShmNum means the shm of DataNode-0 is shutdown" +
+ " due to slot release failures.",
+ 1, cache.getDfsClientShmManager().getShmNum());
+ cache.scheduleSlotReleaser(slot2);
+ Thread.sleep(2000);
+
+ Assert.assertEquals(0, dn0.getShortCircuitRegistry().getShmNum());
+ Assert.assertEquals(0, dn1.getShortCircuitRegistry().getShmNum());
+ Assert.assertEquals(0, cache.getDfsClientShmManager().getShmNum());
+ } finally {
+ cluster.shutdown();
+ }
+ }
+
@Test(timeout = 60000)
public void testDNRestart() throws Exception {
TemporarySocketDirectory sockDir = new TemporarySocketDirectory();
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org