You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by he...@apache.org on 2021/09/14 06:19:34 UTC

[hadoop] 03/06: HDFS-15160. ReplicaMap, Disk Balancer, Directory Scanner and various FsDatasetImpl methods should use datanode readlock. Contributed by Stephen O'Donnell.

This is an automated email from the ASF dual-hosted git repository.

hexiaoqiao pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/hadoop.git

commit ad55d5d1d2566220d6b3a9967c504d2bd0ab2fa6
Author: Stephen O'Donnell <so...@apache.org>
AuthorDate: Tue Jun 30 07:09:26 2020 -0700

    HDFS-15160. ReplicaMap, Disk Balancer, Directory Scanner and various FsDatasetImpl methods should use datanode readlock. Contributed by Stephen O'Donnell.
    
    Signed-off-by: Wei-Chiu Chuang <we...@apache.org>
    (cherry picked from commit 2a67e2b1a0e3a5f91056f5b977ef9c4c07ba6718)
---
 .../java/org/apache/hadoop/hdfs/DFSConfigKeys.java |   4 +
 .../hadoop/hdfs/server/datanode/BlockSender.java   |   2 +-
 .../hadoop/hdfs/server/datanode/DataNode.java      |   2 +-
 .../hadoop/hdfs/server/datanode/DiskBalancer.java  |   2 +-
 .../server/datanode/fsdataset/FsDatasetSpi.java    |   8 +-
 .../datanode/fsdataset/impl/FsDatasetImpl.java     |  67 ++++++++------
 .../server/datanode/fsdataset/impl/ReplicaMap.java |  31 +++++--
 .../src/main/resources/hdfs-default.xml            |  13 +++
 .../datanode/fsdataset/impl/TestFsDatasetImpl.java | 102 ++++++++++++++++++++-
 9 files changed, 187 insertions(+), 44 deletions(-)

diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index 76784e7..b02b5f4 100755
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -552,6 +552,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
   public static final String DFS_DATANODE_LOCK_FAIR_KEY =
       "dfs.datanode.lock.fair";
   public static final boolean DFS_DATANODE_LOCK_FAIR_DEFAULT = true;
