You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by we...@apache.org on 2020/06/30 14:10:36 UTC
[hadoop] branch trunk updated: HDFS-15160. ReplicaMap, Disk Balancer,
Directory Scanner and various FsDatasetImpl methods should use
datanode readlock. Contributed by Stephen O'Donnell.
This is an automated email from the ASF dual-hosted git repository.
weichiu pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new 2a67e2b HDFS-15160. ReplicaMap, Disk Balancer, Directory Scanner and various FsDatasetImpl methods should use datanode readlock. Contributed by Stephen O'Donnell.
2a67e2b is described below
commit 2a67e2b1a0e3a5f91056f5b977ef9c4c07ba6718
Author: Stephen O'Donnell <so...@apache.org>
AuthorDate: Tue Jun 30 07:09:26 2020 -0700
HDFS-15160. ReplicaMap, Disk Balancer, Directory Scanner and various FsDatasetImpl methods should use datanode readlock. Contributed by Stephen O'Donnell.
Signed-off-by: Wei-Chiu Chuang <we...@apache.org>
---
.../java/org/apache/hadoop/hdfs/DFSConfigKeys.java | 4 +
.../hadoop/hdfs/server/datanode/BlockSender.java | 2 +-
.../hadoop/hdfs/server/datanode/DataNode.java | 2 +-
.../hdfs/server/datanode/DirectoryScanner.java | 2 +-
.../hadoop/hdfs/server/datanode/DiskBalancer.java | 2 +-
.../server/datanode/fsdataset/FsDatasetSpi.java | 8 +-
.../datanode/fsdataset/impl/FsDatasetImpl.java | 64 ++++++++-----
.../server/datanode/fsdataset/impl/ReplicaMap.java | 31 +++++--
.../src/main/resources/hdfs-default.xml | 13 +++
.../datanode/fsdataset/impl/TestFsDatasetImpl.java | 101 ++++++++++++++++++++-
10 files changed, 187 insertions(+), 42 deletions(-)
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index 3a0a678..9de33ff 100755
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -606,6 +606,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final String DFS_DATANODE_LOCK_FAIR_KEY =
"dfs.datanode.lock.fair";
public static final boolean DFS_DATANODE_LOCK_FAIR_DEFAULT = true;
+ public static final String DFS_DATANODE_LOCK_READ_WRITE_ENABLED_KEY =
+ "dfs.datanode.lock.read.write.enabled";
+ public static final Boolean DFS_DATANODE_LOCK_READ_WRITE_ENABLED_DEFAULT =
+ true;
public static final String DFS_DATANODE_LOCK_REPORTING_THRESHOLD_MS_KEY =
"dfs.datanode.lock-reporting-threshold-ms";
public static final long
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
index 6102a59..b396bf9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
@@ -255,7 +255,7 @@ class BlockSender implements java.io.Closeable {
// the append write.
ChunkChecksum chunkChecksum = null;
final long replicaVisibleLength;
- try(AutoCloseableLock lock = datanode.data.acquireDatasetLock()) {
+ try(AutoCloseableLock lock = datanode.data.acquireDatasetReadLock()) {
replica = getReplica(block, datanode);
replicaVisibleLength = replica.getVisibleLength();
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
index 2e498e4..e242cc8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
@@ -3060,7 +3060,7 @@ public class DataNode extends ReconfigurableBase
final BlockConstructionStage stage;
//get replica information
- try(AutoCloseableLock lock = data.acquireDatasetLock()) {
+ try(AutoCloseableLock lock = data.acquireDatasetReadLock()) {
Block storedBlock = data.getStoredBlock(b.getBlockPoolId(),
b.getBlockId());
if (null == storedBlock) {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java
index 35625ce..b2e521c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java
@@ -473,7 +473,7 @@ public class DirectoryScanner implements Runnable {
blockPoolReport.sortBlocks();
// Hold FSDataset lock to prevent further changes to the block map
- try (AutoCloseableLock lock = dataset.acquireDatasetLock()) {
+ try (AutoCloseableLock lock = dataset.acquireDatasetReadLock()) {
for (final String bpid : blockPoolReport.getBlockPoolIds()) {
List<ScanInfo> blockpoolReport = blockPoolReport.getScanInfo(bpid);
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java
index 958c0cfee..ac10e8f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java
@@ -504,7 +504,7 @@ public class DiskBalancer {
Map<String, String> storageIDToVolBasePathMap = new HashMap<>();
FsDatasetSpi.FsVolumeReferences references;
try {
- try(AutoCloseableLock lock = this.dataset.acquireDatasetLock()) {
+ try(AutoCloseableLock lock = this.dataset.acquireDatasetReadLock()) {
references = this.dataset.getFsVolumeReferences();
for (int ndx = 0; ndx < references.size(); ndx++) {
FsVolumeSpi vol = references.get(ndx);
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
index 2e5135d..177c62e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
@@ -657,12 +657,16 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
FsVolumeSpi destination) throws IOException;
/**
- * Acquire the lock of the data set.
+ * Acquire the lock of the data set. This prevents other threads from
+ * modifying the volume map structure inside the datanode, but other changes
+ * are still possible. For example modifying the genStamp of a block instance.
*/
AutoCloseableLock acquireDatasetLock();
/***
- * Acquire the read lock of the data set.
+ * Acquire the read lock of the data set. This prevents other threads from
+ * modifying the volume map structure inside the datanode, but other changes
+ * are still possible. For example modifying the genStamp of a block instance.
* @return The AutoClosable read lock instance.
*/
AutoCloseableLock acquireDatasetReadLock();
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
index a083012..de898e9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
@@ -183,7 +183,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
@Override
public FsVolumeImpl getVolume(final ExtendedBlock b) {
- try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
+ try (AutoCloseableLock lock = datasetReadLock.acquire()) {
final ReplicaInfo r =
volumeMap.get(b.getBlockPoolId(), b.getLocalBlock());
return r != null ? (FsVolumeImpl) r.getVolume() : null;
@@ -193,7 +193,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
@Override // FsDatasetSpi
public Block getStoredBlock(String bpid, long blkid)
throws IOException {
- try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
+ try (AutoCloseableLock lock = datasetReadLock.acquire()) {
ReplicaInfo r = volumeMap.get(bpid, blkid);
if (r == null) {
return null;
@@ -206,7 +206,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
public Set<? extends Replica> deepCopyReplica(String bpid)
throws IOException {
Set<? extends Replica> replicas = null;
- try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
+ try (AutoCloseableLock lock = datasetReadLock.acquire()) {
replicas = new HashSet<>(volumeMap.replicas(bpid) == null ? Collections.
EMPTY_SET : volumeMap.replicas(bpid));
}
@@ -302,7 +302,20 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
DFSConfigKeys.DFS_DATANODE_LOCK_REPORTING_THRESHOLD_MS_DEFAULT,
TimeUnit.MILLISECONDS));
this.datasetWriteLock = new AutoCloseableLock(datasetRWLock.writeLock());
- this.datasetReadLock = new AutoCloseableLock(datasetRWLock.readLock());
+ boolean enableRL = conf.getBoolean(
+ DFSConfigKeys.DFS_DATANODE_LOCK_READ_WRITE_ENABLED_KEY,
+ DFSConfigKeys.DFS_DATANODE_LOCK_READ_WRITE_ENABLED_DEFAULT);
+ // The read lock can be disabled by the above config key. If it is disabled
+ // then we simply make the both the read and write lock variables hold
+ // the write lock. All accesses to the lock are via these variables, so that
+ // effectively disables the read lock.
+ if (enableRL) {
+ LOG.info("The datanode lock is a read write lock");
+ this.datasetReadLock = new AutoCloseableLock(datasetRWLock.readLock());
+ } else {
+ LOG.info("The datanode lock is an exclusive write lock");
+ this.datasetReadLock = this.datasetWriteLock;
+ }
this.datasetWriteLockCondition = datasetWriteLock.newCondition();
// The number of volumes required for operation is the total number
@@ -342,7 +355,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
}
storageMap = new ConcurrentHashMap<String, DatanodeStorage>();
- volumeMap = new ReplicaMap(datasetRWLock);
+ volumeMap = new ReplicaMap(datasetReadLock, datasetWriteLock);
ramDiskReplicaTracker = RamDiskReplicaTracker.getInstance(conf, this);
@SuppressWarnings("unchecked")
@@ -475,7 +488,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
.setConf(this.conf)
.build();
FsVolumeReference ref = fsVolume.obtainReference();
- ReplicaMap tempVolumeMap = new ReplicaMap(datasetRWLock);
+ ReplicaMap tempVolumeMap =
+ new ReplicaMap(datasetReadLock, datasetWriteLock);
fsVolume.getVolumeMap(tempVolumeMap, ramDiskReplicaTracker);
activateVolume(tempVolumeMap, sd, storageLocation.getStorageType(), ref);
@@ -810,7 +824,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
long seekOffset) throws IOException {
ReplicaInfo info;
- try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
+ try (AutoCloseableLock lock = datasetReadLock.acquire()) {
info = volumeMap.get(b.getBlockPoolId(), b.getLocalBlock());
}
@@ -898,7 +912,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
@Override // FsDatasetSpi
public ReplicaInputStreams getTmpInputStreams(ExtendedBlock b,
long blkOffset, long metaOffset) throws IOException {
- try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
+ try (AutoCloseableLock lock = datasetReadLock.acquire()) {
ReplicaInfo info = getReplicaInfo(b);
FsVolumeReference ref = info.getVolume().obtainReference();
try {
@@ -1023,7 +1037,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
}
FsVolumeReference volumeRef = null;
- try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
+ try (AutoCloseableLock lock = datasetReadLock.acquire()) {
volumeRef = volumes.getNextVolume(targetStorageType, targetStorageId,
block.getNumBytes());
}
@@ -1137,7 +1151,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
FsVolumeReference volumeRef = null;
- try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
+ try (AutoCloseableLock lock = datasetReadLock.acquire()) {
volumeRef = destination.obtainReference();
}
@@ -1930,7 +1944,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
new HashMap<String, BlockListAsLongs.Builder>();
List<FsVolumeImpl> curVolumes = null;
- try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
+ try (AutoCloseableLock lock = datasetReadLock.acquire()) {
curVolumes = volumes.getVolumes();
for (FsVolumeSpi v : curVolumes) {
builders.put(v.getStorageID(), BlockListAsLongs.builder(maxDataLength));
@@ -1989,7 +2003,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
*/
@Override
public List<ReplicaInfo> getFinalizedBlocks(String bpid) {
- try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
+ try (AutoCloseableLock lock = datasetReadLock.acquire()) {
final List<ReplicaInfo> finalized = new ArrayList<ReplicaInfo>(
volumeMap.size(bpid));
for (ReplicaInfo b : volumeMap.replicas(bpid)) {
@@ -2082,9 +2096,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
ReplicaInfo validateBlockFile(String bpid, long blockId) {
//Should we check for metadata file too?
final ReplicaInfo r;
- try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
- r = volumeMap.get(bpid, blockId);
- }
+ r = volumeMap.get(bpid, blockId);
if (r != null) {
if (r.blockDataExists()) {
return r;
@@ -2327,7 +2339,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
@Override // FsDatasetSpi
public boolean contains(final ExtendedBlock block) {
- try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
+ try (AutoCloseableLock lock = datasetReadLock.acquire()) {
final long blockId = block.getLocalBlock().getBlockId();
final String bpid = block.getBlockPoolId();
final ReplicaInfo r = volumeMap.get(bpid, blockId);
@@ -2655,7 +2667,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
@Override
public String getReplicaString(String bpid, long blockId) {
- try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
+ try (AutoCloseableLock lock = datasetReadLock.acquire()) {
final Replica r = volumeMap.get(bpid, blockId);
return r == null ? "null" : r.toString();
}
@@ -2882,7 +2894,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
@Override // FsDatasetSpi
public long getReplicaVisibleLength(final ExtendedBlock block)
throws IOException {
- try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
+ try (AutoCloseableLock lock = datasetReadLock.acquire()) {
final Replica replica = getReplicaInfo(block.getBlockPoolId(),
block.getBlockId());
if (replica.getGenerationStamp() < block.getGenerationStamp()) {
@@ -3032,18 +3044,20 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
@Override // FsDatasetSpi
public BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock block)
throws IOException {
- try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
+ try (AutoCloseableLock lock = datasetReadLock.acquire()) {
final Replica replica = volumeMap.get(block.getBlockPoolId(),
block.getBlockId());
if (replica == null) {
throw new ReplicaNotFoundException(block);
}
- if (replica.getGenerationStamp() < block.getGenerationStamp()) {
- throw new IOException(
- "Replica generation stamp < block generation stamp, block="
- + block + ", replica=" + replica);
- } else if (replica.getGenerationStamp() > block.getGenerationStamp()) {
- block.setGenerationStamp(replica.getGenerationStamp());
+ synchronized(replica) {
+ if (replica.getGenerationStamp() < block.getGenerationStamp()) {
+ throw new IOException(
+ "Replica generation stamp < block generation stamp, block="
+ + block + ", replica=" + replica);
+ } else if (replica.getGenerationStamp() > block.getGenerationStamp()) {
+ block.setGenerationStamp(replica.getGenerationStamp());
+ }
}
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ReplicaMap.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ReplicaMap.java
index df14f2a..5dfcc77 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ReplicaMap.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ReplicaMap.java
@@ -33,7 +33,6 @@ import org.apache.hadoop.util.AutoCloseableLock;
* Maintains the replica map.
*/
class ReplicaMap {
- private final ReadWriteLock rwLock;
// Lock object to synchronize this instance.
private final AutoCloseableLock readLock;
private final AutoCloseableLock writeLock;
@@ -53,18 +52,22 @@ class ReplicaMap {
}
};
- ReplicaMap(ReadWriteLock lock) {
- if (lock == null) {
+ ReplicaMap(AutoCloseableLock readLock, AutoCloseableLock writeLock) {
+ if (readLock == null || writeLock == null) {
throw new HadoopIllegalArgumentException(
"Lock to synchronize on cannot be null");
}
- this.rwLock = lock;
- this.readLock = new AutoCloseableLock(rwLock.readLock());
- this.writeLock = new AutoCloseableLock(rwLock.writeLock());
+ this.readLock = readLock;
+ this.writeLock = writeLock;
+ }
+
+ ReplicaMap(ReadWriteLock lock) {
+ this(new AutoCloseableLock(lock.readLock()),
+ new AutoCloseableLock(lock.writeLock()));
}
String[] getBlockPoolList() {
- try (AutoCloseableLock l = writeLock.acquire()) {
+ try (AutoCloseableLock l = readLock.acquire()) {
return map.keySet().toArray(new String[map.keySet().size()]);
}
}
@@ -109,7 +112,7 @@ class ReplicaMap {
*/
ReplicaInfo get(String bpid, long blockId) {
checkBlockPool(bpid);
- try (AutoCloseableLock l = writeLock.acquire()) {
+ try (AutoCloseableLock l = readLock.acquire()) {
FoldedTreeSet<ReplicaInfo> set = map.get(bpid);
if (set == null) {
return null;
@@ -235,7 +238,7 @@ class ReplicaMap {
* @return the number of replicas in the map
*/
int size(String bpid) {
- try (AutoCloseableLock l = writeLock.acquire()) {
+ try (AutoCloseableLock l = readLock.acquire()) {
FoldedTreeSet<ReplicaInfo> set = map.get(bpid);
return set != null ? set.size() : 0;
}
@@ -281,4 +284,14 @@ class ReplicaMap {
AutoCloseableLock getLock() {
return writeLock;
}
+
+ /**
+ * Get the lock object used for synchronizing the ReplicasMap for read only
+ * operations.
+ * @return The read lock object
+ */
+ AutoCloseableLock getReadLock() {
+ return readLock;
+ }
+
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index 1e8490ad..689ecfe 100755
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -3251,6 +3251,19 @@
</property>
<property>
+ <name>dfs.datanode.lock.read.write.enabled</name>
+ <value>true</value>
+ <description>If this is true, the FsDataset lock will be a read write lock. If
+ it is false, all locks will be a write lock.
+ Enabling this should give better datanode throughput, as many read only
+ functions can run concurrently under the read lock, when they would
+ previously have required the exclusive write lock. As the feature is
+ experimental, this switch can be used to disable the shared read lock, and
+ cause all lock acquisitions to use the exclusive write lock.
+ </description>
+</property>
+
+<property>
<name>dfs.datanode.lock-reporting-threshold-ms</name>
<value>300</value>
<description>When thread waits to obtain a lock, or a thread holds a lock for
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
index 273feb0..8b445c5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
@@ -65,6 +65,7 @@ import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.io.MultipleIOException;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.LambdaTestUtils;
+import org.apache.hadoop.util.AutoCloseableLock;
import org.apache.hadoop.util.FakeTimer;
import org.apache.hadoop.util.StringUtils;
import org.junit.Assert;
@@ -84,6 +85,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DN_CACHED_DFSUSED_CHECK_INTERVAL_MS;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY;
@@ -199,6 +201,101 @@ public class TestFsDatasetImpl {
}
@Test
+ public void testReadLockEnabledByDefault()
+ throws IOException, InterruptedException {
+ final FsDatasetSpi ds = dataset;
+ AtomicBoolean accessed = new AtomicBoolean(false);
+ CountDownLatch latch = new CountDownLatch(1);
+ CountDownLatch waiterLatch = new CountDownLatch(1);
+
+ Thread holder = new Thread() {
+ public void run() {
+ try (AutoCloseableLock l = ds.acquireDatasetReadLock()) {
+ latch.countDown();
+ sleep(10000);
+ } catch (Exception e) {
+ }
+ }
+ };
+
+ Thread waiter = new Thread() {
+ public void run() {
+ try (AutoCloseableLock l = ds.acquireDatasetReadLock()) {
+ waiterLatch.countDown();
+ accessed.getAndSet(true);
+ } catch (Exception e) {
+ }
+ }
+ };
+
+ holder.start();
+ latch.await();
+ waiter.start();
+ waiterLatch.await();
+ // The holder thread is still holding the lock, but the waiter can still
+ // run as the lock is a shared read lock.
+ assertEquals(true, accessed.get());
+ holder.interrupt();
+ holder.join();
+ waiter.join();
+ }
+
+ @Test(timeout=10000)
+ public void testReadLockCanBeDisabledByConfig()
+ throws IOException, InterruptedException {
+ HdfsConfiguration conf = new HdfsConfiguration();
+ conf.setBoolean(
+ DFSConfigKeys.DFS_DATANODE_LOCK_READ_WRITE_ENABLED_KEY, false);
+ MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+ .numDataNodes(1).build();
+ try {
+ cluster.waitActive();
+ DataNode dn = cluster.getDataNodes().get(0);
+ final FsDatasetSpi<?> ds = DataNodeTestUtils.getFSDataset(dn);
+
+ CountDownLatch latch = new CountDownLatch(1);
+ CountDownLatch waiterLatch = new CountDownLatch(1);
+ AtomicBoolean accessed = new AtomicBoolean(false);
+
+ Thread holder = new Thread() {
+ public void run() {
+ try (AutoCloseableLock l = ds.acquireDatasetReadLock()) {
+ latch.countDown();
+ sleep(10000);
+ } catch (Exception e) {
+ }
+ }
+ };
+
+ Thread waiter = new Thread() {
+ public void run() {
+ try (AutoCloseableLock l = ds.acquireDatasetReadLock()) {
+ accessed.getAndSet(true);
+ waiterLatch.countDown();
+ } catch (Exception e) {
+ }
+ }
+ };
+
+ holder.start();
+ latch.await();
+ waiter.start();
+ Thread.sleep(200);
+ // Waiting thread should not have been able to update the variable
+ // as the read lock is disabled and hence an exclusive lock.
+ assertEquals(false, accessed.get());
+ holder.interrupt();
+ holder.join();
+ waiterLatch.await();
+ // After the holder thread exits, the variable is updated.
+ assertEquals(true, accessed.get());
+ waiter.join();
+ } finally {
+ cluster.shutdown();
+ }
+ }
+
+ @Test
public void testAddVolumes() throws IOException {
final int numNewVolumes = 3;
final int numExistingVolumes = getNumVolumes();
@@ -244,8 +341,8 @@ public class TestFsDatasetImpl {
@Test
public void testAddVolumeWithSameStorageUuid() throws IOException {
- HdfsConfiguration conf = new HdfsConfiguration();
- MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+ HdfsConfiguration config = new HdfsConfiguration();
+ MiniDFSCluster cluster = new MiniDFSCluster.Builder(config)
.numDataNodes(1).build();
try {
cluster.waitActive();
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org