You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by he...@apache.org on 2021/09/14 06:19:33 UTC
[hadoop] 02/06: HDFS-15150. Introduce read write lock to Datanode.
Contributed Stephen O'Donnell.
This is an automated email from the ASF dual-hosted git repository.
hexiaoqiao pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/hadoop.git
commit 7039433146d307c8e56587c69d33f989626da57f
Author: Stephen O'Donnell <so...@cloudera.com>
AuthorDate: Tue Feb 11 07:59:34 2020 -0800
HDFS-15150. Introduce read write lock to Datanode. Contributed Stephen O'Donnell.
Signed-off-by: Wei-Chiu Chuang <we...@apache.org>
(cherry picked from commit d7c136b9ed6d99e1b03f5b89723b3a20df359ba8)
---
.../hadoop/util/InstrumentedReadWriteLock.java | 2 +-
.../java/org/apache/hadoop/hdfs/DFSConfigKeys.java | 7 +
.../server/datanode/fsdataset/FsDatasetSpi.java | 6 +
.../datanode/fsdataset/impl/BlockPoolSlice.java | 4 +-
.../datanode/fsdataset/impl/FsDatasetImpl.java | 143 ++++++++++++---------
.../fsdataset/impl/ProvidedVolumeImpl.java | 10 +-
.../server/datanode/fsdataset/impl/ReplicaMap.java | 31 +++--
.../src/main/resources/hdfs-default.xml | 21 +++
.../hdfs/server/datanode/SimulatedFSDataset.java | 6 +
.../datanode/extdataset/ExternalDatasetImpl.java | 5 +
.../fsdataset/impl/FsDatasetImplTestUtils.java | 2 +-
.../datanode/fsdataset/impl/TestFsVolumeList.java | 4 +-
.../fsdataset/impl/TestInterDatanodeProtocol.java | 4 +-
.../datanode/fsdataset/impl/TestProvidedImpl.java | 8 +-
.../datanode/fsdataset/impl/TestReplicaMap.java | 9 +-
.../fsdataset/impl/TestWriteToReplica.java | 4 +-
16 files changed, 168 insertions(+), 98 deletions(-)
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/InstrumentedReadWriteLock.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/InstrumentedReadWriteLock.java
index a410524..758f1ff 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/InstrumentedReadWriteLock.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/InstrumentedReadWriteLock.java
@@ -37,7 +37,7 @@ public class InstrumentedReadWriteLock implements ReadWriteLock {
private final Lock readLock;
private final Lock writeLock;
- InstrumentedReadWriteLock(boolean fair, String name, Logger logger,
+ public InstrumentedReadWriteLock(boolean fair, String name, Logger logger,
long minLoggingGapMs, long lockWarningThresholdMs) {
ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock(fair);
readLock = new InstrumentedReadLock(name, logger, readWriteLock,
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index 563b64c..76784e7 100755
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -549,6 +549,13 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
"dfs.lock.suppress.warning.interval";
public static final long DFS_LOCK_SUPPRESS_WARNING_INTERVAL_DEFAULT =
10000; //ms
+ public static final String DFS_DATANODE_LOCK_FAIR_KEY =
+ "dfs.datanode.lock.fair";
+ public static final boolean DFS_DATANODE_LOCK_FAIR_DEFAULT = true;
+ public static final String DFS_DATANODE_LOCK_REPORTING_THRESHOLD_MS_KEY =
+ "dfs.datanode.lock-reporting-threshold-ms";
+ public static final long
+ DFS_DATANODE_LOCK_REPORTING_THRESHOLD_MS_DEFAULT = 300L;
public static final String DFS_UPGRADE_DOMAIN_FACTOR = "dfs.namenode.upgrade.domain.factor";
public static final int DFS_UPGRADE_DOMAIN_FACTOR_DEFAULT = DFS_REPLICATION_DEFAULT;
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
index 78a5cfc..4de9169 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
@@ -661,5 +661,11 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
*/
AutoCloseableLock acquireDatasetLock();
+ /***
+ * Acquire the read lock of the data set.
+ * @return The AutoClosable read lock instance.
+ */
+ AutoCloseableLock acquireDatasetReadLock();
+
Set<? extends Replica> deepCopyReplica(String bpid) throws IOException;
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
index 9656b9d..ea77505 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
@@ -42,6 +42,7 @@ import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveAction;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.hadoop.hdfs.server.datanode.FSCachingGetSpaceUsed;
import org.slf4j.Logger;
@@ -66,7 +67,6 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.RamDiskReplicaTrack
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.MultipleIOException;
-import org.apache.hadoop.util.AutoCloseableLock;
import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.DataChecksum.Type;
import org.apache.hadoop.util.DiskChecker;
@@ -874,7 +874,7 @@ class BlockPoolSlice {
private boolean readReplicasFromCache(ReplicaMap volumeMap,
final RamDiskReplicaTracker lazyWriteReplicaMap) {
- ReplicaMap tmpReplicaMap = new ReplicaMap(new AutoCloseableLock());
+ ReplicaMap tmpReplicaMap = new ReplicaMap(new ReentrantReadWriteLock());
File replicaFile = new File(currentDir, REPLICA_CACHE_FILE);
// Check whether the file exists or not.
if (!replicaFile.exists()) {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
index 3576671..32e58f4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
@@ -40,8 +40,8 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.management.NotCompliantMBeanException;
import javax.management.ObjectName;
@@ -112,7 +112,7 @@ import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
-import org.apache.hadoop.util.InstrumentedLock;
+import org.apache.hadoop.util.InstrumentedReadWriteLock;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.util.Timer;
@@ -179,7 +179,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
@Override
public FsVolumeImpl getVolume(final ExtendedBlock b) {
- try (AutoCloseableLock lock = datasetLock.acquire()) {
+ try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
final ReplicaInfo r =
volumeMap.get(b.getBlockPoolId(), b.getLocalBlock());
return r != null ? (FsVolumeImpl) r.getVolume() : null;
@@ -189,7 +189,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
@Override // FsDatasetSpi
public Block getStoredBlock(String bpid, long blkid)
throws IOException {
- try (AutoCloseableLock lock = datasetLock.acquire()) {
+ try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
ReplicaInfo r = volumeMap.get(bpid, blkid);
if (r == null) {
return null;
@@ -205,9 +205,13 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
@Override
public Set<? extends Replica> deepCopyReplica(String bpid)
throws IOException {
- Set<? extends Replica> replicas =
- new HashSet<>(volumeMap.replicas(bpid) == null ? Collections.EMPTY_SET
- : volumeMap.replicas(bpid));
+ Set<? extends Replica> replicas = null;
+ try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
+ replicas =
+ new HashSet<>(volumeMap.replicas(bpid) == null ? Collections.EMPTY_SET
+ : volumeMap.replicas(bpid));
+ }
+
return Collections.unmodifiableSet(replicas);
}
@@ -268,8 +272,12 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
private final int maxDataLength;
@VisibleForTesting
- final AutoCloseableLock datasetLock;
- private final Condition datasetLockCondition;
+ final AutoCloseableLock datasetWriteLock;
+ @VisibleForTesting
+ final AutoCloseableLock datasetReadLock;
+ @VisibleForTesting
+ final InstrumentedReadWriteLock datasetRWLock;
+ private final Condition datasetWriteLockCondition;
private static String blockPoolId = "";
/**
@@ -282,15 +290,20 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
this.dataStorage = storage;
this.conf = conf;
this.smallBufferSize = DFSUtilClient.getSmallBufferSize(conf);
- this.datasetLock = new AutoCloseableLock(
- new InstrumentedLock(getClass().getName(), LOG,
- new ReentrantLock(true),
- conf.getTimeDuration(
- DFSConfigKeys.DFS_LOCK_SUPPRESS_WARNING_INTERVAL_KEY,
- DFSConfigKeys.DFS_LOCK_SUPPRESS_WARNING_INTERVAL_DEFAULT,
- TimeUnit.MILLISECONDS),
- 300));
- this.datasetLockCondition = datasetLock.newCondition();
+ this.datasetRWLock = new InstrumentedReadWriteLock(
+ conf.getBoolean(DFSConfigKeys.DFS_DATANODE_LOCK_FAIR_KEY,
+ DFSConfigKeys.DFS_DATANODE_LOCK_FAIR_DEFAULT),
+ "FsDatasetRWLock", LOG, conf.getTimeDuration(
+ DFSConfigKeys.DFS_LOCK_SUPPRESS_WARNING_INTERVAL_KEY,
+ DFSConfigKeys.DFS_LOCK_SUPPRESS_WARNING_INTERVAL_DEFAULT,
+ TimeUnit.MILLISECONDS),
+ conf.getTimeDuration(
+ DFSConfigKeys.DFS_DATANODE_LOCK_REPORTING_THRESHOLD_MS_KEY,
+ DFSConfigKeys.DFS_DATANODE_LOCK_REPORTING_THRESHOLD_MS_DEFAULT,
+ TimeUnit.MILLISECONDS));
+ this.datasetWriteLock = new AutoCloseableLock(datasetRWLock.writeLock());
+ this.datasetReadLock = new AutoCloseableLock(datasetRWLock.readLock());
+ this.datasetWriteLockCondition = datasetWriteLock.newCondition();
// The number of volumes required for operation is the total number
// of volumes minus the number of failed volumes we can tolerate.
@@ -329,7 +342,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
}
storageMap = new ConcurrentHashMap<String, DatanodeStorage>();
- volumeMap = new ReplicaMap(datasetLock);
+ volumeMap = new ReplicaMap(datasetRWLock);
ramDiskReplicaTracker = RamDiskReplicaTracker.getInstance(conf, this);
@SuppressWarnings("unchecked")
@@ -383,7 +396,12 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
@Override
public AutoCloseableLock acquireDatasetLock() {
- return datasetLock.acquire();
+ return datasetWriteLock.acquire();
+ }
+
+ @Override
+ public AutoCloseableLock acquireDatasetReadLock() {
+ return datasetReadLock.acquire();
}
/**
@@ -424,7 +442,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
ReplicaMap replicaMap,
Storage.StorageDirectory sd, StorageType storageType,
FsVolumeReference ref) throws IOException {
- try (AutoCloseableLock lock = datasetLock.acquire()) {
+ try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
DatanodeStorage dnStorage = storageMap.get(sd.getStorageUuid());
if (dnStorage != null) {
final String errorMsg = String.format(
@@ -457,7 +475,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
.setConf(this.conf)
.build();
FsVolumeReference ref = fsVolume.obtainReference();
- ReplicaMap tempVolumeMap = new ReplicaMap(datasetLock);
+ ReplicaMap tempVolumeMap = new ReplicaMap(datasetRWLock);
fsVolume.getVolumeMap(tempVolumeMap, ramDiskReplicaTracker);
activateVolume(tempVolumeMap, sd, storageLocation.getStorageType(), ref);
@@ -496,7 +514,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
StorageType storageType = location.getStorageType();
final FsVolumeImpl fsVolume =
createFsVolume(sd.getStorageUuid(), sd, location);
- final ReplicaMap tempVolumeMap = new ReplicaMap(new AutoCloseableLock());
+ final ReplicaMap tempVolumeMap =
+ new ReplicaMap(new ReentrantReadWriteLock());
ArrayList<IOException> exceptions = Lists.newArrayList();
for (final NamespaceInfo nsInfo : nsInfos) {
@@ -541,7 +560,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
new ArrayList<>(storageLocsToRemove);
Map<String, List<ReplicaInfo>> blkToInvalidate = new HashMap<>();
List<String> storageToRemove = new ArrayList<>();
- try (AutoCloseableLock lock = datasetLock.acquire()) {
+ try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
for (int idx = 0; idx < dataStorage.getNumStorageDirs(); idx++) {
Storage.StorageDirectory sd = dataStorage.getStorageDir(idx);
final StorageLocation sdLocation = sd.getStorageLocation();
@@ -553,7 +572,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
// Disable the volume from the service.
asyncDiskService.removeVolume(sd.getStorageUuid());
volumes.removeVolume(sdLocation, clearFailure);
- volumes.waitVolumeRemoved(5000, datasetLockCondition);
+ volumes.waitVolumeRemoved(5000, datasetWriteLockCondition);
// Removed all replica information for the blocks on the volume.
// Unlike updating the volumeMap in addVolume(), this operation does
@@ -600,7 +619,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
}
}
- try (AutoCloseableLock lock = datasetLock.acquire()) {
+ try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
for(String storageUuid : storageToRemove) {
storageMap.remove(storageUuid);
}
@@ -791,7 +810,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
long seekOffset) throws IOException {
ReplicaInfo info;
- try (AutoCloseableLock lock = datasetLock.acquire()) {
+ try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
info = volumeMap.get(b.getBlockPoolId(), b.getLocalBlock());
}
@@ -879,7 +898,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
@Override // FsDatasetSpi
public ReplicaInputStreams getTmpInputStreams(ExtendedBlock b,
long blkOffset, long metaOffset) throws IOException {
- try (AutoCloseableLock lock = datasetLock.acquire()) {
+ try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
ReplicaInfo info = getReplicaInfo(b);
FsVolumeReference ref = info.getVolume().obtainReference();
try {
@@ -1004,7 +1023,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
}
FsVolumeReference volumeRef = null;
- try (AutoCloseableLock lock = datasetLock.acquire()) {
+ try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
volumeRef = volumes.getNextVolume(targetStorageType, targetStorageId,
block.getNumBytes());
}
@@ -1118,7 +1137,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
FsVolumeReference volumeRef = null;
- try (AutoCloseableLock lock = datasetLock.acquire()) {
+ try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
volumeRef = destination.obtainReference();
}
@@ -1206,7 +1225,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
@Override // FsDatasetSpi
public ReplicaHandler append(ExtendedBlock b,
long newGS, long expectedBlockLen) throws IOException {
- try (AutoCloseableLock lock = datasetLock.acquire()) {
+ try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
// If the block was successfully finalized because all packets
// were successfully processed at the Datanode but the ack for
// some of the packets were not received by the client. The client
@@ -1258,7 +1277,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
private ReplicaInPipeline append(String bpid,
ReplicaInfo replicaInfo, long newGS, long estimateBlockLen)
throws IOException {
- try (AutoCloseableLock lock = datasetLock.acquire()) {
+ try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
// If the block is cached, start uncaching it.
if (replicaInfo.getState() != ReplicaState.FINALIZED) {
throw new IOException("Only a Finalized replica can be appended to; "
@@ -1354,7 +1373,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
while (true) {
try {
- try (AutoCloseableLock lock = datasetLock.acquire()) {
+ try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
ReplicaInfo replicaInfo = recoverCheck(b, newGS, expectedBlockLen);
FsVolumeReference ref = replicaInfo.getVolume().obtainReference();
ReplicaInPipeline replica;
@@ -1386,7 +1405,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
LOG.info("Recover failed close " + b);
while (true) {
try {
- try (AutoCloseableLock lock = datasetLock.acquire()) {
+ try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
// check replica's state
ReplicaInfo replicaInfo = recoverCheck(b, newGS, expectedBlockLen);
// bump the replica's GS
@@ -1408,7 +1427,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
public ReplicaHandler createRbw(
StorageType storageType, String storageId, ExtendedBlock b,
boolean allowLazyPersist) throws IOException {
- try (AutoCloseableLock lock = datasetLock.acquire()) {
+ try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(),
b.getBlockId());
if (replicaInfo != null) {
@@ -1479,7 +1498,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
while (true) {
try {
- try (AutoCloseableLock lock = datasetLock.acquire()) {
+ try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
ReplicaInfo replicaInfo =
getReplicaInfo(b.getBlockPoolId(), b.getBlockId());
// check the replica's state
@@ -1504,7 +1523,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
private ReplicaHandler recoverRbwImpl(ReplicaInPipeline rbw,
ExtendedBlock b, long newGS, long minBytesRcvd, long maxBytesRcvd)
throws IOException {
- try (AutoCloseableLock lock = datasetLock.acquire()) {
+ try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
// check generation stamp
long replicaGenerationStamp = rbw.getGenerationStamp();
if (replicaGenerationStamp < b.getGenerationStamp() ||
@@ -1565,7 +1584,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
public ReplicaInPipeline convertTemporaryToRbw(
final ExtendedBlock b) throws IOException {
- try (AutoCloseableLock lock = datasetLock.acquire()) {
+ try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
final long blockId = b.getBlockId();
final long expectedGs = b.getGenerationStamp();
final long visible = b.getNumBytes();
@@ -1639,7 +1658,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
ReplicaInfo lastFoundReplicaInfo = null;
boolean isInPipeline = false;
do {
- try (AutoCloseableLock lock = datasetLock.acquire()) {
+ try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
ReplicaInfo currentReplicaInfo =
volumeMap.get(b.getBlockPoolId(), b.getBlockId());
if (currentReplicaInfo == lastFoundReplicaInfo) {
@@ -1692,7 +1711,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
invalidate(b.getBlockPoolId(), new Block[] { lastFoundReplicaInfo },
false);
}
- try (AutoCloseableLock lock = datasetLock.acquire()) {
+ try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
FsVolumeReference ref = volumes.getNextVolume(storageType, storageId, b
.getNumBytes());
FsVolumeImpl v = (FsVolumeImpl) ref.getVolume();
@@ -1743,7 +1762,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
throws IOException {
ReplicaInfo replicaInfo = null;
ReplicaInfo finalizedReplicaInfo = null;
- try (AutoCloseableLock lock = datasetLock.acquire()) {
+ try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
if (Thread.interrupted()) {
// Don't allow data modifications from interrupted threads
throw new IOException("Cannot finalize block from Interrupted Thread");
@@ -1774,7 +1793,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
private ReplicaInfo finalizeReplica(String bpid, ReplicaInfo replicaInfo)
throws IOException {
- try (AutoCloseableLock lock = datasetLock.acquire()) {
+ try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
// Compare generation stamp of old and new replica before finalizing
if (volumeMap.get(bpid, replicaInfo.getBlockId()).getGenerationStamp()
> replicaInfo.getGenerationStamp()) {
@@ -1819,7 +1838,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
*/
@Override // FsDatasetSpi
public void unfinalizeBlock(ExtendedBlock b) throws IOException {
- try (AutoCloseableLock lock = datasetLock.acquire()) {
+ try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(),
b.getLocalBlock());
if (replicaInfo != null &&
@@ -1872,7 +1891,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
new HashMap<String, BlockListAsLongs.Builder>();
List<FsVolumeImpl> curVolumes = null;
- try (AutoCloseableLock lock = datasetLock.acquire()) {
+ try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
curVolumes = volumes.getVolumes();
for (FsVolumeSpi v : curVolumes) {
builders.put(v.getStorageID(), BlockListAsLongs.builder(maxDataLength));
@@ -1927,7 +1946,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
* Gets a list of references to the finalized blocks for the given block pool.
* <p>
* Callers of this function should call
- * {@link FsDatasetSpi#acquireDatasetLock} to avoid blocks' status being
+ * {@link FsDatasetSpi#acquireDatasetLock()} to avoid blocks' status being
* changed during list iteration.
* </p>
* @return a list of references to the finalized blocks for the given block
@@ -1935,7 +1954,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
*/
@Override
public List<ReplicaInfo> getFinalizedBlocks(String bpid) {
- try (AutoCloseableLock lock = datasetLock.acquire()) {
+ try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
final List<ReplicaInfo> finalized = new ArrayList<ReplicaInfo>(
volumeMap.size(bpid));
for (ReplicaInfo b : volumeMap.replicas(bpid)) {
@@ -2028,7 +2047,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
ReplicaInfo validateBlockFile(String bpid, long blockId) {
//Should we check for metadata file too?
final ReplicaInfo r;
- try (AutoCloseableLock lock = datasetLock.acquire()) {
+ try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
r = volumeMap.get(bpid, blockId);
}
if (r != null) {
@@ -2079,7 +2098,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
for (int i = 0; i < invalidBlks.length; i++) {
final ReplicaInfo removing;
final FsVolumeImpl v;
- try (AutoCloseableLock lock = datasetLock.acquire()) {
+ try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
final ReplicaInfo info = volumeMap.get(bpid, invalidBlks[i]);
if (info == null) {
ReplicaInfo infoByBlockId =
@@ -2205,7 +2224,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
long length, genstamp;
Executor volumeExecutor;
- try (AutoCloseableLock lock = datasetLock.acquire()) {
+ try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
ReplicaInfo info = volumeMap.get(bpid, blockId);
boolean success = false;
try {
@@ -2273,7 +2292,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
@Override // FsDatasetSpi
public boolean contains(final ExtendedBlock block) {
- try (AutoCloseableLock lock = datasetLock.acquire()) {
+ try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
final long blockId = block.getLocalBlock().getBlockId();
final String bpid = block.getBlockPoolId();
final ReplicaInfo r = volumeMap.get(bpid, blockId);
@@ -2393,7 +2412,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
Block corruptBlock = null;
ReplicaInfo memBlockInfo;
- try (AutoCloseableLock lock = datasetLock.acquire()) {
+ try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
memBlockInfo = volumeMap.get(bpid, blockId);
if (memBlockInfo != null &&
memBlockInfo.getState() != ReplicaState.FINALIZED) {
@@ -2594,7 +2613,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
@Override
public String getReplicaString(String bpid, long blockId) {
- try (AutoCloseableLock lock = datasetLock.acquire()) {
+ try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
final Replica r = volumeMap.get(bpid, blockId);
return r == null ? "null" : r.toString();
}
@@ -2701,7 +2720,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
final long recoveryId,
final long newBlockId,
final long newlength) throws IOException {
- try (AutoCloseableLock lock = datasetLock.acquire()) {
+ try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
//get replica
final String bpid = oldBlock.getBlockPoolId();
final ReplicaInfo replica = volumeMap.get(bpid, oldBlock.getBlockId());
@@ -2814,7 +2833,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
@Override // FsDatasetSpi
public long getReplicaVisibleLength(final ExtendedBlock block)
throws IOException {
- try (AutoCloseableLock lock = datasetLock.acquire()) {
+ try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
final Replica replica = getReplicaInfo(block.getBlockPoolId(),
block.getBlockId());
if (replica.getGenerationStamp() < block.getGenerationStamp()) {
@@ -2831,7 +2850,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
throws IOException {
LOG.info("Adding block pool " + bpid);
AddBlockPoolException volumeExceptions = new AddBlockPoolException();
- try (AutoCloseableLock lock = datasetLock.acquire()) {
+ try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
try {
volumes.addBlockPool(bpid, conf);
} catch (AddBlockPoolException e) {
@@ -2861,7 +2880,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
@Override
public void shutdownBlockPool(String bpid) {
- try (AutoCloseableLock lock = datasetLock.acquire()) {
+ try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
LOG.info("Removing block pool " + bpid);
Map<DatanodeStorage, BlockListAsLongs> blocksPerVolume
= getBlockReports(bpid);
@@ -2935,7 +2954,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
@Override //FsDatasetSpi
public void deleteBlockPool(String bpid, boolean force)
throws IOException {
- try (AutoCloseableLock lock = datasetLock.acquire()) {
+ try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
List<FsVolumeImpl> curVolumes = volumes.getVolumes();
if (!force) {
for (FsVolumeImpl volume : curVolumes) {
@@ -2964,7 +2983,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
@Override // FsDatasetSpi
public BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock block)
throws IOException {
- try (AutoCloseableLock lock = datasetLock.acquire()) {
+ try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
final Replica replica = volumeMap.get(block.getBlockPoolId(),
block.getBlockId());
if (replica == null) {
@@ -3016,7 +3035,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
@Override
public void onCompleteLazyPersist(String bpId, long blockId,
long creationTime, File[] savedFiles, FsVolumeImpl targetVolume) {
- try (AutoCloseableLock lock = datasetLock.acquire()) {
+ try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
ramDiskReplicaTracker.recordEndLazyPersist(bpId, blockId, savedFiles);
targetVolume.incDfsUsedAndNumBlocks(bpId, savedFiles[0].length()
@@ -3150,7 +3169,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
try {
block = ramDiskReplicaTracker.dequeueNextReplicaToPersist();
if (block != null) {
- try (AutoCloseableLock lock = datasetLock.acquire()) {
+ try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
replicaInfo = volumeMap.get(block.getBlockPoolId(), block.getBlockId());
// If replicaInfo is null, the block was either deleted before
@@ -3217,7 +3236,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
ReplicaInfo replicaInfo, newReplicaInfo;
final String bpid = replicaState.getBlockPoolId();
- try (AutoCloseableLock lock = datasetLock.acquire()) {
+ try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
replicaInfo = getReplicaInfo(replicaState.getBlockPoolId(),
replicaState.getBlockId());
Preconditions.checkState(replicaInfo.getVolume().isTransientStorage());
@@ -3390,7 +3409,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
}
void stopAllDataxceiverThreads(FsVolumeImpl volume) {
- try (AutoCloseableLock lock = datasetLock.acquire()) {
+ try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
for (String bpid : volumeMap.getBlockPoolList()) {
Collection<ReplicaInfo> replicas = volumeMap.replicas(bpid);
for (ReplicaInfo replicaInfo : replicas) {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ProvidedVolumeImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ProvidedVolumeImpl.java
index e2d8681..341a2f0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ProvidedVolumeImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ProvidedVolumeImpl.java
@@ -28,6 +28,7 @@ import java.util.Set;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
@@ -53,9 +54,10 @@ import org.apache.hadoop.hdfs.server.datanode.checker.VolumeCheckResult;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.datanode.FileIoProvider;
import org.apache.hadoop.hdfs.server.datanode.ReplicaBuilder;
-import org.apache.hadoop.util.Timer;
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
-import org.apache.hadoop.util.AutoCloseableLock;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.Time;
+import org.apache.hadoop.util.Timer;
import org.codehaus.jackson.annotate.JsonProperty;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.map.ObjectReader;
@@ -63,8 +65,6 @@ import org.codehaus.jackson.map.ObjectWriter;
import com.google.common.annotations.VisibleForTesting;
-import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.hadoop.util.Time;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PROVIDED_ALIASMAP_LOAD_RETRIES;
@@ -135,7 +135,7 @@ class ProvidedVolumeImpl extends FsVolumeImpl {
ProvidedBlockPoolSlice(String bpid, ProvidedVolumeImpl volume,
Configuration conf) {
this.providedVolume = volume;
- bpVolumeMap = new ReplicaMap(new AutoCloseableLock());
+ bpVolumeMap = new ReplicaMap(new ReentrantReadWriteLock());
Class<? extends BlockAliasMap> fmt =
conf.getClass(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_CLASS,
TextFileRegionAliasMap.class, BlockAliasMap.class);
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ReplicaMap.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ReplicaMap.java
index 53a238c..25141b8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ReplicaMap.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ReplicaMap.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.locks.ReadWriteLock;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.hdfs.protocol.Block;
@@ -31,23 +32,27 @@ import org.apache.hadoop.util.AutoCloseableLock;
* Maintains the replica map.
*/
class ReplicaMap {
+ private final ReadWriteLock rwLock;
// Lock object to synchronize this instance.
- private final AutoCloseableLock lock;
+ private final AutoCloseableLock readLock;
+ private final AutoCloseableLock writeLock;
// Map of block pool Id to another map of block Id to ReplicaInfo.
private final Map<String, LightWeightResizableGSet<Block, ReplicaInfo>> map =
new HashMap<>();
- ReplicaMap(AutoCloseableLock lock) {
+ ReplicaMap(ReadWriteLock lock) {
if (lock == null) {
throw new HadoopIllegalArgumentException(
"Lock to synchronize on cannot be null");
}
- this.lock = lock;
+ this.rwLock = lock;
+ this.readLock = new AutoCloseableLock(rwLock.readLock());
+ this.writeLock = new AutoCloseableLock(rwLock.writeLock());
}
String[] getBlockPoolList() {
- try (AutoCloseableLock l = lock.acquire()) {
+ try (AutoCloseableLock l = writeLock.acquire()) {
return map.keySet().toArray(new String[map.keySet().size()]);
}
}
@@ -92,7 +97,7 @@ class ReplicaMap {
*/
ReplicaInfo get(String bpid, long blockId) {
checkBlockPool(bpid);
- try (AutoCloseableLock l = lock.acquire()) {
+ try (AutoCloseableLock l = writeLock.acquire()) {
LightWeightResizableGSet<Block, ReplicaInfo> m = map.get(bpid);
return m != null ? m.get(new Block(blockId)) : null;
}
@@ -109,7 +114,7 @@ class ReplicaMap {
ReplicaInfo add(String bpid, ReplicaInfo replicaInfo) {
checkBlockPool(bpid);
checkBlock(replicaInfo);
- try (AutoCloseableLock l = lock.acquire()) {
+ try (AutoCloseableLock l = writeLock.acquire()) {
LightWeightResizableGSet<Block, ReplicaInfo> m = map.get(bpid);
if (m == null) {
// Add an entry for block pool if it does not exist already
@@ -127,7 +132,7 @@ class ReplicaMap {
ReplicaInfo addAndGet(String bpid, ReplicaInfo replicaInfo) {
checkBlockPool(bpid);
checkBlock(replicaInfo);
- try (AutoCloseableLock l = lock.acquire()) {
+ try (AutoCloseableLock l = writeLock.acquire()) {
LightWeightResizableGSet<Block, ReplicaInfo> m = map.get(bpid);
if (m == null) {
// Add an entry for block pool if it does not exist already
@@ -176,7 +181,7 @@ class ReplicaMap {
ReplicaInfo remove(String bpid, Block block) {
checkBlockPool(bpid);
checkBlock(block);
- try (AutoCloseableLock l = lock.acquire()) {
+ try (AutoCloseableLock l = writeLock.acquire()) {
LightWeightResizableGSet<Block, ReplicaInfo> m = map.get(bpid);
if (m != null) {
ReplicaInfo replicaInfo = m.get(block);
@@ -198,7 +203,7 @@ class ReplicaMap {
*/
ReplicaInfo remove(String bpid, long blockId) {
checkBlockPool(bpid);
- try (AutoCloseableLock l = lock.acquire()) {
+ try (AutoCloseableLock l = writeLock.acquire()) {
LightWeightResizableGSet<Block, ReplicaInfo> m = map.get(bpid);
if (m != null) {
return m.remove(new Block(blockId));
@@ -213,7 +218,7 @@ class ReplicaMap {
* @return the number of replicas in the map
*/
int size(String bpid) {
- try (AutoCloseableLock l = lock.acquire()) {
+ try (AutoCloseableLock l = writeLock.acquire()) {
LightWeightResizableGSet<Block, ReplicaInfo> m = map.get(bpid);
return m != null ? m.size() : 0;
}
@@ -237,7 +242,7 @@ class ReplicaMap {
void initBlockPool(String bpid) {
checkBlockPool(bpid);
- try (AutoCloseableLock l = lock.acquire()) {
+ try (AutoCloseableLock l = writeLock.acquire()) {
LightWeightResizableGSet<Block, ReplicaInfo> m = map.get(bpid);
if (m == null) {
// Add an entry for block pool if it does not exist already
@@ -249,7 +254,7 @@ class ReplicaMap {
void cleanUpBlockPool(String bpid) {
checkBlockPool(bpid);
- try (AutoCloseableLock l = lock.acquire()) {
+ try (AutoCloseableLock l = writeLock.acquire()) {
map.remove(bpid);
}
}
@@ -259,6 +264,6 @@ class ReplicaMap {
* @return lock object
*/
AutoCloseableLock getLock() {
- return lock;
+ return writeLock;
}
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index 3391740..2e1dbf6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -3000,6 +3000,27 @@
</property>
<property>
+ <name>dfs.datanode.lock.fair</name>
+ <value>true</value>
+ <description>If this is true, the Datanode FsDataset lock will be used in Fair
+ mode, which will help to prevent writer threads from being starved, but can
+ lower lock throughput. See java.util.concurrent.locks.ReentrantReadWriteLock
+ for more information on fair/non-fair locks.
+ </description>
+</property>
+
+<property>
+ <name>dfs.datanode.lock-reporting-threshold-ms</name>
+ <value>300</value>
+ <description>When thread waits to obtain a lock, or a thread holds a lock for
+ more than the threshold, a log message will be written. Note that
+ dfs.lock.suppress.warning.interval ensures a single log message is
+ emitted per interval for waiting threads and a single message for holding
+ threads to avoid excessive logging.
+ </description>
+</property>
+
+<property>
<name>dfs.namenode.startup.delay.block.deletion.sec</name>
<value>0</value>
<description>The delay in seconds at which we will pause the blocks deletion
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
index 8f4e29b..cb7bbdb 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
@@ -1573,6 +1573,12 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
}
@Override
+ public AutoCloseableLock acquireDatasetReadLock() {
+ // No RW lock implementation in simulated dataset currently.
+ return datasetLock.acquire();
+ }
+
+ @Override
public Set<? extends Replica> deepCopyReplica(String bpid)
throws IOException {
Set<BInfo> replicas = new HashSet<>();
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java
index 8fe515f..caaa89c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java
@@ -456,6 +456,11 @@ public class ExternalDatasetImpl implements FsDatasetSpi<ExternalVolumeImpl> {
}
@Override
+ public AutoCloseableLock acquireDatasetReadLock() {
+ return null;
+ }
+
+ @Override
public Set<? extends Replica> deepCopyReplica(String bpid)
throws IOException {
return Collections.EMPTY_SET;
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImplTestUtils.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImplTestUtils.java
index 2d939fa..e383e07 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImplTestUtils.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImplTestUtils.java
@@ -434,7 +434,7 @@ public class FsDatasetImplTestUtils implements FsDatasetTestUtils {
@Override
public Iterator<Replica> getStoredReplicas(String bpid) throws IOException {
// Reload replicas from the disk.
- ReplicaMap replicaMap = new ReplicaMap(dataset.datasetLock);
+ ReplicaMap replicaMap = new ReplicaMap(dataset.datasetRWLock);
try (FsVolumeReferences refs = dataset.getFsVolumeReferences()) {
for (FsVolumeSpi vol : refs) {
FsVolumeImpl volume = (FsVolumeImpl) vol;
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsVolumeList.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsVolumeList.java
index 9db9c0c..9371a51 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsVolumeList.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsVolumeList.java
@@ -37,7 +37,6 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.RoundRobinVolumeChoosingPolicy;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.VolumeChoosingPolicy;
import org.apache.hadoop.test.GenericTestUtils;
-import org.apache.hadoop.util.AutoCloseableLock;
import org.apache.hadoop.util.StringUtils;
import org.junit.Before;
import org.junit.Test;
@@ -53,6 +52,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.TimeoutException;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DU_RESERVED_PERCENTAGE_KEY;
import static org.junit.Assert.assertEquals;
@@ -368,7 +368,7 @@ public class TestFsVolumeList {
fs.close();
FsDatasetImpl fsDataset = (FsDatasetImpl) cluster.getDataNodes().get(0)
.getFSDataset();
- ReplicaMap volumeMap = new ReplicaMap(new AutoCloseableLock());
+ ReplicaMap volumeMap = new ReplicaMap(new ReentrantReadWriteLock());
RamDiskReplicaTracker ramDiskReplicaMap = RamDiskReplicaTracker
.getInstance(conf, fsDataset);
FsVolumeImpl vol = (FsVolumeImpl) fsDataset.getFsVolumeReferences().get(0);
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestInterDatanodeProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestInterDatanodeProtocol.java
index 86e9f90..b72b1cd 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestInterDatanodeProtocol.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestInterDatanodeProtocol.java
@@ -25,6 +25,7 @@ import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketTimeoutException;
import java.util.List;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
@@ -59,7 +60,6 @@ import org.apache.hadoop.io.Writable;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.util.AutoCloseableLock;
import org.junit.Assert;
import org.junit.Test;
@@ -236,7 +236,7 @@ public class TestInterDatanodeProtocol {
final long firstblockid = 10000L;
final long gs = 7777L;
final long length = 22L;
- final ReplicaMap map = new ReplicaMap(new AutoCloseableLock());
+ final ReplicaMap map = new ReplicaMap(new ReentrantReadWriteLock());
String bpid = "BP-TEST";
final Block[] blocks = new Block[5];
for(int i = 0; i < blocks.length; i++) {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestProvidedImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestProvidedImpl.java
index d7935b5..ef0a119 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestProvidedImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestProvidedImpl.java
@@ -43,6 +43,7 @@ import java.util.Map;
import java.util.Optional;
import java.util.Random;
import java.util.Set;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
@@ -77,7 +78,6 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi.BlockIterator;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi.FsVolumeReferences;
import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.util.AutoCloseableLock;
import org.apache.hadoop.util.StringUtils;
import org.junit.Before;
import org.junit.Test;
@@ -399,7 +399,7 @@ public class TestProvidedImpl {
public void testBlockLoad() throws IOException {
for (int i = 0; i < providedVolumes.size(); i++) {
FsVolumeImpl vol = providedVolumes.get(i);
- ReplicaMap volumeMap = new ReplicaMap(new AutoCloseableLock());
+ ReplicaMap volumeMap = new ReplicaMap(new ReentrantReadWriteLock());
vol.getVolumeMap(volumeMap, null);
assertEquals(vol.getBlockPoolList().length, BLOCK_POOL_IDS.length);
@@ -475,7 +475,7 @@ public class TestProvidedImpl {
vol.setFileRegionProvider(BLOCK_POOL_IDS[CHOSEN_BP_ID],
new TestFileRegionBlockAliasMap(fileRegionIterator, minBlockId,
numBlocks));
- ReplicaMap volumeMap = new ReplicaMap(new AutoCloseableLock());
+ ReplicaMap volumeMap = new ReplicaMap(new ReentrantReadWriteLock());
vol.getVolumeMap(BLOCK_POOL_IDS[CHOSEN_BP_ID], volumeMap, null);
totalBlocks += volumeMap.size(BLOCK_POOL_IDS[CHOSEN_BP_ID]);
}
@@ -585,7 +585,7 @@ public class TestProvidedImpl {
public void testProvidedReplicaPrefix() throws Exception {
for (int i = 0; i < providedVolumes.size(); i++) {
FsVolumeImpl vol = providedVolumes.get(i);
- ReplicaMap volumeMap = new ReplicaMap(new AutoCloseableLock());
+ ReplicaMap volumeMap = new ReplicaMap(new ReentrantReadWriteLock());
vol.getVolumeMap(volumeMap, null);
Path expectedPrefix = new Path(
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestReplicaMap.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestReplicaMap.java
index 1059c08..59203bb 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestReplicaMap.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestReplicaMap.java
@@ -23,15 +23,16 @@ import static org.junit.Assert.fail;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica;
-import org.apache.hadoop.util.AutoCloseableLock;
import org.junit.Before;
import org.junit.Test;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
/**
* Unit test for ReplicasMap class
*/
public class TestReplicaMap {
- private final ReplicaMap map = new ReplicaMap(new AutoCloseableLock());
+ private final ReplicaMap map = new ReplicaMap(new ReentrantReadWriteLock());
private final String bpid = "BP-TEST";
private final Block block = new Block(1234, 1234, 1234);
@@ -111,7 +112,7 @@ public class TestReplicaMap {
@Test
public void testMergeAll() {
- ReplicaMap temReplicaMap = new ReplicaMap(new AutoCloseableLock());
+ ReplicaMap temReplicaMap = new ReplicaMap(new ReentrantReadWriteLock());
Block tmpBlock = new Block(5678, 5678, 5678);
temReplicaMap.add(bpid, new FinalizedReplica(tmpBlock, null, null));
@@ -122,7 +123,7 @@ public class TestReplicaMap {
@Test
public void testAddAll() {
- ReplicaMap temReplicaMap = new ReplicaMap(new AutoCloseableLock());
+ ReplicaMap temReplicaMap = new ReplicaMap(new ReentrantReadWriteLock());
Block tmpBlock = new Block(5678, 5678, 5678);
temReplicaMap.add(bpid, new FinalizedReplica(tmpBlock, null, null));
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java
index 2c5df28..e939389 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java
@@ -27,6 +27,7 @@ import java.io.RandomAccessFile;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.StorageType;
@@ -47,7 +48,6 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.test.GenericTestUtils;
-import org.apache.hadoop.util.AutoCloseableLock;
import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
import org.junit.Assert;
import org.junit.Test;
@@ -550,7 +550,7 @@ public class TestWriteToReplica {
bpList.size() == 2);
createReplicas(bpList, volumes, cluster.getFsDatasetTestUtils(dn));
- ReplicaMap oldReplicaMap = new ReplicaMap(new AutoCloseableLock());
+ ReplicaMap oldReplicaMap = new ReplicaMap(new ReentrantReadWriteLock());
oldReplicaMap.addAll(dataSet.volumeMap);
cluster.restartDataNode(0);
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org