You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by he...@apache.org on 2022/03/31 06:01:13 UTC

[hadoop] branch trunk updated: HDFS-16511. Improve lock type for ReplicaMap under fine-grain lock mode. (#4085). Contributed by limingxiang.

This is an automated email from the ASF dual-hosted git repository.

hexiaoqiao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 2bf78e2  HDFS-16511. Improve lock type for ReplicaMap under fine-grain lock mode. (#4085). Contributed by limingxiang.
2bf78e2 is described below

commit 2bf78e2416875c8f82f485b457420fafd42aa977
Author: He Xiaoqiao <he...@apache.org>
AuthorDate: Thu Mar 31 14:00:38 2022 +0800

    HDFS-16511. Improve lock type for ReplicaMap under fine-grain lock mode. (#4085). Contributed by limingxiang.
    
    Signed-off-by: He Xiaoqiao <he...@apache.org>
---
 .../server/datanode/fsdataset/impl/ReplicaMap.java | 16 +++----
 .../datanode/fsdataset/impl/TestFsDatasetImpl.java | 51 ++++++++++++++++++++++
 2 files changed, 59 insertions(+), 8 deletions(-)

diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ReplicaMap.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ReplicaMap.java
index 25d302b..6ecc48a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ReplicaMap.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ReplicaMap.java
@@ -120,12 +120,12 @@ class ReplicaMap {
   ReplicaInfo add(String bpid, ReplicaInfo replicaInfo) {
     checkBlockPool(bpid);
     checkBlock(replicaInfo);
-    try (AutoCloseDataSetLock l = lockManager.writeLock(LockLevel.BLOCK_POOl, bpid)) {
+    try (AutoCloseDataSetLock l = lockManager.readLock(LockLevel.BLOCK_POOl, bpid)) {
       LightWeightResizableGSet<Block, ReplicaInfo> m = map.get(bpid);
       if (m == null) {
         // Add an entry for block pool if it does not exist already
-        m = new LightWeightResizableGSet<Block, ReplicaInfo>();
-        map.put(bpid, m);
+        map.putIfAbsent(bpid, new LightWeightResizableGSet<Block, ReplicaInfo>());
+        m = map.get(bpid);
       }
       return  m.put(replicaInfo);
     }
@@ -138,12 +138,12 @@ class ReplicaMap {
   ReplicaInfo addAndGet(String bpid, ReplicaInfo replicaInfo) {
     checkBlockPool(bpid);
     checkBlock(replicaInfo);
-    try (AutoCloseDataSetLock l = lockManager.writeLock(LockLevel.BLOCK_POOl, bpid)) {
+    try (AutoCloseDataSetLock l = lockManager.readLock(LockLevel.BLOCK_POOl, bpid)) {
       LightWeightResizableGSet<Block, ReplicaInfo> m = map.get(bpid);
       if (m == null) {
         // Add an entry for block pool if it does not exist already
-        m = new LightWeightResizableGSet<Block, ReplicaInfo>();
-        map.put(bpid, m);
+        map.putIfAbsent(bpid, new LightWeightResizableGSet<Block, ReplicaInfo>());
+        m = map.get(bpid);
       }
       ReplicaInfo oldReplicaInfo = m.get(replicaInfo);
       if (oldReplicaInfo != null) {
@@ -202,7 +202,7 @@ class ReplicaMap {
   ReplicaInfo remove(String bpid, Block block) {
     checkBlockPool(bpid);
     checkBlock(block);
-    try (AutoCloseDataSetLock l = lockManager.writeLock(LockLevel.BLOCK_POOl, bpid)) {
+    try (AutoCloseDataSetLock l = lockManager.readLock(LockLevel.BLOCK_POOl, bpid)) {
       LightWeightResizableGSet<Block, ReplicaInfo> m = map.get(bpid);
       if (m != null) {
         ReplicaInfo replicaInfo = m.get(block);
@@ -224,7 +224,7 @@ class ReplicaMap {
    */
   ReplicaInfo remove(String bpid, long blockId) {
     checkBlockPool(bpid);
-    try (AutoCloseDataSetLock l = lockManager.writeLock(LockLevel.BLOCK_POOl, bpid)) {
+    try (AutoCloseDataSetLock l = lockManager.readLock(LockLevel.BLOCK_POOl, bpid)) {
       LightWeightResizableGSet<Block, ReplicaInfo> m = map.get(bpid);
       if (m != null) {
         return m.remove(new Block(blockId));
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
index 23a72f9..f250eea 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
@@ -21,6 +21,10 @@ import java.io.InputStream;
 import java.io.OutputStream;
 import java.nio.file.Files;
 import java.nio.file.Paths;
+import java.util.Random;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.function.Supplier;
 
 import org.apache.hadoop.fs.DF;
@@ -602,6 +606,53 @@ public class TestFsDatasetImpl {
         + "volumeMap.", 0, totalNumReplicas);
   }
 
+  @Test(timeout = 30000)
+  public void testConcurrentWriteAndDeleteBlock() throws Exception {
+    // Feed FsDataset with block metadata.
+    final int numBlocks = 1000;
+    final int threadCount = 10;
+    // Generate data blocks.
+    ExecutorService pool = Executors.newFixedThreadPool(threadCount);
+    List<Future<?>> futureList = new ArrayList<>();
+    Random random = new Random();
+    // Random write block and delete half of them.
+    for (int i = 0; i < threadCount; i++) {
+      Thread thread = new Thread() {
+        @Override
+        public void run() {
+          try {
+            String bpid = BLOCK_POOL_IDS[random.nextInt(BLOCK_POOL_IDS.length)];
+            for (int blockId = 0; blockId < numBlocks; blockId++) {
+              ExtendedBlock eb = new ExtendedBlock(bpid, blockId);
+              ReplicaHandler replica = null;
+              try {
+                replica = dataset.createRbw(StorageType.DEFAULT, null, eb,
+                    false);
+                if (blockId % 2 > 0) {
+                  dataset.invalidate(bpid, new Block[]{eb.getLocalBlock()});
+                }
+              } finally {
+                if (replica != null) {
+                  replica.close();
+                }
+              }
+            }
+            // Just keep final consistency no need to care exception.
+          } catch (Exception ignore) {}
+        }
+      };
+      thread.setName("AddBlock" + i);
+      futureList.add(pool.submit(thread));
+    }
+    // Wait for data generation
+    for (Future<?> f : futureList) {
+      f.get();
+    }
+    for (String bpid : dataset.volumeMap.getBlockPoolList()) {
+      assertEquals(numBlocks / 2, dataset.volumeMap.size(bpid));
+    }
+  }
+
   @Test(timeout = 5000)
   public void testRemoveNewlyAddedVolume() throws IOException {
     final int numExistingVolumes = getNumVolumes();

---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org