You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by he...@apache.org on 2021/09/14 06:19:31 UTC

[hadoop] branch branch-3.2 updated (318bc5f -> a3920e8)

This is an automated email from the ASF dual-hosted git repository.

hexiaoqiao pushed a change to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/hadoop.git.


    from 318bc5f   HDFS-15160. ReplicaMap, Disk Balancer, Directory Scanner and various FsDatasetImpl methods should use datanode readlock. (#3200)
     new ca182b6  Revert " HDFS-15160. ReplicaMap, Disk Balancer, Directory Scanner and various FsDatasetImpl methods should use datanode readlock. (#3200)"
     new 7039433  HDFS-15150. Introduce read write lock to Datanode. Contributed Stephen O'Donnell.
     new ad55d5d  HDFS-15160. ReplicaMap, Disk Balancer, Directory Scanner and various FsDatasetImpl methods should use datanode readlock. Contributed by Stephen O'Donnell.
     new 33c60ba  HDFS-15457. TestFsDatasetImpl fails intermittently (#2407)
     new 3c03ade  HDFS-15818. Fix TestFsDatasetImpl.testReadLockCanBeDisabledByConfig. Contributed by Leon Gao (#2679)
     new a3920e8  HDFS-15160. amend to fix javac error supressing unchecked warning

The 6 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:

---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[hadoop] 05/06: HDFS-15818. Fix TestFsDatasetImpl.testReadLockCanBeDisabledByConfig. Contributed by Leon Gao (#2679)

Posted by he...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hexiaoqiao pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/hadoop.git

commit 3c03ade285dd68cef10ed979cb2378ae40d200c7
Author: LeonGao <li...@uber.com>
AuthorDate: Tue Feb 9 02:49:28 2021 -0800

    HDFS-15818. Fix TestFsDatasetImpl.testReadLockCanBeDisabledByConfig. Contributed by Leon Gao (#2679)
    
    (cherry picked from commit 9434c1eccc255a25ea5e11f6d8c9e1f83996d6b4)
---
 .../datanode/fsdataset/impl/TestFsDatasetImpl.java | 50 +++++++++++-----------
 1 file changed, 25 insertions(+), 25 deletions(-)

diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
index 5055c66..7809a2d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
@@ -17,8 +17,7 @@
  */
 package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
 
-import java.util.Arrays;
-import java.util.Collections;
+import java.util.concurrent.TimeoutException;
 import java.util.function.Supplier;
 import com.google.common.collect.Lists;
 
@@ -240,11 +239,12 @@ public class TestFsDatasetImpl {
     waiter.join();
     // The holder thread is still holding the lock, but the waiter can still
     // run as the lock is a shared read lock.
+    // Otherwise test will timeout with deadlock.
     assertEquals(true, accessed.get());
     holder.interrupt();
   }
 
-  @Test(timeout=10000)
+  @Test(timeout=20000)
   public void testReadLockCanBeDisabledByConfig()
       throws Exception {
     HdfsConfiguration conf = new HdfsConfiguration();
@@ -253,29 +253,20 @@ public class TestFsDatasetImpl {
     MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
         .numDataNodes(1).build();
     try {
+      AtomicBoolean accessed = new AtomicBoolean(false);
       cluster.waitActive();
       DataNode dn = cluster.getDataNodes().get(0);
       final FsDatasetSpi<?> ds = DataNodeTestUtils.getFSDataset(dn);
 
       CountDownLatch latch = new CountDownLatch(1);
       CountDownLatch waiterLatch = new CountDownLatch(1);
-      // create a synchronized list and verify the order of elements.
-      List<Integer> syncList =
-          Collections.synchronizedList(new ArrayList<>());
-
-
       Thread holder = new Thread() {
         public void run() {
-          latch.countDown();
           try (AutoCloseableLock l = ds.acquireDatasetReadLock()) {
-            syncList.add(0);
-          } catch (Exception e) {
-            return;
-          }
-          try {
+            latch.countDown();
+            // wait for the waiter thread to access the lock.
             waiterLatch.await();
-            syncList.add(2);
-          } catch (InterruptedException e) {
+          } catch (Exception e) {
           }
         }
       };
@@ -283,13 +274,15 @@ public class TestFsDatasetImpl {
       Thread waiter = new Thread() {
         public void run() {
           try {
-            // wait for holder to get into the critical section.
+            // Wait for holder to get ds read lock.
             latch.await();
           } catch (InterruptedException e) {
             waiterLatch.countDown();
+            return;
           }
           try (AutoCloseableLock l = ds.acquireDatasetReadLock()) {
-            syncList.add(1);
+            accessed.getAndSet(true);
+            // signal the holder thread.
             waiterLatch.countDown();
           } catch (Exception e) {
           }
@@ -297,14 +290,21 @@ public class TestFsDatasetImpl {
       };
       waiter.start();
       holder.start();
-
-      waiter.join();
+      // Wait for sometime to make sure we are in deadlock,
+      try {
+        GenericTestUtils.waitFor(() ->
+                accessed.get(),
+            100, 10000);
+        fail("Waiter thread should not execute.");
+      } catch (TimeoutException e) {
+      }
+      // Release waiterLatch to exit deadlock.
+      waiterLatch.countDown();
       holder.join();
-
-      // verify that the synchronized list has the correct sequence.
-      assertEquals(
-          "The sequence of checkpoints does not correspond to shared lock",
-          syncList, Arrays.asList(0, 1, 2));
+      waiter.join();
+      // After releasing waiterLatch water
+      // thread will be able to execute.
+      assertTrue(accessed.get());
     } finally {
       cluster.shutdown();
     }

---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[hadoop] 01/06: Revert " HDFS-15160. ReplicaMap, Disk Balancer, Directory Scanner and various FsDatasetImpl methods should use datanode readlock. (#3200)"

Posted by he...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hexiaoqiao pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/hadoop.git

commit ca182b6e5f17a3c11619821d9865797624457eab
Author: He Xiaoqiao <he...@apache.org>
AuthorDate: Tue Sep 14 12:27:42 2021 +0800

    Revert " HDFS-15160. ReplicaMap, Disk Balancer, Directory Scanner and various FsDatasetImpl methods should use datanode readlock. (#3200)"
    
    This reverts commit 318bc5ff69bc578db1ce95198aa7fa7bc7199320.
---
 .../hadoop/util/InstrumentedReadWriteLock.java     |   2 +-
 .../java/org/apache/hadoop/hdfs/DFSConfigKeys.java |  11 --
 .../hadoop/hdfs/server/datanode/BlockSender.java   |   2 +-
 .../hadoop/hdfs/server/datanode/DataNode.java      |   2 +-
 .../hadoop/hdfs/server/datanode/DiskBalancer.java  |   2 +-
 .../server/datanode/fsdataset/FsDatasetSpi.java    |  12 +-
 .../datanode/fsdataset/impl/BlockPoolSlice.java    |   4 +-
 .../datanode/fsdataset/impl/FsDatasetImpl.java     | 172 +++++++++------------
 .../fsdataset/impl/ProvidedVolumeImpl.java         |  10 +-
 .../server/datanode/fsdataset/impl/ReplicaMap.java |  46 ++----
 .../src/main/resources/hdfs-default.xml            |  34 ----
 .../hdfs/server/datanode/SimulatedFSDataset.java   |   6 -
 .../datanode/extdataset/ExternalDatasetImpl.java   |   5 -
 .../fsdataset/impl/FsDatasetImplTestUtils.java     |   2 +-
 .../datanode/fsdataset/impl/TestFsDatasetImpl.java | 120 +-------------
 .../datanode/fsdataset/impl/TestFsVolumeList.java  |   4 +-
 .../fsdataset/impl/TestInterDatanodeProtocol.java  |   4 +-
 .../datanode/fsdataset/impl/TestProvidedImpl.java  |   8 +-
 .../datanode/fsdataset/impl/TestReplicaMap.java    |   9 +-
 .../fsdataset/impl/TestWriteToReplica.java         |   4 +-
 20 files changed, 114 insertions(+), 345 deletions(-)

diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/InstrumentedReadWriteLock.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/InstrumentedReadWriteLock.java
index 758f1ff..a410524 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/InstrumentedReadWriteLock.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/InstrumentedReadWriteLock.java
@@ -37,7 +37,7 @@ public class InstrumentedReadWriteLock implements ReadWriteLock {
   private final Lock readLock;
   private final Lock writeLock;
 
-  public InstrumentedReadWriteLock(boolean fair, String name, Logger logger,
+  InstrumentedReadWriteLock(boolean fair, String name, Logger logger,
       long minLoggingGapMs, long lockWarningThresholdMs) {
     ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock(fair);
     readLock = new InstrumentedReadLock(name, logger, readWriteLock,
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index b02b5f4..563b64c 100755
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -549,17 +549,6 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
       "dfs.lock.suppress.warning.interval";
   public static final long DFS_LOCK_SUPPRESS_WARNING_INTERVAL_DEFAULT =
       10000; //ms
-  public static final String DFS_DATANODE_LOCK_FAIR_KEY =
-      "dfs.datanode.lock.fair";
-  public static final boolean DFS_DATANODE_LOCK_FAIR_DEFAULT = true;
-  public static final String DFS_DATANODE_LOCK_READ_WRITE_ENABLED_KEY =
-      "dfs.datanode.lock.read.write.enabled";
-  public static final Boolean DFS_DATANODE_LOCK_READ_WRITE_ENABLED_DEFAULT =
-      true;
-  public static final String  DFS_DATANODE_LOCK_REPORTING_THRESHOLD_MS_KEY =
-      "dfs.datanode.lock-reporting-threshold-ms";
-  public static final long
-      DFS_DATANODE_LOCK_REPORTING_THRESHOLD_MS_DEFAULT = 300L;
 
   public static final String  DFS_UPGRADE_DOMAIN_FACTOR = "dfs.namenode.upgrade.domain.factor";
   public static final int DFS_UPGRADE_DOMAIN_FACTOR_DEFAULT = DFS_REPLICATION_DEFAULT;
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
index ee7cdb1..ad9be88 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
@@ -254,7 +254,7 @@ class BlockSender implements java.io.Closeable {
       // the append write.
       ChunkChecksum chunkChecksum = null;
       final long replicaVisibleLength;
-      try(AutoCloseableLock lock = datanode.data.acquireDatasetReadLock()) {
+      try(AutoCloseableLock lock = datanode.data.acquireDatasetLock()) {
         replica = getReplica(block, datanode);
         replicaVisibleLength = replica.getVisibleLength();
       }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
index cda8096..bc9fb13 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
@@ -3010,7 +3010,7 @@ public class DataNode extends ReconfigurableBase
     final BlockConstructionStage stage;
 
     //get replica information
-    try(AutoCloseableLock lock = data.acquireDatasetReadLock()) {
+    try(AutoCloseableLock lock = data.acquireDatasetLock()) {
       Block storedBlock = data.getStoredBlock(b.getBlockPoolId(),
           b.getBlockId());
       if (null == storedBlock) {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java
index b99ca3b..63f20e8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java
@@ -504,7 +504,7 @@ public class DiskBalancer {
     Map<String, String> storageIDToVolBasePathMap = new HashMap<>();
     FsDatasetSpi.FsVolumeReferences references;
     try {
-      try(AutoCloseableLock lock = this.dataset.acquireDatasetReadLock()) {
+      try(AutoCloseableLock lock = this.dataset.acquireDatasetLock()) {
         references = this.dataset.getFsVolumeReferences();
         for (int ndx = 0; ndx < references.size(); ndx++) {
           FsVolumeSpi vol = references.get(ndx);
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
index 499901d..78a5cfc 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
@@ -657,19 +657,9 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
       FsVolumeSpi destination) throws IOException;
 
   /**
-   * Acquire the lock of the data set. This prevents other threads from
-   * modifying the volume map structure inside the datanode, but other changes
-   * are still possible. For example modifying the genStamp of a block instance.
+   * Acquire the lock of the data set.
    */
   AutoCloseableLock acquireDatasetLock();
 
-  /***
-   * Acquire the read lock of the data set. This prevents other threads from
-   * modifying the volume map structure inside the datanode, but other changes
-   * are still possible. For example modifying the genStamp of a block instance.
-   * @return The AutoClosable read lock instance.
-   */
-  AutoCloseableLock acquireDatasetReadLock();
-
   Set<? extends Replica> deepCopyReplica(String bpid) throws IOException;
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
index ea77505..9656b9d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
@@ -42,7 +42,6 @@ import java.util.concurrent.ForkJoinPool;
 import java.util.concurrent.ForkJoinTask;
 import java.util.concurrent.RecursiveAction;
 import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.hadoop.hdfs.server.datanode.FSCachingGetSpaceUsed;
 import org.slf4j.Logger;
@@ -67,6 +66,7 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.RamDiskReplicaTrack
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.MultipleIOException;
+import org.apache.hadoop.util.AutoCloseableLock;
 import org.apache.hadoop.util.DataChecksum;
 import org.apache.hadoop.util.DataChecksum.Type;
 import org.apache.hadoop.util.DiskChecker;
@@ -874,7 +874,7 @@ class BlockPoolSlice {
 
   private boolean readReplicasFromCache(ReplicaMap volumeMap,
       final RamDiskReplicaTracker lazyWriteReplicaMap) {
-    ReplicaMap tmpReplicaMap = new ReplicaMap(new ReentrantReadWriteLock());
+    ReplicaMap tmpReplicaMap = new ReplicaMap(new AutoCloseableLock());
     File replicaFile = new File(currentDir, REPLICA_CACHE_FILE);
     // Check whether the file exists or not.
     if (!replicaFile.exists()) {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
index 045c8cd..3576671 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
@@ -40,6 +40,7 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executor;
 import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
 import java.util.concurrent.TimeUnit;
 
 import javax.management.NotCompliantMBeanException;
@@ -111,7 +112,7 @@ import org.apache.hadoop.util.Daemon;
 import org.apache.hadoop.util.DataChecksum;
 import org.apache.hadoop.util.DiskChecker.DiskErrorException;
 import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
-import org.apache.hadoop.util.InstrumentedReadWriteLock;
+import org.apache.hadoop.util.InstrumentedLock;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.Time;
 import org.apache.hadoop.util.Timer;
@@ -178,7 +179,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
 
   @Override
   public FsVolumeImpl getVolume(final ExtendedBlock b) {
-    try (AutoCloseableLock lock = datasetReadLock.acquire()) {
+    try (AutoCloseableLock lock = datasetLock.acquire()) {
       final ReplicaInfo r =
           volumeMap.get(b.getBlockPoolId(), b.getLocalBlock());
       return r != null ? (FsVolumeImpl) r.getVolume() : null;
@@ -188,7 +189,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
   @Override // FsDatasetSpi
   public Block getStoredBlock(String bpid, long blkid)
       throws IOException {
-    try (AutoCloseableLock lock = datasetReadLock.acquire()) {
+    try (AutoCloseableLock lock = datasetLock.acquire()) {
       ReplicaInfo r = volumeMap.get(bpid, blkid);
       if (r == null) {
         return null;
@@ -201,16 +202,12 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
    * The deepCopyReplica call doesn't use the datasetock since it will lead the
    * potential deadlock with the {@link FsVolumeList#addBlockPool} call.
    */
-  @SuppressWarnings("unchecked")
   @Override
   public Set<? extends Replica> deepCopyReplica(String bpid)
       throws IOException {
-    Set<? extends Replica> replicas;
-    try (AutoCloseableLock lock = datasetReadLock.acquire()) {
-      replicas =
-          new HashSet<>(volumeMap.replicas(bpid) == null ? Collections.EMPTY_SET
-              : volumeMap.replicas(bpid));
-    }
+    Set<? extends Replica> replicas =
+        new HashSet<>(volumeMap.replicas(bpid) == null ? Collections.EMPTY_SET
+            : volumeMap.replicas(bpid));
     return Collections.unmodifiableSet(replicas);
   }
 
@@ -271,12 +268,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
   private final int maxDataLength;
 
   @VisibleForTesting
-  final AutoCloseableLock datasetWriteLock;
-  @VisibleForTesting
-  final AutoCloseableLock datasetReadLock;
-  @VisibleForTesting
-  final InstrumentedReadWriteLock datasetRWLock;
-  private final Condition datasetWriteLockCondition;
+  final AutoCloseableLock datasetLock;
+  private final Condition datasetLockCondition;
   private static String blockPoolId = "";
   
   /**
@@ -289,33 +282,15 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
     this.dataStorage = storage;
     this.conf = conf;
     this.smallBufferSize = DFSUtilClient.getSmallBufferSize(conf);
-    this.datasetRWLock = new InstrumentedReadWriteLock(
-        conf.getBoolean(DFSConfigKeys.DFS_DATANODE_LOCK_FAIR_KEY,
-            DFSConfigKeys.DFS_DATANODE_LOCK_FAIR_DEFAULT),
-        "FsDatasetRWLock", LOG, conf.getTimeDuration(
-        DFSConfigKeys.DFS_LOCK_SUPPRESS_WARNING_INTERVAL_KEY,
-        DFSConfigKeys.DFS_LOCK_SUPPRESS_WARNING_INTERVAL_DEFAULT,
-        TimeUnit.MILLISECONDS),
-        conf.getTimeDuration(
-            DFSConfigKeys.DFS_DATANODE_LOCK_REPORTING_THRESHOLD_MS_KEY,
-            DFSConfigKeys.DFS_DATANODE_LOCK_REPORTING_THRESHOLD_MS_DEFAULT,
-            TimeUnit.MILLISECONDS));
-    this.datasetWriteLock = new AutoCloseableLock(datasetRWLock.writeLock());
-    boolean enableRL = conf.getBoolean(
-        DFSConfigKeys.DFS_DATANODE_LOCK_READ_WRITE_ENABLED_KEY,
-        DFSConfigKeys.DFS_DATANODE_LOCK_READ_WRITE_ENABLED_DEFAULT);
-    // The read lock can be disabled by the above config key. If it is disabled
-    // then we simply make the both the read and write lock variables hold
-    // the write lock. All accesses to the lock are via these variables, so that
-    // effectively disables the read lock.
-    if (enableRL) {
-      LOG.info("The datanode lock is a read write lock");
-      this.datasetReadLock = new AutoCloseableLock(datasetRWLock.readLock());
-    } else {
-      LOG.info("The datanode lock is an exclusive write lock");
-      this.datasetReadLock = this.datasetWriteLock;
-    }
-    this.datasetWriteLockCondition = datasetWriteLock.newCondition();
+    this.datasetLock = new AutoCloseableLock(
+        new InstrumentedLock(getClass().getName(), LOG,
+          new ReentrantLock(true),
+          conf.getTimeDuration(
+            DFSConfigKeys.DFS_LOCK_SUPPRESS_WARNING_INTERVAL_KEY,
+            DFSConfigKeys.DFS_LOCK_SUPPRESS_WARNING_INTERVAL_DEFAULT,
+            TimeUnit.MILLISECONDS),
+          300));
+    this.datasetLockCondition = datasetLock.newCondition();
 
     // The number of volumes required for operation is the total number
     // of volumes minus the number of failed volumes we can tolerate.
@@ -354,7 +329,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
     }
 
     storageMap = new ConcurrentHashMap<String, DatanodeStorage>();
-    volumeMap = new ReplicaMap(datasetReadLock, datasetWriteLock);
+    volumeMap = new ReplicaMap(datasetLock);
     ramDiskReplicaTracker = RamDiskReplicaTracker.getInstance(conf, this);
 
     @SuppressWarnings("unchecked")
@@ -408,12 +383,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
 
   @Override
   public AutoCloseableLock acquireDatasetLock() {
-    return datasetWriteLock.acquire();
-  }
-
-  @Override
-  public AutoCloseableLock acquireDatasetReadLock() {
-    return datasetReadLock.acquire();
+    return datasetLock.acquire();
   }
 
   /**
@@ -454,7 +424,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
       ReplicaMap replicaMap,
       Storage.StorageDirectory sd, StorageType storageType,
       FsVolumeReference ref) throws IOException {
-    try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
+    try (AutoCloseableLock lock = datasetLock.acquire()) {
       DatanodeStorage dnStorage = storageMap.get(sd.getStorageUuid());
       if (dnStorage != null) {
         final String errorMsg = String.format(
@@ -487,8 +457,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
                               .setConf(this.conf)
                               .build();
     FsVolumeReference ref = fsVolume.obtainReference();
-    ReplicaMap tempVolumeMap =
-        new ReplicaMap(datasetReadLock, datasetWriteLock);
+    ReplicaMap tempVolumeMap = new ReplicaMap(datasetLock);
     fsVolume.getVolumeMap(tempVolumeMap, ramDiskReplicaTracker);
 
     activateVolume(tempVolumeMap, sd, storageLocation.getStorageType(), ref);
@@ -527,8 +496,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
     StorageType storageType = location.getStorageType();
     final FsVolumeImpl fsVolume =
         createFsVolume(sd.getStorageUuid(), sd, location);
-    final ReplicaMap tempVolumeMap =
-        new ReplicaMap(datasetReadLock, datasetWriteLock);
+    final ReplicaMap tempVolumeMap = new ReplicaMap(new AutoCloseableLock());
     ArrayList<IOException> exceptions = Lists.newArrayList();
 
     for (final NamespaceInfo nsInfo : nsInfos) {
@@ -573,7 +541,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
         new ArrayList<>(storageLocsToRemove);
     Map<String, List<ReplicaInfo>> blkToInvalidate = new HashMap<>();
     List<String> storageToRemove = new ArrayList<>();
-    try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
+    try (AutoCloseableLock lock = datasetLock.acquire()) {
       for (int idx = 0; idx < dataStorage.getNumStorageDirs(); idx++) {
         Storage.StorageDirectory sd = dataStorage.getStorageDir(idx);
         final StorageLocation sdLocation = sd.getStorageLocation();
@@ -585,7 +553,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
           // Disable the volume from the service.
           asyncDiskService.removeVolume(sd.getStorageUuid());
           volumes.removeVolume(sdLocation, clearFailure);
-          volumes.waitVolumeRemoved(5000, datasetWriteLockCondition);
+          volumes.waitVolumeRemoved(5000, datasetLockCondition);
 
           // Removed all replica information for the blocks on the volume.
           // Unlike updating the volumeMap in addVolume(), this operation does
@@ -632,7 +600,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
       }
     }
 
-    try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
+    try (AutoCloseableLock lock = datasetLock.acquire()) {
       for(String storageUuid : storageToRemove) {
         storageMap.remove(storageUuid);
       }
@@ -823,7 +791,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
       long seekOffset) throws IOException {
 
     ReplicaInfo info;
-    try (AutoCloseableLock lock = datasetReadLock.acquire()) {
+    try (AutoCloseableLock lock = datasetLock.acquire()) {
       info = volumeMap.get(b.getBlockPoolId(), b.getLocalBlock());
     }
 
@@ -911,7 +879,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
   @Override // FsDatasetSpi
   public ReplicaInputStreams getTmpInputStreams(ExtendedBlock b,
       long blkOffset, long metaOffset) throws IOException {
-    try (AutoCloseableLock lock = datasetReadLock.acquire()) {
+    try (AutoCloseableLock lock = datasetLock.acquire()) {
       ReplicaInfo info = getReplicaInfo(b);
       FsVolumeReference ref = info.getVolume().obtainReference();
       try {
@@ -1036,7 +1004,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
     }
 
     FsVolumeReference volumeRef = null;
-    try (AutoCloseableLock lock = datasetReadLock.acquire()) {
+    try (AutoCloseableLock lock = datasetLock.acquire()) {
       volumeRef = volumes.getNextVolume(targetStorageType, targetStorageId,
           block.getNumBytes());
     }
@@ -1150,7 +1118,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
 
     FsVolumeReference volumeRef = null;
 
-    try (AutoCloseableLock lock = datasetReadLock.acquire()) {
+    try (AutoCloseableLock lock = datasetLock.acquire()) {
       volumeRef = destination.obtainReference();
     }
 
@@ -1238,7 +1206,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
   @Override  // FsDatasetSpi
   public ReplicaHandler append(ExtendedBlock b,
       long newGS, long expectedBlockLen) throws IOException {
-    try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
+    try (AutoCloseableLock lock = datasetLock.acquire()) {
       // If the block was successfully finalized because all packets
       // were successfully processed at the Datanode but the ack for
       // some of the packets were not received by the client. The client
@@ -1290,7 +1258,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
   private ReplicaInPipeline append(String bpid,
       ReplicaInfo replicaInfo, long newGS, long estimateBlockLen)
       throws IOException {
-    try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
+    try (AutoCloseableLock lock = datasetLock.acquire()) {
       // If the block is cached, start uncaching it.
       if (replicaInfo.getState() != ReplicaState.FINALIZED) {
         throw new IOException("Only a Finalized replica can be appended to; "
@@ -1386,7 +1354,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
 
     while (true) {
       try {
-        try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
+        try (AutoCloseableLock lock = datasetLock.acquire()) {
           ReplicaInfo replicaInfo = recoverCheck(b, newGS, expectedBlockLen);
           FsVolumeReference ref = replicaInfo.getVolume().obtainReference();
           ReplicaInPipeline replica;
@@ -1418,7 +1386,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
     LOG.info("Recover failed close " + b);
     while (true) {
       try {
-        try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
+        try (AutoCloseableLock lock = datasetLock.acquire()) {
           // check replica's state
           ReplicaInfo replicaInfo = recoverCheck(b, newGS, expectedBlockLen);
           // bump the replica's GS
@@ -1440,7 +1408,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
   public ReplicaHandler createRbw(
       StorageType storageType, String storageId, ExtendedBlock b,
       boolean allowLazyPersist) throws IOException {
-    try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
+    try (AutoCloseableLock lock = datasetLock.acquire()) {
       ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(),
           b.getBlockId());
       if (replicaInfo != null) {
@@ -1511,7 +1479,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
 
     while (true) {
       try {
-        try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
+        try (AutoCloseableLock lock = datasetLock.acquire()) {
           ReplicaInfo replicaInfo =
               getReplicaInfo(b.getBlockPoolId(), b.getBlockId());
           // check the replica's state
@@ -1536,7 +1504,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
   private ReplicaHandler recoverRbwImpl(ReplicaInPipeline rbw,
       ExtendedBlock b, long newGS, long minBytesRcvd, long maxBytesRcvd)
       throws IOException {
-    try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
+    try (AutoCloseableLock lock = datasetLock.acquire()) {
       // check generation stamp
       long replicaGenerationStamp = rbw.getGenerationStamp();
       if (replicaGenerationStamp < b.getGenerationStamp() ||
@@ -1597,7 +1565,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
   public ReplicaInPipeline convertTemporaryToRbw(
       final ExtendedBlock b) throws IOException {
 
-    try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
+    try (AutoCloseableLock lock = datasetLock.acquire()) {
       final long blockId = b.getBlockId();
       final long expectedGs = b.getGenerationStamp();
       final long visible = b.getNumBytes();
@@ -1671,7 +1639,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
     ReplicaInfo lastFoundReplicaInfo = null;
     boolean isInPipeline = false;
     do {
-      try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
+      try (AutoCloseableLock lock = datasetLock.acquire()) {
         ReplicaInfo currentReplicaInfo =
             volumeMap.get(b.getBlockPoolId(), b.getBlockId());
         if (currentReplicaInfo == lastFoundReplicaInfo) {
@@ -1724,7 +1692,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
       invalidate(b.getBlockPoolId(), new Block[] { lastFoundReplicaInfo },
           false);
     }
-    try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
+    try (AutoCloseableLock lock = datasetLock.acquire()) {
       FsVolumeReference ref = volumes.getNextVolume(storageType, storageId, b
           .getNumBytes());
       FsVolumeImpl v = (FsVolumeImpl) ref.getVolume();
@@ -1775,7 +1743,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
       throws IOException {
     ReplicaInfo replicaInfo = null;
     ReplicaInfo finalizedReplicaInfo = null;
-    try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
+    try (AutoCloseableLock lock = datasetLock.acquire()) {
       if (Thread.interrupted()) {
         // Don't allow data modifications from interrupted threads
         throw new IOException("Cannot finalize block from Interrupted Thread");
@@ -1806,7 +1774,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
 
   private ReplicaInfo finalizeReplica(String bpid, ReplicaInfo replicaInfo)
       throws IOException {
-    try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
+    try (AutoCloseableLock lock = datasetLock.acquire()) {
       // Compare generation stamp of old and new replica before finalizing
       if (volumeMap.get(bpid, replicaInfo.getBlockId()).getGenerationStamp()
           > replicaInfo.getGenerationStamp()) {
@@ -1851,7 +1819,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
    */
   @Override // FsDatasetSpi
   public void unfinalizeBlock(ExtendedBlock b) throws IOException {
-    try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
+    try (AutoCloseableLock lock = datasetLock.acquire()) {
       ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(),
           b.getLocalBlock());
       if (replicaInfo != null &&
@@ -1904,7 +1872,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
         new HashMap<String, BlockListAsLongs.Builder>();
 
     List<FsVolumeImpl> curVolumes = null;
-    try (AutoCloseableLock lock = datasetReadLock.acquire()) {
+    try (AutoCloseableLock lock = datasetLock.acquire()) {
       curVolumes = volumes.getVolumes();
       for (FsVolumeSpi v : curVolumes) {
         builders.put(v.getStorageID(), BlockListAsLongs.builder(maxDataLength));
@@ -1959,7 +1927,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
    * Gets a list of references to the finalized blocks for the given block pool.
    * <p>
    * Callers of this function should call
-   * {@link FsDatasetSpi#acquireDatasetLock()} to avoid blocks' status being
+   * {@link FsDatasetSpi#acquireDatasetLock} to avoid blocks' status being
    * changed during list iteration.
    * </p>
    * @return a list of references to the finalized blocks for the given block
@@ -1967,7 +1935,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
    */
   @Override
   public List<ReplicaInfo> getFinalizedBlocks(String bpid) {
-    try (AutoCloseableLock lock = datasetReadLock.acquire()) {
+    try (AutoCloseableLock lock = datasetLock.acquire()) {
       final List<ReplicaInfo> finalized = new ArrayList<ReplicaInfo>(
           volumeMap.size(bpid));
       for (ReplicaInfo b : volumeMap.replicas(bpid)) {
@@ -2060,7 +2028,9 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
   ReplicaInfo validateBlockFile(String bpid, long blockId) {
     //Should we check for metadata file too?
     final ReplicaInfo r;
-    r = volumeMap.get(bpid, blockId);
+    try (AutoCloseableLock lock = datasetLock.acquire()) {
+      r = volumeMap.get(bpid, blockId);
+    }
     if (r != null) {
       if (r.blockDataExists()) {
         return r;
@@ -2109,7 +2079,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
     for (int i = 0; i < invalidBlks.length; i++) {
       final ReplicaInfo removing;
       final FsVolumeImpl v;
-      try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
+      try (AutoCloseableLock lock = datasetLock.acquire()) {
         final ReplicaInfo info = volumeMap.get(bpid, invalidBlks[i]);
         if (info == null) {
           ReplicaInfo infoByBlockId =
@@ -2235,7 +2205,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
     long length, genstamp;
     Executor volumeExecutor;
 
-    try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
+    try (AutoCloseableLock lock = datasetLock.acquire()) {
       ReplicaInfo info = volumeMap.get(bpid, blockId);
       boolean success = false;
       try {
@@ -2303,7 +2273,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
 
   @Override // FsDatasetSpi
   public boolean contains(final ExtendedBlock block) {
-    try (AutoCloseableLock lock = datasetReadLock.acquire()) {
+    try (AutoCloseableLock lock = datasetLock.acquire()) {
       final long blockId = block.getLocalBlock().getBlockId();
       final String bpid = block.getBlockPoolId();
       final ReplicaInfo r = volumeMap.get(bpid, blockId);
@@ -2423,7 +2393,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
 
     Block corruptBlock = null;
     ReplicaInfo memBlockInfo;
-    try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
+    try (AutoCloseableLock lock = datasetLock.acquire()) {
       memBlockInfo = volumeMap.get(bpid, blockId);
       if (memBlockInfo != null &&
           memBlockInfo.getState() != ReplicaState.FINALIZED) {
@@ -2624,7 +2594,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
 
   @Override 
   public String getReplicaString(String bpid, long blockId) {
-    try (AutoCloseableLock lock = datasetReadLock.acquire()) {
+    try (AutoCloseableLock lock = datasetLock.acquire()) {
       final Replica r = volumeMap.get(bpid, blockId);
       return r == null ? "null" : r.toString();
     }
@@ -2731,7 +2701,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
                                     final long recoveryId,
                                     final long newBlockId,
                                     final long newlength) throws IOException {
-    try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
+    try (AutoCloseableLock lock = datasetLock.acquire()) {
       //get replica
       final String bpid = oldBlock.getBlockPoolId();
       final ReplicaInfo replica = volumeMap.get(bpid, oldBlock.getBlockId());
@@ -2844,7 +2814,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
   @Override // FsDatasetSpi
   public long getReplicaVisibleLength(final ExtendedBlock block)
   throws IOException {
-    try (AutoCloseableLock lock = datasetReadLock.acquire()) {
+    try (AutoCloseableLock lock = datasetLock.acquire()) {
       final Replica replica = getReplicaInfo(block.getBlockPoolId(),
           block.getBlockId());
       if (replica.getGenerationStamp() < block.getGenerationStamp()) {
@@ -2861,7 +2831,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
       throws IOException {
     LOG.info("Adding block pool " + bpid);
     AddBlockPoolException volumeExceptions = new AddBlockPoolException();
-    try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
+    try (AutoCloseableLock lock = datasetLock.acquire()) {
       try {
         volumes.addBlockPool(bpid, conf);
       } catch (AddBlockPoolException e) {
@@ -2891,7 +2861,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
 
   @Override
   public void shutdownBlockPool(String bpid) {
-    try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
+    try (AutoCloseableLock lock = datasetLock.acquire()) {
       LOG.info("Removing block pool " + bpid);
       Map<DatanodeStorage, BlockListAsLongs> blocksPerVolume
           = getBlockReports(bpid);
@@ -2965,7 +2935,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
   @Override //FsDatasetSpi
   public void deleteBlockPool(String bpid, boolean force)
       throws IOException {
-    try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
+    try (AutoCloseableLock lock = datasetLock.acquire()) {
       List<FsVolumeImpl> curVolumes = volumes.getVolumes();
       if (!force) {
         for (FsVolumeImpl volume : curVolumes) {
@@ -2994,20 +2964,18 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
   @Override // FsDatasetSpi
   public BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock block)
       throws IOException {
-    try (AutoCloseableLock lock = datasetReadLock.acquire()) {
+    try (AutoCloseableLock lock = datasetLock.acquire()) {
       final Replica replica = volumeMap.get(block.getBlockPoolId(),
           block.getBlockId());
       if (replica == null) {
         throw new ReplicaNotFoundException(block);
       }
-      synchronized(replica) {
-        if (replica.getGenerationStamp() < block.getGenerationStamp()) {
-          throw new IOException(
-              "Replica generation stamp < block generation stamp, block="
-                  + block + ", replica=" + replica);
-        } else if (replica.getGenerationStamp() > block.getGenerationStamp()) {
-          block.setGenerationStamp(replica.getGenerationStamp());
-        }
+      if (replica.getGenerationStamp() < block.getGenerationStamp()) {
+        throw new IOException(
+            "Replica generation stamp < block generation stamp, block="
+            + block + ", replica=" + replica);
+      } else if (replica.getGenerationStamp() > block.getGenerationStamp()) {
+        block.setGenerationStamp(replica.getGenerationStamp());
       }
     }
 
@@ -3048,7 +3016,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
   @Override
   public void onCompleteLazyPersist(String bpId, long blockId,
       long creationTime, File[] savedFiles, FsVolumeImpl targetVolume) {
-    try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
+    try (AutoCloseableLock lock = datasetLock.acquire()) {
       ramDiskReplicaTracker.recordEndLazyPersist(bpId, blockId, savedFiles);
 
       targetVolume.incDfsUsedAndNumBlocks(bpId, savedFiles[0].length()
@@ -3182,7 +3150,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
       try {
         block = ramDiskReplicaTracker.dequeueNextReplicaToPersist();
         if (block != null) {
-          try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
+          try (AutoCloseableLock lock = datasetLock.acquire()) {
             replicaInfo = volumeMap.get(block.getBlockPoolId(), block.getBlockId());
 
             // If replicaInfo is null, the block was either deleted before
@@ -3249,7 +3217,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
         ReplicaInfo replicaInfo, newReplicaInfo;
         final String bpid = replicaState.getBlockPoolId();
 
-        try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
+        try (AutoCloseableLock lock = datasetLock.acquire()) {
           replicaInfo = getReplicaInfo(replicaState.getBlockPoolId(),
                                        replicaState.getBlockId());
           Preconditions.checkState(replicaInfo.getVolume().isTransientStorage());
@@ -3422,7 +3390,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
   }
 
   void stopAllDataxceiverThreads(FsVolumeImpl volume) {
-    try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
+    try (AutoCloseableLock lock = datasetLock.acquire()) {
       for (String bpid : volumeMap.getBlockPoolList()) {
         Collection<ReplicaInfo> replicas = volumeMap.replicas(bpid);
         for (ReplicaInfo replicaInfo : replicas) {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ProvidedVolumeImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ProvidedVolumeImpl.java
index 341a2f0..e2d8681 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ProvidedVolumeImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ProvidedVolumeImpl.java
@@ -28,7 +28,6 @@ import java.util.Set;
 import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
@@ -54,10 +53,9 @@ import org.apache.hadoop.hdfs.server.datanode.checker.VolumeCheckResult;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
 import org.apache.hadoop.hdfs.server.datanode.FileIoProvider;
 import org.apache.hadoop.hdfs.server.datanode.ReplicaBuilder;
-import org.apache.hadoop.util.DiskChecker.DiskErrorException;
-import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.hadoop.util.Time;
 import org.apache.hadoop.util.Timer;
+import org.apache.hadoop.util.DiskChecker.DiskErrorException;
+import org.apache.hadoop.util.AutoCloseableLock;
 import org.codehaus.jackson.annotate.JsonProperty;
 import org.codehaus.jackson.map.ObjectMapper;
 import org.codehaus.jackson.map.ObjectReader;
@@ -65,6 +63,8 @@ import org.codehaus.jackson.map.ObjectWriter;
 
 import com.google.common.annotations.VisibleForTesting;
 
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.Time;
 
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PROVIDED_ALIASMAP_LOAD_RETRIES;
 
@@ -135,7 +135,7 @@ class ProvidedVolumeImpl extends FsVolumeImpl {
     ProvidedBlockPoolSlice(String bpid, ProvidedVolumeImpl volume,
         Configuration conf) {
       this.providedVolume = volume;
-      bpVolumeMap = new ReplicaMap(new ReentrantReadWriteLock());
+      bpVolumeMap = new ReplicaMap(new AutoCloseableLock());
       Class<? extends BlockAliasMap> fmt =
           conf.getClass(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_CLASS,
               TextFileRegionAliasMap.class, BlockAliasMap.class);
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ReplicaMap.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ReplicaMap.java
index 13c55d3..53a238c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ReplicaMap.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ReplicaMap.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.concurrent.locks.ReadWriteLock;
 
 import org.apache.hadoop.HadoopIllegalArgumentException;
 import org.apache.hadoop.hdfs.protocol.Block;
@@ -33,29 +32,22 @@ import org.apache.hadoop.util.AutoCloseableLock;
  */
 class ReplicaMap {
   // Lock object to synchronize this instance.
-  private final AutoCloseableLock readLock;
-  private final AutoCloseableLock writeLock;
+  private final AutoCloseableLock lock;
   
   // Map of block pool Id to another map of block Id to ReplicaInfo.
   private final Map<String, LightWeightResizableGSet<Block, ReplicaInfo>> map =
       new HashMap<>();
 
-  ReplicaMap(AutoCloseableLock rLock, AutoCloseableLock wLock) {
-    if (rLock == null || wLock == null) {
+  ReplicaMap(AutoCloseableLock lock) {
+    if (lock == null) {
       throw new HadoopIllegalArgumentException(
           "Lock to synchronize on cannot be null");
     }
-    this.readLock = rLock;
-    this.writeLock = wLock;
-  }
-
-  ReplicaMap(ReadWriteLock lock) {
-    this(new AutoCloseableLock(lock.readLock()),
-        new AutoCloseableLock(lock.writeLock()));
+    this.lock = lock;
   }
   
   String[] getBlockPoolList() {
-    try (AutoCloseableLock l = readLock.acquire()) {
+    try (AutoCloseableLock l = lock.acquire()) {
       return map.keySet().toArray(new String[map.keySet().size()]);   
     }
   }
@@ -100,7 +92,7 @@ class ReplicaMap {
    */
   ReplicaInfo get(String bpid, long blockId) {
     checkBlockPool(bpid);
-    try (AutoCloseableLock l = readLock.acquire()) {
+    try (AutoCloseableLock l = lock.acquire()) {
       LightWeightResizableGSet<Block, ReplicaInfo> m = map.get(bpid);
       return m != null ? m.get(new Block(blockId)) : null;
     }
@@ -117,7 +109,7 @@ class ReplicaMap {
   ReplicaInfo add(String bpid, ReplicaInfo replicaInfo) {
     checkBlockPool(bpid);
     checkBlock(replicaInfo);
-    try (AutoCloseableLock l = writeLock.acquire()) {
+    try (AutoCloseableLock l = lock.acquire()) {
       LightWeightResizableGSet<Block, ReplicaInfo> m = map.get(bpid);
       if (m == null) {
         // Add an entry for block pool if it does not exist already
@@ -135,7 +127,7 @@ class ReplicaMap {
   ReplicaInfo addAndGet(String bpid, ReplicaInfo replicaInfo) {
     checkBlockPool(bpid);
     checkBlock(replicaInfo);
-    try (AutoCloseableLock l = writeLock.acquire()) {
+    try (AutoCloseableLock l = lock.acquire()) {
       LightWeightResizableGSet<Block, ReplicaInfo> m = map.get(bpid);
       if (m == null) {
         // Add an entry for block pool if it does not exist already
@@ -184,7 +176,7 @@ class ReplicaMap {
   ReplicaInfo remove(String bpid, Block block) {
     checkBlockPool(bpid);
     checkBlock(block);
-    try (AutoCloseableLock l = writeLock.acquire()) {
+    try (AutoCloseableLock l = lock.acquire()) {
       LightWeightResizableGSet<Block, ReplicaInfo> m = map.get(bpid);
       if (m != null) {
         ReplicaInfo replicaInfo = m.get(block);
@@ -206,7 +198,7 @@ class ReplicaMap {
    */
   ReplicaInfo remove(String bpid, long blockId) {
     checkBlockPool(bpid);
-    try (AutoCloseableLock l = writeLock.acquire()) {
+    try (AutoCloseableLock l = lock.acquire()) {
       LightWeightResizableGSet<Block, ReplicaInfo> m = map.get(bpid);
       if (m != null) {
         return m.remove(new Block(blockId));
@@ -221,7 +213,7 @@ class ReplicaMap {
    * @return the number of replicas in the map
    */
   int size(String bpid) {
-    try (AutoCloseableLock l = readLock.acquire()) {
+    try (AutoCloseableLock l = lock.acquire()) {
       LightWeightResizableGSet<Block, ReplicaInfo> m = map.get(bpid);
       return m != null ? m.size() : 0;
     }
@@ -245,7 +237,7 @@ class ReplicaMap {
 
   void initBlockPool(String bpid) {
     checkBlockPool(bpid);
-    try (AutoCloseableLock l = writeLock.acquire()) {
+    try (AutoCloseableLock l = lock.acquire()) {
       LightWeightResizableGSet<Block, ReplicaInfo> m = map.get(bpid);
       if (m == null) {
         // Add an entry for block pool if it does not exist already
@@ -257,7 +249,7 @@ class ReplicaMap {
   
   void cleanUpBlockPool(String bpid) {
     checkBlockPool(bpid);
-    try (AutoCloseableLock l = writeLock.acquire()) {
+    try (AutoCloseableLock l = lock.acquire()) {
       map.remove(bpid);
     }
   }
@@ -267,16 +259,6 @@ class ReplicaMap {
    * @return lock object
    */
   AutoCloseableLock getLock() {
-    return writeLock;
+    return lock;
   }
-
-  /**
-   * Get the lock object used for synchronizing the ReplicasMap for read only
-   * operations.
-   * @return The read lock object
-   */
-  AutoCloseableLock getReadLock() {
-    return readLock;
-  }
-
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index c36b333..3391740 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -3000,40 +3000,6 @@
 </property>
 
 <property>
-  <name>dfs.datanode.lock.fair</name>
-  <value>true</value>
-  <description>If this is true, the Datanode FsDataset lock will be used in Fair
-    mode, which will help to prevent writer threads from being starved, but can
-    lower lock throughput. See java.util.concurrent.locks.ReentrantReadWriteLock
-    for more information on fair/non-fair locks.
-  </description>
-</property>
-
-<property>
-  <name>dfs.datanode.lock.read.write.enabled</name>
-  <value>true</value>
-  <description>If this is true, the FsDataset lock will be a read write lock. If
-    it is false, all locks will be a write lock.
-    Enabling this should give better datanode throughput, as many read only
-    functions can run concurrently under the read lock, when they would
-    previously have required the exclusive write lock. As the feature is
-    experimental, this switch can be used to disable the shared read lock, and
-    cause all lock acquisitions to use the exclusive write lock.
-  </description>
-</property>
-
-<property>
-  <name>dfs.datanode.lock-reporting-threshold-ms</name>
-  <value>300</value>
-  <description>When thread waits to obtain a lock, or a thread holds a lock for
-    more than the threshold, a log message will be written. Note that
-    dfs.lock.suppress.warning.interval ensures a single log message is
-    emitted per interval for waiting threads and a single message for holding
-    threads to avoid excessive logging.
-  </description>
-</property>
-
-<property>
   <name>dfs.namenode.startup.delay.block.deletion.sec</name>
   <value>0</value>
   <description>The delay in seconds at which we will pause the blocks deletion
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
index cb7bbdb..8f4e29b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
@@ -1573,12 +1573,6 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
   }
 
   @Override
-  public AutoCloseableLock acquireDatasetReadLock() {
-    // No RW lock implementation in simulated dataset currently.
-    return datasetLock.acquire();
-  }
-
-  @Override
   public Set<? extends Replica> deepCopyReplica(String bpid)
       throws IOException {
     Set<BInfo> replicas = new HashSet<>();
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java
index caaa89c..8fe515f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java
@@ -456,11 +456,6 @@ public class ExternalDatasetImpl implements FsDatasetSpi<ExternalVolumeImpl> {
   }
 
   @Override
-  public AutoCloseableLock acquireDatasetReadLock() {
-    return null;
-  }
-
-  @Override
   public Set<? extends Replica> deepCopyReplica(String bpid)
       throws IOException {
     return Collections.EMPTY_SET;
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImplTestUtils.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImplTestUtils.java
index e383e07..2d939fa 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImplTestUtils.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImplTestUtils.java
@@ -434,7 +434,7 @@ public class FsDatasetImplTestUtils implements FsDatasetTestUtils {
   @Override
   public Iterator<Replica> getStoredReplicas(String bpid) throws IOException {
     // Reload replicas from the disk.
-    ReplicaMap replicaMap = new ReplicaMap(dataset.datasetRWLock);
+    ReplicaMap replicaMap = new ReplicaMap(dataset.datasetLock);
     try (FsVolumeReferences refs = dataset.getFsVolumeReferences()) {
       for (FsVolumeSpi vol : refs) {
         FsVolumeImpl volume = (FsVolumeImpl) vol;
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
index 7809a2d..13ffb96 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
@@ -17,10 +17,10 @@
  */
 package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
 
-import java.util.concurrent.TimeoutException;
 import java.util.function.Supplier;
 import com.google.common.collect.Lists;
 
+import java.io.OutputStream;
 import java.nio.file.Files;
 import java.nio.file.Paths;
 import org.apache.commons.io.FileUtils;
@@ -65,7 +65,6 @@ import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
 import org.apache.hadoop.io.MultipleIOException;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.test.LambdaTestUtils;
-import org.apache.hadoop.util.AutoCloseableLock;
 import org.apache.hadoop.util.FakeTimer;
 import org.apache.hadoop.util.StringUtils;
 import org.junit.Assert;
@@ -86,7 +85,6 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
 
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DN_CACHED_DFSUSED_CHECK_INTERVAL_MS;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY;
@@ -198,118 +196,6 @@ public class TestFsDatasetImpl {
     assertEquals(0, dataset.getNumFailedVolumes());
   }
 
-  @Test(timeout=10000)
-  public void testReadLockEnabledByDefault()
-      throws Exception {
-    final FsDatasetSpi ds = dataset;
-    AtomicBoolean accessed = new AtomicBoolean(false);
-    CountDownLatch latch = new CountDownLatch(1);
-    CountDownLatch waiterLatch = new CountDownLatch(1);
-
-    Thread holder = new Thread() {
-      public void run() {
-        try (AutoCloseableLock l = ds.acquireDatasetReadLock()) {
-          latch.countDown();
-          // wait for the waiter thread to access the lock.
-          waiterLatch.await();
-        } catch (Exception e) {
-        }
-      }
-    };
-
-    Thread waiter = new Thread() {
-      public void run() {
-        try {
-          latch.await();
-        } catch (InterruptedException e) {
-          waiterLatch.countDown();
-          return;
-        }
-        try (AutoCloseableLock l = ds.acquireDatasetReadLock()) {
-          accessed.getAndSet(true);
-          // signal the holder thread.
-          waiterLatch.countDown();
-        } catch (Exception e) {
-        }
-      }
-    };
-    waiter.start();
-    holder.start();
-    holder.join();
-    waiter.join();
-    // The holder thread is still holding the lock, but the waiter can still
-    // run as the lock is a shared read lock.
-    // Otherwise test will timeout with deadlock.
-    assertEquals(true, accessed.get());
-    holder.interrupt();
-  }
-
-  @Test(timeout=20000)
-  public void testReadLockCanBeDisabledByConfig()
-      throws Exception {
-    HdfsConfiguration conf = new HdfsConfiguration();
-    conf.setBoolean(
-        DFSConfigKeys.DFS_DATANODE_LOCK_READ_WRITE_ENABLED_KEY, false);
-    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
-        .numDataNodes(1).build();
-    try {
-      AtomicBoolean accessed = new AtomicBoolean(false);
-      cluster.waitActive();
-      DataNode dn = cluster.getDataNodes().get(0);
-      final FsDatasetSpi<?> ds = DataNodeTestUtils.getFSDataset(dn);
-
-      CountDownLatch latch = new CountDownLatch(1);
-      CountDownLatch waiterLatch = new CountDownLatch(1);
-      Thread holder = new Thread() {
-        public void run() {
-          try (AutoCloseableLock l = ds.acquireDatasetReadLock()) {
-            latch.countDown();
-            // wait for the waiter thread to access the lock.
-            waiterLatch.await();
-          } catch (Exception e) {
-          }
-        }
-      };
-
-      Thread waiter = new Thread() {
-        public void run() {
-          try {
-            // Wait for holder to get ds read lock.
-            latch.await();
-          } catch (InterruptedException e) {
-            waiterLatch.countDown();
-            return;
-          }
-          try (AutoCloseableLock l = ds.acquireDatasetReadLock()) {
-            accessed.getAndSet(true);
-            // signal the holder thread.
-            waiterLatch.countDown();
-          } catch (Exception e) {
-          }
-        }
-      };
-      waiter.start();
-      holder.start();
-      // Wait for sometime to make sure we are in deadlock,
-      try {
-        GenericTestUtils.waitFor(() ->
-                accessed.get(),
-            100, 10000);
-        fail("Waiter thread should not execute.");
-      } catch (TimeoutException e) {
-      }
-      // Release waiterLatch to exit deadlock.
-      waiterLatch.countDown();
-      holder.join();
-      waiter.join();
-      // After releasing waiterLatch water
-      // thread will be able to execute.
-      assertTrue(accessed.get());
-    } finally {
-      cluster.shutdown();
-    }
-  }
-
   @Test
   public void testAddVolumes() throws IOException {
     final int numNewVolumes = 3;
@@ -356,8 +242,8 @@ public class TestFsDatasetImpl {
 
   @Test
   public void testAddVolumeWithSameStorageUuid() throws IOException {
-    HdfsConfiguration config = new HdfsConfiguration();
-    MiniDFSCluster cluster = new MiniDFSCluster.Builder(config)
+    HdfsConfiguration conf = new HdfsConfiguration();
+    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
         .numDataNodes(1).build();
     try {
       cluster.waitActive();
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsVolumeList.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsVolumeList.java
index 9371a51..9db9c0c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsVolumeList.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsVolumeList.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.RoundRobinVolumeChoosingPolicy;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.VolumeChoosingPolicy;
 import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.util.AutoCloseableLock;
 import org.apache.hadoop.util.StringUtils;
 import org.junit.Before;
 import org.junit.Test;
@@ -52,7 +53,6 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.ForkJoinPool;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeoutException;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DU_RESERVED_PERCENTAGE_KEY;
 import static org.junit.Assert.assertEquals;
@@ -368,7 +368,7 @@ public class TestFsVolumeList {
     fs.close();
     FsDatasetImpl fsDataset = (FsDatasetImpl) cluster.getDataNodes().get(0)
         .getFSDataset();
-    ReplicaMap volumeMap = new ReplicaMap(new ReentrantReadWriteLock());
+    ReplicaMap volumeMap = new ReplicaMap(new AutoCloseableLock());
     RamDiskReplicaTracker ramDiskReplicaMap = RamDiskReplicaTracker
         .getInstance(conf, fsDataset);
     FsVolumeImpl vol = (FsVolumeImpl) fsDataset.getFsVolumeReferences().get(0);
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestInterDatanodeProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestInterDatanodeProtocol.java
index b72b1cd..86e9f90 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestInterDatanodeProtocol.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestInterDatanodeProtocol.java
@@ -25,7 +25,6 @@ import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.SocketTimeoutException;
 import java.util.List;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
@@ -60,6 +59,7 @@ import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.util.AutoCloseableLock;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -236,7 +236,7 @@ public class TestInterDatanodeProtocol {
     final long firstblockid = 10000L;
     final long gs = 7777L;
     final long length = 22L;
-    final ReplicaMap map = new ReplicaMap(new ReentrantReadWriteLock());
+    final ReplicaMap map = new ReplicaMap(new AutoCloseableLock());
     String bpid = "BP-TEST";
     final Block[] blocks = new Block[5];
     for(int i = 0; i < blocks.length; i++) {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestProvidedImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestProvidedImpl.java
index ef0a119..d7935b5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestProvidedImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestProvidedImpl.java
@@ -43,7 +43,6 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.Random;
 import java.util.Set;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.conf.Configuration;
@@ -78,6 +77,7 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi.BlockIterator;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi.FsVolumeReferences;
 import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.util.AutoCloseableLock;
 import org.apache.hadoop.util.StringUtils;
 import org.junit.Before;
 import org.junit.Test;
@@ -399,7 +399,7 @@ public class TestProvidedImpl {
   public void testBlockLoad() throws IOException {
     for (int i = 0; i < providedVolumes.size(); i++) {
       FsVolumeImpl vol = providedVolumes.get(i);
-      ReplicaMap volumeMap = new ReplicaMap(new ReentrantReadWriteLock());
+      ReplicaMap volumeMap = new ReplicaMap(new AutoCloseableLock());
       vol.getVolumeMap(volumeMap, null);
 
       assertEquals(vol.getBlockPoolList().length, BLOCK_POOL_IDS.length);
@@ -475,7 +475,7 @@ public class TestProvidedImpl {
       vol.setFileRegionProvider(BLOCK_POOL_IDS[CHOSEN_BP_ID],
           new TestFileRegionBlockAliasMap(fileRegionIterator, minBlockId,
               numBlocks));
-      ReplicaMap volumeMap = new ReplicaMap(new ReentrantReadWriteLock());
+      ReplicaMap volumeMap = new ReplicaMap(new AutoCloseableLock());
       vol.getVolumeMap(BLOCK_POOL_IDS[CHOSEN_BP_ID], volumeMap, null);
       totalBlocks += volumeMap.size(BLOCK_POOL_IDS[CHOSEN_BP_ID]);
     }
@@ -585,7 +585,7 @@ public class TestProvidedImpl {
   public void testProvidedReplicaPrefix() throws Exception {
     for (int i = 0; i < providedVolumes.size(); i++) {
       FsVolumeImpl vol = providedVolumes.get(i);
-      ReplicaMap volumeMap = new ReplicaMap(new ReentrantReadWriteLock());
+      ReplicaMap volumeMap = new ReplicaMap(new AutoCloseableLock());
       vol.getVolumeMap(volumeMap, null);
 
       Path expectedPrefix = new Path(
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestReplicaMap.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestReplicaMap.java
index 59203bb..1059c08 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestReplicaMap.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestReplicaMap.java
@@ -23,16 +23,15 @@ import static org.junit.Assert.fail;
 
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica;
+import org.apache.hadoop.util.AutoCloseableLock;
 import org.junit.Before;
 import org.junit.Test;
 
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
 /**
  * Unit test for ReplicasMap class
  */
 public class TestReplicaMap {
-  private final ReplicaMap map = new ReplicaMap(new ReentrantReadWriteLock());
+  private final ReplicaMap map = new ReplicaMap(new AutoCloseableLock());
   private final String bpid = "BP-TEST";
   private final  Block block = new Block(1234, 1234, 1234);
   
@@ -112,7 +111,7 @@ public class TestReplicaMap {
 
   @Test
   public void testMergeAll() {
-    ReplicaMap temReplicaMap = new ReplicaMap(new ReentrantReadWriteLock());
+    ReplicaMap temReplicaMap = new ReplicaMap(new AutoCloseableLock());
     Block tmpBlock = new Block(5678, 5678, 5678);
     temReplicaMap.add(bpid, new FinalizedReplica(tmpBlock, null, null));
 
@@ -123,7 +122,7 @@ public class TestReplicaMap {
 
   @Test
   public void testAddAll() {
-    ReplicaMap temReplicaMap = new ReplicaMap(new ReentrantReadWriteLock());
+    ReplicaMap temReplicaMap = new ReplicaMap(new AutoCloseableLock());
     Block tmpBlock = new Block(5678, 5678, 5678);
     temReplicaMap.add(bpid, new FinalizedReplica(tmpBlock, null, null));
 
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java
index e939389..2c5df28 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java
@@ -27,7 +27,6 @@ import java.io.RandomAccessFile;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.StorageType;
@@ -48,6 +47,7 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.util.AutoCloseableLock;
 import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
 import org.junit.Assert;
 import org.junit.Test;
@@ -550,7 +550,7 @@ public class TestWriteToReplica {
           bpList.size() == 2);
       
       createReplicas(bpList, volumes, cluster.getFsDatasetTestUtils(dn));
-      ReplicaMap oldReplicaMap = new ReplicaMap(new ReentrantReadWriteLock());
+      ReplicaMap oldReplicaMap = new ReplicaMap(new AutoCloseableLock());
       oldReplicaMap.addAll(dataSet.volumeMap);
 
       cluster.restartDataNode(0);

---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[hadoop] 02/06: HDFS-15150. Introduce read write lock to Datanode. Contributed Stephen O'Donnell.

Posted by he...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hexiaoqiao pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/hadoop.git

commit 7039433146d307c8e56587c69d33f989626da57f
Author: Stephen O'Donnell <so...@cloudera.com>
AuthorDate: Tue Feb 11 07:59:34 2020 -0800

    HDFS-15150. Introduce read write lock to Datanode. Contributed Stephen O'Donnell.
    
    Signed-off-by: Wei-Chiu Chuang <we...@apache.org>
    (cherry picked from commit d7c136b9ed6d99e1b03f5b89723b3a20df359ba8)
---
 .../hadoop/util/InstrumentedReadWriteLock.java     |   2 +-
 .../java/org/apache/hadoop/hdfs/DFSConfigKeys.java |   7 +
 .../server/datanode/fsdataset/FsDatasetSpi.java    |   6 +
 .../datanode/fsdataset/impl/BlockPoolSlice.java    |   4 +-
 .../datanode/fsdataset/impl/FsDatasetImpl.java     | 143 ++++++++++++---------
 .../fsdataset/impl/ProvidedVolumeImpl.java         |  10 +-
 .../server/datanode/fsdataset/impl/ReplicaMap.java |  31 +++--
 .../src/main/resources/hdfs-default.xml            |  21 +++
 .../hdfs/server/datanode/SimulatedFSDataset.java   |   6 +
 .../datanode/extdataset/ExternalDatasetImpl.java   |   5 +
 .../fsdataset/impl/FsDatasetImplTestUtils.java     |   2 +-
 .../datanode/fsdataset/impl/TestFsVolumeList.java  |   4 +-
 .../fsdataset/impl/TestInterDatanodeProtocol.java  |   4 +-
 .../datanode/fsdataset/impl/TestProvidedImpl.java  |   8 +-
 .../datanode/fsdataset/impl/TestReplicaMap.java    |   9 +-
 .../fsdataset/impl/TestWriteToReplica.java         |   4 +-
 16 files changed, 168 insertions(+), 98 deletions(-)

diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/InstrumentedReadWriteLock.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/InstrumentedReadWriteLock.java
index a410524..758f1ff 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/InstrumentedReadWriteLock.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/InstrumentedReadWriteLock.java
@@ -37,7 +37,7 @@ public class InstrumentedReadWriteLock implements ReadWriteLock {
   private final Lock readLock;
   private final Lock writeLock;
 
-  InstrumentedReadWriteLock(boolean fair, String name, Logger logger,
+  public InstrumentedReadWriteLock(boolean fair, String name, Logger logger,
       long minLoggingGapMs, long lockWarningThresholdMs) {
     ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock(fair);
     readLock = new InstrumentedReadLock(name, logger, readWriteLock,
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index 563b64c..76784e7 100755
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -549,6 +549,13 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
       "dfs.lock.suppress.warning.interval";
   public static final long DFS_LOCK_SUPPRESS_WARNING_INTERVAL_DEFAULT =
       10000; //ms
+  public static final String DFS_DATANODE_LOCK_FAIR_KEY =
+      "dfs.datanode.lock.fair";
+  public static final boolean DFS_DATANODE_LOCK_FAIR_DEFAULT = true;
+  public static final String  DFS_DATANODE_LOCK_REPORTING_THRESHOLD_MS_KEY =
+      "dfs.datanode.lock-reporting-threshold-ms";
+  public static final long
+      DFS_DATANODE_LOCK_REPORTING_THRESHOLD_MS_DEFAULT = 300L;
 
   public static final String  DFS_UPGRADE_DOMAIN_FACTOR = "dfs.namenode.upgrade.domain.factor";
   public static final int DFS_UPGRADE_DOMAIN_FACTOR_DEFAULT = DFS_REPLICATION_DEFAULT;
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
index 78a5cfc..4de9169 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
@@ -661,5 +661,11 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
    */
   AutoCloseableLock acquireDatasetLock();
 
+  /***
+   * Acquire the read lock of the data set.
+   * @return The AutoClosable read lock instance.
+   */
+  AutoCloseableLock acquireDatasetReadLock();
+
   Set<? extends Replica> deepCopyReplica(String bpid) throws IOException;
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
index 9656b9d..ea77505 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
@@ -42,6 +42,7 @@ import java.util.concurrent.ForkJoinPool;
 import java.util.concurrent.ForkJoinTask;
 import java.util.concurrent.RecursiveAction;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.hadoop.hdfs.server.datanode.FSCachingGetSpaceUsed;
 import org.slf4j.Logger;
@@ -66,7 +67,6 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.RamDiskReplicaTrack
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.MultipleIOException;
-import org.apache.hadoop.util.AutoCloseableLock;
 import org.apache.hadoop.util.DataChecksum;
 import org.apache.hadoop.util.DataChecksum.Type;
 import org.apache.hadoop.util.DiskChecker;
@@ -874,7 +874,7 @@ class BlockPoolSlice {
 
   private boolean readReplicasFromCache(ReplicaMap volumeMap,
       final RamDiskReplicaTracker lazyWriteReplicaMap) {
-    ReplicaMap tmpReplicaMap = new ReplicaMap(new AutoCloseableLock());
+    ReplicaMap tmpReplicaMap = new ReplicaMap(new ReentrantReadWriteLock());
     File replicaFile = new File(currentDir, REPLICA_CACHE_FILE);
     // Check whether the file exists or not.
     if (!replicaFile.exists()) {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
index 3576671..32e58f4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
@@ -40,8 +40,8 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executor;
 import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.ReentrantLock;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import javax.management.NotCompliantMBeanException;
 import javax.management.ObjectName;
@@ -112,7 +112,7 @@ import org.apache.hadoop.util.Daemon;
 import org.apache.hadoop.util.DataChecksum;
 import org.apache.hadoop.util.DiskChecker.DiskErrorException;
 import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
-import org.apache.hadoop.util.InstrumentedLock;
+import org.apache.hadoop.util.InstrumentedReadWriteLock;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.Time;
 import org.apache.hadoop.util.Timer;
@@ -179,7 +179,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
 
   @Override
   public FsVolumeImpl getVolume(final ExtendedBlock b) {
-    try (AutoCloseableLock lock = datasetLock.acquire()) {
+    try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
       final ReplicaInfo r =
           volumeMap.get(b.getBlockPoolId(), b.getLocalBlock());
       return r != null ? (FsVolumeImpl) r.getVolume() : null;
@@ -189,7 +189,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
   @Override // FsDatasetSpi
   public Block getStoredBlock(String bpid, long blkid)
       throws IOException {
-    try (AutoCloseableLock lock = datasetLock.acquire()) {
+    try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
       ReplicaInfo r = volumeMap.get(bpid, blkid);
       if (r == null) {
         return null;
@@ -205,9 +205,13 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
   @Override
   public Set<? extends Replica> deepCopyReplica(String bpid)
       throws IOException {
-    Set<? extends Replica> replicas =
-        new HashSet<>(volumeMap.replicas(bpid) == null ? Collections.EMPTY_SET
-            : volumeMap.replicas(bpid));
+    Set<? extends Replica> replicas = null;
+    try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
+      replicas =
+          new HashSet<>(volumeMap.replicas(bpid) == null ? Collections.EMPTY_SET
+              : volumeMap.replicas(bpid));
+    }
+
     return Collections.unmodifiableSet(replicas);
   }
 
@@ -268,8 +272,12 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
   private final int maxDataLength;
 
   @VisibleForTesting
-  final AutoCloseableLock datasetLock;
-  private final Condition datasetLockCondition;
+  final AutoCloseableLock datasetWriteLock;
+  @VisibleForTesting
+  final AutoCloseableLock datasetReadLock;
+  @VisibleForTesting
+  final InstrumentedReadWriteLock datasetRWLock;
+  private final Condition datasetWriteLockCondition;
   private static String blockPoolId = "";
   
   /**
@@ -282,15 +290,20 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
     this.dataStorage = storage;
     this.conf = conf;
     this.smallBufferSize = DFSUtilClient.getSmallBufferSize(conf);
-    this.datasetLock = new AutoCloseableLock(
-        new InstrumentedLock(getClass().getName(), LOG,
-          new ReentrantLock(true),
-          conf.getTimeDuration(
-            DFSConfigKeys.DFS_LOCK_SUPPRESS_WARNING_INTERVAL_KEY,
-            DFSConfigKeys.DFS_LOCK_SUPPRESS_WARNING_INTERVAL_DEFAULT,
-            TimeUnit.MILLISECONDS),
-          300));
-    this.datasetLockCondition = datasetLock.newCondition();
+    this.datasetRWLock = new InstrumentedReadWriteLock(
+        conf.getBoolean(DFSConfigKeys.DFS_DATANODE_LOCK_FAIR_KEY,
+            DFSConfigKeys.DFS_DATANODE_LOCK_FAIR_DEFAULT),
+        "FsDatasetRWLock", LOG, conf.getTimeDuration(
+        DFSConfigKeys.DFS_LOCK_SUPPRESS_WARNING_INTERVAL_KEY,
+        DFSConfigKeys.DFS_LOCK_SUPPRESS_WARNING_INTERVAL_DEFAULT,
+        TimeUnit.MILLISECONDS),
+        conf.getTimeDuration(
+            DFSConfigKeys.DFS_DATANODE_LOCK_REPORTING_THRESHOLD_MS_KEY,
+            DFSConfigKeys.DFS_DATANODE_LOCK_REPORTING_THRESHOLD_MS_DEFAULT,
+            TimeUnit.MILLISECONDS));
+    this.datasetWriteLock = new AutoCloseableLock(datasetRWLock.writeLock());
+    this.datasetReadLock = new AutoCloseableLock(datasetRWLock.readLock());
+    this.datasetWriteLockCondition = datasetWriteLock.newCondition();
 
     // The number of volumes required for operation is the total number
     // of volumes minus the number of failed volumes we can tolerate.
@@ -329,7 +342,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
     }
 
     storageMap = new ConcurrentHashMap<String, DatanodeStorage>();
-    volumeMap = new ReplicaMap(datasetLock);
+    volumeMap = new ReplicaMap(datasetRWLock);
     ramDiskReplicaTracker = RamDiskReplicaTracker.getInstance(conf, this);
 
     @SuppressWarnings("unchecked")
@@ -383,7 +396,12 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
 
   @Override
   public AutoCloseableLock acquireDatasetLock() {
-    return datasetLock.acquire();
+    return datasetWriteLock.acquire();
+  }
+
+  @Override
+  public AutoCloseableLock acquireDatasetReadLock() {
+    return datasetReadLock.acquire();
   }
 
   /**
@@ -424,7 +442,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
       ReplicaMap replicaMap,
       Storage.StorageDirectory sd, StorageType storageType,
       FsVolumeReference ref) throws IOException {
-    try (AutoCloseableLock lock = datasetLock.acquire()) {
+    try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
       DatanodeStorage dnStorage = storageMap.get(sd.getStorageUuid());
       if (dnStorage != null) {
         final String errorMsg = String.format(
@@ -457,7 +475,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
                               .setConf(this.conf)
                               .build();
     FsVolumeReference ref = fsVolume.obtainReference();
-    ReplicaMap tempVolumeMap = new ReplicaMap(datasetLock);
+    ReplicaMap tempVolumeMap = new ReplicaMap(datasetRWLock);
     fsVolume.getVolumeMap(tempVolumeMap, ramDiskReplicaTracker);
 
     activateVolume(tempVolumeMap, sd, storageLocation.getStorageType(), ref);
@@ -496,7 +514,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
     StorageType storageType = location.getStorageType();
     final FsVolumeImpl fsVolume =
         createFsVolume(sd.getStorageUuid(), sd, location);
-    final ReplicaMap tempVolumeMap = new ReplicaMap(new AutoCloseableLock());
+    final ReplicaMap tempVolumeMap =
+        new ReplicaMap(new ReentrantReadWriteLock());
     ArrayList<IOException> exceptions = Lists.newArrayList();
 
     for (final NamespaceInfo nsInfo : nsInfos) {
@@ -541,7 +560,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
         new ArrayList<>(storageLocsToRemove);
     Map<String, List<ReplicaInfo>> blkToInvalidate = new HashMap<>();
     List<String> storageToRemove = new ArrayList<>();
-    try (AutoCloseableLock lock = datasetLock.acquire()) {
+    try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
       for (int idx = 0; idx < dataStorage.getNumStorageDirs(); idx++) {
         Storage.StorageDirectory sd = dataStorage.getStorageDir(idx);
         final StorageLocation sdLocation = sd.getStorageLocation();
@@ -553,7 +572,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
           // Disable the volume from the service.
           asyncDiskService.removeVolume(sd.getStorageUuid());
           volumes.removeVolume(sdLocation, clearFailure);
-          volumes.waitVolumeRemoved(5000, datasetLockCondition);
+          volumes.waitVolumeRemoved(5000, datasetWriteLockCondition);
 
           // Removed all replica information for the blocks on the volume.
           // Unlike updating the volumeMap in addVolume(), this operation does
@@ -600,7 +619,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
       }
     }
 
-    try (AutoCloseableLock lock = datasetLock.acquire()) {
+    try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
       for(String storageUuid : storageToRemove) {
         storageMap.remove(storageUuid);
       }
@@ -791,7 +810,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
       long seekOffset) throws IOException {
 
     ReplicaInfo info;
-    try (AutoCloseableLock lock = datasetLock.acquire()) {
+    try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
       info = volumeMap.get(b.getBlockPoolId(), b.getLocalBlock());
     }
 
@@ -879,7 +898,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
   @Override // FsDatasetSpi
   public ReplicaInputStreams getTmpInputStreams(ExtendedBlock b,
       long blkOffset, long metaOffset) throws IOException {
-    try (AutoCloseableLock lock = datasetLock.acquire()) {
+    try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
       ReplicaInfo info = getReplicaInfo(b);
       FsVolumeReference ref = info.getVolume().obtainReference();
       try {
@@ -1004,7 +1023,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
     }
 
     FsVolumeReference volumeRef = null;
-    try (AutoCloseableLock lock = datasetLock.acquire()) {
+    try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
       volumeRef = volumes.getNextVolume(targetStorageType, targetStorageId,
           block.getNumBytes());
     }
@@ -1118,7 +1137,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
 
     FsVolumeReference volumeRef = null;
 
-    try (AutoCloseableLock lock = datasetLock.acquire()) {
+    try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
       volumeRef = destination.obtainReference();
     }
 
@@ -1206,7 +1225,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
   @Override  // FsDatasetSpi
   public ReplicaHandler append(ExtendedBlock b,
       long newGS, long expectedBlockLen) throws IOException {
-    try (AutoCloseableLock lock = datasetLock.acquire()) {
+    try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
       // If the block was successfully finalized because all packets
       // were successfully processed at the Datanode but the ack for
       // some of the packets were not received by the client. The client
@@ -1258,7 +1277,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
   private ReplicaInPipeline append(String bpid,
       ReplicaInfo replicaInfo, long newGS, long estimateBlockLen)
       throws IOException {
-    try (AutoCloseableLock lock = datasetLock.acquire()) {
+    try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
       // If the block is cached, start uncaching it.
       if (replicaInfo.getState() != ReplicaState.FINALIZED) {
         throw new IOException("Only a Finalized replica can be appended to; "
@@ -1354,7 +1373,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
 
     while (true) {
       try {
-        try (AutoCloseableLock lock = datasetLock.acquire()) {
+        try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
           ReplicaInfo replicaInfo = recoverCheck(b, newGS, expectedBlockLen);
           FsVolumeReference ref = replicaInfo.getVolume().obtainReference();
           ReplicaInPipeline replica;
@@ -1386,7 +1405,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
     LOG.info("Recover failed close " + b);
     while (true) {
       try {
-        try (AutoCloseableLock lock = datasetLock.acquire()) {
+        try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
           // check replica's state
           ReplicaInfo replicaInfo = recoverCheck(b, newGS, expectedBlockLen);
           // bump the replica's GS
@@ -1408,7 +1427,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
   public ReplicaHandler createRbw(
       StorageType storageType, String storageId, ExtendedBlock b,
       boolean allowLazyPersist) throws IOException {
-    try (AutoCloseableLock lock = datasetLock.acquire()) {
+    try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
       ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(),
           b.getBlockId());
       if (replicaInfo != null) {
@@ -1479,7 +1498,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
 
     while (true) {
       try {
-        try (AutoCloseableLock lock = datasetLock.acquire()) {
+        try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
           ReplicaInfo replicaInfo =
               getReplicaInfo(b.getBlockPoolId(), b.getBlockId());
           // check the replica's state
@@ -1504,7 +1523,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
   private ReplicaHandler recoverRbwImpl(ReplicaInPipeline rbw,
       ExtendedBlock b, long newGS, long minBytesRcvd, long maxBytesRcvd)
       throws IOException {
-    try (AutoCloseableLock lock = datasetLock.acquire()) {
+    try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
       // check generation stamp
       long replicaGenerationStamp = rbw.getGenerationStamp();
       if (replicaGenerationStamp < b.getGenerationStamp() ||
@@ -1565,7 +1584,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
   public ReplicaInPipeline convertTemporaryToRbw(
       final ExtendedBlock b) throws IOException {
 
-    try (AutoCloseableLock lock = datasetLock.acquire()) {
+    try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
       final long blockId = b.getBlockId();
       final long expectedGs = b.getGenerationStamp();
       final long visible = b.getNumBytes();
@@ -1639,7 +1658,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
     ReplicaInfo lastFoundReplicaInfo = null;
     boolean isInPipeline = false;
     do {
-      try (AutoCloseableLock lock = datasetLock.acquire()) {
+      try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
         ReplicaInfo currentReplicaInfo =
             volumeMap.get(b.getBlockPoolId(), b.getBlockId());
         if (currentReplicaInfo == lastFoundReplicaInfo) {
@@ -1692,7 +1711,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
       invalidate(b.getBlockPoolId(), new Block[] { lastFoundReplicaInfo },
           false);
     }
-    try (AutoCloseableLock lock = datasetLock.acquire()) {
+    try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
       FsVolumeReference ref = volumes.getNextVolume(storageType, storageId, b
           .getNumBytes());
       FsVolumeImpl v = (FsVolumeImpl) ref.getVolume();
@@ -1743,7 +1762,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
       throws IOException {
     ReplicaInfo replicaInfo = null;
     ReplicaInfo finalizedReplicaInfo = null;
-    try (AutoCloseableLock lock = datasetLock.acquire()) {
+    try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
       if (Thread.interrupted()) {
         // Don't allow data modifications from interrupted threads
         throw new IOException("Cannot finalize block from Interrupted Thread");
@@ -1774,7 +1793,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
 
   private ReplicaInfo finalizeReplica(String bpid, ReplicaInfo replicaInfo)
       throws IOException {
-    try (AutoCloseableLock lock = datasetLock.acquire()) {
+    try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
       // Compare generation stamp of old and new replica before finalizing
       if (volumeMap.get(bpid, replicaInfo.getBlockId()).getGenerationStamp()
           > replicaInfo.getGenerationStamp()) {
@@ -1819,7 +1838,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
    */
   @Override // FsDatasetSpi
   public void unfinalizeBlock(ExtendedBlock b) throws IOException {
-    try (AutoCloseableLock lock = datasetLock.acquire()) {
+    try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
       ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(),
           b.getLocalBlock());
       if (replicaInfo != null &&
@@ -1872,7 +1891,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
         new HashMap<String, BlockListAsLongs.Builder>();
 
     List<FsVolumeImpl> curVolumes = null;
-    try (AutoCloseableLock lock = datasetLock.acquire()) {
+    try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
       curVolumes = volumes.getVolumes();
       for (FsVolumeSpi v : curVolumes) {
         builders.put(v.getStorageID(), BlockListAsLongs.builder(maxDataLength));
@@ -1927,7 +1946,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
    * Gets a list of references to the finalized blocks for the given block pool.
    * <p>
    * Callers of this function should call
-   * {@link FsDatasetSpi#acquireDatasetLock} to avoid blocks' status being
+   * {@link FsDatasetSpi#acquireDatasetLock()} to avoid blocks' status being
    * changed during list iteration.
    * </p>
    * @return a list of references to the finalized blocks for the given block
@@ -1935,7 +1954,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
    */
   @Override
   public List<ReplicaInfo> getFinalizedBlocks(String bpid) {
-    try (AutoCloseableLock lock = datasetLock.acquire()) {
+    try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
       final List<ReplicaInfo> finalized = new ArrayList<ReplicaInfo>(
           volumeMap.size(bpid));
       for (ReplicaInfo b : volumeMap.replicas(bpid)) {
@@ -2028,7 +2047,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
   ReplicaInfo validateBlockFile(String bpid, long blockId) {
     //Should we check for metadata file too?
     final ReplicaInfo r;
-    try (AutoCloseableLock lock = datasetLock.acquire()) {
+    try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
       r = volumeMap.get(bpid, blockId);
     }
     if (r != null) {
@@ -2079,7 +2098,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
     for (int i = 0; i < invalidBlks.length; i++) {
       final ReplicaInfo removing;
       final FsVolumeImpl v;
-      try (AutoCloseableLock lock = datasetLock.acquire()) {
+      try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
         final ReplicaInfo info = volumeMap.get(bpid, invalidBlks[i]);
         if (info == null) {
           ReplicaInfo infoByBlockId =
@@ -2205,7 +2224,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
     long length, genstamp;
     Executor volumeExecutor;
 
-    try (AutoCloseableLock lock = datasetLock.acquire()) {
+    try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
       ReplicaInfo info = volumeMap.get(bpid, blockId);
       boolean success = false;
       try {
@@ -2273,7 +2292,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
 
   @Override // FsDatasetSpi
   public boolean contains(final ExtendedBlock block) {
-    try (AutoCloseableLock lock = datasetLock.acquire()) {
+    try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
       final long blockId = block.getLocalBlock().getBlockId();
       final String bpid = block.getBlockPoolId();
       final ReplicaInfo r = volumeMap.get(bpid, blockId);
@@ -2393,7 +2412,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
 
     Block corruptBlock = null;
     ReplicaInfo memBlockInfo;
-    try (AutoCloseableLock lock = datasetLock.acquire()) {
+    try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
       memBlockInfo = volumeMap.get(bpid, blockId);
       if (memBlockInfo != null &&
           memBlockInfo.getState() != ReplicaState.FINALIZED) {
@@ -2594,7 +2613,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
 
   @Override 
   public String getReplicaString(String bpid, long blockId) {
-    try (AutoCloseableLock lock = datasetLock.acquire()) {
+    try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
       final Replica r = volumeMap.get(bpid, blockId);
       return r == null ? "null" : r.toString();
     }
@@ -2701,7 +2720,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
                                     final long recoveryId,
                                     final long newBlockId,
                                     final long newlength) throws IOException {
-    try (AutoCloseableLock lock = datasetLock.acquire()) {
+    try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
       //get replica
       final String bpid = oldBlock.getBlockPoolId();
       final ReplicaInfo replica = volumeMap.get(bpid, oldBlock.getBlockId());
@@ -2814,7 +2833,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
   @Override // FsDatasetSpi
   public long getReplicaVisibleLength(final ExtendedBlock block)
   throws IOException {
-    try (AutoCloseableLock lock = datasetLock.acquire()) {
+    try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
       final Replica replica = getReplicaInfo(block.getBlockPoolId(),
           block.getBlockId());
       if (replica.getGenerationStamp() < block.getGenerationStamp()) {
@@ -2831,7 +2850,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
       throws IOException {
     LOG.info("Adding block pool " + bpid);
     AddBlockPoolException volumeExceptions = new AddBlockPoolException();
-    try (AutoCloseableLock lock = datasetLock.acquire()) {
+    try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
       try {
         volumes.addBlockPool(bpid, conf);
       } catch (AddBlockPoolException e) {
@@ -2861,7 +2880,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
 
   @Override
   public void shutdownBlockPool(String bpid) {
-    try (AutoCloseableLock lock = datasetLock.acquire()) {
+    try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
       LOG.info("Removing block pool " + bpid);
       Map<DatanodeStorage, BlockListAsLongs> blocksPerVolume
           = getBlockReports(bpid);
@@ -2935,7 +2954,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
   @Override //FsDatasetSpi
   public void deleteBlockPool(String bpid, boolean force)
       throws IOException {
-    try (AutoCloseableLock lock = datasetLock.acquire()) {
+    try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
       List<FsVolumeImpl> curVolumes = volumes.getVolumes();
       if (!force) {
         for (FsVolumeImpl volume : curVolumes) {
@@ -2964,7 +2983,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
   @Override // FsDatasetSpi
   public BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock block)
       throws IOException {
-    try (AutoCloseableLock lock = datasetLock.acquire()) {
+    try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
       final Replica replica = volumeMap.get(block.getBlockPoolId(),
           block.getBlockId());
       if (replica == null) {
@@ -3016,7 +3035,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
   @Override
   public void onCompleteLazyPersist(String bpId, long blockId,
       long creationTime, File[] savedFiles, FsVolumeImpl targetVolume) {
-    try (AutoCloseableLock lock = datasetLock.acquire()) {
+    try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
       ramDiskReplicaTracker.recordEndLazyPersist(bpId, blockId, savedFiles);
 
       targetVolume.incDfsUsedAndNumBlocks(bpId, savedFiles[0].length()
@@ -3150,7 +3169,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
       try {
         block = ramDiskReplicaTracker.dequeueNextReplicaToPersist();
         if (block != null) {
-          try (AutoCloseableLock lock = datasetLock.acquire()) {
+          try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
             replicaInfo = volumeMap.get(block.getBlockPoolId(), block.getBlockId());
 
             // If replicaInfo is null, the block was either deleted before
@@ -3217,7 +3236,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
         ReplicaInfo replicaInfo, newReplicaInfo;
         final String bpid = replicaState.getBlockPoolId();
 
-        try (AutoCloseableLock lock = datasetLock.acquire()) {
+        try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
           replicaInfo = getReplicaInfo(replicaState.getBlockPoolId(),
                                        replicaState.getBlockId());
           Preconditions.checkState(replicaInfo.getVolume().isTransientStorage());
@@ -3390,7 +3409,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
   }
 
   void stopAllDataxceiverThreads(FsVolumeImpl volume) {
-    try (AutoCloseableLock lock = datasetLock.acquire()) {
+    try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
       for (String bpid : volumeMap.getBlockPoolList()) {
         Collection<ReplicaInfo> replicas = volumeMap.replicas(bpid);
         for (ReplicaInfo replicaInfo : replicas) {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ProvidedVolumeImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ProvidedVolumeImpl.java
index e2d8681..341a2f0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ProvidedVolumeImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ProvidedVolumeImpl.java
@@ -28,6 +28,7 @@ import java.util.Set;
 import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
@@ -53,9 +54,10 @@ import org.apache.hadoop.hdfs.server.datanode.checker.VolumeCheckResult;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
 import org.apache.hadoop.hdfs.server.datanode.FileIoProvider;
 import org.apache.hadoop.hdfs.server.datanode.ReplicaBuilder;
-import org.apache.hadoop.util.Timer;
 import org.apache.hadoop.util.DiskChecker.DiskErrorException;
-import org.apache.hadoop.util.AutoCloseableLock;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.Time;
+import org.apache.hadoop.util.Timer;
 import org.codehaus.jackson.annotate.JsonProperty;
 import org.codehaus.jackson.map.ObjectMapper;
 import org.codehaus.jackson.map.ObjectReader;
@@ -63,8 +65,6 @@ import org.codehaus.jackson.map.ObjectWriter;
 
 import com.google.common.annotations.VisibleForTesting;
 
-import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.hadoop.util.Time;
 
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PROVIDED_ALIASMAP_LOAD_RETRIES;
 
@@ -135,7 +135,7 @@ class ProvidedVolumeImpl extends FsVolumeImpl {
     ProvidedBlockPoolSlice(String bpid, ProvidedVolumeImpl volume,
         Configuration conf) {
       this.providedVolume = volume;
-      bpVolumeMap = new ReplicaMap(new AutoCloseableLock());
+      bpVolumeMap = new ReplicaMap(new ReentrantReadWriteLock());
       Class<? extends BlockAliasMap> fmt =
           conf.getClass(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_CLASS,
               TextFileRegionAliasMap.class, BlockAliasMap.class);
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ReplicaMap.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ReplicaMap.java
index 53a238c..25141b8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ReplicaMap.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ReplicaMap.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.locks.ReadWriteLock;
 
 import org.apache.hadoop.HadoopIllegalArgumentException;
 import org.apache.hadoop.hdfs.protocol.Block;
@@ -31,23 +32,27 @@ import org.apache.hadoop.util.AutoCloseableLock;
  * Maintains the replica map. 
  */
 class ReplicaMap {
+  private final ReadWriteLock rwLock;
   // Lock object to synchronize this instance.
-  private final AutoCloseableLock lock;
+  private final AutoCloseableLock readLock;
+  private final AutoCloseableLock writeLock;
   
   // Map of block pool Id to another map of block Id to ReplicaInfo.
   private final Map<String, LightWeightResizableGSet<Block, ReplicaInfo>> map =
       new HashMap<>();
 
-  ReplicaMap(AutoCloseableLock lock) {
+  ReplicaMap(ReadWriteLock lock) {
     if (lock == null) {
       throw new HadoopIllegalArgumentException(
           "Lock to synchronize on cannot be null");
     }
-    this.lock = lock;
+    this.rwLock = lock;
+    this.readLock = new AutoCloseableLock(rwLock.readLock());
+    this.writeLock = new AutoCloseableLock(rwLock.writeLock());
   }
   
   String[] getBlockPoolList() {
-    try (AutoCloseableLock l = lock.acquire()) {
+    try (AutoCloseableLock l = writeLock.acquire()) {
       return map.keySet().toArray(new String[map.keySet().size()]);   
     }
   }
@@ -92,7 +97,7 @@ class ReplicaMap {
    */
   ReplicaInfo get(String bpid, long blockId) {
     checkBlockPool(bpid);
-    try (AutoCloseableLock l = lock.acquire()) {
+    try (AutoCloseableLock l = writeLock.acquire()) {
       LightWeightResizableGSet<Block, ReplicaInfo> m = map.get(bpid);
       return m != null ? m.get(new Block(blockId)) : null;
     }
@@ -109,7 +114,7 @@ class ReplicaMap {
   ReplicaInfo add(String bpid, ReplicaInfo replicaInfo) {
     checkBlockPool(bpid);
     checkBlock(replicaInfo);
-    try (AutoCloseableLock l = lock.acquire()) {
+    try (AutoCloseableLock l = writeLock.acquire()) {
       LightWeightResizableGSet<Block, ReplicaInfo> m = map.get(bpid);
       if (m == null) {
         // Add an entry for block pool if it does not exist already
@@ -127,7 +132,7 @@ class ReplicaMap {
   ReplicaInfo addAndGet(String bpid, ReplicaInfo replicaInfo) {
     checkBlockPool(bpid);
     checkBlock(replicaInfo);
-    try (AutoCloseableLock l = lock.acquire()) {
+    try (AutoCloseableLock l = writeLock.acquire()) {
       LightWeightResizableGSet<Block, ReplicaInfo> m = map.get(bpid);
       if (m == null) {
         // Add an entry for block pool if it does not exist already
@@ -176,7 +181,7 @@ class ReplicaMap {
   ReplicaInfo remove(String bpid, Block block) {
     checkBlockPool(bpid);
     checkBlock(block);
-    try (AutoCloseableLock l = lock.acquire()) {
+    try (AutoCloseableLock l = writeLock.acquire()) {
       LightWeightResizableGSet<Block, ReplicaInfo> m = map.get(bpid);
       if (m != null) {
         ReplicaInfo replicaInfo = m.get(block);
@@ -198,7 +203,7 @@ class ReplicaMap {
    */
   ReplicaInfo remove(String bpid, long blockId) {
     checkBlockPool(bpid);
-    try (AutoCloseableLock l = lock.acquire()) {
+    try (AutoCloseableLock l = writeLock.acquire()) {
       LightWeightResizableGSet<Block, ReplicaInfo> m = map.get(bpid);
       if (m != null) {
         return m.remove(new Block(blockId));
@@ -213,7 +218,7 @@ class ReplicaMap {
    * @return the number of replicas in the map
    */
   int size(String bpid) {
-    try (AutoCloseableLock l = lock.acquire()) {
+    try (AutoCloseableLock l = writeLock.acquire()) {
       LightWeightResizableGSet<Block, ReplicaInfo> m = map.get(bpid);
       return m != null ? m.size() : 0;
     }
@@ -237,7 +242,7 @@ class ReplicaMap {
 
   void initBlockPool(String bpid) {
     checkBlockPool(bpid);
-    try (AutoCloseableLock l = lock.acquire()) {
+    try (AutoCloseableLock l = writeLock.acquire()) {
       LightWeightResizableGSet<Block, ReplicaInfo> m = map.get(bpid);
       if (m == null) {
         // Add an entry for block pool if it does not exist already
@@ -249,7 +254,7 @@ class ReplicaMap {
   
   void cleanUpBlockPool(String bpid) {
     checkBlockPool(bpid);
-    try (AutoCloseableLock l = lock.acquire()) {
+    try (AutoCloseableLock l = writeLock.acquire()) {
       map.remove(bpid);
     }
   }
@@ -259,6 +264,6 @@ class ReplicaMap {
    * @return lock object
    */
   AutoCloseableLock getLock() {
-    return lock;
+    return writeLock;
   }
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index 3391740..2e1dbf6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -3000,6 +3000,27 @@
 </property>
 
 <property>
+  <name>dfs.datanode.lock.fair</name>
+  <value>true</value>
+  <description>If this is true, the Datanode FsDataset lock will be used in Fair
+    mode, which will help to prevent writer threads from being starved, but can
+    lower lock throughput. See java.util.concurrent.locks.ReentrantReadWriteLock
+    for more information on fair/non-fair locks.
+  </description>
+</property>
+
+<property>
+  <name>dfs.datanode.lock-reporting-threshold-ms</name>
+  <value>300</value>
+  <description>When thread waits to obtain a lock, or a thread holds a lock for
+    more than the threshold, a log message will be written. Note that
+    dfs.lock.suppress.warning.interval ensures a single log message is
+    emitted per interval for waiting threads and a single message for holding
+    threads to avoid excessive logging.
+  </description>
+</property>
+
+<property>
   <name>dfs.namenode.startup.delay.block.deletion.sec</name>
   <value>0</value>
   <description>The delay in seconds at which we will pause the blocks deletion
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
index 8f4e29b..cb7bbdb 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
@@ -1573,6 +1573,12 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
   }
 
   @Override
+  public AutoCloseableLock acquireDatasetReadLock() {
+    // No RW lock implementation in simulated dataset currently.
+    return datasetLock.acquire();
+  }
+
+  @Override
   public Set<? extends Replica> deepCopyReplica(String bpid)
       throws IOException {
     Set<BInfo> replicas = new HashSet<>();
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java
index 8fe515f..caaa89c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java
@@ -456,6 +456,11 @@ public class ExternalDatasetImpl implements FsDatasetSpi<ExternalVolumeImpl> {
   }
 
   @Override
+  public AutoCloseableLock acquireDatasetReadLock() {
+    return null;
+  }
+
+  @Override
   public Set<? extends Replica> deepCopyReplica(String bpid)
       throws IOException {
     return Collections.EMPTY_SET;
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImplTestUtils.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImplTestUtils.java
index 2d939fa..e383e07 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImplTestUtils.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImplTestUtils.java
@@ -434,7 +434,7 @@ public class FsDatasetImplTestUtils implements FsDatasetTestUtils {
   @Override
   public Iterator<Replica> getStoredReplicas(String bpid) throws IOException {
     // Reload replicas from the disk.
-    ReplicaMap replicaMap = new ReplicaMap(dataset.datasetLock);
+    ReplicaMap replicaMap = new ReplicaMap(dataset.datasetRWLock);
     try (FsVolumeReferences refs = dataset.getFsVolumeReferences()) {
       for (FsVolumeSpi vol : refs) {
         FsVolumeImpl volume = (FsVolumeImpl) vol;
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsVolumeList.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsVolumeList.java
index 9db9c0c..9371a51 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsVolumeList.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsVolumeList.java
@@ -37,7 +37,6 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.RoundRobinVolumeChoosingPolicy;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.VolumeChoosingPolicy;
 import org.apache.hadoop.test.GenericTestUtils;
-import org.apache.hadoop.util.AutoCloseableLock;
 import org.apache.hadoop.util.StringUtils;
 import org.junit.Before;
 import org.junit.Test;
@@ -53,6 +52,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.ForkJoinPool;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DU_RESERVED_PERCENTAGE_KEY;
 import static org.junit.Assert.assertEquals;
@@ -368,7 +368,7 @@ public class TestFsVolumeList {
     fs.close();
     FsDatasetImpl fsDataset = (FsDatasetImpl) cluster.getDataNodes().get(0)
         .getFSDataset();
-    ReplicaMap volumeMap = new ReplicaMap(new AutoCloseableLock());
+    ReplicaMap volumeMap = new ReplicaMap(new ReentrantReadWriteLock());
     RamDiskReplicaTracker ramDiskReplicaMap = RamDiskReplicaTracker
         .getInstance(conf, fsDataset);
     FsVolumeImpl vol = (FsVolumeImpl) fsDataset.getFsVolumeReferences().get(0);
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestInterDatanodeProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestInterDatanodeProtocol.java
index 86e9f90..b72b1cd 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestInterDatanodeProtocol.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestInterDatanodeProtocol.java
@@ -25,6 +25,7 @@ import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.SocketTimeoutException;
 import java.util.List;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
@@ -59,7 +60,6 @@ import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.util.AutoCloseableLock;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -236,7 +236,7 @@ public class TestInterDatanodeProtocol {
     final long firstblockid = 10000L;
     final long gs = 7777L;
     final long length = 22L;
-    final ReplicaMap map = new ReplicaMap(new AutoCloseableLock());
+    final ReplicaMap map = new ReplicaMap(new ReentrantReadWriteLock());
     String bpid = "BP-TEST";
     final Block[] blocks = new Block[5];
     for(int i = 0; i < blocks.length; i++) {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestProvidedImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestProvidedImpl.java
index d7935b5..ef0a119 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestProvidedImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestProvidedImpl.java
@@ -43,6 +43,7 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.Random;
 import java.util.Set;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.conf.Configuration;
@@ -77,7 +78,6 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi.BlockIterator;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi.FsVolumeReferences;
 import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.util.AutoCloseableLock;
 import org.apache.hadoop.util.StringUtils;
 import org.junit.Before;
 import org.junit.Test;
@@ -399,7 +399,7 @@ public class TestProvidedImpl {
   public void testBlockLoad() throws IOException {
     for (int i = 0; i < providedVolumes.size(); i++) {
       FsVolumeImpl vol = providedVolumes.get(i);
-      ReplicaMap volumeMap = new ReplicaMap(new AutoCloseableLock());
+      ReplicaMap volumeMap = new ReplicaMap(new ReentrantReadWriteLock());
       vol.getVolumeMap(volumeMap, null);
 
       assertEquals(vol.getBlockPoolList().length, BLOCK_POOL_IDS.length);
@@ -475,7 +475,7 @@ public class TestProvidedImpl {
       vol.setFileRegionProvider(BLOCK_POOL_IDS[CHOSEN_BP_ID],
           new TestFileRegionBlockAliasMap(fileRegionIterator, minBlockId,
               numBlocks));
-      ReplicaMap volumeMap = new ReplicaMap(new AutoCloseableLock());
+      ReplicaMap volumeMap = new ReplicaMap(new ReentrantReadWriteLock());
       vol.getVolumeMap(BLOCK_POOL_IDS[CHOSEN_BP_ID], volumeMap, null);
       totalBlocks += volumeMap.size(BLOCK_POOL_IDS[CHOSEN_BP_ID]);
     }
@@ -585,7 +585,7 @@ public class TestProvidedImpl {
   public void testProvidedReplicaPrefix() throws Exception {
     for (int i = 0; i < providedVolumes.size(); i++) {
       FsVolumeImpl vol = providedVolumes.get(i);
-      ReplicaMap volumeMap = new ReplicaMap(new AutoCloseableLock());
+      ReplicaMap volumeMap = new ReplicaMap(new ReentrantReadWriteLock());
       vol.getVolumeMap(volumeMap, null);
 
       Path expectedPrefix = new Path(
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestReplicaMap.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestReplicaMap.java
index 1059c08..59203bb 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestReplicaMap.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestReplicaMap.java
@@ -23,15 +23,16 @@ import static org.junit.Assert.fail;
 
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica;
-import org.apache.hadoop.util.AutoCloseableLock;
 import org.junit.Before;
 import org.junit.Test;
 
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
 /**
  * Unit test for ReplicasMap class
  */
 public class TestReplicaMap {
-  private final ReplicaMap map = new ReplicaMap(new AutoCloseableLock());
+  private final ReplicaMap map = new ReplicaMap(new ReentrantReadWriteLock());
   private final String bpid = "BP-TEST";
   private final  Block block = new Block(1234, 1234, 1234);
   
@@ -111,7 +112,7 @@ public class TestReplicaMap {
 
   @Test
   public void testMergeAll() {
-    ReplicaMap temReplicaMap = new ReplicaMap(new AutoCloseableLock());
+    ReplicaMap temReplicaMap = new ReplicaMap(new ReentrantReadWriteLock());
     Block tmpBlock = new Block(5678, 5678, 5678);
     temReplicaMap.add(bpid, new FinalizedReplica(tmpBlock, null, null));
 
@@ -122,7 +123,7 @@ public class TestReplicaMap {
 
   @Test
   public void testAddAll() {
-    ReplicaMap temReplicaMap = new ReplicaMap(new AutoCloseableLock());
+    ReplicaMap temReplicaMap = new ReplicaMap(new ReentrantReadWriteLock());
     Block tmpBlock = new Block(5678, 5678, 5678);
     temReplicaMap.add(bpid, new FinalizedReplica(tmpBlock, null, null));
 
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java
index 2c5df28..e939389 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java
@@ -27,6 +27,7 @@ import java.io.RandomAccessFile;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.StorageType;
@@ -47,7 +48,6 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.test.GenericTestUtils;
-import org.apache.hadoop.util.AutoCloseableLock;
 import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
 import org.junit.Assert;
 import org.junit.Test;
@@ -550,7 +550,7 @@ public class TestWriteToReplica {
           bpList.size() == 2);
       
       createReplicas(bpList, volumes, cluster.getFsDatasetTestUtils(dn));
-      ReplicaMap oldReplicaMap = new ReplicaMap(new AutoCloseableLock());
+      ReplicaMap oldReplicaMap = new ReplicaMap(new ReentrantReadWriteLock());
       oldReplicaMap.addAll(dataSet.volumeMap);
 
       cluster.restartDataNode(0);

---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[hadoop] 06/06: HDFS-15160. amend to fix javac error supressing unchecked warning

Posted by he...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hexiaoqiao pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/hadoop.git

commit a3920e89de3de56f8bdde84219bc4b51be2d59bd
Author: Ahmed Hussein <a...@ahussein.me>
AuthorDate: Tue Aug 31 15:58:06 2021 -0500

    HDFS-15160. amend to fix javac error supressing unchecked warning
---
 .../hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java     | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
index 7adcd8e..045c8cd 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
@@ -201,16 +201,16 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
    * The deepCopyReplica call doesn't use the datasetock since it will lead the
    * potential deadlock with the {@link FsVolumeList#addBlockPool} call.
    */
+  @SuppressWarnings("unchecked")
   @Override
   public Set<? extends Replica> deepCopyReplica(String bpid)
       throws IOException {
-    Set<? extends Replica> replicas = null;
+    Set<? extends Replica> replicas;
     try (AutoCloseableLock lock = datasetReadLock.acquire()) {
       replicas =
           new HashSet<>(volumeMap.replicas(bpid) == null ? Collections.EMPTY_SET
               : volumeMap.replicas(bpid));
     }
-
     return Collections.unmodifiableSet(replicas);
   }
 

---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[hadoop] 03/06: HDFS-15160. ReplicaMap, Disk Balancer, Directory Scanner and various FsDatasetImpl methods should use datanode readlock. Contributed by Stephen O'Donnell.

Posted by he...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hexiaoqiao pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/hadoop.git

commit ad55d5d1d2566220d6b3a9967c504d2bd0ab2fa6
Author: Stephen O'Donnell <so...@apache.org>
AuthorDate: Tue Jun 30 07:09:26 2020 -0700

    HDFS-15160. ReplicaMap, Disk Balancer, Directory Scanner and various FsDatasetImpl methods should use datanode readlock. Contributed by Stephen O'Donnell.
    
    Signed-off-by: Wei-Chiu Chuang <we...@apache.org>
    (cherry picked from commit 2a67e2b1a0e3a5f91056f5b977ef9c4c07ba6718)
---
 .../java/org/apache/hadoop/hdfs/DFSConfigKeys.java |   4 +
 .../hadoop/hdfs/server/datanode/BlockSender.java   |   2 +-
 .../hadoop/hdfs/server/datanode/DataNode.java      |   2 +-
 .../hadoop/hdfs/server/datanode/DiskBalancer.java  |   2 +-
 .../server/datanode/fsdataset/FsDatasetSpi.java    |   8 +-
 .../datanode/fsdataset/impl/FsDatasetImpl.java     |  67 ++++++++------
 .../server/datanode/fsdataset/impl/ReplicaMap.java |  31 +++++--
 .../src/main/resources/hdfs-default.xml            |  13 +++
 .../datanode/fsdataset/impl/TestFsDatasetImpl.java | 102 ++++++++++++++++++++-
 9 files changed, 187 insertions(+), 44 deletions(-)

diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index 76784e7..b02b5f4 100755
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -552,6 +552,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
   public static final String DFS_DATANODE_LOCK_FAIR_KEY =
       "dfs.datanode.lock.fair";
   public static final boolean DFS_DATANODE_LOCK_FAIR_DEFAULT = true;
+  public static final String DFS_DATANODE_LOCK_READ_WRITE_ENABLED_KEY =
+      "dfs.datanode.lock.read.write.enabled";
+  public static final Boolean DFS_DATANODE_LOCK_READ_WRITE_ENABLED_DEFAULT =
+      true;
   public static final String  DFS_DATANODE_LOCK_REPORTING_THRESHOLD_MS_KEY =
       "dfs.datanode.lock-reporting-threshold-ms";
   public static final long
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
index ad9be88..ee7cdb1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
@@ -254,7 +254,7 @@ class BlockSender implements java.io.Closeable {
       // the append write.
       ChunkChecksum chunkChecksum = null;
       final long replicaVisibleLength;
-      try(AutoCloseableLock lock = datanode.data.acquireDatasetLock()) {
+      try(AutoCloseableLock lock = datanode.data.acquireDatasetReadLock()) {
         replica = getReplica(block, datanode);
         replicaVisibleLength = replica.getVisibleLength();
       }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
index bc9fb13..cda8096 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
@@ -3010,7 +3010,7 @@ public class DataNode extends ReconfigurableBase
     final BlockConstructionStage stage;
 
     //get replica information
-    try(AutoCloseableLock lock = data.acquireDatasetLock()) {
+    try(AutoCloseableLock lock = data.acquireDatasetReadLock()) {
       Block storedBlock = data.getStoredBlock(b.getBlockPoolId(),
           b.getBlockId());
       if (null == storedBlock) {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java
index 63f20e8..b99ca3b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java
@@ -504,7 +504,7 @@ public class DiskBalancer {
     Map<String, String> storageIDToVolBasePathMap = new HashMap<>();
     FsDatasetSpi.FsVolumeReferences references;
     try {
-      try(AutoCloseableLock lock = this.dataset.acquireDatasetLock()) {
+      try(AutoCloseableLock lock = this.dataset.acquireDatasetReadLock()) {
         references = this.dataset.getFsVolumeReferences();
         for (int ndx = 0; ndx < references.size(); ndx++) {
           FsVolumeSpi vol = references.get(ndx);
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
index 4de9169..499901d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
@@ -657,12 +657,16 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
       FsVolumeSpi destination) throws IOException;
 
   /**
-   * Acquire the lock of the data set.
+   * Acquire the lock of the data set. This prevents other threads from
+   * modifying the volume map structure inside the datanode, but other changes
+   * are still possible. For example modifying the genStamp of a block instance.
    */
   AutoCloseableLock acquireDatasetLock();
 
   /***
-   * Acquire the read lock of the data set.
+   * Acquire the read lock of the data set. This prevents other threads from
+   * modifying the volume map structure inside the datanode, but other changes
+   * are still possible. For example modifying the genStamp of a block instance.
    * @return The AutoClosable read lock instance.
    */
   AutoCloseableLock acquireDatasetReadLock();
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
index 32e58f4..7adcd8e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
@@ -41,7 +41,6 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executor;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import javax.management.NotCompliantMBeanException;
 import javax.management.ObjectName;
@@ -179,7 +178,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
 
   @Override
   public FsVolumeImpl getVolume(final ExtendedBlock b) {
-    try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
+    try (AutoCloseableLock lock = datasetReadLock.acquire()) {
       final ReplicaInfo r =
           volumeMap.get(b.getBlockPoolId(), b.getLocalBlock());
       return r != null ? (FsVolumeImpl) r.getVolume() : null;
@@ -189,7 +188,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
   @Override // FsDatasetSpi
   public Block getStoredBlock(String bpid, long blkid)
       throws IOException {
-    try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
+    try (AutoCloseableLock lock = datasetReadLock.acquire()) {
       ReplicaInfo r = volumeMap.get(bpid, blkid);
       if (r == null) {
         return null;
@@ -206,7 +205,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
   public Set<? extends Replica> deepCopyReplica(String bpid)
       throws IOException {
     Set<? extends Replica> replicas = null;
-    try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
+    try (AutoCloseableLock lock = datasetReadLock.acquire()) {
       replicas =
           new HashSet<>(volumeMap.replicas(bpid) == null ? Collections.EMPTY_SET
               : volumeMap.replicas(bpid));
@@ -302,7 +301,20 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
             DFSConfigKeys.DFS_DATANODE_LOCK_REPORTING_THRESHOLD_MS_DEFAULT,
             TimeUnit.MILLISECONDS));
     this.datasetWriteLock = new AutoCloseableLock(datasetRWLock.writeLock());
-    this.datasetReadLock = new AutoCloseableLock(datasetRWLock.readLock());
+    boolean enableRL = conf.getBoolean(
+        DFSConfigKeys.DFS_DATANODE_LOCK_READ_WRITE_ENABLED_KEY,
+        DFSConfigKeys.DFS_DATANODE_LOCK_READ_WRITE_ENABLED_DEFAULT);
+    // The read lock can be disabled by the above config key. If it is disabled
+    // then we simply make the both the read and write lock variables hold
+    // the write lock. All accesses to the lock are via these variables, so that
+    // effectively disables the read lock.
+    if (enableRL) {
+      LOG.info("The datanode lock is a read write lock");
+      this.datasetReadLock = new AutoCloseableLock(datasetRWLock.readLock());
+    } else {
+      LOG.info("The datanode lock is an exclusive write lock");
+      this.datasetReadLock = this.datasetWriteLock;
+    }
     this.datasetWriteLockCondition = datasetWriteLock.newCondition();
 
     // The number of volumes required for operation is the total number
@@ -342,7 +354,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
     }
 
     storageMap = new ConcurrentHashMap<String, DatanodeStorage>();
-    volumeMap = new ReplicaMap(datasetRWLock);
+    volumeMap = new ReplicaMap(datasetReadLock, datasetWriteLock);
     ramDiskReplicaTracker = RamDiskReplicaTracker.getInstance(conf, this);
 
     @SuppressWarnings("unchecked")
@@ -475,7 +487,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
                               .setConf(this.conf)
                               .build();
     FsVolumeReference ref = fsVolume.obtainReference();
-    ReplicaMap tempVolumeMap = new ReplicaMap(datasetRWLock);
+    ReplicaMap tempVolumeMap =
+        new ReplicaMap(datasetReadLock, datasetWriteLock);
     fsVolume.getVolumeMap(tempVolumeMap, ramDiskReplicaTracker);
 
     activateVolume(tempVolumeMap, sd, storageLocation.getStorageType(), ref);
@@ -515,7 +528,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
     final FsVolumeImpl fsVolume =
         createFsVolume(sd.getStorageUuid(), sd, location);
     final ReplicaMap tempVolumeMap =
-        new ReplicaMap(new ReentrantReadWriteLock());
+        new ReplicaMap(datasetReadLock, datasetWriteLock);
     ArrayList<IOException> exceptions = Lists.newArrayList();
 
     for (final NamespaceInfo nsInfo : nsInfos) {
@@ -810,7 +823,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
       long seekOffset) throws IOException {
 
     ReplicaInfo info;
-    try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
+    try (AutoCloseableLock lock = datasetReadLock.acquire()) {
       info = volumeMap.get(b.getBlockPoolId(), b.getLocalBlock());
     }
 
@@ -898,7 +911,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
   @Override // FsDatasetSpi
   public ReplicaInputStreams getTmpInputStreams(ExtendedBlock b,
       long blkOffset, long metaOffset) throws IOException {
-    try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
+    try (AutoCloseableLock lock = datasetReadLock.acquire()) {
       ReplicaInfo info = getReplicaInfo(b);
       FsVolumeReference ref = info.getVolume().obtainReference();
       try {
@@ -1023,7 +1036,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
     }
 
     FsVolumeReference volumeRef = null;
-    try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
+    try (AutoCloseableLock lock = datasetReadLock.acquire()) {
       volumeRef = volumes.getNextVolume(targetStorageType, targetStorageId,
           block.getNumBytes());
     }
@@ -1137,7 +1150,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
 
     FsVolumeReference volumeRef = null;
 
-    try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
+    try (AutoCloseableLock lock = datasetReadLock.acquire()) {
       volumeRef = destination.obtainReference();
     }
 
@@ -1891,7 +1904,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
         new HashMap<String, BlockListAsLongs.Builder>();
 
     List<FsVolumeImpl> curVolumes = null;
-    try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
+    try (AutoCloseableLock lock = datasetReadLock.acquire()) {
       curVolumes = volumes.getVolumes();
       for (FsVolumeSpi v : curVolumes) {
         builders.put(v.getStorageID(), BlockListAsLongs.builder(maxDataLength));
@@ -1954,7 +1967,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
    */
   @Override
   public List<ReplicaInfo> getFinalizedBlocks(String bpid) {
-    try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
+    try (AutoCloseableLock lock = datasetReadLock.acquire()) {
       final List<ReplicaInfo> finalized = new ArrayList<ReplicaInfo>(
           volumeMap.size(bpid));
       for (ReplicaInfo b : volumeMap.replicas(bpid)) {
@@ -2047,9 +2060,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
   ReplicaInfo validateBlockFile(String bpid, long blockId) {
     //Should we check for metadata file too?
     final ReplicaInfo r;
-    try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
-      r = volumeMap.get(bpid, blockId);
-    }
+    r = volumeMap.get(bpid, blockId);
     if (r != null) {
       if (r.blockDataExists()) {
         return r;
@@ -2292,7 +2303,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
 
   @Override // FsDatasetSpi
   public boolean contains(final ExtendedBlock block) {
-    try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
+    try (AutoCloseableLock lock = datasetReadLock.acquire()) {
       final long blockId = block.getLocalBlock().getBlockId();
       final String bpid = block.getBlockPoolId();
       final ReplicaInfo r = volumeMap.get(bpid, blockId);
@@ -2613,7 +2624,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
 
   @Override 
   public String getReplicaString(String bpid, long blockId) {
-    try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
+    try (AutoCloseableLock lock = datasetReadLock.acquire()) {
       final Replica r = volumeMap.get(bpid, blockId);
       return r == null ? "null" : r.toString();
     }
@@ -2833,7 +2844,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
   @Override // FsDatasetSpi
   public long getReplicaVisibleLength(final ExtendedBlock block)
   throws IOException {
-    try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
+    try (AutoCloseableLock lock = datasetReadLock.acquire()) {
       final Replica replica = getReplicaInfo(block.getBlockPoolId(),
           block.getBlockId());
       if (replica.getGenerationStamp() < block.getGenerationStamp()) {
@@ -2983,18 +2994,20 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
   @Override // FsDatasetSpi
   public BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock block)
       throws IOException {
-    try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
+    try (AutoCloseableLock lock = datasetReadLock.acquire()) {
       final Replica replica = volumeMap.get(block.getBlockPoolId(),
           block.getBlockId());
       if (replica == null) {
         throw new ReplicaNotFoundException(block);
       }
-      if (replica.getGenerationStamp() < block.getGenerationStamp()) {
-        throw new IOException(
-            "Replica generation stamp < block generation stamp, block="
-            + block + ", replica=" + replica);
-      } else if (replica.getGenerationStamp() > block.getGenerationStamp()) {
-        block.setGenerationStamp(replica.getGenerationStamp());
+      synchronized(replica) {
+        if (replica.getGenerationStamp() < block.getGenerationStamp()) {
+          throw new IOException(
+              "Replica generation stamp < block generation stamp, block="
+                  + block + ", replica=" + replica);
+        } else if (replica.getGenerationStamp() > block.getGenerationStamp()) {
+          block.setGenerationStamp(replica.getGenerationStamp());
+        }
       }
     }
 
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ReplicaMap.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ReplicaMap.java
index 25141b8..13c55d3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ReplicaMap.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ReplicaMap.java
@@ -32,7 +32,6 @@ import org.apache.hadoop.util.AutoCloseableLock;
  * Maintains the replica map. 
  */
 class ReplicaMap {
-  private final ReadWriteLock rwLock;
   // Lock object to synchronize this instance.
   private final AutoCloseableLock readLock;
   private final AutoCloseableLock writeLock;
@@ -41,18 +40,22 @@ class ReplicaMap {
   private final Map<String, LightWeightResizableGSet<Block, ReplicaInfo>> map =
       new HashMap<>();
 
-  ReplicaMap(ReadWriteLock lock) {
-    if (lock == null) {
+  ReplicaMap(AutoCloseableLock rLock, AutoCloseableLock wLock) {
+    if (rLock == null || wLock == null) {
       throw new HadoopIllegalArgumentException(
           "Lock to synchronize on cannot be null");
     }
-    this.rwLock = lock;
-    this.readLock = new AutoCloseableLock(rwLock.readLock());
-    this.writeLock = new AutoCloseableLock(rwLock.writeLock());
+    this.readLock = rLock;
+    this.writeLock = wLock;
+  }
+
+  ReplicaMap(ReadWriteLock lock) {
+    this(new AutoCloseableLock(lock.readLock()),
+        new AutoCloseableLock(lock.writeLock()));
   }
   
   String[] getBlockPoolList() {
-    try (AutoCloseableLock l = writeLock.acquire()) {
+    try (AutoCloseableLock l = readLock.acquire()) {
       return map.keySet().toArray(new String[map.keySet().size()]);   
     }
   }
@@ -97,7 +100,7 @@ class ReplicaMap {
    */
   ReplicaInfo get(String bpid, long blockId) {
     checkBlockPool(bpid);
-    try (AutoCloseableLock l = writeLock.acquire()) {
+    try (AutoCloseableLock l = readLock.acquire()) {
       LightWeightResizableGSet<Block, ReplicaInfo> m = map.get(bpid);
       return m != null ? m.get(new Block(blockId)) : null;
     }
@@ -218,7 +221,7 @@ class ReplicaMap {
    * @return the number of replicas in the map
    */
   int size(String bpid) {
-    try (AutoCloseableLock l = writeLock.acquire()) {
+    try (AutoCloseableLock l = readLock.acquire()) {
       LightWeightResizableGSet<Block, ReplicaInfo> m = map.get(bpid);
       return m != null ? m.size() : 0;
     }
@@ -266,4 +269,14 @@ class ReplicaMap {
   AutoCloseableLock getLock() {
     return writeLock;
   }
+
+  /**
+   * Get the lock object used for synchronizing the ReplicasMap for read only
+   * operations.
+   * @return The read lock object
+   */
+  AutoCloseableLock getReadLock() {
+    return readLock;
+  }
+
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index 2e1dbf6..c36b333 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -3010,6 +3010,19 @@
 </property>
 
 <property>
+  <name>dfs.datanode.lock.read.write.enabled</name>
+  <value>true</value>
+  <description>If this is true, the FsDataset lock will be a read write lock. If
+    it is false, all locks will be a write lock.
+    Enabling this should give better datanode throughput, as many read only
+    functions can run concurrently under the read lock, when they would
+    previously have required the exclusive write lock. As the feature is
+    experimental, this switch can be used to disable the shared read lock, and
+    cause all lock acquisitions to use the exclusive write lock.
+  </description>
+</property>
+
+<property>
   <name>dfs.datanode.lock-reporting-threshold-ms</name>
   <value>300</value>
   <description>When thread waits to obtain a lock, or a thread holds a lock for
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
index 13ffb96..d4c4b14 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
 import java.util.function.Supplier;
 import com.google.common.collect.Lists;
 
-import java.io.OutputStream;
 import java.nio.file.Files;
 import java.nio.file.Paths;
 import org.apache.commons.io.FileUtils;
@@ -65,6 +64,7 @@ import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
 import org.apache.hadoop.io.MultipleIOException;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.test.LambdaTestUtils;
+import org.apache.hadoop.util.AutoCloseableLock;
 import org.apache.hadoop.util.FakeTimer;
 import org.apache.hadoop.util.StringUtils;
 import org.junit.Assert;
@@ -85,6 +85,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DN_CACHED_DFSUSED_CHECK_INTERVAL_MS;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY;
@@ -197,6 +198,101 @@ public class TestFsDatasetImpl {
   }
 
   @Test
+  public void testReadLockEnabledByDefault()
+      throws IOException, InterruptedException {
+    final FsDatasetSpi ds = dataset;
+    AtomicBoolean accessed = new AtomicBoolean(false);
+    CountDownLatch latch = new CountDownLatch(1);
+    CountDownLatch waiterLatch = new CountDownLatch(1);
+
+    Thread holder = new Thread() {
+      public void run() {
+        try (AutoCloseableLock l = ds.acquireDatasetReadLock()) {
+          latch.countDown();
+          sleep(10000);
+        } catch (Exception e) {
+        }
+      }
+    };
+
+    Thread waiter = new Thread() {
+      public void run() {
+        try (AutoCloseableLock l = ds.acquireDatasetReadLock()) {
+          waiterLatch.countDown();
+          accessed.getAndSet(true);
+        } catch (Exception e) {
+        }
+      }
+    };
+
+    holder.start();
+    latch.await();
+    waiter.start();
+    waiterLatch.await();
+    // The holder thread is still holding the lock, but the waiter can still
+    // run as the lock is a shared read lock.
+    assertEquals(true, accessed.get());
+    holder.interrupt();
+    holder.join();
+    waiter.join();
+  }
+
+  @Test(timeout=10000)
+  public void testReadLockCanBeDisabledByConfig()
+      throws IOException, InterruptedException {
+    HdfsConfiguration conf = new HdfsConfiguration();
+    conf.setBoolean(
+        DFSConfigKeys.DFS_DATANODE_LOCK_READ_WRITE_ENABLED_KEY, false);
+    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+        .numDataNodes(1).build();
+    try {
+      cluster.waitActive();
+      DataNode dn = cluster.getDataNodes().get(0);
+      final FsDatasetSpi<?> ds = DataNodeTestUtils.getFSDataset(dn);
+
+      CountDownLatch latch = new CountDownLatch(1);
+      CountDownLatch waiterLatch = new CountDownLatch(1);
+      AtomicBoolean accessed = new AtomicBoolean(false);
+
+      Thread holder = new Thread() {
+        public void run() {
+          try (AutoCloseableLock l = ds.acquireDatasetReadLock()) {
+            latch.countDown();
+            sleep(10000);
+          } catch (Exception e) {
+          }
+        }
+      };
+
+      Thread waiter = new Thread() {
+        public void run() {
+          try (AutoCloseableLock l = ds.acquireDatasetReadLock()) {
+            accessed.getAndSet(true);
+            waiterLatch.countDown();
+          } catch (Exception e) {
+          }
+        }
+      };
+
+      holder.start();
+      latch.await();
+      waiter.start();
+      Thread.sleep(200);
+      // Waiting thread should not have been able to update the variable
+      // as the read lock is disabled and hence an exclusive lock.
+      assertEquals(false, accessed.get());
+      holder.interrupt();
+      holder.join();
+      waiterLatch.await();
+      // After the holder thread exits, the variable is updated.
+      assertEquals(true, accessed.get());
+      waiter.join();
+    } finally {
+      cluster.shutdown();
+    }
+  }
+
+  @Test
   public void testAddVolumes() throws IOException {
     final int numNewVolumes = 3;
     final int numExistingVolumes = getNumVolumes();
@@ -242,8 +338,8 @@ public class TestFsDatasetImpl {
 
   @Test
   public void testAddVolumeWithSameStorageUuid() throws IOException {
-    HdfsConfiguration conf = new HdfsConfiguration();
-    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+    HdfsConfiguration config = new HdfsConfiguration();
+    MiniDFSCluster cluster = new MiniDFSCluster.Builder(config)
         .numDataNodes(1).build();
     try {
       cluster.waitActive();

---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[hadoop] 04/06: HDFS-15457. TestFsDatasetImpl fails intermittently (#2407)

Posted by he...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hexiaoqiao pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/hadoop.git

commit 33c60ba0607b216af9b0f202fa3eeef45f5955d2
Author: Ahmed Hussein <50...@users.noreply.github.com>
AuthorDate: Tue Oct 27 20:52:56 2020 -0500

    HDFS-15457. TestFsDatasetImpl fails intermittently (#2407)
    
    (cherry picked from commit 98097b8f19789605b9697f6a959da57261e0fe19)
---
 .../datanode/fsdataset/impl/TestFsDatasetImpl.java | 72 ++++++++++++++--------
 1 file changed, 45 insertions(+), 27 deletions(-)

diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
index d4c4b14..5055c66 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
@@ -17,6 +17,8 @@
  */
 package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
 
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.function.Supplier;
 import com.google.common.collect.Lists;
 
@@ -197,9 +199,9 @@ public class TestFsDatasetImpl {
     assertEquals(0, dataset.getNumFailedVolumes());
   }
 
-  @Test
+  @Test(timeout=10000)
   public void testReadLockEnabledByDefault()
-      throws IOException, InterruptedException {
+      throws Exception {
     final FsDatasetSpi ds = dataset;
     AtomicBoolean accessed = new AtomicBoolean(false);
     CountDownLatch latch = new CountDownLatch(1);
@@ -209,7 +211,8 @@ public class TestFsDatasetImpl {
       public void run() {
         try (AutoCloseableLock l = ds.acquireDatasetReadLock()) {
           latch.countDown();
-          sleep(10000);
+          // wait for the waiter thread to access the lock.
+          waiterLatch.await();
         } catch (Exception e) {
         }
       }
@@ -217,29 +220,33 @@ public class TestFsDatasetImpl {
 
     Thread waiter = new Thread() {
       public void run() {
-        try (AutoCloseableLock l = ds.acquireDatasetReadLock()) {
+        try {
+          latch.await();
+        } catch (InterruptedException e) {
           waiterLatch.countDown();
+          return;
+        }
+        try (AutoCloseableLock l = ds.acquireDatasetReadLock()) {
           accessed.getAndSet(true);
+          // signal the holder thread.
+          waiterLatch.countDown();
         } catch (Exception e) {
         }
       }
     };
-
-    holder.start();
-    latch.await();
     waiter.start();
-    waiterLatch.await();
+    holder.start();
+    holder.join();
+    waiter.join();
     // The holder thread is still holding the lock, but the waiter can still
     // run as the lock is a shared read lock.
     assertEquals(true, accessed.get());
     holder.interrupt();
-    holder.join();
-    waiter.join();
   }
 
   @Test(timeout=10000)
   public void testReadLockCanBeDisabledByConfig()
-      throws IOException, InterruptedException {
+      throws Exception {
     HdfsConfiguration conf = new HdfsConfiguration();
     conf.setBoolean(
         DFSConfigKeys.DFS_DATANODE_LOCK_READ_WRITE_ENABLED_KEY, false);
@@ -252,41 +259,52 @@ public class TestFsDatasetImpl {
 
       CountDownLatch latch = new CountDownLatch(1);
       CountDownLatch waiterLatch = new CountDownLatch(1);
-      AtomicBoolean accessed = new AtomicBoolean(false);
+      // create a synchronized list and verify the order of elements.
+      List<Integer> syncList =
+          Collections.synchronizedList(new ArrayList<>());
+
 
       Thread holder = new Thread() {
         public void run() {
+          latch.countDown();
           try (AutoCloseableLock l = ds.acquireDatasetReadLock()) {
-            latch.countDown();
-            sleep(10000);
+            syncList.add(0);
           } catch (Exception e) {
+            return;
+          }
+          try {
+            waiterLatch.await();
+            syncList.add(2);
+          } catch (InterruptedException e) {
           }
         }
       };
 
       Thread waiter = new Thread() {
         public void run() {
+          try {
+            // wait for holder to get into the critical section.
+            latch.await();
+          } catch (InterruptedException e) {
+            waiterLatch.countDown();
+          }
           try (AutoCloseableLock l = ds.acquireDatasetReadLock()) {
-            accessed.getAndSet(true);
+            syncList.add(1);
             waiterLatch.countDown();
           } catch (Exception e) {
           }
         }
       };
-
-      holder.start();
-      latch.await();
       waiter.start();
-      Thread.sleep(200);
-      // Waiting thread should not have been able to update the variable
-      // as the read lock is disabled and hence an exclusive lock.
-      assertEquals(false, accessed.get());
-      holder.interrupt();
-      holder.join();
-      waiterLatch.await();
-      // After the holder thread exits, the variable is updated.
-      assertEquals(true, accessed.get());
+      holder.start();
+
       waiter.join();
+      holder.join();
+
+      // verify that the synchronized list has the correct sequence.
+      assertEquals(
+          "The sequence of checkpoints does not correspond to shared lock",
+          syncList, Arrays.asList(0, 1, 2));
     } finally {
       cluster.shutdown();
     }

---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org