You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by he...@apache.org on 2022/03/31 06:01:13 UTC
[hadoop] branch trunk updated: HDFS-16511. Improve lock type for ReplicaMap under fine-grain lock mode. (#4085). Contributed by limingxiang.
This is an automated email from the ASF dual-hosted git repository.
hexiaoqiao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new 2bf78e2 HDFS-16511. Improve lock type for ReplicaMap under fine-grain lock mode. (#4085). Contributed by limingxiang.
2bf78e2 is described below
commit 2bf78e2416875c8f82f485b457420fafd42aa977
Author: He Xiaoqiao <he...@apache.org>
AuthorDate: Thu Mar 31 14:00:38 2022 +0800
HDFS-16511. Improve lock type for ReplicaMap under fine-grain lock mode. (#4085). Contributed by limingxiang.
Signed-off-by: He Xiaoqiao <he...@apache.org>
---
.../server/datanode/fsdataset/impl/ReplicaMap.java | 16 +++----
.../datanode/fsdataset/impl/TestFsDatasetImpl.java | 51 ++++++++++++++++++++++
2 files changed, 59 insertions(+), 8 deletions(-)
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ReplicaMap.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ReplicaMap.java
index 25d302b..6ecc48a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ReplicaMap.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ReplicaMap.java
@@ -120,12 +120,12 @@ class ReplicaMap {
ReplicaInfo add(String bpid, ReplicaInfo replicaInfo) {
checkBlockPool(bpid);
checkBlock(replicaInfo);
- try (AutoCloseDataSetLock l = lockManager.writeLock(LockLevel.BLOCK_POOl, bpid)) {
+ try (AutoCloseDataSetLock l = lockManager.readLock(LockLevel.BLOCK_POOl, bpid)) {
LightWeightResizableGSet<Block, ReplicaInfo> m = map.get(bpid);
if (m == null) {
// Add an entry for block pool if it does not exist already
- m = new LightWeightResizableGSet<Block, ReplicaInfo>();
- map.put(bpid, m);
+ map.putIfAbsent(bpid, new LightWeightResizableGSet<Block, ReplicaInfo>());
+ m = map.get(bpid);
}
return m.put(replicaInfo);
}
@@ -138,12 +138,12 @@ class ReplicaMap {
ReplicaInfo addAndGet(String bpid, ReplicaInfo replicaInfo) {
checkBlockPool(bpid);
checkBlock(replicaInfo);
- try (AutoCloseDataSetLock l = lockManager.writeLock(LockLevel.BLOCK_POOl, bpid)) {
+ try (AutoCloseDataSetLock l = lockManager.readLock(LockLevel.BLOCK_POOl, bpid)) {
LightWeightResizableGSet<Block, ReplicaInfo> m = map.get(bpid);
if (m == null) {
// Add an entry for block pool if it does not exist already
- m = new LightWeightResizableGSet<Block, ReplicaInfo>();
- map.put(bpid, m);
+ map.putIfAbsent(bpid, new LightWeightResizableGSet<Block, ReplicaInfo>());
+ m = map.get(bpid);
}
ReplicaInfo oldReplicaInfo = m.get(replicaInfo);
if (oldReplicaInfo != null) {
@@ -202,7 +202,7 @@ class ReplicaMap {
ReplicaInfo remove(String bpid, Block block) {
checkBlockPool(bpid);
checkBlock(block);
- try (AutoCloseDataSetLock l = lockManager.writeLock(LockLevel.BLOCK_POOl, bpid)) {
+ try (AutoCloseDataSetLock l = lockManager.readLock(LockLevel.BLOCK_POOl, bpid)) {
LightWeightResizableGSet<Block, ReplicaInfo> m = map.get(bpid);
if (m != null) {
ReplicaInfo replicaInfo = m.get(block);
@@ -224,7 +224,7 @@ class ReplicaMap {
*/
ReplicaInfo remove(String bpid, long blockId) {
checkBlockPool(bpid);
- try (AutoCloseDataSetLock l = lockManager.writeLock(LockLevel.BLOCK_POOl, bpid)) {
+ try (AutoCloseDataSetLock l = lockManager.readLock(LockLevel.BLOCK_POOl, bpid)) {
LightWeightResizableGSet<Block, ReplicaInfo> m = map.get(bpid);
if (m != null) {
return m.remove(new Block(blockId));
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
index 23a72f9..f250eea 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
@@ -21,6 +21,10 @@ import java.io.InputStream;
import java.io.OutputStream;
import java.nio.file.Files;
import java.nio.file.Paths;
+import java.util.Random;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import java.util.function.Supplier;
import org.apache.hadoop.fs.DF;
@@ -602,6 +606,53 @@ public class TestFsDatasetImpl {
+ "volumeMap.", 0, totalNumReplicas);
}
+ @Test(timeout = 30000)
+ public void testConcurrentWriteAndDeleteBlock() throws Exception {
+ // Feed FsDataset with block metadata.
+ final int numBlocks = 1000;
+ final int threadCount = 10;
+ // Generate data blocks.
+ ExecutorService pool = Executors.newFixedThreadPool(threadCount);
+ List<Future<?>> futureList = new ArrayList<>();
+ Random random = new Random();
+ // Random write block and delete half of them.
+ for (int i = 0; i < threadCount; i++) {
+ Thread thread = new Thread() {
+ @Override
+ public void run() {
+ try {
+ String bpid = BLOCK_POOL_IDS[random.nextInt(BLOCK_POOL_IDS.length)];
+ for (int blockId = 0; blockId < numBlocks; blockId++) {
+ ExtendedBlock eb = new ExtendedBlock(bpid, blockId);
+ ReplicaHandler replica = null;
+ try {
+ replica = dataset.createRbw(StorageType.DEFAULT, null, eb,
+ false);
+ if (blockId % 2 > 0) {
+ dataset.invalidate(bpid, new Block[]{eb.getLocalBlock()});
+ }
+ } finally {
+ if (replica != null) {
+ replica.close();
+ }
+ }
+ }
+ // Just keep final consistency no need to care exception.
+ } catch (Exception ignore) {}
+ }
+ };
+ thread.setName("AddBlock" + i);
+ futureList.add(pool.submit(thread));
+ }
+ // Wait for data generation
+ for (Future<?> f : futureList) {
+ f.get();
+ }
+ for (String bpid : dataset.volumeMap.getBlockPoolList()) {
+ assertEquals(numBlocks / 2, dataset.volumeMap.size(bpid));
+ }
+ }
+
@Test(timeout = 5000)
public void testRemoveNewlyAddedVolume() throws IOException {
final int numExistingVolumes = getNumVolumes();
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org