+  public static final String DFS_DATANODE_LOCK_READ_WRITE_ENABLED_KEY =
+      "dfs.datanode.lock.read.write.enabled";
+  public static final Boolean DFS_DATANODE_LOCK_READ_WRITE_ENABLED_DEFAULT =
+      true;
   public static final String  DFS_DATANODE_LOCK_REPORTING_THRESHOLD_MS_KEY =
       "dfs.datanode.lock-reporting-threshold-ms";
   public static final long
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
index ad9be88..ee7cdb1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
@@ -254,7 +254,7 @@ class BlockSender implements java.io.Closeable {
       // the append write.
       ChunkChecksum chunkChecksum = null;
       final long replicaVisibleLength;
-      try(AutoCloseableLock lock = datanode.data.acquireDatasetLock()) {
+      try(AutoCloseableLock lock = datanode.data.acquireDatasetReadLock()) {
         replica = getReplica(block, datanode);
         replicaVisibleLength = replica.getVisibleLength();
       }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
index bc9fb13..cda8096 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
@@ -3010,7 +3010,7 @@ public class DataNode extends ReconfigurableBase
     final BlockConstructionStage stage;
 
     //get replica information
-    try(AutoCloseableLock lock = data.acquireDatasetLock()) {
+    try(AutoCloseableLock lock = data.acquireDatasetReadLock()) {
       Block storedBlock = data.getStoredBlock(b.getBlockPoolId(),
           b.getBlockId());
       if (null == storedBlock) {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java
index 63f20e8..b99ca3b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java
@@ -504,7 +504,7 @@ public class DiskBalancer {
     Map<String, String> storageIDToVolBasePathMap = new HashMap<>();
     FsDatasetSpi.FsVolumeReferences references;
     try {
-      try(AutoCloseableLock lock = this.dataset.acquireDatasetLock()) {
+      try(AutoCloseableLock lock = this.dataset.acquireDatasetReadLock()) {
         references = this.dataset.getFsVolumeReferences();
         for (int ndx = 0; ndx < references.size(); ndx++) {
           FsVolumeSpi vol = references.get(ndx);
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
index 4de9169..499901d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
@@ -657,12 +657,16 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
       FsVolumeSpi destination) throws IOException;
 
   /**
-   * Acquire the lock of the data set.
+   * Acquire the lock of the data set. This prevents other threads from
+   * modifying the volume map structure inside the datanode, but other changes
+   * are still possible. For example modifying the genStamp of a block instance.
    */
   AutoCloseableLock acquireDatasetLock();
 
   /***
-   * Acquire the read lock of the data set.
+   * Acquire the read lock of the data set. This prevents other threads from
+   * modifying the volume map structure inside the datanode, but other changes
+   * are still possible. For example modifying the genStamp of a block instance.
    * @return The AutoClosable read lock instance.
    */
   AutoCloseableLock acquireDatasetReadLock();
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
index 32e58f4..7adcd8e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
@@ -41,7 +41,6 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executor;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import javax.management.NotCompliantMBeanException;
 import javax.management.ObjectName;
@@ -179,7 +178,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
 
   @Override
   public FsVolumeImpl getVolume(final ExtendedBlock b) {
-    try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
+    try (AutoCloseableLock lock = datasetReadLock.acquire()) {
       final ReplicaInfo r =
           volumeMap.get(b.getBlockPoolId(), b.getLocalBlock());
       return r != null ? (FsVolumeImpl) r.getVolume() : null;
@@ -189,7 +188,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
   @Override // FsDatasetSpi
   public Block getStoredBlock(String bpid, long blkid)
       throws IOException {
-    try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
+    try (AutoCloseableLock lock = datasetReadLock.acquire()) {
       ReplicaInfo r = volumeMap.get(bpid, blkid);
       if (r == null) {
         return null;
@@ -206,7 +205,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
   public Set<? extends Replica> deepCopyReplica(String bpid)
       throws IOException {
     Set<? extends Replica> replicas = null;
-    try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
+    try (AutoCloseableLock lock = datasetReadLock.acquire()) {
       replicas =
           new HashSet<>(volumeMap.replicas(bpid) == null ? Collections.EMPTY_SET
               : volumeMap.replicas(bpid));
@@ -302,7 +301,20 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
             DFSConfigKeys.DFS_DATANODE_LOCK_REPORTING_THRESHOLD_MS_DEFAULT,
             TimeUnit.MILLISECONDS));
     this.datasetWriteLock = new AutoCloseableLock(datasetRWLock.writeLock());
-    this.datasetReadLock = new AutoCloseableLock(datasetRWLock.readLock());
+    boolean enableRL = conf.getBoolean(
+        DFSConfigKeys.DFS_DATANODE_LOCK_READ_WRITE_ENABLED_KEY,
+        DFSConfigKeys.DFS_DATANODE_LOCK_READ_WRITE_ENABLED_DEFAULT);
+    // The read lock can be disabled by the above config key. If it is disabled
+    // then we simply make the both the read and write lock variables hold
+    // the write lock. All accesses to the lock are via these variables, so that
+    // effectively disables the read lock.
+    if (enableRL) {
+      LOG.info("The datanode lock is a read write lock");
+      this.datasetReadLock = new AutoCloseableLock(datasetRWLock.readLock());
+    } else {
+      LOG.info("The datanode lock is an exclusive write lock");
+      this.datasetReadLock = this.datasetWriteLock;
+    }
     this.datasetWriteLockCondition = datasetWriteLock.newCondition();
 
     // The number of volumes required for operation is the total number
@@ -342,7 +354,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
     }
 
     storageMap = new ConcurrentHashMap<String, DatanodeStorage>();
-    volumeMap = new ReplicaMap(datasetRWLock);
+    volumeMap = new ReplicaMap(datasetReadLock, datasetWriteLock);
     ramDiskReplicaTracker = RamDiskReplicaTracker.getInstance(conf, this);
 
     @SuppressWarnings("unchecked")
@@ -475,7 +487,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
                               .setConf(this.conf)
                               .build();
     FsVolumeReference ref = fsVolume.obtainReference();
-    ReplicaMap tempVolumeMap = new ReplicaMap(datasetRWLock);
+    ReplicaMap tempVolumeMap =
+        new ReplicaMap(datasetReadLock, datasetWriteLock);
     fsVolume.getVolumeMap(tempVolumeMap, ramDiskReplicaTracker);
 
     activateVolume(tempVolumeMap, sd, storageLocation.getStorageType(), ref);
@@ -515,7 +528,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
     final FsVolumeImpl fsVolume =
         createFsVolume(sd.getStorageUuid(), sd, location);
     final ReplicaMap tempVolumeMap =
-        new ReplicaMap(new ReentrantReadWriteLock());
+        new ReplicaMap(datasetReadLock, datasetWriteLock);
     ArrayList<IOException> exceptions = Lists.newArrayList();
 
     for (final NamespaceInfo nsInfo : nsInfos) {
@@ -810,7 +823,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
       long seekOffset) throws IOException {
 
     ReplicaInfo info;
-    try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
+    try (AutoCloseableLock lock = datasetReadLock.acquire()) {
       info = volumeMap.get(b.getBlockPoolId(), b.getLocalBlock());
     }
 
@@ -898,7 +911,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
   @Override // FsDatasetSpi
   public ReplicaInputStreams getTmpInputStreams(ExtendedBlock b,
       long blkOffset, long metaOffset) throws IOException {
-    try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
+    try (AutoCloseableLock lock = datasetReadLock.acquire()) {
       ReplicaInfo info = getReplicaInfo(b);
       FsVolumeReference ref = info.getVolume().obtainReference();
       try {
@@ -1023,7 +1036,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
     }
 
     FsVolumeReference volumeRef = null;
-    try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
+    try (AutoCloseableLock lock = datasetReadLock.acquire()) {
       volumeRef = volumes.getNextVolume(targetStorageType, targetStorageId,
           block.getNumBytes());
     }
@@ -1137,7 +1150,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
 
     FsVolumeReference volumeRef = null;
 
-    try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
+    try (AutoCloseableLock lock = datasetReadLock.acquire()) {
       volumeRef = destination.obtainReference();
     }
 
@@ -1891,7 +1904,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
         new HashMap<String, BlockListAsLongs.Builder>();
 
     List<FsVolumeImpl> curVolumes = null;
-    try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
+    try (AutoCloseableLock lock = datasetReadLock.acquire()) {
       curVolumes = volumes.getVolumes();
       for (FsVolumeSpi v : curVolumes) {
         builders.put(v.getStorageID(), BlockListAsLongs.builder(maxDataLength));
@@ -1954,7 +1967,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
    */
   @Override
   public List<ReplicaInfo> getFinalizedBlocks(String bpid) {
-    try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
+    try (AutoCloseableLock lock = datasetReadLock.acquire()) {
       final List<ReplicaInfo> finalized = new ArrayList<ReplicaInfo>(
           volumeMap.size(bpid));
       for (ReplicaInfo b : volumeMap.replicas(bpid)) {
@@ -2047,9 +2060,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
   ReplicaInfo validateBlockFile(String bpid, long blockId) {
     //Should we check for metadata file too?
     final ReplicaInfo r;
-    try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
-      r = volumeMap.get(bpid, blockId);
-    }
+    r = volumeMap.get(bpid, blockId);
     if (r != null) {
       if (r.blockDataExists()) {
         return r;
@@ -2292,7 +2303,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
 
   @Override // FsDatasetSpi
   public boolean contains(final ExtendedBlock block) {
-    try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
+    try (AutoCloseableLock lock = datasetReadLock.acquire()) {
       final long blockId = block.getLocalBlock().getBlockId();
       final String bpid = block.getBlockPoolId();
       final ReplicaInfo r = volumeMap.get(bpid, blockId);
@@ -2613,7 +2624,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
 
   @Override 
   public String getReplicaString(String bpid, long blockId) {
-    try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
+    try (AutoCloseableLock lock = datasetReadLock.acquire()) {
       final Replica r = volumeMap.get(bpid, blockId);
       return r == null ? "null" : r.toString();
     }
@@ -2833,7 +2844,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
   @Override // FsDatasetSpi
   public long getReplicaVisibleLength(final ExtendedBlock block)
   throws IOException {
-    try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
+    try (AutoCloseableLock lock = datasetReadLock.acquire()) {
       final Replica replica = getReplicaInfo(block.getBlockPoolId(),
           block.getBlockId());
       if (replica.getGenerationStamp() < block.getGenerationStamp()) {
@@ -2983,18 +2994,20 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
   @Override // FsDatasetSpi
   public BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock block)
       throws IOException {
-    try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
+    try (AutoCloseableLock lock = datasetReadLock.acquire()) {
       final Replica replica = volumeMap.get(block.getBlockPoolId(),
           block.getBlockId());
       if (replica == null) {
         throw new ReplicaNotFoundException(block);
       }
-      if (replica.getGenerationStamp() < block.getGenerationStamp()) {
-        throw new IOException(
-            "Replica generation stamp < block generation stamp, block="
-            + block + ", replica=" + replica);
-      } else if (replica.getGenerationStamp() > block.getGenerationStamp()) {
-        block.setGenerationStamp(replica.getGenerationStamp());
+      synchronized(replica) {
+        if (replica.getGenerationStamp() < block.getGenerationStamp()) {
+          throw new IOException(
+              "Replica generation stamp < block generation stamp, block="
+                  + block + ", replica=" + replica);
+        } else if (replica.getGenerationStamp() > block.getGenerationStamp()) {
+          block.setGenerationStamp(replica.getGenerationStamp());
+        }
       }
     }
 
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ReplicaMap.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ReplicaMap.java
index 25141b8..13c55d3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ReplicaMap.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ReplicaMap.java
@@ -32,7 +32,6 @@ import org.apache.hadoop.util.AutoCloseableLock;
  * Maintains the replica map. 
  */
 class ReplicaMap {
-  private final ReadWriteLock rwLock;
   // Lock object to synchronize this instance.
   private final AutoCloseableLock readLock;
   private final AutoCloseableLock writeLock;
@@ -41,18 +40,22 @@ class ReplicaMap {
   private final Map<String, LightWeightResizableGSet<Block, ReplicaInfo>> map =
       new HashMap<>();
 
-  ReplicaMap(ReadWriteLock lock) {
-    if (lock == null) {
+  ReplicaMap(AutoCloseableLock rLock, AutoCloseableLock wLock) {
+    if (rLock == null || wLock == null) {
       throw new HadoopIllegalArgumentException(
           "Lock to synchronize on cannot be null");
     }
-    this.rwLock = lock;
-    this.readLock = new AutoCloseableLock(rwLock.readLock());
-    this.writeLock = new AutoCloseableLock(rwLock.writeLock());
+    this.readLock = rLock;
+    this.writeLock = wLock;
+  }
+
+  ReplicaMap(ReadWriteLock lock) {
+    this(new AutoCloseableLock(lock.readLock()),
+        new AutoCloseableLock(lock.writeLock()));
   }
   
   String[] getBlockPoolList() {
-    try (AutoCloseableLock l = writeLock.acquire()) {
+    try (AutoCloseableLock l = readLock.acquire()) {
       return map.keySet().toArray(new String[map.keySet().size()]);   
     }
   }
@@ -97,7 +100,7 @@ class ReplicaMap {
    */
   ReplicaInfo get(String bpid, long blockId) {
     checkBlockPool(bpid);
-    try (AutoCloseableLock l = writeLock.acquire()) {
+    try (AutoCloseableLock l = readLock.acquire()) {
       LightWeightResizableGSet<Block, ReplicaInfo> m = map.get(bpid);
       return m != null ? m.get(new Block(blockId)) : null;
     }
@@ -218,7 +221,7 @@ class ReplicaMap {
    * @return the number of replicas in the map
    */
   int size(String bpid) {
-    try (AutoCloseableLock l = writeLock.acquire()) {
+    try (AutoCloseableLock l = readLock.acquire()) {
       LightWeightResizableGSet<Block, ReplicaInfo> m = map.get(bpid);
       return m != null ? m.size() : 0;
     }
@@ -266,4 +269,14 @@ class ReplicaMap {
   AutoCloseableLock getLock() {
     return writeLock;
   }
+
+  /**
+   * Get the lock object used for synchronizing the ReplicasMap for read only
+   * operations.
+   * @return The read lock object
+   */
+  AutoCloseableLock getReadLock() {
+    return readLock;
+  }
+
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index 2e1dbf6..c36b333 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -3010,6 +3010,19 @@
 </property>
 
 <property>
+  <name>dfs.datanode.lock.read.write.enabled</name>
+  <value>true</value>
+  <description>If this is true, the FsDataset lock will be a read write lock. If
+    it is false, all locks will be a write lock.
+    Enabling this should give better datanode throughput, as many read only
+    functions can run concurrently under the read lock, when they would
+    previously have required the exclusive write lock. As the feature is
+    experimental, this switch can be used to disable the shared read lock, and
+    cause all lock acquisitions to use the exclusive write lock.
+  </description>
+</property>
+
+<property>
   <name>dfs.datanode.lock-reporting-threshold-ms</name>
   <value>300</value>
   <description>When thread waits to obtain a lock, or a thread holds a lock for
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
index 13ffb96..d4c4b14 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
 import java.util.function.Supplier;
 import com.google.common.collect.Lists;
 
-import java.io.OutputStream;
 import java.nio.file.Files;
 import java.nio.file.Paths;
 import org.apache.commons.io.FileUtils;
@@ -65,6 +64,7 @@ import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
 import org.apache.hadoop.io.MultipleIOException;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.test.LambdaTestUtils;
+import org.apache.hadoop.util.AutoCloseableLock;
 import org.apache.hadoop.util.FakeTimer;
 import org.apache.hadoop.util.StringUtils;
 import org.junit.Assert;
@@ -85,6 +85,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DN_CACHED_DFSUSED_CHECK_INTERVAL_MS;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY;
@@ -197,6 +198,101 @@ public class TestFsDatasetImpl {
   }
 
   @Test
+  public void testReadLockEnabledByDefault()
+      throws IOException, InterruptedException {
+    final FsDatasetSpi ds = dataset;
+    AtomicBoolean accessed = new AtomicBoolean(false);
+    CountDownLatch latch = new CountDownLatch(1);
+    CountDownLatch waiterLatch = new CountDownLatch(1);
+
+    Thread holder = new Thread() {
+      public void run() {
+        try (AutoCloseableLock l = ds.acquireDatasetReadLock()) {
+          latch.countDown();
+          sleep(10000);
+        } catch (Exception e) {
+        }
+      }
+    };
+
+    Thread waiter = new Thread() {
+      public void run() {
+        try (AutoCloseableLock l = ds.acquireDatasetReadLock()) {
+          waiterLatch.countDown();
+          accessed.getAndSet(true);
+        } catch (Exception e) {
+        }
+      }
+    };
+
+    holder.start();
+    latch.await();
+    waiter.start();
+    waiterLatch.await();
+    // The holder thread is still holding the lock, but the waiter can still
+    // run as the lock is a shared read lock.
+    assertEquals(true, accessed.get());
+    holder.interrupt();
+    holder.join();
+    waiter.join();
+  }
+
+  @Test(timeout=10000)
+  public void testReadLockCanBeDisabledByConfig()
+      throws IOException, InterruptedException {
+    HdfsConfiguration conf = new HdfsConfiguration();
+    conf.setBoolean(
+        DFSConfigKeys.DFS_DATANODE_LOCK_READ_WRITE_ENABLED_KEY, false);
+    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+        .numDataNodes(1).build();
+    try {
+      cluster.waitActive();
+      DataNode dn = cluster.getDataNodes().get(0);
+      final FsDatasetSpi<?> ds = DataNodeTestUtils.getFSDataset(dn);
+
+      CountDownLatch latch = new CountDownLatch(1);
+      CountDownLatch waiterLatch = new CountDownLatch(1);
+      AtomicBoolean accessed = new AtomicBoolean(false);
+
+      Thread holder = new Thread() {
+        public void run() {
+          try (AutoCloseableLock l = ds.acquireDatasetReadLock()) {
+            latch.countDown();
+            sleep(10000);
+          } catch (Exception e) {
+          }
+        }
+      };
+
+      Thread waiter = new Thread() {
+        public void run() {
+          try (AutoCloseableLock l = ds.acquireDatasetReadLock()) {
+            accessed.getAndSet(true);
+            waiterLatch.countDown();
+          } catch (Exception e) {
+          }
+        }
+      };
+
+      holder.start();
+      latch.await();
+      waiter.start();
+      Thread.sleep(200);
+      // Waiting thread should not have been able to update the variable
+      // as the read lock is disabled and hence an exclusive lock.
+      assertEquals(false, accessed.get());
+      holder.interrupt();
+      holder.join();
+      waiterLatch.await();
+      // After the holder thread exits, the variable is updated.
+      assertEquals(true, accessed.get());
+      waiter.join();
+    } finally {
+      cluster.shutdown();
+    }
+  }
+
+  @Test
   public void testAddVolumes() throws IOException {
     final int numNewVolumes = 3;
     final int numExistingVolumes = getNumVolumes();
@@ -242,8 +338,8 @@ public class TestFsDatasetImpl {
 
   @Test
   public void testAddVolumeWithSameStorageUuid() throws IOException {
-    HdfsConfiguration conf = new HdfsConfiguration();
-    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+    HdfsConfiguration config = new HdfsConfiguration();
+    MiniDFSCluster cluster = new MiniDFSCluster.Builder(config)
         .numDataNodes(1).build();
     try {
       cluster.waitActive();

---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org