You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by we...@apache.org on 2022/04/18 02:32:39 UTC

[hadoop] branch trunk updated: HDFS-16535. SlotReleaser should reuse the domain socket based on socket paths (#4158)

This is an automated email from the ASF dual-hosted git repository.

weichiu pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 35d4c02bccd HDFS-16535. SlotReleaser should reuse the domain socket based on socket paths (#4158)
35d4c02bccd is described below

commit 35d4c02bccd303b5a2608a952fa1f8487001087b
Author: Quanlong Huang <hu...@126.com>
AuthorDate: Mon Apr 18 10:32:29 2022 +0800

    HDFS-16535. SlotReleaser should reuse the domain socket based on socket paths (#4158)
    
    Reviewed-by: Lisheng Sun <su...@apache.org>
---
 .../hdfs/shortcircuit/ShortCircuitCache.java       | 25 ++++---
 .../hdfs/shortcircuit/TestShortCircuitCache.java   | 78 ++++++++++++++++++++++
 2 files changed, 95 insertions(+), 8 deletions(-)

diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitCache.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitCache.java
index a950388a312..df2a92c75c9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitCache.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitCache.java
@@ -189,6 +189,7 @@ public class ShortCircuitCache implements Closeable {
       final DfsClientShm shm = (DfsClientShm)slot.getShm();
       final DomainSocket shmSock = shm.getPeer().getDomainSocket();
       final String path = shmSock.getPath();
+      DomainSocket domainSocket = pathToDomainSocket.get(path);
       DataOutputStream out = null;
       boolean success = false;
       int retries = 2;
@@ -196,9 +197,10 @@ public class ShortCircuitCache implements Closeable {
         while (retries > 0) {
           try {
             if (domainSocket == null || !domainSocket.isOpen()) {
-              // we are running in single thread mode, no protection needed for
-              // domainSocket
               domainSocket = DomainSocket.connect(path);
+              // we are running in single thread mode, no protection needed for
+              // pathToDomainSocket
+              pathToDomainSocket.put(path, domainSocket);
             }
 
             out = new DataOutputStream(
@@ -221,13 +223,16 @@ public class ShortCircuitCache implements Closeable {
           } catch (SocketException se) {
             // the domain socket on datanode may be timed out, we retry once
             retries--;
-            domainSocket.close();
-            domainSocket = null;
+            if (domainSocket != null) {
+              domainSocket.close();
+              domainSocket = null;
+              pathToDomainSocket.remove(path);
+            }
             if (retries == 0) {
               throw new SocketException("Create domain socket failed");
             }
           }
-        }
+        } // end of while block
       } catch (IOException e) {
         LOG.warn(ShortCircuitCache.this + ": failed to release "
             + "short-circuit shared memory slot " + slot + " by sending "
@@ -240,10 +245,10 @@ public class ShortCircuitCache implements Closeable {
         } else {
           shm.getEndpointShmManager().shutdown(shm);
           IOUtilsClient.cleanupWithLogger(LOG, domainSocket, out);
-          domainSocket = null;
+          pathToDomainSocket.remove(path);
         }
       }
-    }
+    } // end of run()
   }
 
   public interface ShortCircuitReplicaCreator {
@@ -354,7 +359,11 @@ public class ShortCircuitCache implements Closeable {
    */
   private final DfsClientShmManager shmManager;
 
-  private DomainSocket domainSocket = null;
+  /**
+   * A map contains all DomainSockets used in SlotReleaser. Keys are the domain socket
+   * paths of short-circuit shared memory segments.
+   */
+  private Map<String, DomainSocket> pathToDomainSocket = new HashMap<>();
 
   public static ShortCircuitCache fromConf(ShortCircuitConf conf) {
     return new ShortCircuitCache(
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitCache.java
index 9754da33483..965ac0ac98b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitCache.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitCache.java
@@ -63,6 +63,7 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo.DatanodeInfoBuilder;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.datanode.DataNodeFaultInjector;
 import org.apache.hadoop.hdfs.server.datanode.ShortCircuitRegistry;
 import org.apache.hadoop.hdfs.server.datanode.ShortCircuitRegistry.RegisteredShm;
@@ -957,6 +958,83 @@ public class TestShortCircuitCache {
     }
   }
 
+  // Regression test for HDFS-16535
+  @Test(timeout = 60000)
+  public void testDomainSocketClosedByMultipleDNs() throws Exception {
+    TemporarySocketDirectory sockDir = new TemporarySocketDirectory();
+    String testName = "testDomainSocketClosedByMultipleDNs";
+    Configuration conf = createShortCircuitConf(testName, sockDir);
+    conf.set(DFS_DOMAIN_SOCKET_PATH_KEY, new File(sockDir.getDir(),
+        testName + "._PORT").getAbsolutePath());
+    MiniDFSCluster cluster =
+        new MiniDFSCluster.Builder(conf).numDataNodes(2).build();
+
+    try {
+      cluster.waitActive();
+      DistributedFileSystem fs = cluster.getFileSystem();
+      final ShortCircuitCache cache =
+          fs.getClient().getClientContext().getShortCircuitCache();
+
+      ExtendedBlockId blockId0 = new ExtendedBlockId(123, "xyz");
+      ExtendedBlockId blockId1 = new ExtendedBlockId(456, "xyz");
+
+      DataNode dn0 = cluster.getDataNodes().get(0);
+      DataNode dn1 = cluster.getDataNodes().get(1);
+
+      DomainPeer peer0 = new DomainPeer(DomainSocket.connect(new File(
+          sockDir.getDir(), testName + "." + dn0.getXferPort()).getAbsolutePath()));
+      DomainPeer peer1 = new DomainPeer(DomainSocket.connect(new File(
+          sockDir.getDir(), testName + "." + dn1.getXferPort()).getAbsolutePath()));
+
+      final DatanodeInfo dnInfo0 = new DatanodeInfo.DatanodeInfoBuilder()
+          .setNodeID(dn0.getDatanodeId()).build();
+      final DatanodeInfo dnInfo1 = new DatanodeInfo.DatanodeInfoBuilder()
+          .setNodeID(dn1.getDatanodeId()).build();
+
+      // Allocate 2 shm slots from DataNode-0
+      MutableBoolean usedPeer = new MutableBoolean(false);
+      Slot slot1 = cache.allocShmSlot(dnInfo0, peer0, usedPeer, blockId0,
+          "testDomainSocketClosedByMultipleDNs_client");
+      dn0.getShortCircuitRegistry()
+          .registerSlot(blockId0, slot1.getSlotId(), false);
+
+      Slot slot2 = cache.allocShmSlot(dnInfo0, peer0, usedPeer, blockId0,
+          "testDomainSocketClosedByMultipleDNs_client");
+      dn0.getShortCircuitRegistry()
+          .registerSlot(blockId0, slot2.getSlotId(), false);
+
+      // Allocate 1 shm slot from DataNode-1
+      Slot slot3 = cache.allocShmSlot(dnInfo1, peer1, usedPeer, blockId1,
+          "testDomainSocketClosedByMultipleDNs_client");
+      dn1.getShortCircuitRegistry()
+          .registerSlot(blockId1, slot3.getSlotId(), false);
+
+      Assert.assertEquals(2, cache.getDfsClientShmManager().getShmNum());
+      Assert.assertEquals(1, dn0.getShortCircuitRegistry().getShmNum());
+      Assert.assertEquals(1, dn1.getShortCircuitRegistry().getShmNum());
+
+      // Release the slot of DataNode-1 first.
+      cache.scheduleSlotReleaser(slot3);
+      Thread.sleep(2000);
+      Assert.assertEquals(1, cache.getDfsClientShmManager().getShmNum());
+
+      // Release the slots of DataNode-0.
+      cache.scheduleSlotReleaser(slot1);
+      Thread.sleep(2000);
+      Assert.assertEquals("0 ShmNum means the shm of DataNode-0 is shutdown" +
+              " due to slot release failures.",
+          1, cache.getDfsClientShmManager().getShmNum());
+      cache.scheduleSlotReleaser(slot2);
+      Thread.sleep(2000);
+
+      Assert.assertEquals(0, dn0.getShortCircuitRegistry().getShmNum());
+      Assert.assertEquals(0, dn1.getShortCircuitRegistry().getShmNum());
+      Assert.assertEquals(0, cache.getDfsClientShmManager().getShmNum());
+    } finally {
+      cluster.shutdown();
+    }
+  }
+
   @Test(timeout = 60000)
   public void testDNRestart() throws Exception {
     TemporarySocketDirectory sockDir = new TemporarySocketDirectory();


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org