You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ar...@apache.org on 2016/09/11 01:11:28 UTC
[1/3] hadoop git commit: HDFS-10682. Replace FsDatasetImpl object
lock with a separate lock object. (Contributed by Chen Liang)
Repository: hadoop
Updated Branches:
refs/heads/branch-2 bb6d86620 -> 012b266e5
refs/heads/branch-2.8 d5ea508ca -> 04f620c4d
HDFS-10682. Replace FsDatasetImpl object lock with a separate lock object. (Contributed by Chen Liang)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/ad0ac6cc
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/ad0ac6cc
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/ad0ac6cc
Branch: refs/heads/branch-2
Commit: ad0ac6cced127c5303ea8497fdd856b9cd0acb38
Parents: bb6d866
Author: Arpit Agarwal <ar...@apache.org>
Authored: Wed Aug 17 16:22:00 2016 -0700
Committer: Arpit Agarwal <ar...@apache.org>
Committed: Sat Sep 10 17:48:27 2016 -0700
----------------------------------------------------------------------
.../hdfs/server/datanode/BlockSender.java | 3 +-
.../hadoop/hdfs/server/datanode/DataNode.java | 3 +-
.../hdfs/server/datanode/DirectoryScanner.java | 4 +-
.../server/datanode/fsdataset/FsDatasetSpi.java | 6 +
.../datanode/fsdataset/impl/FsDatasetImpl.java | 926 ++++++++++---------
.../datanode/fsdataset/impl/FsVolumeImpl.java | 9 +-
.../server/datanode/SimulatedFSDataset.java | 9 +
.../hdfs/server/datanode/TestBlockRecovery.java | 3 +-
.../server/datanode/TestDirectoryScanner.java | 9 +-
.../extdataset/ExternalDatasetImpl.java | 6 +
10 files changed, 533 insertions(+), 445 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ad0ac6cc/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
index 0b4fac1..e060cab 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
@@ -46,6 +46,7 @@ import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.ReadaheadPool.ReadaheadRequest;
import org.apache.hadoop.io.nativeio.NativeIO;
import org.apache.hadoop.net.SocketOutputStream;
+import org.apache.hadoop.util.AutoCloseableLock;
import org.apache.hadoop.util.DataChecksum;
import org.apache.htrace.core.Sampler;
import org.apache.htrace.core.TraceScope;
@@ -242,7 +243,7 @@ class BlockSender implements java.io.Closeable {
}
final long replicaVisibleLength;
- synchronized(datanode.data) {
+ try(AutoCloseableLock lock = datanode.data.acquireDatasetLock()) {
replica = getReplica(block, datanode);
replicaVisibleLength = replica.getVisibleLength();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ad0ac6cc/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
index 11b3e6a..b31c43b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
@@ -201,6 +201,7 @@ import org.apache.hadoop.tracing.TraceAdminProtocolPB;
import org.apache.hadoop.tracing.TraceAdminProtocolServerSideTranslatorPB;
import org.apache.hadoop.tracing.TraceUtils;
import org.apache.hadoop.tracing.TracerConfigurationManager;
+import org.apache.hadoop.util.AutoCloseableLock;
import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.DiskChecker;
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
@@ -2831,7 +2832,7 @@ public class DataNode extends ReconfigurableBase
final BlockConstructionStage stage;
//get replica information
- synchronized(data) {
+ try(AutoCloseableLock lock = data.acquireDatasetLock()) {
Block storedBlock = data.getStoredBlock(b.getBlockPoolId(),
b.getBlockId());
if (null == storedBlock) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ad0ac6cc/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java
index 1db445e..421d067 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java
@@ -44,12 +44,14 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.util.AutoCloseableLock;
import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.StopWatch;
import org.apache.hadoop.util.Time;
@@ -583,7 +585,7 @@ public class DirectoryScanner implements Runnable {
Map<String, ScanInfo[]> diskReport = getDiskReport();
// Hold FSDataset lock to prevent further changes to the block map
- synchronized(dataset) {
+ try(AutoCloseableLock lock = dataset.acquireDatasetLock()) {
for (Entry<String, ScanInfo[]> entry : diskReport.entrySet()) {
String bpid = entry.getKey();
ScanInfo[] blockpoolReport = entry.getValue();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ad0ac6cc/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
index 788b75b..ac3c5b4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
@@ -60,6 +60,7 @@ import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
+import org.apache.hadoop.util.AutoCloseableLock;
import org.apache.hadoop.util.ReflectionUtils;
/**
@@ -641,4 +642,9 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
* Confirm whether the block is deleting
*/
boolean isDeletingBlock(String bpid, long blockId);
+
+ /**
+ * Acquire the lock of the dataset.
+ */
+ AutoCloseableLock acquireDatasetLock();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ad0ac6cc/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
index 04f887f..a1c2a46 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
@@ -63,6 +63,7 @@ import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtilClient;
import org.apache.hadoop.hdfs.ExtendedBlockId;
import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.util.AutoCloseableLock;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
@@ -181,21 +182,26 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
}
@Override
- public synchronized FsVolumeImpl getVolume(final ExtendedBlock b) {
- final ReplicaInfo r = volumeMap.get(b.getBlockPoolId(), b.getLocalBlock());
- return r != null? (FsVolumeImpl)r.getVolume(): null;
+ public FsVolumeImpl getVolume(final ExtendedBlock b) {
+ try(AutoCloseableLock lock = datasetLock.acquire()) {
+ final ReplicaInfo r =
+ volumeMap.get(b.getBlockPoolId(), b.getLocalBlock());
+ return r != null ? (FsVolumeImpl) r.getVolume() : null;
+ }
}
@Override // FsDatasetSpi
- public synchronized Block getStoredBlock(String bpid, long blkid)
+ public Block getStoredBlock(String bpid, long blkid)
throws IOException {
- File blockfile = getFile(bpid, blkid, false);
- if (blockfile == null) {
- return null;
+ try(AutoCloseableLock lock = datasetLock.acquire()) {
+ File blockfile = getFile(bpid, blkid, false);
+ if (blockfile == null) {
+ return null;
+ }
+ final File metafile = FsDatasetUtil.findMetaFile(blockfile);
+ final long gs = FsDatasetUtil.parseGenerationStamp(blockfile, metafile);
+ return new Block(blkid, blockfile.length(), gs);
}
- final File metafile = FsDatasetUtil.findMetaFile(blockfile);
- final long gs = FsDatasetUtil.parseGenerationStamp(blockfile, metafile);
- return new Block(blkid, blockfile.length(), gs);
}
@@ -264,7 +270,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
private boolean blockPinningEnabled;
private final int maxDataLength;
-
+
+ private final AutoCloseableLock datasetLock;
/**
* An FSDataset has a directory where it loads its data files.
*/
@@ -275,6 +282,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
this.dataStorage = storage;
this.conf = conf;
this.smallBufferSize = DFSUtilClient.getSmallBufferSize(conf);
+ this.datasetLock = new AutoCloseableLock();
// The number of volumes required for operation is the total number
// of volumes minus the number of failed volumes we can tolerate.
volFailuresTolerated = datanode.getDnConf().getVolFailuresTolerated();
@@ -381,25 +389,27 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
* Activate a volume to serve requests.
* @throws IOException if the storage UUID already exists.
*/
- private synchronized void activateVolume(
+ private void activateVolume(
ReplicaMap replicaMap,
Storage.StorageDirectory sd, StorageType storageType,
FsVolumeReference ref) throws IOException {
- DatanodeStorage dnStorage = storageMap.get(sd.getStorageUuid());
- if (dnStorage != null) {
- final String errorMsg = String.format(
- "Found duplicated storage UUID: %s in %s.",
- sd.getStorageUuid(), sd.getVersionFile());
- LOG.error(errorMsg);
- throw new IOException(errorMsg);
- }
- volumeMap.addAll(replicaMap);
- storageMap.put(sd.getStorageUuid(),
- new DatanodeStorage(sd.getStorageUuid(),
- DatanodeStorage.State.NORMAL,
- storageType));
- asyncDiskService.addVolume(sd.getCurrentDir());
- volumes.addVolume(ref);
+ try(AutoCloseableLock lock = datasetLock.acquire()) {
+ DatanodeStorage dnStorage = storageMap.get(sd.getStorageUuid());
+ if (dnStorage != null) {
+ final String errorMsg = String.format(
+ "Found duplicated storage UUID: %s in %s.",
+ sd.getStorageUuid(), sd.getVersionFile());
+ LOG.error(errorMsg);
+ throw new IOException(errorMsg);
+ }
+ volumeMap.addAll(replicaMap);
+ storageMap.put(sd.getStorageUuid(),
+ new DatanodeStorage(sd.getStorageUuid(),
+ DatanodeStorage.State.NORMAL,
+ storageType));
+ asyncDiskService.addVolume(sd.getCurrentDir());
+ volumes.addVolume(ref);
+ }
}
private void addVolume(Collection<StorageLocation> dataLocations,
@@ -494,7 +504,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
Map<String, List<ReplicaInfo>> blkToInvalidate = new HashMap<>();
List<String> storageToRemove = new ArrayList<>();
- synchronized (this) {
+ try(AutoCloseableLock lock = datasetLock.acquire()) {
for (int idx = 0; idx < dataStorage.getNumStorageDirs(); idx++) {
Storage.StorageDirectory sd = dataStorage.getStorageDir(idx);
final File absRoot = sd.getRoot().getAbsoluteFile();
@@ -540,7 +550,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
}
}
- synchronized (this) {
+ try(AutoCloseableLock lock = datasetLock.acquire()) {
for(String storageUuid : storageToRemove) {
storageMap.remove(storageUuid);
}
@@ -749,7 +759,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
boolean touch)
throws IOException {
final File f;
- synchronized(this) {
+ try(AutoCloseableLock lock = datasetLock.acquire()) {
f = getFile(b.getBlockPoolId(), b.getLocalBlock().getBlockId(), touch);
}
if (f == null) {
@@ -815,22 +825,25 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
* Returns handles to the block file and its metadata file
*/
@Override // FsDatasetSpi
- public synchronized ReplicaInputStreams getTmpInputStreams(ExtendedBlock b,
+ public ReplicaInputStreams getTmpInputStreams(ExtendedBlock b,
long blkOffset, long metaOffset) throws IOException {
- ReplicaInfo info = getReplicaInfo(b);
- FsVolumeReference ref = info.getVolume().obtainReference();
- try {
- InputStream blockInStream = openAndSeek(info.getBlockFile(), blkOffset);
+ try(AutoCloseableLock lock = datasetLock.acquire()) {
+ ReplicaInfo info = getReplicaInfo(b);
+ FsVolumeReference ref = info.getVolume().obtainReference();
try {
- InputStream metaInStream = openAndSeek(info.getMetaFile(), metaOffset);
- return new ReplicaInputStreams(blockInStream, metaInStream, ref);
+ InputStream blockInStream = openAndSeek(info.getBlockFile(), blkOffset);
+ try {
+ InputStream metaInStream =
+ openAndSeek(info.getMetaFile(), metaOffset);
+ return new ReplicaInputStreams(blockInStream, metaInStream, ref);
+ } catch (IOException e) {
+ IOUtils.cleanup(null, blockInStream);
+ throw e;
+ }
} catch (IOException e) {
- IOUtils.cleanup(null, blockInStream);
+ IOUtils.cleanup(null, ref);
throw e;
}
- } catch (IOException e) {
- IOUtils.cleanup(null, ref);
- throw e;
}
}
@@ -949,7 +962,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
}
FsVolumeReference volumeRef = null;
- synchronized (this) {
+ try(AutoCloseableLock lock = datasetLock.acquire()) {
volumeRef = volumes.getNextVolume(targetStorageType, block.getNumBytes());
}
try {
@@ -968,7 +981,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
newReplicaInfo.setNumBytes(blockFiles[1].length());
// Finalize the copied files
newReplicaInfo = finalizeReplica(block.getBlockPoolId(), newReplicaInfo);
- synchronized (this) {
+ try(AutoCloseableLock lock = datasetLock.acquire()) {
// Increment numBlocks here as this block moved without knowing to BPS
FsVolumeImpl volume = (FsVolumeImpl) newReplicaInfo.getVolume();
volume.getBlockPoolSlice(block.getBlockPoolId()).incrNumBlocks();
@@ -1100,41 +1113,43 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
@Override // FsDatasetSpi
- public synchronized ReplicaHandler append(ExtendedBlock b,
+ public ReplicaHandler append(ExtendedBlock b,
long newGS, long expectedBlockLen) throws IOException {
- // If the block was successfully finalized because all packets
- // were successfully processed at the Datanode but the ack for
- // some of the packets were not received by the client. The client
- // re-opens the connection and retries sending those packets.
- // The other reason is that an "append" is occurring to this block.
-
- // check the validity of the parameter
- if (newGS < b.getGenerationStamp()) {
- throw new IOException("The new generation stamp " + newGS +
- " should be greater than the replica " + b + "'s generation stamp");
- }
- ReplicaInfo replicaInfo = getReplicaInfo(b);
- LOG.info("Appending to " + replicaInfo);
- if (replicaInfo.getState() != ReplicaState.FINALIZED) {
- throw new ReplicaNotFoundException(
- ReplicaNotFoundException.UNFINALIZED_REPLICA + b);
- }
- if (replicaInfo.getNumBytes() != expectedBlockLen) {
- throw new IOException("Corrupted replica " + replicaInfo +
- " with a length of " + replicaInfo.getNumBytes() +
- " expected length is " + expectedBlockLen);
- }
+ try(AutoCloseableLock lock = datasetLock.acquire()) {
+ // If the block was successfully finalized because all packets
+ // were successfully processed at the Datanode but the ack for
+ // some of the packets were not received by the client. The client
+ // re-opens the connection and retries sending those packets.
+ // The other reason is that an "append" is occurring to this block.
+
+ // check the validity of the parameter
+ if (newGS < b.getGenerationStamp()) {
+ throw new IOException("The new generation stamp " + newGS +
+ " should be greater than the replica " + b + "'s generation stamp");
+ }
+ ReplicaInfo replicaInfo = getReplicaInfo(b);
+ LOG.info("Appending to " + replicaInfo);
+ if (replicaInfo.getState() != ReplicaState.FINALIZED) {
+ throw new ReplicaNotFoundException(
+ ReplicaNotFoundException.UNFINALIZED_REPLICA + b);
+ }
+ if (replicaInfo.getNumBytes() != expectedBlockLen) {
+ throw new IOException("Corrupted replica " + replicaInfo +
+ " with a length of " + replicaInfo.getNumBytes() +
+ " expected length is " + expectedBlockLen);
+ }
- FsVolumeReference ref = replicaInfo.getVolume().obtainReference();
- ReplicaBeingWritten replica = null;
- try {
- replica = append(b.getBlockPoolId(), (FinalizedReplica)replicaInfo, newGS,
- b.getNumBytes());
- } catch (IOException e) {
- IOUtils.cleanup(null, ref);
- throw e;
+ FsVolumeReference ref = replicaInfo.getVolume().obtainReference();
+ ReplicaBeingWritten replica = null;
+ try {
+ replica = append(b.getBlockPoolId(), (FinalizedReplica) replicaInfo,
+ newGS, b.getNumBytes());
+ } catch (IOException e) {
+ IOUtils.cleanup(null, ref);
+ throw e;
+ }
+ return new ReplicaHandler(replica, ref);
}
- return new ReplicaHandler(replica, ref);
}
/** Append to a finalized replica
@@ -1149,66 +1164,68 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
* @throws IOException if moving the replica from finalized directory
* to rbw directory fails
*/
- private synchronized ReplicaBeingWritten append(String bpid,
+ private ReplicaBeingWritten append(String bpid,
FinalizedReplica replicaInfo, long newGS, long estimateBlockLen)
throws IOException {
- // If the block is cached, start uncaching it.
- cacheManager.uncacheBlock(bpid, replicaInfo.getBlockId());
-
- // If there are any hardlinks to the block, break them. This ensures we are
- // not appending to a file that is part of a previous/ directory.
- replicaInfo.breakHardLinksIfNeeded();
-
- // construct a RBW replica with the new GS
- File blkfile = replicaInfo.getBlockFile();
- FsVolumeImpl v = (FsVolumeImpl)replicaInfo.getVolume();
- long bytesReserved = estimateBlockLen - replicaInfo.getNumBytes();
- if (v.getAvailable() < bytesReserved) {
- throw new DiskOutOfSpaceException("Insufficient space for appending to "
- + replicaInfo);
- }
- File newBlkFile = new File(v.getRbwDir(bpid), replicaInfo.getBlockName());
- File oldmeta = replicaInfo.getMetaFile();
- ReplicaBeingWritten newReplicaInfo = new ReplicaBeingWritten(
- replicaInfo.getBlockId(), replicaInfo.getNumBytes(), newGS,
- v, newBlkFile.getParentFile(), Thread.currentThread(), bytesReserved);
- File newmeta = newReplicaInfo.getMetaFile();
-
- // rename meta file to rbw directory
- if (LOG.isDebugEnabled()) {
- LOG.debug("Renaming " + oldmeta + " to " + newmeta);
- }
- try {
- NativeIO.renameTo(oldmeta, newmeta);
- } catch (IOException e) {
- throw new IOException("Block " + replicaInfo + " reopen failed. " +
- " Unable to move meta file " + oldmeta +
- " to rbw dir " + newmeta, e);
- }
+ try(AutoCloseableLock lock = datasetLock.acquire()) {
+ // If the block is cached, start uncaching it.
+ cacheManager.uncacheBlock(bpid, replicaInfo.getBlockId());
+
+ // If there are any hardlinks to the block, break them. This ensures we
+ // are not appending to a file that is part of a previous/ directory.
+ replicaInfo.breakHardLinksIfNeeded();
+
+ // construct a RBW replica with the new GS
+ File blkfile = replicaInfo.getBlockFile();
+ FsVolumeImpl v = (FsVolumeImpl) replicaInfo.getVolume();
+ long bytesReserved = estimateBlockLen - replicaInfo.getNumBytes();
+ if (v.getAvailable() < bytesReserved) {
+ throw new DiskOutOfSpaceException("Insufficient space for appending to "
+ + replicaInfo);
+ }
+ File newBlkFile = new File(v.getRbwDir(bpid), replicaInfo.getBlockName());
+ File oldmeta = replicaInfo.getMetaFile();
+ ReplicaBeingWritten newReplicaInfo = new ReplicaBeingWritten(
+ replicaInfo.getBlockId(), replicaInfo.getNumBytes(), newGS,
+ v, newBlkFile.getParentFile(), Thread.currentThread(), bytesReserved);
+ File newmeta = newReplicaInfo.getMetaFile();
+
+ // rename meta file to rbw directory
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Renaming " + oldmeta + " to " + newmeta);
+ }
+ try {
+ NativeIO.renameTo(oldmeta, newmeta);
+ } catch (IOException e) {
+ throw new IOException("Block " + replicaInfo + " reopen failed. " +
+ " Unable to move meta file " + oldmeta +
+ " to rbw dir " + newmeta, e);
+ }
- // rename block file to rbw directory
- if (LOG.isDebugEnabled()) {
- LOG.debug("Renaming " + blkfile + " to " + newBlkFile
- + ", file length=" + blkfile.length());
- }
- try {
- NativeIO.renameTo(blkfile, newBlkFile);
- } catch (IOException e) {
+ // rename block file to rbw directory
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Renaming " + blkfile + " to " + newBlkFile
+ + ", file length=" + blkfile.length());
+ }
try {
- NativeIO.renameTo(newmeta, oldmeta);
- } catch (IOException ex) {
- LOG.warn("Cannot move meta file " + newmeta +
- "back to the finalized directory " + oldmeta, ex);
+ NativeIO.renameTo(blkfile, newBlkFile);
+ } catch (IOException e) {
+ try {
+ NativeIO.renameTo(newmeta, oldmeta);
+ } catch (IOException ex) {
+ LOG.warn("Cannot move meta file " + newmeta +
+ "back to the finalized directory " + oldmeta, ex);
+ }
+ throw new IOException("Block " + replicaInfo + " reopen failed. " +
+ " Unable to move block file " + blkfile +
+ " to rbw dir " + newBlkFile, e);
}
- throw new IOException("Block " + replicaInfo + " reopen failed. " +
- " Unable to move block file " + blkfile +
- " to rbw dir " + newBlkFile, e);
+
+ // Replace finalized replica by a RBW replica in replicas map
+ volumeMap.add(bpid, newReplicaInfo);
+ v.reserveSpaceForReplica(bytesReserved);
+ return newReplicaInfo;
}
-
- // Replace finalized replica by a RBW replica in replicas map
- volumeMap.add(bpid, newReplicaInfo);
- v.reserveSpaceForReplica(bytesReserved);
- return newReplicaInfo;
}
private static class MustStopExistingWriter extends Exception {
@@ -1278,7 +1295,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
while (true) {
try {
- synchronized (this) {
+ try(AutoCloseableLock lock = datasetLock.acquire()) {
ReplicaInfo replicaInfo = recoverCheck(b, newGS, expectedBlockLen);
FsVolumeReference ref = replicaInfo.getVolume().obtainReference();
@@ -1310,7 +1327,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
LOG.info("Recover failed close " + b);
while (true) {
try {
- synchronized (this) {
+ try(AutoCloseableLock lock = datasetLock.acquire()) {
// check replica's state
ReplicaInfo replicaInfo = recoverCheck(b, newGS, expectedBlockLen);
// bump the replica's GS
@@ -1357,62 +1374,65 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
}
@Override // FsDatasetSpi
- public synchronized ReplicaHandler createRbw(
+ public ReplicaHandler createRbw(
StorageType storageType, ExtendedBlock b, boolean allowLazyPersist)
throws IOException {
- ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(),
- b.getBlockId());
- if (replicaInfo != null) {
- throw new ReplicaAlreadyExistsException("Block " + b +
- " already exists in state " + replicaInfo.getState() +
- " and thus cannot be created.");
- }
- // create a new block
- FsVolumeReference ref = null;
-
- // Use ramdisk only if block size is a multiple of OS page size.
- // This simplifies reservation for partially used replicas
- // significantly.
- if (allowLazyPersist &&
- lazyWriter != null &&
- b.getNumBytes() % cacheManager.getOsPageSize() == 0 &&
- reserveLockedMemory(b.getNumBytes())) {
- try {
- // First try to place the block on a transient volume.
- ref = volumes.getNextTransientVolume(b.getNumBytes());
- datanode.getMetrics().incrRamDiskBlocksWrite();
- } catch(DiskOutOfSpaceException de) {
- // Ignore the exception since we just fall back to persistent storage.
- } finally {
- if (ref == null) {
- cacheManager.release(b.getNumBytes());
+ try(AutoCloseableLock lock = datasetLock.acquire()) {
+ ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(),
+ b.getBlockId());
+ if (replicaInfo != null) {
+ throw new ReplicaAlreadyExistsException("Block " + b +
+ " already exists in state " + replicaInfo.getState() +
+ " and thus cannot be created.");
+ }
+ // create a new block
+ FsVolumeReference ref = null;
+
+ // Use ramdisk only if block size is a multiple of OS page size.
+ // This simplifies reservation for partially used replicas
+ // significantly.
+ if (allowLazyPersist &&
+ lazyWriter != null &&
+ b.getNumBytes() % cacheManager.getOsPageSize() == 0 &&
+ reserveLockedMemory(b.getNumBytes())) {
+ try {
+ // First try to place the block on a transient volume.
+ ref = volumes.getNextTransientVolume(b.getNumBytes());
+ datanode.getMetrics().incrRamDiskBlocksWrite();
+ } catch (DiskOutOfSpaceException de) {
+ // Ignore the exception since we just fall back to persistent storage.
+ } finally {
+ if (ref == null) {
+ cacheManager.release(b.getNumBytes());
+ }
}
}
- }
- if (ref == null) {
- ref = volumes.getNextVolume(storageType, b.getNumBytes());
- }
+ if (ref == null) {
+ ref = volumes.getNextVolume(storageType, b.getNumBytes());
+ }
- FsVolumeImpl v = (FsVolumeImpl) ref.getVolume();
- // create an rbw file to hold block in the designated volume
+ FsVolumeImpl v = (FsVolumeImpl) ref.getVolume();
+ // create an rbw file to hold block in the designated volume
- if (allowLazyPersist && !v.isTransientStorage()) {
- datanode.getMetrics().incrRamDiskBlocksWriteFallback();
- }
+ if (allowLazyPersist && !v.isTransientStorage()) {
+ datanode.getMetrics().incrRamDiskBlocksWriteFallback();
+ }
- File f;
- try {
- f = v.createRbwFile(b.getBlockPoolId(), b.getLocalBlock());
- } catch (IOException e) {
- IOUtils.cleanup(null, ref);
- throw e;
- }
+ File f;
+ try {
+ f = v.createRbwFile(b.getBlockPoolId(), b.getLocalBlock());
+ } catch (IOException e) {
+ IOUtils.cleanup(null, ref);
+ throw e;
+ }
- ReplicaBeingWritten newReplicaInfo = new ReplicaBeingWritten(b.getBlockId(),
- b.getGenerationStamp(), v, f.getParentFile(), b.getNumBytes());
- volumeMap.add(b.getBlockPoolId(), newReplicaInfo);
- return new ReplicaHandler(newReplicaInfo, ref);
+ ReplicaBeingWritten newReplicaInfo =
+ new ReplicaBeingWritten(b.getBlockId(),
+ b.getGenerationStamp(), v, f.getParentFile(), b.getNumBytes());
+ volumeMap.add(b.getBlockPoolId(), newReplicaInfo);
+ return new ReplicaHandler(newReplicaInfo, ref);
+ }
}
@Override // FsDatasetSpi
@@ -1423,7 +1443,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
while (true) {
try {
- synchronized (this) {
+ try(AutoCloseableLock lock = datasetLock.acquire()) {
ReplicaInfo replicaInfo = getReplicaInfo(b.getBlockPoolId(), b.getBlockId());
// check the replica's state
@@ -1444,61 +1464,64 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
}
}
- private synchronized ReplicaHandler recoverRbwImpl(ReplicaBeingWritten rbw,
+ private ReplicaHandler recoverRbwImpl(ReplicaBeingWritten rbw,
ExtendedBlock b, long newGS, long minBytesRcvd, long maxBytesRcvd)
throws IOException {
- // check generation stamp
- long replicaGenerationStamp = rbw.getGenerationStamp();
- if (replicaGenerationStamp < b.getGenerationStamp() ||
- replicaGenerationStamp > newGS) {
- throw new ReplicaNotFoundException(
- ReplicaNotFoundException.UNEXPECTED_GS_REPLICA + b +
- ". Expected GS range is [" + b.getGenerationStamp() + ", " +
- newGS + "].");
- }
-
- // check replica length
- long bytesAcked = rbw.getBytesAcked();
- long numBytes = rbw.getNumBytes();
- if (bytesAcked < minBytesRcvd || numBytes > maxBytesRcvd){
- throw new ReplicaNotFoundException("Unmatched length replica " +
- rbw + ": BytesAcked = " + bytesAcked +
- " BytesRcvd = " + numBytes + " are not in the range of [" +
- minBytesRcvd + ", " + maxBytesRcvd + "].");
- }
+ try(AutoCloseableLock lock = datasetLock.acquire()) {
+ // check generation stamp
+ long replicaGenerationStamp = rbw.getGenerationStamp();
+ if (replicaGenerationStamp < b.getGenerationStamp() ||
+ replicaGenerationStamp > newGS) {
+ throw new ReplicaNotFoundException(
+ ReplicaNotFoundException.UNEXPECTED_GS_REPLICA + b +
+ ". Expected GS range is [" + b.getGenerationStamp() + ", " +
+ newGS + "].");
+ }
- FsVolumeReference ref = rbw.getVolume().obtainReference();
- try {
- // Truncate the potentially corrupt portion.
- // If the source was client and the last node in the pipeline was lost,
- // any corrupt data written after the acked length can go unnoticed.
- if (numBytes > bytesAcked) {
- final File replicafile = rbw.getBlockFile();
- truncateBlock(replicafile, rbw.getMetaFile(), numBytes, bytesAcked);
- rbw.setNumBytes(bytesAcked);
- rbw.setLastChecksumAndDataLen(bytesAcked, null);
- }
-
- // bump the replica's generation stamp to newGS
- bumpReplicaGS(rbw, newGS);
- } catch (IOException e) {
- IOUtils.cleanup(null, ref);
- throw e;
+ // check replica length
+ long bytesAcked = rbw.getBytesAcked();
+ long numBytes = rbw.getNumBytes();
+ if (bytesAcked < minBytesRcvd || numBytes > maxBytesRcvd) {
+ throw new ReplicaNotFoundException("Unmatched length replica " +
+ rbw + ": BytesAcked = " + bytesAcked +
+ " BytesRcvd = " + numBytes + " are not in the range of [" +
+ minBytesRcvd + ", " + maxBytesRcvd + "].");
+ }
+
+ FsVolumeReference ref = rbw.getVolume().obtainReference();
+ try {
+ // Truncate the potentially corrupt portion.
+ // If the source was client and the last node in the pipeline was lost,
+ // any corrupt data written after the acked length can go unnoticed.
+ if (numBytes > bytesAcked) {
+ final File replicafile = rbw.getBlockFile();
+ truncateBlock(replicafile, rbw.getMetaFile(), numBytes, bytesAcked);
+ rbw.setNumBytes(bytesAcked);
+ rbw.setLastChecksumAndDataLen(bytesAcked, null);
+ }
+
+ // bump the replica's generation stamp to newGS
+ bumpReplicaGS(rbw, newGS);
+ } catch (IOException e) {
+ IOUtils.cleanup(null, ref);
+ throw e;
+ }
+ return new ReplicaHandler(rbw, ref);
}
- return new ReplicaHandler(rbw, ref);
}
@Override // FsDatasetSpi
- public synchronized ReplicaInPipeline convertTemporaryToRbw(
+ public ReplicaInPipeline convertTemporaryToRbw(
final ExtendedBlock b) throws IOException {
- final long blockId = b.getBlockId();
- final long expectedGs = b.getGenerationStamp();
- final long visible = b.getNumBytes();
- LOG.info("Convert " + b + " from Temporary to RBW, visible length="
- + visible);
-
- final ReplicaInPipeline temp;
- {
+ try(AutoCloseableLock lock = datasetLock.acquire()) {
+ final long blockId = b.getBlockId();
+ final long expectedGs = b.getGenerationStamp();
+ final long visible = b.getNumBytes();
+ LOG.info("Convert " + b + " from Temporary to RBW, visible length="
+ + visible);
+
+ final ReplicaInPipeline temp;
+
// get replica
final ReplicaInfo r = volumeMap.get(b.getBlockPoolId(), blockId);
if (r == null) {
@@ -1510,43 +1533,44 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
throw new ReplicaAlreadyExistsException(
"r.getState() != ReplicaState.TEMPORARY, r=" + r);
}
- temp = (ReplicaInPipeline)r;
- }
- // check generation stamp
- if (temp.getGenerationStamp() != expectedGs) {
- throw new ReplicaAlreadyExistsException(
- "temp.getGenerationStamp() != expectedGs = " + expectedGs
- + ", temp=" + temp);
- }
+ temp = (ReplicaInPipeline) r;
+
+ // check generation stamp
+ if (temp.getGenerationStamp() != expectedGs) {
+ throw new ReplicaAlreadyExistsException(
+ "temp.getGenerationStamp() != expectedGs = " + expectedGs
+ + ", temp=" + temp);
+ }
- // TODO: check writer?
- // set writer to the current thread
- // temp.setWriter(Thread.currentThread());
+ // TODO: check writer?
+ // set writer to the current thread
+ // temp.setWriter(Thread.currentThread());
- // check length
- final long numBytes = temp.getNumBytes();
- if (numBytes < visible) {
- throw new IOException(numBytes + " = numBytes < visible = "
- + visible + ", temp=" + temp);
- }
- // check volume
- final FsVolumeImpl v = (FsVolumeImpl)temp.getVolume();
- if (v == null) {
- throw new IOException("r.getVolume() = null, temp=" + temp);
+ // check length
+ final long numBytes = temp.getNumBytes();
+ if (numBytes < visible) {
+ throw new IOException(numBytes + " = numBytes < visible = "
+ + visible + ", temp=" + temp);
+ }
+ // check volume
+ final FsVolumeImpl v = (FsVolumeImpl) temp.getVolume();
+ if (v == null) {
+ throw new IOException("r.getVolume() = null, temp=" + temp);
+ }
+
+ // move block files to the rbw directory
+ BlockPoolSlice bpslice = v.getBlockPoolSlice(b.getBlockPoolId());
+ final File dest = moveBlockFiles(b.getLocalBlock(), temp.getBlockFile(),
+ bpslice.getRbwDir());
+ // create RBW
+ final ReplicaBeingWritten rbw = new ReplicaBeingWritten(
+ blockId, numBytes, expectedGs,
+ v, dest.getParentFile(), Thread.currentThread(), 0);
+ rbw.setBytesAcked(visible);
+ // overwrite the RBW in the volume map
+ volumeMap.add(b.getBlockPoolId(), rbw);
+ return rbw;
}
-
- // move block files to the rbw directory
- BlockPoolSlice bpslice = v.getBlockPoolSlice(b.getBlockPoolId());
- final File dest = moveBlockFiles(b.getLocalBlock(), temp.getBlockFile(),
- bpslice.getRbwDir());
- // create RBW
- final ReplicaBeingWritten rbw = new ReplicaBeingWritten(
- blockId, numBytes, expectedGs,
- v, dest.getParentFile(), Thread.currentThread(), 0);
- rbw.setBytesAcked(visible);
- // overwrite the RBW in the volume map
- volumeMap.add(b.getBlockPoolId(), rbw);
- return rbw;
}
@Override // FsDatasetSpi
@@ -1556,7 +1580,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
long writerStopTimeoutMs = datanode.getDnConf().getXceiverStopTimeout();
ReplicaInfo lastFoundReplicaInfo = null;
do {
- synchronized (this) {
+ try(AutoCloseableLock lock = datasetLock.acquire()) {
ReplicaInfo currentReplicaInfo =
volumeMap.get(b.getBlockPoolId(), b.getBlockId());
if (currentReplicaInfo == lastFoundReplicaInfo) {
@@ -1635,72 +1659,82 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
* Complete the block write!
*/
@Override // FsDatasetSpi
- public synchronized void finalizeBlock(ExtendedBlock b) throws IOException {
- if (Thread.interrupted()) {
- // Don't allow data modifications from interrupted threads
- throw new IOException("Cannot finalize block from Interrupted Thread");
- }
- ReplicaInfo replicaInfo = getReplicaInfo(b);
- if (replicaInfo.getState() == ReplicaState.FINALIZED) {
- // this is legal, when recovery happens on a file that has
- // been opened for append but never modified
- return;
+ public void finalizeBlock(ExtendedBlock b) throws IOException {
+ try(AutoCloseableLock lock = datasetLock.acquire()) {
+ if (Thread.interrupted()) {
+ // Don't allow data modifications from interrupted threads
+ throw new IOException("Cannot finalize block from Interrupted Thread");
+ }
+ ReplicaInfo replicaInfo = getReplicaInfo(b);
+ if (replicaInfo.getState() == ReplicaState.FINALIZED) {
+ // this is legal, when recovery happens on a file that has
+ // been opened for append but never modified
+ return;
+ }
+ finalizeReplica(b.getBlockPoolId(), replicaInfo);
}
- finalizeReplica(b.getBlockPoolId(), replicaInfo);
}
- private synchronized FinalizedReplica finalizeReplica(String bpid,
+ private FinalizedReplica finalizeReplica(String bpid,
ReplicaInfo replicaInfo) throws IOException {
- FinalizedReplica newReplicaInfo = null;
- if (replicaInfo.getState() == ReplicaState.RUR &&
- ((ReplicaUnderRecovery)replicaInfo).getOriginalReplica().getState() ==
- ReplicaState.FINALIZED) {
- newReplicaInfo = (FinalizedReplica)
- ((ReplicaUnderRecovery)replicaInfo).getOriginalReplica();
- } else {
- FsVolumeImpl v = (FsVolumeImpl)replicaInfo.getVolume();
- File f = replicaInfo.getBlockFile();
- if (v == null) {
- throw new IOException("No volume for temporary file " + f +
- " for block " + replicaInfo);
- }
+ try(AutoCloseableLock lock = datasetLock.acquire()) {
+ FinalizedReplica newReplicaInfo = null;
+ if (replicaInfo.getState() == ReplicaState.RUR &&
+ ((ReplicaUnderRecovery) replicaInfo).getOriginalReplica().getState()
+ == ReplicaState.FINALIZED) {
+ newReplicaInfo = (FinalizedReplica)
+ ((ReplicaUnderRecovery) replicaInfo).getOriginalReplica();
+ } else {
+ FsVolumeImpl v = (FsVolumeImpl) replicaInfo.getVolume();
+ File f = replicaInfo.getBlockFile();
+ if (v == null) {
+ throw new IOException("No volume for temporary file " + f +
+ " for block " + replicaInfo);
+ }
- File dest = v.addFinalizedBlock(
- bpid, replicaInfo, f, replicaInfo.getBytesReserved());
- newReplicaInfo = new FinalizedReplica(replicaInfo, v, dest.getParentFile());
+ File dest = v.addFinalizedBlock(
+ bpid, replicaInfo, f, replicaInfo.getBytesReserved());
+ newReplicaInfo =
+ new FinalizedReplica(replicaInfo, v, dest.getParentFile());
- if (v.isTransientStorage()) {
- releaseLockedMemory(
- replicaInfo.getOriginalBytesReserved() - replicaInfo.getNumBytes(),
- false);
- ramDiskReplicaTracker.addReplica(
- bpid, replicaInfo.getBlockId(), v, replicaInfo.getNumBytes());
- datanode.getMetrics().addRamDiskBytesWrite(replicaInfo.getNumBytes());
+ if (v.isTransientStorage()) {
+ releaseLockedMemory(
+ replicaInfo.getOriginalBytesReserved()
+ - replicaInfo.getNumBytes(),
+ false);
+ ramDiskReplicaTracker.addReplica(
+ bpid, replicaInfo.getBlockId(), v, replicaInfo.getNumBytes());
+ datanode.getMetrics().addRamDiskBytesWrite(replicaInfo.getNumBytes());
+ }
}
- }
- volumeMap.add(bpid, newReplicaInfo);
+ volumeMap.add(bpid, newReplicaInfo);
- return newReplicaInfo;
+ return newReplicaInfo;
+ }
}
/**
* Remove the temporary block file (if any)
*/
@Override // FsDatasetSpi
- public synchronized void unfinalizeBlock(ExtendedBlock b) throws IOException {
- ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(),
- b.getLocalBlock());
- if (replicaInfo != null && replicaInfo.getState() == ReplicaState.TEMPORARY) {
- // remove from volumeMap
- volumeMap.remove(b.getBlockPoolId(), b.getLocalBlock());
-
- // delete the on-disk temp file
- if (delBlockFromDisk(replicaInfo.getBlockFile(),
- replicaInfo.getMetaFile(), b.getLocalBlock())) {
- LOG.warn("Block " + b + " unfinalized and removed. " );
- }
- if (replicaInfo.getVolume().isTransientStorage()) {
- ramDiskReplicaTracker.discardReplica(b.getBlockPoolId(), b.getBlockId(), true);
+ public void unfinalizeBlock(ExtendedBlock b) throws IOException {
+ try(AutoCloseableLock lock = datasetLock.acquire()) {
+ ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(),
+ b.getLocalBlock());
+ if (replicaInfo != null
+ && replicaInfo.getState() == ReplicaState.TEMPORARY) {
+ // remove from volumeMap
+ volumeMap.remove(b.getBlockPoolId(), b.getLocalBlock());
+
+ // delete the on-disk temp file
+ if (delBlockFromDisk(replicaInfo.getBlockFile(),
+ replicaInfo.getMetaFile(), b.getLocalBlock())) {
+ LOG.warn("Block " + b + " unfinalized and removed. ");
+ }
+ if (replicaInfo.getVolume().isTransientStorage()) {
+ ramDiskReplicaTracker.discardReplica(b.getBlockPoolId(),
+ b.getBlockId(), true);
+ }
}
}
}
@@ -1739,7 +1773,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
new HashMap<String, BlockListAsLongs.Builder>();
List<FsVolumeImpl> curVolumes = null;
- synchronized(this) {
+ try(AutoCloseableLock lock = datasetLock.acquire()) {
curVolumes = volumes.getVolumes();
for (FsVolumeSpi v : curVolumes) {
builders.put(v.getStorageID(), BlockListAsLongs.builder(maxDataLength));
@@ -1792,31 +1826,36 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
* Get the list of finalized blocks from in-memory blockmap for a block pool.
*/
@Override
- public synchronized List<FinalizedReplica> getFinalizedBlocks(String bpid) {
- ArrayList<FinalizedReplica> finalized =
- new ArrayList<FinalizedReplica>(volumeMap.size(bpid));
- for (ReplicaInfo b : volumeMap.replicas(bpid)) {
- if(b.getState() == ReplicaState.FINALIZED) {
- finalized.add(new FinalizedReplica((FinalizedReplica)b));
+ public List<FinalizedReplica> getFinalizedBlocks(String bpid) {
+ try(AutoCloseableLock lock = datasetLock.acquire()) {
+ ArrayList<FinalizedReplica> finalized =
+ new ArrayList<FinalizedReplica>(volumeMap.size(bpid));
+ for (ReplicaInfo b : volumeMap.replicas(bpid)) {
+ if (b.getState() == ReplicaState.FINALIZED) {
+ finalized.add(new FinalizedReplica((FinalizedReplica) b));
+ }
}
+ return finalized;
}
- return finalized;
}
/**
* Get the list of finalized blocks from in-memory blockmap for a block pool.
*/
@Override
- public synchronized List<FinalizedReplica> getFinalizedBlocksOnPersistentStorage(String bpid) {
- ArrayList<FinalizedReplica> finalized =
- new ArrayList<FinalizedReplica>(volumeMap.size(bpid));
- for (ReplicaInfo b : volumeMap.replicas(bpid)) {
- if(!b.getVolume().isTransientStorage() &&
- b.getState() == ReplicaState.FINALIZED) {
- finalized.add(new FinalizedReplica((FinalizedReplica)b));
+ public List<FinalizedReplica>
+ getFinalizedBlocksOnPersistentStorage(String bpid) {
+ try(AutoCloseableLock lock = datasetLock.acquire()) {
+ ArrayList<FinalizedReplica> finalized =
+ new ArrayList<FinalizedReplica>(volumeMap.size(bpid));
+ for (ReplicaInfo b : volumeMap.replicas(bpid)) {
+ if (!b.getVolume().isTransientStorage() &&
+ b.getState() == ReplicaState.FINALIZED) {
+ finalized.add(new FinalizedReplica((FinalizedReplica) b));
+ }
}
+ return finalized;
}
- return finalized;
}
/**
@@ -1892,7 +1931,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
File validateBlockFile(String bpid, long blockId) {
//Should we check for metadata file too?
final File f;
- synchronized(this) {
+ try(AutoCloseableLock lock = datasetLock.acquire()) {
f = getFile(bpid, blockId, false);
}
@@ -1941,7 +1980,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
for (int i = 0; i < invalidBlks.length; i++) {
final File f;
final FsVolumeImpl v;
- synchronized (this) {
+ try(AutoCloseableLock lock = datasetLock.acquire()) {
final ReplicaInfo info = volumeMap.get(bpid, invalidBlks[i]);
if (info == null) {
// It is okay if the block is not found -- it may be deleted earlier.
@@ -2052,7 +2091,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
long length, genstamp;
Executor volumeExecutor;
- synchronized (this) {
+ try(AutoCloseableLock lock = datasetLock.acquire()) {
ReplicaInfo info = volumeMap.get(bpid, blockId);
boolean success = false;
try {
@@ -2119,9 +2158,11 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
}
@Override // FsDatasetSpi
- public synchronized boolean contains(final ExtendedBlock block) {
- final long blockId = block.getLocalBlock().getBlockId();
- return getFile(block.getBlockPoolId(), blockId, false) != null;
+ public boolean contains(final ExtendedBlock block) {
+ try(AutoCloseableLock lock = datasetLock.acquire()) {
+ final long blockId = block.getLocalBlock().getBlockId();
+ return getFile(block.getBlockPoolId(), blockId, false) != null;
+ }
}
/**
@@ -2247,7 +2288,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
File diskMetaFile, FsVolumeSpi vol) throws IOException {
Block corruptBlock = null;
ReplicaInfo memBlockInfo;
- synchronized (this) {
+ try(AutoCloseableLock lock = datasetLock.acquire()) {
memBlockInfo = volumeMap.get(bpid, blockId);
if (memBlockInfo != null && memBlockInfo.getState() != ReplicaState.FINALIZED) {
// Block is not finalized - ignore the difference
@@ -2403,9 +2444,11 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
}
@Override
- public synchronized String getReplicaString(String bpid, long blockId) {
- final Replica r = volumeMap.get(bpid, blockId);
- return r == null? "null": r.toString();
+ public String getReplicaString(String bpid, long blockId) {
+ try(AutoCloseableLock lock = datasetLock.acquire()) {
+ final Replica r = volumeMap.get(bpid, blockId);
+ return r == null ? "null" : r.toString();
+ }
}
@Override // FsDatasetSpi
@@ -2498,67 +2541,69 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
}
@Override // FsDatasetSpi
- public synchronized Replica updateReplicaUnderRecovery(
+ public Replica updateReplicaUnderRecovery(
final ExtendedBlock oldBlock,
final long recoveryId,
final long newBlockId,
final long newlength) throws IOException {
- //get replica
- final String bpid = oldBlock.getBlockPoolId();
- final ReplicaInfo replica = volumeMap.get(bpid, oldBlock.getBlockId());
- LOG.info("updateReplica: " + oldBlock
- + ", recoveryId=" + recoveryId
- + ", length=" + newlength
- + ", replica=" + replica);
+ try(AutoCloseableLock lock = datasetLock.acquire()) {
+ //get replica
+ final String bpid = oldBlock.getBlockPoolId();
+ final ReplicaInfo replica = volumeMap.get(bpid, oldBlock.getBlockId());
+ LOG.info("updateReplica: " + oldBlock
+ + ", recoveryId=" + recoveryId
+ + ", length=" + newlength
+ + ", replica=" + replica);
- //check replica
- if (replica == null) {
- throw new ReplicaNotFoundException(oldBlock);
- }
+ //check replica
+ if (replica == null) {
+ throw new ReplicaNotFoundException(oldBlock);
+ }
- //check replica state
- if (replica.getState() != ReplicaState.RUR) {
- throw new IOException("replica.getState() != " + ReplicaState.RUR
- + ", replica=" + replica);
- }
+ //check replica state
+ if (replica.getState() != ReplicaState.RUR) {
+ throw new IOException("replica.getState() != " + ReplicaState.RUR
+ + ", replica=" + replica);
+ }
- //check replica's byte on disk
- if (replica.getBytesOnDisk() != oldBlock.getNumBytes()) {
- throw new IOException("THIS IS NOT SUPPOSED TO HAPPEN:"
- + " replica.getBytesOnDisk() != block.getNumBytes(), block="
- + oldBlock + ", replica=" + replica);
- }
-
- //check replica files before update
- checkReplicaFiles(replica);
-
- //update replica
- final FinalizedReplica finalized = updateReplicaUnderRecovery(oldBlock
- .getBlockPoolId(), (ReplicaUnderRecovery) replica, recoveryId,
- newBlockId, newlength);
-
- boolean copyTruncate = newBlockId != oldBlock.getBlockId();
- if(!copyTruncate) {
- assert finalized.getBlockId() == oldBlock.getBlockId()
- && finalized.getGenerationStamp() == recoveryId
- && finalized.getNumBytes() == newlength
- : "Replica information mismatched: oldBlock=" + oldBlock
- + ", recoveryId=" + recoveryId + ", newlength=" + newlength
- + ", newBlockId=" + newBlockId + ", finalized=" + finalized;
- } else {
- assert finalized.getBlockId() == oldBlock.getBlockId()
- && finalized.getGenerationStamp() == oldBlock.getGenerationStamp()
- && finalized.getNumBytes() == oldBlock.getNumBytes()
- : "Finalized and old information mismatched: oldBlock=" + oldBlock
- + ", genStamp=" + oldBlock.getGenerationStamp()
- + ", len=" + oldBlock.getNumBytes()
- + ", finalized=" + finalized;
- }
+ //check replica's byte on disk
+ if (replica.getBytesOnDisk() != oldBlock.getNumBytes()) {
+ throw new IOException("THIS IS NOT SUPPOSED TO HAPPEN:"
+ + " replica.getBytesOnDisk() != block.getNumBytes(), block="
+ + oldBlock + ", replica=" + replica);
+ }
+
+ //check replica files before update
+ checkReplicaFiles(replica);
+
+ //update replica
+ final FinalizedReplica finalized = updateReplicaUnderRecovery(oldBlock
+ .getBlockPoolId(), (ReplicaUnderRecovery) replica, recoveryId,
+ newBlockId, newlength);
+
+ boolean copyTruncate = newBlockId != oldBlock.getBlockId();
+ if (!copyTruncate) {
+ assert finalized.getBlockId() == oldBlock.getBlockId()
+ && finalized.getGenerationStamp() == recoveryId
+ && finalized.getNumBytes() == newlength
+ : "Replica information mismatched: oldBlock=" + oldBlock
+ + ", recoveryId=" + recoveryId + ", newlength=" + newlength
+ + ", newBlockId=" + newBlockId + ", finalized=" + finalized;
+ } else {
+ assert finalized.getBlockId() == oldBlock.getBlockId()
+ && finalized.getGenerationStamp() == oldBlock.getGenerationStamp()
+ && finalized.getNumBytes() == oldBlock.getNumBytes()
+ : "Finalized and old information mismatched: oldBlock=" + oldBlock
+ + ", genStamp=" + oldBlock.getGenerationStamp()
+ + ", len=" + oldBlock.getNumBytes()
+ + ", finalized=" + finalized;
+ }
- //check replica files after update
- checkReplicaFiles(finalized);
+ //check replica files after update
+ checkReplicaFiles(finalized);
- return finalized;
+ return finalized;
+ }
}
private FinalizedReplica updateReplicaUnderRecovery(
@@ -2636,23 +2681,25 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
}
@Override // FsDatasetSpi
- public synchronized long getReplicaVisibleLength(final ExtendedBlock block)
+ public long getReplicaVisibleLength(final ExtendedBlock block)
throws IOException {
- final Replica replica = getReplicaInfo(block.getBlockPoolId(),
- block.getBlockId());
- if (replica.getGenerationStamp() < block.getGenerationStamp()) {
- throw new IOException(
- "replica.getGenerationStamp() < block.getGenerationStamp(), block="
- + block + ", replica=" + replica);
+ try(AutoCloseableLock lock = datasetLock.acquire()) {
+ final Replica replica = getReplicaInfo(block.getBlockPoolId(),
+ block.getBlockId());
+ if (replica.getGenerationStamp() < block.getGenerationStamp()) {
+ throw new IOException(
+ "replica.getGenerationStamp() < block.getGenerationStamp(), block="
+ + block + ", replica=" + replica);
+ }
+ return replica.getVisibleLength();
}
- return replica.getVisibleLength();
}
@Override
public void addBlockPool(String bpid, Configuration conf)
throws IOException {
LOG.info("Adding block pool " + bpid);
- synchronized(this) {
+ try(AutoCloseableLock lock = datasetLock.acquire()) {
volumes.addBlockPool(bpid, conf);
volumeMap.initBlockPool(bpid);
}
@@ -2660,11 +2707,14 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
}
@Override
- public synchronized void shutdownBlockPool(String bpid) {
- LOG.info("Removing block pool " + bpid);
- Map<DatanodeStorage, BlockListAsLongs> blocksPerVolume = getBlockReports(bpid);
- volumeMap.cleanUpBlockPool(bpid);
- volumes.removeBlockPool(bpid, blocksPerVolume);
+ public void shutdownBlockPool(String bpid) {
+ try(AutoCloseableLock lock = datasetLock.acquire()) {
+ LOG.info("Removing block pool " + bpid);
+ Map<DatanodeStorage, BlockListAsLongs> blocksPerVolume =
+ getBlockReports(bpid);
+ volumeMap.cleanUpBlockPool(bpid);
+ volumes.removeBlockPool(bpid, blocksPerVolume);
+ }
}
/**
@@ -2727,35 +2777,38 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
}
@Override //FsDatasetSpi
- public synchronized void deleteBlockPool(String bpid, boolean force)
+ public void deleteBlockPool(String bpid, boolean force)
throws IOException {
- List<FsVolumeImpl> curVolumes = volumes.getVolumes();
- if (!force) {
+ try(AutoCloseableLock lock = datasetLock.acquire()) {
+ List<FsVolumeImpl> curVolumes = volumes.getVolumes();
+ if (!force) {
+ for (FsVolumeImpl volume : curVolumes) {
+ try (FsVolumeReference ref = volume.obtainReference()) {
+ if (!volume.isBPDirEmpty(bpid)) {
+ LOG.warn(bpid
+ + " has some block files, cannot delete unless forced");
+ throw new IOException("Cannot delete block pool, "
+ + "it contains some block files");
+ }
+ } catch (ClosedChannelException e) {
+ // ignore.
+ }
+ }
+ }
for (FsVolumeImpl volume : curVolumes) {
try (FsVolumeReference ref = volume.obtainReference()) {
- if (!volume.isBPDirEmpty(bpid)) {
- LOG.warn(bpid + " has some block files, cannot delete unless forced");
- throw new IOException("Cannot delete block pool, "
- + "it contains some block files");
- }
+ volume.deleteBPDirectories(bpid, force);
} catch (ClosedChannelException e) {
// ignore.
}
}
}
- for (FsVolumeImpl volume : curVolumes) {
- try (FsVolumeReference ref = volume.obtainReference()) {
- volume.deleteBPDirectories(bpid, force);
- } catch (ClosedChannelException e) {
- // ignore.
- }
- }
}
@Override // FsDatasetSpi
public BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock block)
throws IOException {
- synchronized(this) {
+ try(AutoCloseableLock lock = datasetLock.acquire()) {
final Replica replica = volumeMap.get(block.getBlockPoolId(),
block.getBlockId());
if (replica == null) {
@@ -2848,7 +2901,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
@Override
public void onCompleteLazyPersist(String bpId, long blockId,
long creationTime, File[] savedFiles, FsVolumeImpl targetVolume) {
- synchronized (FsDatasetImpl.this) {
+ try(AutoCloseableLock lock = datasetLock.acquire()) {
ramDiskReplicaTracker.recordEndLazyPersist(bpId, blockId, savedFiles);
targetVolume.incDfsUsedAndNumBlocks(bpId, savedFiles[0].length()
@@ -2982,7 +3035,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
try {
block = ramDiskReplicaTracker.dequeueNextReplicaToPersist();
if (block != null) {
- synchronized (FsDatasetImpl.this) {
+ try(AutoCloseableLock lock = datasetLock.acquire()) {
replicaInfo = volumeMap.get(block.getBlockPoolId(), block.getBlockId());
// If replicaInfo is null, the block was either deleted before
@@ -3052,7 +3105,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
long blockFileUsed, metaFileUsed;
final String bpid = replicaState.getBlockPoolId();
- synchronized (FsDatasetImpl.this) {
+ try(AutoCloseableLock lock = datasetLock.acquire()) {
replicaInfo = getReplicaInfo(replicaState.getBlockPoolId(),
replicaState.getBlockId());
Preconditions.checkState(replicaInfo.getVolume().isTransientStorage());
@@ -3161,7 +3214,12 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
return s != null ? s.contains(blockId) : false;
}
}
-
+
+ @Override
+ public AutoCloseableLock acquireDatasetLock() {
+ return datasetLock.acquire();
+ }
+
public void removeDeletedBlocks(String bpid, Set<Long> blockIds) {
synchronized (deletingBlock) {
Set<Long> s = deletingBlock.get(bpid);
@@ -3229,14 +3287,16 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
this.timer = newTimer;
}
- synchronized void stopAllDataxceiverThreads(FsVolumeImpl volume) {
- for (String blockPoolId : volumeMap.getBlockPoolList()) {
- Collection<ReplicaInfo> replicas = volumeMap.replicas(blockPoolId);
- for (ReplicaInfo replicaInfo : replicas) {
- if (replicaInfo instanceof ReplicaInPipeline
- && replicaInfo.getVolume().equals(volume)) {
- ReplicaInPipeline replicaInPipeline = (ReplicaInPipeline) replicaInfo;
- replicaInPipeline.interruptThread();
+ void stopAllDataxceiverThreads(FsVolumeImpl volume) {
+ try(AutoCloseableLock lock = datasetLock.acquire()) {
+ for (String blockPoolId : volumeMap.getBlockPoolList()) {
+ Collection<ReplicaInfo> replicas = volumeMap.replicas(blockPoolId);
+ for (ReplicaInfo replicaInfo : replicas) {
+ if (replicaInfo instanceof ReplicaInPipeline
+ && replicaInfo.getVolume().equals(volume)) {
+ ReplicaInPipeline replicaInPipeline = (ReplicaInPipeline) replicaInfo;
+ replicaInPipeline.interruptThread();
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ad0ac6cc/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
index 10bae5d..67e7192 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
@@ -56,6 +56,7 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.util.AutoCloseableLock;
import org.apache.hadoop.util.CloseableReferenceCount;
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
import org.apache.hadoop.util.StringUtils;
@@ -304,7 +305,7 @@ public class FsVolumeImpl implements FsVolumeSpi {
private void decDfsUsedAndNumBlocks(String bpid, long value,
boolean blockFileDeleted) {
- synchronized(dataset) {
+ try(AutoCloseableLock lock = dataset.acquireDatasetLock()) {
BlockPoolSlice bp = bpSlices.get(bpid);
if (bp != null) {
bp.decDfsUsed(value);
@@ -316,7 +317,7 @@ public class FsVolumeImpl implements FsVolumeSpi {
}
void incDfsUsedAndNumBlocks(String bpid, long value) {
- synchronized (dataset) {
+ try(AutoCloseableLock lock = dataset.acquireDatasetLock()) {
BlockPoolSlice bp = bpSlices.get(bpid);
if (bp != null) {
bp.incDfsUsed(value);
@@ -326,7 +327,7 @@ public class FsVolumeImpl implements FsVolumeSpi {
}
void incDfsUsed(String bpid, long value) {
- synchronized(dataset) {
+ try(AutoCloseableLock lock = dataset.acquireDatasetLock()) {
BlockPoolSlice bp = bpSlices.get(bpid);
if (bp != null) {
bp.incDfsUsed(value);
@@ -337,7 +338,7 @@ public class FsVolumeImpl implements FsVolumeSpi {
@VisibleForTesting
public long getDfsUsed() throws IOException {
long dfsUsed = 0;
- synchronized(dataset) {
+ try(AutoCloseableLock lock = dataset.acquireDatasetLock()) {
for(BlockPoolSlice s : bpSlices.values()) {
dfsUsed += s.getDfsUsed();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ad0ac6cc/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
index 227f4f8..cbc9690 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
@@ -62,6 +62,7 @@ import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.metrics2.MetricsCollector;
import org.apache.hadoop.metrics2.util.MBeans;
+import org.apache.hadoop.util.AutoCloseableLock;
import org.apache.hadoop.util.DataChecksum;
/**
@@ -113,6 +114,8 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
"dfs.datanode.simulateddatastorage.state";
private static final DatanodeStorage.State DEFAULT_STATE =
DatanodeStorage.State.NORMAL;
+
+ private final AutoCloseableLock datasetLock;
static final byte[] nullCrcFileData;
static {
@@ -550,6 +553,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
conf.getLong(CONFIG_PROPERTY_CAPACITY, DEFAULT_CAPACITY),
conf.getEnum(CONFIG_PROPERTY_STATE, DEFAULT_STATE));
this.volume = new SimulatedVolume(this.storage);
+ this.datasetLock = new AutoCloseableLock();
}
public synchronized void injectBlocks(String bpid,
@@ -1365,5 +1369,10 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
public boolean isDeletingBlock(String bpid, long blockId) {
throw new UnsupportedOperationException();
}
+
+ @Override
+ public AutoCloseableLock acquireDatasetLock() {
+ return datasetLock.acquire();
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ad0ac6cc/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
index 513e9a8..a11cfb9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
@@ -85,6 +85,7 @@ import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.util.AutoCloseableLock;
import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.Time;
import org.apache.log4j.Level;
@@ -693,7 +694,7 @@ public class TestBlockRecovery {
final RecoveringBlock recoveringBlock = new RecoveringBlock(
block.getBlock(), locations, block.getBlock()
.getGenerationStamp() + 1);
- synchronized (dataNode.data) {
+ try(AutoCloseableLock lock = dataNode.data.acquireDatasetLock()) {
Thread.sleep(2000);
dataNode.initReplicaRecovery(recoveringBlock);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ad0ac6cc/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java
index bd99b07..82bbdc4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java
@@ -63,6 +63,7 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetTestUtil;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.LazyPersistTestCase;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.util.AutoCloseableLock;
import org.apache.hadoop.util.Time;
import org.junit.Before;
import org.junit.Test;
@@ -113,7 +114,7 @@ public class TestDirectoryScanner {
/** Truncate a block file */
private long truncateBlockFile() throws IOException {
- synchronized (fds) {
+ try(AutoCloseableLock lock = fds.acquireDatasetLock()) {
for (ReplicaInfo b : FsDatasetTestUtil.getReplicas(fds, bpid)) {
File f = b.getBlockFile();
File mf = b.getMetaFile();
@@ -138,7 +139,7 @@ public class TestDirectoryScanner {
/** Delete a block file */
private long deleteBlockFile() {
- synchronized(fds) {
+ try(AutoCloseableLock lock = fds.acquireDatasetLock()) {
for (ReplicaInfo b : FsDatasetTestUtil.getReplicas(fds, bpid)) {
File f = b.getBlockFile();
File mf = b.getMetaFile();
@@ -154,7 +155,7 @@ public class TestDirectoryScanner {
/** Delete block meta file */
private long deleteMetaFile() {
- synchronized(fds) {
+ try(AutoCloseableLock lock = fds.acquireDatasetLock()) {
for (ReplicaInfo b : FsDatasetTestUtil.getReplicas(fds, bpid)) {
File file = b.getMetaFile();
// Delete a metadata file
@@ -173,7 +174,7 @@ public class TestDirectoryScanner {
* @throws IOException
*/
private void duplicateBlock(long blockId) throws IOException {
- synchronized (fds) {
+ try(AutoCloseableLock lock = fds.acquireDatasetLock()) {
ReplicaInfo b = FsDatasetTestUtil.fetchReplicaInfo(fds, bpid, blockId);
try (FsDatasetSpi.FsVolumeReferences volumes =
fds.getFsVolumeReferences()) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ad0ac6cc/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java
index 8997f73..738f338 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java
@@ -43,6 +43,7 @@ import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
import org.apache.hadoop.metrics2.MetricsCollector;
+import org.apache.hadoop.util.AutoCloseableLock;
public class ExternalDatasetImpl implements FsDatasetSpi<ExternalVolumeImpl> {
@@ -448,4 +449,9 @@ public class ExternalDatasetImpl implements FsDatasetSpi<ExternalVolumeImpl> {
public boolean isDeletingBlock(String bpid, long blockId) {
return false;
}
+
+ @Override
+ public AutoCloseableLock acquireDatasetLock() {
+ return null;
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org
[3/3] hadoop git commit: HDFS-10742. Measure lock time in
FsDatasetImpl. Contributed by Chen Liang.
Posted by ar...@apache.org.
HDFS-10742. Measure lock time in FsDatasetImpl. Contributed by Chen Liang.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/012b266e
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/012b266e
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/012b266e
Branch: refs/heads/branch-2
Commit: 012b266e5eb4a371955d5d4e128483ed4e75631e
Parents: ad0ac6c
Author: Arpit Agarwal <ar...@apache.org>
Authored: Sat Sep 10 18:04:00 2016 -0700
Committer: Arpit Agarwal <ar...@apache.org>
Committed: Sat Sep 10 18:04:00 2016 -0700
----------------------------------------------------------------------
.../apache/hadoop/util/AutoCloseableLock.java | 28 ++-
.../org/apache/hadoop/hdfs/DFSConfigKeys.java | 5 +
.../apache/hadoop/hdfs/InstrumentedLock.java | 185 +++++++++++++++++++
.../datanode/fsdataset/impl/FsDatasetImpl.java | 11 +-
.../src/main/resources/hdfs-default.xml | 7 +
.../hadoop/hdfs/TestInstrumentedLock.java | 166 +++++++++++++++++
6 files changed, 394 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/012b266e/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/AutoCloseableLock.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/AutoCloseableLock.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/AutoCloseableLock.java
index 2aa8578..d920bc6 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/AutoCloseableLock.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/AutoCloseableLock.java
@@ -17,22 +17,33 @@
*/
package org.apache.hadoop.util;
+import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
+import com.google.common.annotations.VisibleForTesting;
+
/**
* This is a wrap class of a ReentrantLock. Extending AutoCloseable
* interface such that the users can use a try-with-resource syntax.
*/
public class AutoCloseableLock implements AutoCloseable {
- private final ReentrantLock lock;
+ private final Lock lock;
/**
* Creates an instance of {@code AutoCloseableLock}, initializes
- * the underlying {@code ReentrantLock} object.
+ * the underlying lock instance with a new {@code ReentrantLock}.
*/
public AutoCloseableLock() {
- this.lock = new ReentrantLock();
+ this(new ReentrantLock());
+ }
+
+ /**
+ * Wrap provided Lock instance.
+ * @param lock Lock instance to wrap in AutoCloseable API.
+ */
+ public AutoCloseableLock(Lock lock) {
+ this.lock = lock;
}
/**
@@ -86,7 +97,7 @@ public class AutoCloseableLock implements AutoCloseable {
/**
* A wrapper method that makes a call to {@code tryLock()} of
- * the underlying {@code ReentrantLock} object.
+ * the underlying {@code Lock} object.
*
* If the lock is not held by another thread, acquires the lock, set the
* hold count to one and returns {@code true}.
@@ -116,7 +127,12 @@ public class AutoCloseableLock implements AutoCloseable {
* @return {@code true} if any thread holds this lock and
* {@code false} otherwise
*/
- public boolean isLocked() {
- return lock.isLocked();
+ @VisibleForTesting
+ boolean isLocked() {
+ if (lock instanceof ReentrantLock) {
+ return ((ReentrantLock)lock).isLocked();
+ }
+ throw new UnsupportedOperationException();
}
+
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/012b266e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index 0457741..4ed9fc3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -392,6 +392,11 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final String DFS_NAMENODE_READ_LOCK_REPORTING_THRESHOLD_MS_KEY =
"dfs.namenode.read-lock-reporting-threshold-ms";
public static final long DFS_NAMENODE_READ_LOCK_REPORTING_THRESHOLD_MS_DEFAULT = 5000L;
+ // Threshold for how long the lock warnings must be suppressed
+ public static final String DFS_LOCK_SUPPRESS_WARNING_INTERVAL_KEY =
+ "dfs.lock.suppress.warning.interval";
+ public static final long DFS_LOCK_SUPPRESS_WARNING_INTERVAL_DEFAULT =
+ 10000; //ms
public static final String DFS_UPGRADE_DOMAIN_FACTOR = "dfs.namenode.upgrade.domain.factor";
public static final int DFS_UPGRADE_DOMAIN_FACTOR_DEFAULT = DFS_REPLICATION_DEFAULT;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/012b266e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/InstrumentedLock.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/InstrumentedLock.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/InstrumentedLock.java
new file mode 100644
index 0000000..6279e95
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/InstrumentedLock.java
@@ -0,0 +1,185 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.commons.logging.Log;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Timer;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * This is a debugging class that can be used by callers to track
+ * whether a specifc lock is being held for too long and periodically
+ * log a warning and stack trace, if so.
+ *
+ * The logged warnings are throttled so that logs are not spammed.
+ *
+ * A new instance of InstrumentedLock can be created for each object
+ * that needs to be instrumented.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class InstrumentedLock implements Lock {
+
+ private final Lock lock;
+ private final Log logger;
+ private final String name;
+ private final Timer clock;
+
+ /** Minimum gap between two lock warnings. */
+ private final long minLoggingGap;
+ /** Threshold for detecting long lock held time. */
+ private final long lockWarningThreshold;
+
+ // Tracking counters for lock statistics.
+ private volatile long lockAcquireTimestamp;
+ private final AtomicLong lastLogTimestamp;
+ private final AtomicLong warningsSuppressed = new AtomicLong(0);
+
+ /**
+ * Create a instrumented lock instance which logs a warning message
+ * when lock held time is above given threshold.
+ *
+ * @param name the identifier of the lock object
+ * @param logger this class does not have its own logger, will log to the
+ * given logger instead
+ * @param minLoggingGapMs the minimum time gap between two log messages,
+ * this is to avoid spamming to many logs
+ * @param lockWarningThresholdMs the time threshold to view lock held
+ * time as being "too long"
+ */
+ public InstrumentedLock(String name, Log logger, long minLoggingGapMs,
+ long lockWarningThresholdMs) {
+ this(name, logger, new ReentrantLock(),
+ minLoggingGapMs, lockWarningThresholdMs);
+ }
+
+ public InstrumentedLock(String name, Log logger, Lock lock,
+ long minLoggingGapMs, long lockWarningThresholdMs) {
+ this(name, logger, lock,
+ minLoggingGapMs, lockWarningThresholdMs, new Timer());
+ }
+
+ @VisibleForTesting
+ InstrumentedLock(String name, Log logger, Lock lock,
+ long minLoggingGapMs, long lockWarningThresholdMs, Timer clock) {
+ this.name = name;
+ this.lock = lock;
+ this.clock = clock;
+ this.logger = logger;
+ minLoggingGap = minLoggingGapMs;
+ lockWarningThreshold = lockWarningThresholdMs;
+ lastLogTimestamp = new AtomicLong(
+ clock.monotonicNow() - Math.max(minLoggingGap, lockWarningThreshold));
+ }
+
+ @Override
+ public void lock() {
+ lock.lock();
+ lockAcquireTimestamp = clock.monotonicNow();
+ }
+
+ @Override
+ public void lockInterruptibly() throws InterruptedException {
+ lock.lockInterruptibly();
+ lockAcquireTimestamp = clock.monotonicNow();
+ }
+
+ @Override
+ public boolean tryLock() {
+ if (lock.tryLock()) {
+ lockAcquireTimestamp = clock.monotonicNow();
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
+ if (lock.tryLock(time, unit)) {
+ lockAcquireTimestamp = clock.monotonicNow();
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public void unlock() {
+ long localLockReleaseTime = clock.monotonicNow();
+ long localLockAcquireTime = lockAcquireTimestamp;
+ lock.unlock();
+ check(localLockAcquireTime, localLockReleaseTime);
+ }
+
+ @Override
+ public Condition newCondition() {
+ return lock.newCondition();
+ }
+
+ @VisibleForTesting
+ void logWarning(long lockHeldTime, long suppressed) {
+ logger.warn(String.format("Lock held time above threshold: " +
+ "lock identifier: %s " +
+ "lockHeldTimeMs=%d ms. Suppressed %d lock warnings. " +
+ "The stack trace is: %s" ,
+ name, lockHeldTime, suppressed,
+ StringUtils.getStackTrace(Thread.currentThread())));
+ }
+
+ /**
+ * Log a warning if the lock was held for too long.
+ *
+ * Should be invoked by the caller immediately AFTER releasing the lock.
+ *
+ * @param acquireTime - timestamp just after acquiring the lock.
+ * @param releaseTime - timestamp just before releasing the lock.
+ */
+ private void check(long acquireTime, long releaseTime) {
+ if (!logger.isWarnEnabled()) {
+ return;
+ }
+
+ final long lockHeldTime = releaseTime - acquireTime;
+ if (lockWarningThreshold - lockHeldTime < 0) {
+ long now;
+ long localLastLogTs;
+ do {
+ now = clock.monotonicNow();
+ localLastLogTs = lastLogTimestamp.get();
+ long deltaSinceLastLog = now - localLastLogTs;
+ // check should print log or not
+ if (deltaSinceLastLog - minLoggingGap < 0) {
+ warningsSuppressed.incrementAndGet();
+ return;
+ }
+ } while (!lastLogTimestamp.compareAndSet(localLastLogTs, now));
+ long suppressed = warningsSuppressed.getAndSet(0);
+ logWarning(lockHeldTime, suppressed);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/012b266e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
index a1c2a46..5b3ebce 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
@@ -42,6 +42,7 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.*;
import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
import javax.management.NotCompliantMBeanException;
import javax.management.ObjectName;
@@ -62,7 +63,7 @@ import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtilClient;
import org.apache.hadoop.hdfs.ExtendedBlockId;
-import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.InstrumentedLock;
import org.apache.hadoop.util.AutoCloseableLock;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
@@ -282,7 +283,13 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
this.dataStorage = storage;
this.conf = conf;
this.smallBufferSize = DFSUtilClient.getSmallBufferSize(conf);
- this.datasetLock = new AutoCloseableLock();
+ this.datasetLock = new AutoCloseableLock(
+ new InstrumentedLock(getClass().getName(), LOG,
+ conf.getTimeDuration(
+ DFSConfigKeys.DFS_LOCK_SUPPRESS_WARNING_INTERVAL_KEY,
+ DFSConfigKeys.DFS_LOCK_SUPPRESS_WARNING_INTERVAL_DEFAULT,
+ TimeUnit.MILLISECONDS),
+ 300));
// The number of volumes required for operation is the total number
// of volumes minus the number of failed volumes we can tolerate.
volFailuresTolerated = datanode.getDnConf().getVolFailuresTolerated();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/012b266e/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index 11a2539..44cde11 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -4094,4 +4094,11 @@
Truststore password for HTTPS SSL configuration
</description>
</property>
+
+<property>
+ <name>dfs.lock.suppress.warning.interval</name>
+ <value>10s</value>
+ <description>Instrumentation reporting long critical sections will suppress
+ consecutive warnings within this interval.</description>
+</property>
</configuration>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/012b266e/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestInstrumentedLock.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestInstrumentedLock.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestInstrumentedLock.java
new file mode 100644
index 0000000..1d1a42b
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestInstrumentedLock.java
@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
+
+import org.apache.hadoop.util.AutoCloseableLock;
+import org.apache.hadoop.util.Timer;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+import static org.mockito.Mockito.*;
+import static org.junit.Assert.*;
+
+/**
+ * A test class for InstrumentedLock.
+ */
+public class TestInstrumentedLock {
+
+ static final Log LOG = LogFactory.getLog(TestInstrumentedLock.class);
+
+ @Rule public TestName name = new TestName();
+
+ /**
+ * Test exclusive access of the lock.
+ * @throws Exception
+ */
+ @Test(timeout=10000)
+ public void testMultipleThread() throws Exception {
+ String testname = name.getMethodName();
+ final InstrumentedLock lock = new InstrumentedLock(testname, LOG, 0, 300);
+ lock.lock();
+ try {
+ Thread competingThread = new Thread() {
+ @Override
+ public void run() {
+ assertFalse(lock.tryLock());
+ }
+ };
+ competingThread.start();
+ competingThread.join();
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * Test the correctness with try-with-resource syntax.
+ * @throws Exception
+ */
+ @Test(timeout=10000)
+ public void testTryWithResourceSyntax() throws Exception {
+ String testname = name.getMethodName();
+ final AtomicReference<Thread> lockThread = new AtomicReference<>(null);
+ final Lock lock = new InstrumentedLock(testname, LOG, 0, 300) {
+ @Override
+ public void lock() {
+ super.lock();
+ lockThread.set(Thread.currentThread());
+ }
+ @Override
+ public void unlock() {
+ super.unlock();
+ lockThread.set(null);
+ }
+ };
+ AutoCloseableLock acl = new AutoCloseableLock(lock);
+ try (AutoCloseable localLock = acl.acquire()) {
+ assertEquals(acl, localLock);
+ Thread competingThread = new Thread() {
+ @Override
+ public void run() {
+ assertNotEquals(Thread.currentThread(), lockThread.get());
+ assertFalse(lock.tryLock());
+ }
+ };
+ competingThread.start();
+ competingThread.join();
+ assertEquals(Thread.currentThread(), lockThread.get());
+ }
+ assertNull(lockThread.get());
+ }
+
+ /**
+ * Test the lock logs warning when lock held time is greater than threshold
+ * and not log warning otherwise.
+ * @throws Exception
+ */
+ @Test(timeout=10000)
+ public void testLockLongHoldingReport() throws Exception {
+ String testname = name.getMethodName();
+ final AtomicLong time = new AtomicLong(0);
+ Timer mclock = new Timer() {
+ @Override
+ public long monotonicNow() {
+ return time.get();
+ }
+ };
+ Lock mlock = mock(Lock.class);
+
+ final AtomicLong wlogged = new AtomicLong(0);
+ final AtomicLong wsuppresed = new AtomicLong(0);
+ InstrumentedLock lock = new InstrumentedLock(
+ testname, LOG, mlock, 2000, 300, mclock) {
+ @Override
+ void logWarning(long lockHeldTime, long suppressed) {
+ wlogged.incrementAndGet();
+ wsuppresed.set(suppressed);
+ }
+ };
+
+ // do not log warning when the lock held time is short
+ lock.lock(); // t = 0
+ time.set(200);
+ lock.unlock(); // t = 200
+ assertEquals(0, wlogged.get());
+ assertEquals(0, wsuppresed.get());
+
+ lock.lock(); // t = 200
+ time.set(700);
+ lock.unlock(); // t = 700
+ assertEquals(1, wlogged.get());
+ assertEquals(0, wsuppresed.get());
+
+ // despite the lock held time is greater than threshold
+ // suppress the log warning due to the logging gap
+ // (not recorded in wsuppressed until next log message)
+ lock.lock(); // t = 700
+ time.set(1100);
+ lock.unlock(); // t = 1100
+ assertEquals(1, wlogged.get());
+ assertEquals(0, wsuppresed.get());
+
+ // log a warning message when the lock held time is greater the threshold
+ // and the logging time gap is satisfied. Also should display suppressed
+ // previous warnings.
+ time.set(2400);
+ lock.lock(); // t = 2400
+ time.set(2800);
+ lock.unlock(); // t = 2800
+ assertEquals(2, wlogged.get());
+ assertEquals(1, wsuppresed.get());
+ }
+
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org
[2/3] hadoop git commit: HDFS-10742. Measure lock time in
FsDatasetImpl. Contributed by Chen Liang.
Posted by ar...@apache.org.
HDFS-10742. Measure lock time in FsDatasetImpl. Contributed by Chen Liang.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/04f620c4
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/04f620c4
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/04f620c4
Branch: refs/heads/branch-2.8
Commit: 04f620c4d01c3954c780465087a327bba76fe6df
Parents: d5ea508
Author: Arpit Agarwal <ar...@apache.org>
Authored: Sat Sep 10 18:01:37 2016 -0700
Committer: Arpit Agarwal <ar...@apache.org>
Committed: Sat Sep 10 18:01:37 2016 -0700
----------------------------------------------------------------------
.../apache/hadoop/util/AutoCloseableLock.java | 28 ++-
.../org/apache/hadoop/hdfs/DFSConfigKeys.java | 5 +
.../apache/hadoop/hdfs/InstrumentedLock.java | 185 +++++++++++++++++++
.../datanode/fsdataset/impl/FsDatasetImpl.java | 11 +-
.../src/main/resources/hdfs-default.xml | 7 +
.../hadoop/hdfs/TestInstrumentedLock.java | 166 +++++++++++++++++
6 files changed, 394 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/04f620c4/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/AutoCloseableLock.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/AutoCloseableLock.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/AutoCloseableLock.java
index 2aa8578..d920bc6 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/AutoCloseableLock.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/AutoCloseableLock.java
@@ -17,22 +17,33 @@
*/
package org.apache.hadoop.util;
+import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
+import com.google.common.annotations.VisibleForTesting;
+
/**
* This is a wrap class of a ReentrantLock. Extending AutoCloseable
* interface such that the users can use a try-with-resource syntax.
*/
public class AutoCloseableLock implements AutoCloseable {
- private final ReentrantLock lock;
+ private final Lock lock;
/**
* Creates an instance of {@code AutoCloseableLock}, initializes
- * the underlying {@code ReentrantLock} object.
+ * the underlying lock instance with a new {@code ReentrantLock}.
*/
public AutoCloseableLock() {
- this.lock = new ReentrantLock();
+ this(new ReentrantLock());
+ }
+
+ /**
+ * Wrap provided Lock instance.
+ * @param lock Lock instance to wrap in AutoCloseable API.
+ */
+ public AutoCloseableLock(Lock lock) {
+ this.lock = lock;
}
/**
@@ -86,7 +97,7 @@ public class AutoCloseableLock implements AutoCloseable {
/**
* A wrapper method that makes a call to {@code tryLock()} of
- * the underlying {@code ReentrantLock} object.
+ * the underlying {@code Lock} object.
*
* If the lock is not held by another thread, acquires the lock, set the
* hold count to one and returns {@code true}.
@@ -116,7 +127,12 @@ public class AutoCloseableLock implements AutoCloseable {
* @return {@code true} if any thread holds this lock and
* {@code false} otherwise
*/
- public boolean isLocked() {
- return lock.isLocked();
+ @VisibleForTesting
+ boolean isLocked() {
+ if (lock instanceof ReentrantLock) {
+ return ((ReentrantLock)lock).isLocked();
+ }
+ throw new UnsupportedOperationException();
}
+
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/04f620c4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index 9eee1ac..382404c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -383,6 +383,11 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final String DFS_NAMENODE_READ_LOCK_REPORTING_THRESHOLD_MS_KEY =
"dfs.namenode.read-lock-reporting-threshold-ms";
public static final long DFS_NAMENODE_READ_LOCK_REPORTING_THRESHOLD_MS_DEFAULT = 5000L;
+ // Threshold for how long the lock warnings must be suppressed
+ public static final String DFS_LOCK_SUPPRESS_WARNING_INTERVAL_KEY =
+ "dfs.lock.suppress.warning.interval";
+ public static final long DFS_LOCK_SUPPRESS_WARNING_INTERVAL_DEFAULT =
+ 10000; //ms
public static final String DFS_UPGRADE_DOMAIN_FACTOR = "dfs.namenode.upgrade.domain.factor";
public static final int DFS_UPGRADE_DOMAIN_FACTOR_DEFAULT = DFS_REPLICATION_DEFAULT;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/04f620c4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/InstrumentedLock.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/InstrumentedLock.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/InstrumentedLock.java
new file mode 100644
index 0000000..6279e95
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/InstrumentedLock.java
@@ -0,0 +1,185 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.commons.logging.Log;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Timer;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * This is a debugging class that can be used by callers to track
+ * whether a specifc lock is being held for too long and periodically
+ * log a warning and stack trace, if so.
+ *
+ * The logged warnings are throttled so that logs are not spammed.
+ *
+ * A new instance of InstrumentedLock can be created for each object
+ * that needs to be instrumented.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class InstrumentedLock implements Lock {
+
+ private final Lock lock;
+ private final Log logger;
+ private final String name;
+ private final Timer clock;
+
+ /** Minimum gap between two lock warnings. */
+ private final long minLoggingGap;
+ /** Threshold for detecting long lock held time. */
+ private final long lockWarningThreshold;
+
+ // Tracking counters for lock statistics.
+ private volatile long lockAcquireTimestamp;
+ private final AtomicLong lastLogTimestamp;
+ private final AtomicLong warningsSuppressed = new AtomicLong(0);
+
+ /**
+ * Create a instrumented lock instance which logs a warning message
+ * when lock held time is above given threshold.
+ *
+ * @param name the identifier of the lock object
+ * @param logger this class does not have its own logger, will log to the
+ * given logger instead
+ * @param minLoggingGapMs the minimum time gap between two log messages,
+ * this is to avoid spamming to many logs
+ * @param lockWarningThresholdMs the time threshold to view lock held
+ * time as being "too long"
+ */
+ public InstrumentedLock(String name, Log logger, long minLoggingGapMs,
+ long lockWarningThresholdMs) {
+ this(name, logger, new ReentrantLock(),
+ minLoggingGapMs, lockWarningThresholdMs);
+ }
+
+ public InstrumentedLock(String name, Log logger, Lock lock,
+ long minLoggingGapMs, long lockWarningThresholdMs) {
+ this(name, logger, lock,
+ minLoggingGapMs, lockWarningThresholdMs, new Timer());
+ }
+
+ @VisibleForTesting
+ InstrumentedLock(String name, Log logger, Lock lock,
+ long minLoggingGapMs, long lockWarningThresholdMs, Timer clock) {
+ this.name = name;
+ this.lock = lock;
+ this.clock = clock;
+ this.logger = logger;
+ minLoggingGap = minLoggingGapMs;
+ lockWarningThreshold = lockWarningThresholdMs;
+ lastLogTimestamp = new AtomicLong(
+ clock.monotonicNow() - Math.max(minLoggingGap, lockWarningThreshold));
+ }
+
+ @Override
+ public void lock() {
+ lock.lock();
+ lockAcquireTimestamp = clock.monotonicNow();
+ }
+
+ @Override
+ public void lockInterruptibly() throws InterruptedException {
+ lock.lockInterruptibly();
+ lockAcquireTimestamp = clock.monotonicNow();
+ }
+
+ @Override
+ public boolean tryLock() {
+ if (lock.tryLock()) {
+ lockAcquireTimestamp = clock.monotonicNow();
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
+ if (lock.tryLock(time, unit)) {
+ lockAcquireTimestamp = clock.monotonicNow();
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public void unlock() {
+ long localLockReleaseTime = clock.monotonicNow();
+ long localLockAcquireTime = lockAcquireTimestamp;
+ lock.unlock();
+ check(localLockAcquireTime, localLockReleaseTime);
+ }
+
+ @Override
+ public Condition newCondition() {
+ return lock.newCondition();
+ }
+
+ @VisibleForTesting
+ void logWarning(long lockHeldTime, long suppressed) {
+ logger.warn(String.format("Lock held time above threshold: " +
+ "lock identifier: %s " +
+ "lockHeldTimeMs=%d ms. Suppressed %d lock warnings. " +
+ "The stack trace is: %s" ,
+ name, lockHeldTime, suppressed,
+ StringUtils.getStackTrace(Thread.currentThread())));
+ }
+
+ /**
+ * Log a warning if the lock was held for too long.
+ *
+ * Should be invoked by the caller immediately AFTER releasing the lock.
+ *
+ * @param acquireTime - timestamp just after acquiring the lock.
+ * @param releaseTime - timestamp just before releasing the lock.
+ */
+ private void check(long acquireTime, long releaseTime) {
+ if (!logger.isWarnEnabled()) {
+ return;
+ }
+
+ final long lockHeldTime = releaseTime - acquireTime;
+ if (lockWarningThreshold - lockHeldTime < 0) {
+ long now;
+ long localLastLogTs;
+ do {
+ now = clock.monotonicNow();
+ localLastLogTs = lastLogTimestamp.get();
+ long deltaSinceLastLog = now - localLastLogTs;
+ // check should print log or not
+ if (deltaSinceLastLog - minLoggingGap < 0) {
+ warningsSuppressed.incrementAndGet();
+ return;
+ }
+ } while (!lastLogTimestamp.compareAndSet(localLastLogTs, now));
+ long suppressed = warningsSuppressed.getAndSet(0);
+ logWarning(lockHeldTime, suppressed);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/04f620c4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
index 596ab75..1f0d975 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
@@ -42,6 +42,7 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.*;
import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
import javax.management.NotCompliantMBeanException;
import javax.management.ObjectName;
@@ -62,7 +63,7 @@ import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtilClient;
import org.apache.hadoop.hdfs.ExtendedBlockId;
-import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.InstrumentedLock;
import org.apache.hadoop.util.AutoCloseableLock;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
@@ -280,7 +281,13 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
this.dataStorage = storage;
this.conf = conf;
this.smallBufferSize = DFSUtilClient.getSmallBufferSize(conf);
- this.datasetLock = new AutoCloseableLock();
+ this.datasetLock = new AutoCloseableLock(
+ new InstrumentedLock(getClass().getName(), LOG,
+ conf.getTimeDuration(
+ DFSConfigKeys.DFS_LOCK_SUPPRESS_WARNING_INTERVAL_KEY,
+ DFSConfigKeys.DFS_LOCK_SUPPRESS_WARNING_INTERVAL_DEFAULT,
+ TimeUnit.MILLISECONDS),
+ 300));
// The number of volumes required for operation is the total number
// of volumes minus the number of failed volumes we can tolerate.
volFailuresTolerated = datanode.getDnConf().getVolFailuresTolerated();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/04f620c4/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index 19ae973..6984f6f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -3046,4 +3046,11 @@
Keytab based login can be enabled with dfs.balancer.keytab.enabled.
</description>
</property>
+
+<property>
+ <name>dfs.lock.suppress.warning.interval</name>
+ <value>10s</value>
+ <description>Instrumentation reporting long critical sections will suppress
+ consecutive warnings within this interval.</description>
+</property>
</configuration>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/04f620c4/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestInstrumentedLock.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestInstrumentedLock.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestInstrumentedLock.java
new file mode 100644
index 0000000..1d1a42b
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestInstrumentedLock.java
@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
+
+import org.apache.hadoop.util.AutoCloseableLock;
+import org.apache.hadoop.util.Timer;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+import static org.mockito.Mockito.*;
+import static org.junit.Assert.*;
+
+/**
+ * A test class for InstrumentedLock.
+ */
+public class TestInstrumentedLock {
+
+ static final Log LOG = LogFactory.getLog(TestInstrumentedLock.class);
+
+ @Rule public TestName name = new TestName();
+
+ /**
+ * Test exclusive access of the lock.
+ * @throws Exception
+ */
+ @Test(timeout=10000)
+ public void testMultipleThread() throws Exception {
+ String testname = name.getMethodName();
+ final InstrumentedLock lock = new InstrumentedLock(testname, LOG, 0, 300);
+ lock.lock();
+ try {
+ Thread competingThread = new Thread() {
+ @Override
+ public void run() {
+ assertFalse(lock.tryLock());
+ }
+ };
+ competingThread.start();
+ competingThread.join();
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * Test the correctness with try-with-resource syntax.
+ * @throws Exception
+ */
+ @Test(timeout=10000)
+ public void testTryWithResourceSyntax() throws Exception {
+ String testname = name.getMethodName();
+ final AtomicReference<Thread> lockThread = new AtomicReference<>(null);
+ final Lock lock = new InstrumentedLock(testname, LOG, 0, 300) {
+ @Override
+ public void lock() {
+ super.lock();
+ lockThread.set(Thread.currentThread());
+ }
+ @Override
+ public void unlock() {
+ super.unlock();
+ lockThread.set(null);
+ }
+ };
+ AutoCloseableLock acl = new AutoCloseableLock(lock);
+ try (AutoCloseable localLock = acl.acquire()) {
+ assertEquals(acl, localLock);
+ Thread competingThread = new Thread() {
+ @Override
+ public void run() {
+ assertNotEquals(Thread.currentThread(), lockThread.get());
+ assertFalse(lock.tryLock());
+ }
+ };
+ competingThread.start();
+ competingThread.join();
+ assertEquals(Thread.currentThread(), lockThread.get());
+ }
+ assertNull(lockThread.get());
+ }
+
+ /**
+ * Test the lock logs warning when lock held time is greater than threshold
+ * and not log warning otherwise.
+ * @throws Exception
+ */
+ @Test(timeout=10000)
+ public void testLockLongHoldingReport() throws Exception {
+ String testname = name.getMethodName();
+ final AtomicLong time = new AtomicLong(0);
+ Timer mclock = new Timer() {
+ @Override
+ public long monotonicNow() {
+ return time.get();
+ }
+ };
+ Lock mlock = mock(Lock.class);
+
+ final AtomicLong wlogged = new AtomicLong(0);
+ final AtomicLong wsuppresed = new AtomicLong(0);
+ InstrumentedLock lock = new InstrumentedLock(
+ testname, LOG, mlock, 2000, 300, mclock) {
+ @Override
+ void logWarning(long lockHeldTime, long suppressed) {
+ wlogged.incrementAndGet();
+ wsuppresed.set(suppressed);
+ }
+ };
+
+ // do not log warning when the lock held time is short
+ lock.lock(); // t = 0
+ time.set(200);
+ lock.unlock(); // t = 200
+ assertEquals(0, wlogged.get());
+ assertEquals(0, wsuppresed.get());
+
+ lock.lock(); // t = 200
+ time.set(700);
+ lock.unlock(); // t = 700
+ assertEquals(1, wlogged.get());
+ assertEquals(0, wsuppresed.get());
+
+ // despite the lock held time is greater than threshold
+ // suppress the log warning due to the logging gap
+ // (not recorded in wsuppressed until next log message)
+ lock.lock(); // t = 700
+ time.set(1100);
+ lock.unlock(); // t = 1100
+ assertEquals(1, wlogged.get());
+ assertEquals(0, wsuppresed.get());
+
+ // log a warning message when the lock held time is greater the threshold
+ // and the logging time gap is satisfied. Also should display suppressed
+ // previous warnings.
+ time.set(2400);
+ lock.lock(); // t = 2400
+ time.set(2800);
+ lock.unlock(); // t = 2800
+ assertEquals(2, wlogged.get());
+ assertEquals(1, wsuppresed.get());
+ }
+
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org