You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ar...@apache.org on 2016/09/11 01:11:28 UTC

[1/3] hadoop git commit: HDFS-10682. Replace FsDatasetImpl object lock with a separate lock object. (Contributed by Chen Liang)

Repository: hadoop
Updated Branches:
  refs/heads/branch-2 bb6d86620 -> 012b266e5
  refs/heads/branch-2.8 d5ea508ca -> 04f620c4d


HDFS-10682. Replace FsDatasetImpl object lock with a separate lock object. (Contributed by Chen Liang)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/ad0ac6cc
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/ad0ac6cc
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/ad0ac6cc

Branch: refs/heads/branch-2
Commit: ad0ac6cced127c5303ea8497fdd856b9cd0acb38
Parents: bb6d866
Author: Arpit Agarwal <ar...@apache.org>
Authored: Wed Aug 17 16:22:00 2016 -0700
Committer: Arpit Agarwal <ar...@apache.org>
Committed: Sat Sep 10 17:48:27 2016 -0700

----------------------------------------------------------------------
 .../hdfs/server/datanode/BlockSender.java       |   3 +-
 .../hadoop/hdfs/server/datanode/DataNode.java   |   3 +-
 .../hdfs/server/datanode/DirectoryScanner.java  |   4 +-
 .../server/datanode/fsdataset/FsDatasetSpi.java |   6 +
 .../datanode/fsdataset/impl/FsDatasetImpl.java  | 926 ++++++++++---------
 .../datanode/fsdataset/impl/FsVolumeImpl.java   |   9 +-
 .../server/datanode/SimulatedFSDataset.java     |   9 +
 .../hdfs/server/datanode/TestBlockRecovery.java |   3 +-
 .../server/datanode/TestDirectoryScanner.java   |   9 +-
 .../extdataset/ExternalDatasetImpl.java         |   6 +
 10 files changed, 533 insertions(+), 445 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/ad0ac6cc/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
index 0b4fac1..e060cab 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
@@ -46,6 +46,7 @@ import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.ReadaheadPool.ReadaheadRequest;
 import org.apache.hadoop.io.nativeio.NativeIO;
 import org.apache.hadoop.net.SocketOutputStream;
+import org.apache.hadoop.util.AutoCloseableLock;
 import org.apache.hadoop.util.DataChecksum;
 import org.apache.htrace.core.Sampler;
 import org.apache.htrace.core.TraceScope;
@@ -242,7 +243,7 @@ class BlockSender implements java.io.Closeable {
       }
       
       final long replicaVisibleLength;
-      synchronized(datanode.data) { 
+      try(AutoCloseableLock lock = datanode.data.acquireDatasetLock()) {
         replica = getReplica(block, datanode);
         replicaVisibleLength = replica.getVisibleLength();
       }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ad0ac6cc/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
index 11b3e6a..b31c43b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
@@ -201,6 +201,7 @@ import org.apache.hadoop.tracing.TraceAdminProtocolPB;
 import org.apache.hadoop.tracing.TraceAdminProtocolServerSideTranslatorPB;
 import org.apache.hadoop.tracing.TraceUtils;
 import org.apache.hadoop.tracing.TracerConfigurationManager;
+import org.apache.hadoop.util.AutoCloseableLock;
 import org.apache.hadoop.util.Daemon;
 import org.apache.hadoop.util.DiskChecker;
 import org.apache.hadoop.util.DiskChecker.DiskErrorException;
@@ -2831,7 +2832,7 @@ public class DataNode extends ReconfigurableBase
     final BlockConstructionStage stage;
 
     //get replica information
-    synchronized(data) {
+    try(AutoCloseableLock lock = data.acquireDatasetLock()) {
       Block storedBlock = data.getStoredBlock(b.getBlockPoolId(),
           b.getBlockId());
       if (null == storedBlock) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ad0ac6cc/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java
index 1db445e..421d067 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java
@@ -44,12 +44,14 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
 import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.util.AutoCloseableLock;
 import org.apache.hadoop.util.Daemon;
 import org.apache.hadoop.util.StopWatch;
 import org.apache.hadoop.util.Time;
@@ -583,7 +585,7 @@ public class DirectoryScanner implements Runnable {
     Map<String, ScanInfo[]> diskReport = getDiskReport();
 
     // Hold FSDataset lock to prevent further changes to the block map
-    synchronized(dataset) {
+    try(AutoCloseableLock lock = dataset.acquireDatasetLock()) {
       for (Entry<String, ScanInfo[]> entry : diskReport.entrySet()) {
         String bpid = entry.getKey();
         ScanInfo[] blockpoolReport = entry.getValue();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ad0ac6cc/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
index 788b75b..ac3c5b4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
@@ -60,6 +60,7 @@ import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
 import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
 import org.apache.hadoop.hdfs.server.protocol.StorageReport;
 import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
+import org.apache.hadoop.util.AutoCloseableLock;
 import org.apache.hadoop.util.ReflectionUtils;
 
 /**
@@ -641,4 +642,9 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
    * Confirm whether the block is deleting
    */
   boolean isDeletingBlock(String bpid, long blockId);
+
+  /**
+   * Acquire the lock of the dataset.
+   */
+  AutoCloseableLock acquireDatasetLock();
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ad0ac6cc/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
index 04f887f..a1c2a46 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
@@ -63,6 +63,7 @@ import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSUtilClient;
 import org.apache.hadoop.hdfs.ExtendedBlockId;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.util.AutoCloseableLock;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
 import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
@@ -181,21 +182,26 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
   }
 
   @Override
-  public synchronized FsVolumeImpl getVolume(final ExtendedBlock b) {
-    final ReplicaInfo r =  volumeMap.get(b.getBlockPoolId(), b.getLocalBlock());
-    return r != null? (FsVolumeImpl)r.getVolume(): null;
+  public FsVolumeImpl getVolume(final ExtendedBlock b) {
+    try(AutoCloseableLock lock = datasetLock.acquire()) {
+      final ReplicaInfo r =
+          volumeMap.get(b.getBlockPoolId(), b.getLocalBlock());
+      return r != null ? (FsVolumeImpl) r.getVolume() : null;
+    }
   }
 
   @Override // FsDatasetSpi
-  public synchronized Block getStoredBlock(String bpid, long blkid)
+  public Block getStoredBlock(String bpid, long blkid)
       throws IOException {
-    File blockfile = getFile(bpid, blkid, false);
-    if (blockfile == null) {
-      return null;
+    try(AutoCloseableLock lock = datasetLock.acquire()) {
+      File blockfile = getFile(bpid, blkid, false);
+      if (blockfile == null) {
+        return null;
+      }
+      final File metafile = FsDatasetUtil.findMetaFile(blockfile);
+      final long gs = FsDatasetUtil.parseGenerationStamp(blockfile, metafile);
+      return new Block(blkid, blockfile.length(), gs);
     }
-    final File metafile = FsDatasetUtil.findMetaFile(blockfile);
-    final long gs = FsDatasetUtil.parseGenerationStamp(blockfile, metafile);
-    return new Block(blkid, blockfile.length(), gs);
   }
 
 
@@ -264,7 +270,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
 
   private boolean blockPinningEnabled;
   private final int maxDataLength;
-  
+
+  private final AutoCloseableLock datasetLock;
   /**
    * An FSDataset has a directory where it loads its data files.
    */
@@ -275,6 +282,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
     this.dataStorage = storage;
     this.conf = conf;
     this.smallBufferSize = DFSUtilClient.getSmallBufferSize(conf);
+    this.datasetLock = new AutoCloseableLock();
     // The number of volumes required for operation is the total number
     // of volumes minus the number of failed volumes we can tolerate.
     volFailuresTolerated = datanode.getDnConf().getVolFailuresTolerated();
@@ -381,25 +389,27 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
    * Activate a volume to serve requests.
    * @throws IOException if the storage UUID already exists.
    */
-  private synchronized void activateVolume(
+  private void activateVolume(
       ReplicaMap replicaMap,
       Storage.StorageDirectory sd, StorageType storageType,
       FsVolumeReference ref) throws IOException {
-    DatanodeStorage dnStorage = storageMap.get(sd.getStorageUuid());
-    if (dnStorage != null) {
-      final String errorMsg = String.format(
-          "Found duplicated storage UUID: %s in %s.",
-          sd.getStorageUuid(), sd.getVersionFile());
-      LOG.error(errorMsg);
-      throw new IOException(errorMsg);
-    }
-    volumeMap.addAll(replicaMap);
-    storageMap.put(sd.getStorageUuid(),
-        new DatanodeStorage(sd.getStorageUuid(),
-            DatanodeStorage.State.NORMAL,
-            storageType));
-    asyncDiskService.addVolume(sd.getCurrentDir());
-    volumes.addVolume(ref);
+    try(AutoCloseableLock lock = datasetLock.acquire()) {
+      DatanodeStorage dnStorage = storageMap.get(sd.getStorageUuid());
+      if (dnStorage != null) {
+        final String errorMsg = String.format(
+            "Found duplicated storage UUID: %s in %s.",
+            sd.getStorageUuid(), sd.getVersionFile());
+        LOG.error(errorMsg);
+        throw new IOException(errorMsg);
+      }
+      volumeMap.addAll(replicaMap);
+      storageMap.put(sd.getStorageUuid(),
+          new DatanodeStorage(sd.getStorageUuid(),
+              DatanodeStorage.State.NORMAL,
+              storageType));
+      asyncDiskService.addVolume(sd.getCurrentDir());
+      volumes.addVolume(ref);
+    }
   }
 
   private void addVolume(Collection<StorageLocation> dataLocations,
@@ -494,7 +504,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
 
     Map<String, List<ReplicaInfo>> blkToInvalidate = new HashMap<>();
     List<String> storageToRemove = new ArrayList<>();
-    synchronized (this) {
+    try(AutoCloseableLock lock = datasetLock.acquire()) {
       for (int idx = 0; idx < dataStorage.getNumStorageDirs(); idx++) {
         Storage.StorageDirectory sd = dataStorage.getStorageDir(idx);
         final File absRoot = sd.getRoot().getAbsoluteFile();
@@ -540,7 +550,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
       }
     }
 
-    synchronized (this) {
+    try(AutoCloseableLock lock = datasetLock.acquire()) {
       for(String storageUuid : storageToRemove) {
         storageMap.remove(storageUuid);
       }
@@ -749,7 +759,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
                                          boolean touch)
       throws IOException {
     final File f;
-    synchronized(this) {
+    try(AutoCloseableLock lock = datasetLock.acquire()) {
       f = getFile(b.getBlockPoolId(), b.getLocalBlock().getBlockId(), touch);
     }
     if (f == null) {
@@ -815,22 +825,25 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
    * Returns handles to the block file and its metadata file
    */
   @Override // FsDatasetSpi
-  public synchronized ReplicaInputStreams getTmpInputStreams(ExtendedBlock b,
+  public ReplicaInputStreams getTmpInputStreams(ExtendedBlock b,
       long blkOffset, long metaOffset) throws IOException {
-    ReplicaInfo info = getReplicaInfo(b);
-    FsVolumeReference ref = info.getVolume().obtainReference();
-    try {
-      InputStream blockInStream = openAndSeek(info.getBlockFile(), blkOffset);
+    try(AutoCloseableLock lock = datasetLock.acquire()) {
+      ReplicaInfo info = getReplicaInfo(b);
+      FsVolumeReference ref = info.getVolume().obtainReference();
       try {
-        InputStream metaInStream = openAndSeek(info.getMetaFile(), metaOffset);
-        return new ReplicaInputStreams(blockInStream, metaInStream, ref);
+        InputStream blockInStream = openAndSeek(info.getBlockFile(), blkOffset);
+        try {
+          InputStream metaInStream =
+              openAndSeek(info.getMetaFile(), metaOffset);
+          return new ReplicaInputStreams(blockInStream, metaInStream, ref);
+        } catch (IOException e) {
+          IOUtils.cleanup(null, blockInStream);
+          throw e;
+        }
       } catch (IOException e) {
-        IOUtils.cleanup(null, blockInStream);
+        IOUtils.cleanup(null, ref);
         throw e;
       }
-    } catch (IOException e) {
-      IOUtils.cleanup(null, ref);
-      throw e;
     }
   }
 
@@ -949,7 +962,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
     }
 
     FsVolumeReference volumeRef = null;
-    synchronized (this) {
+    try(AutoCloseableLock lock = datasetLock.acquire()) {
       volumeRef = volumes.getNextVolume(targetStorageType, block.getNumBytes());
     }
     try {
@@ -968,7 +981,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
       newReplicaInfo.setNumBytes(blockFiles[1].length());
       // Finalize the copied files
       newReplicaInfo = finalizeReplica(block.getBlockPoolId(), newReplicaInfo);
-      synchronized (this) {
+      try(AutoCloseableLock lock = datasetLock.acquire()) {
         // Increment numBlocks here as this block moved without knowing to BPS
         FsVolumeImpl volume = (FsVolumeImpl) newReplicaInfo.getVolume();
         volume.getBlockPoolSlice(block.getBlockPoolId()).incrNumBlocks();
@@ -1100,41 +1113,43 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
 
 
   @Override  // FsDatasetSpi
-  public synchronized ReplicaHandler append(ExtendedBlock b,
+  public ReplicaHandler append(ExtendedBlock b,
       long newGS, long expectedBlockLen) throws IOException {
-    // If the block was successfully finalized because all packets
-    // were successfully processed at the Datanode but the ack for
-    // some of the packets were not received by the client. The client 
-    // re-opens the connection and retries sending those packets.
-    // The other reason is that an "append" is occurring to this block.
-    
-    // check the validity of the parameter
-    if (newGS < b.getGenerationStamp()) {
-      throw new IOException("The new generation stamp " + newGS + 
-          " should be greater than the replica " + b + "'s generation stamp");
-    }
-    ReplicaInfo replicaInfo = getReplicaInfo(b);
-    LOG.info("Appending to " + replicaInfo);
-    if (replicaInfo.getState() != ReplicaState.FINALIZED) {
-      throw new ReplicaNotFoundException(
-          ReplicaNotFoundException.UNFINALIZED_REPLICA + b);
-    }
-    if (replicaInfo.getNumBytes() != expectedBlockLen) {
-      throw new IOException("Corrupted replica " + replicaInfo + 
-          " with a length of " + replicaInfo.getNumBytes() + 
-          " expected length is " + expectedBlockLen);
-    }
+    try(AutoCloseableLock lock = datasetLock.acquire()) {
+      // If the block was successfully finalized because all packets
+      // were successfully processed at the Datanode but the ack for
+      // some of the packets were not received by the client. The client
+      // re-opens the connection and retries sending those packets.
+      // The other reason is that an "append" is occurring to this block.
+
+      // check the validity of the parameter
+      if (newGS < b.getGenerationStamp()) {
+        throw new IOException("The new generation stamp " + newGS +
+            " should be greater than the replica " + b + "'s generation stamp");
+      }
+      ReplicaInfo replicaInfo = getReplicaInfo(b);
+      LOG.info("Appending to " + replicaInfo);
+      if (replicaInfo.getState() != ReplicaState.FINALIZED) {
+        throw new ReplicaNotFoundException(
+            ReplicaNotFoundException.UNFINALIZED_REPLICA + b);
+      }
+      if (replicaInfo.getNumBytes() != expectedBlockLen) {
+        throw new IOException("Corrupted replica " + replicaInfo +
+            " with a length of " + replicaInfo.getNumBytes() +
+            " expected length is " + expectedBlockLen);
+      }
 
-    FsVolumeReference ref = replicaInfo.getVolume().obtainReference();
-    ReplicaBeingWritten replica = null;
-    try {
-      replica = append(b.getBlockPoolId(), (FinalizedReplica)replicaInfo, newGS,
-          b.getNumBytes());
-    } catch (IOException e) {
-      IOUtils.cleanup(null, ref);
-      throw e;
+      FsVolumeReference ref = replicaInfo.getVolume().obtainReference();
+      ReplicaBeingWritten replica = null;
+      try {
+        replica = append(b.getBlockPoolId(), (FinalizedReplica) replicaInfo,
+            newGS, b.getNumBytes());
+      } catch (IOException e) {
+        IOUtils.cleanup(null, ref);
+        throw e;
+      }
+      return new ReplicaHandler(replica, ref);
     }
-    return new ReplicaHandler(replica, ref);
   }
   
   /** Append to a finalized replica
@@ -1149,66 +1164,68 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
    * @throws IOException if moving the replica from finalized directory 
    *         to rbw directory fails
    */
-  private synchronized ReplicaBeingWritten append(String bpid,
+  private ReplicaBeingWritten append(String bpid,
       FinalizedReplica replicaInfo, long newGS, long estimateBlockLen)
       throws IOException {
-    // If the block is cached, start uncaching it.
-    cacheManager.uncacheBlock(bpid, replicaInfo.getBlockId());
-
-    // If there are any hardlinks to the block, break them.  This ensures we are
-    // not appending to a file that is part of a previous/ directory.
-    replicaInfo.breakHardLinksIfNeeded();
-    
-    // construct a RBW replica with the new GS
-    File blkfile = replicaInfo.getBlockFile();
-    FsVolumeImpl v = (FsVolumeImpl)replicaInfo.getVolume();
-    long bytesReserved = estimateBlockLen - replicaInfo.getNumBytes();
-    if (v.getAvailable() < bytesReserved) {
-      throw new DiskOutOfSpaceException("Insufficient space for appending to "
-          + replicaInfo);
-    }
-    File newBlkFile = new File(v.getRbwDir(bpid), replicaInfo.getBlockName());
-    File oldmeta = replicaInfo.getMetaFile();
-    ReplicaBeingWritten newReplicaInfo = new ReplicaBeingWritten(
-        replicaInfo.getBlockId(), replicaInfo.getNumBytes(), newGS,
-        v, newBlkFile.getParentFile(), Thread.currentThread(), bytesReserved);
-    File newmeta = newReplicaInfo.getMetaFile();
-
-    // rename meta file to rbw directory
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Renaming " + oldmeta + " to " + newmeta);
-    }
-    try {
-      NativeIO.renameTo(oldmeta, newmeta);
-    } catch (IOException e) {
-      throw new IOException("Block " + replicaInfo + " reopen failed. " +
-                            " Unable to move meta file  " + oldmeta +
-                            " to rbw dir " + newmeta, e);
-    }
+    try(AutoCloseableLock lock = datasetLock.acquire()) {
+      // If the block is cached, start uncaching it.
+      cacheManager.uncacheBlock(bpid, replicaInfo.getBlockId());
+
+      // If there are any hardlinks to the block, break them.  This ensures we
+      // are not appending to a file that is part of a previous/ directory.
+      replicaInfo.breakHardLinksIfNeeded();
+
+      // construct a RBW replica with the new GS
+      File blkfile = replicaInfo.getBlockFile();
+      FsVolumeImpl v = (FsVolumeImpl) replicaInfo.getVolume();
+      long bytesReserved = estimateBlockLen - replicaInfo.getNumBytes();
+      if (v.getAvailable() < bytesReserved) {
+        throw new DiskOutOfSpaceException("Insufficient space for appending to "
+            + replicaInfo);
+      }
+      File newBlkFile = new File(v.getRbwDir(bpid), replicaInfo.getBlockName());
+      File oldmeta = replicaInfo.getMetaFile();
+      ReplicaBeingWritten newReplicaInfo = new ReplicaBeingWritten(
+          replicaInfo.getBlockId(), replicaInfo.getNumBytes(), newGS,
+          v, newBlkFile.getParentFile(), Thread.currentThread(), bytesReserved);
+      File newmeta = newReplicaInfo.getMetaFile();
+
+      // rename meta file to rbw directory
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Renaming " + oldmeta + " to " + newmeta);
+      }
+      try {
+        NativeIO.renameTo(oldmeta, newmeta);
+      } catch (IOException e) {
+        throw new IOException("Block " + replicaInfo + " reopen failed. " +
+            " Unable to move meta file  " + oldmeta +
+            " to rbw dir " + newmeta, e);
+      }
 
-    // rename block file to rbw directory
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Renaming " + blkfile + " to " + newBlkFile
-          + ", file length=" + blkfile.length());
-    }
-    try {
-      NativeIO.renameTo(blkfile, newBlkFile);
-    } catch (IOException e) {
+      // rename block file to rbw directory
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Renaming " + blkfile + " to " + newBlkFile
+            + ", file length=" + blkfile.length());
+      }
       try {
-        NativeIO.renameTo(newmeta, oldmeta);
-      } catch (IOException ex) {
-        LOG.warn("Cannot move meta file " + newmeta + 
-            "back to the finalized directory " + oldmeta, ex);
+        NativeIO.renameTo(blkfile, newBlkFile);
+      } catch (IOException e) {
+        try {
+          NativeIO.renameTo(newmeta, oldmeta);
+        } catch (IOException ex) {
+          LOG.warn("Cannot move meta file " + newmeta +
+              "back to the finalized directory " + oldmeta, ex);
+        }
+        throw new IOException("Block " + replicaInfo + " reopen failed. " +
+            " Unable to move block file " + blkfile +
+            " to rbw dir " + newBlkFile, e);
       }
-      throw new IOException("Block " + replicaInfo + " reopen failed. " +
-                              " Unable to move block file " + blkfile +
-                              " to rbw dir " + newBlkFile, e);
+
+      // Replace finalized replica by a RBW replica in replicas map
+      volumeMap.add(bpid, newReplicaInfo);
+      v.reserveSpaceForReplica(bytesReserved);
+      return newReplicaInfo;
     }
-    
-    // Replace finalized replica by a RBW replica in replicas map
-    volumeMap.add(bpid, newReplicaInfo);
-    v.reserveSpaceForReplica(bytesReserved);
-    return newReplicaInfo;
   }
 
   private static class MustStopExistingWriter extends Exception {
@@ -1278,7 +1295,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
 
     while (true) {
       try {
-        synchronized (this) {
+        try(AutoCloseableLock lock = datasetLock.acquire()) {
           ReplicaInfo replicaInfo = recoverCheck(b, newGS, expectedBlockLen);
 
           FsVolumeReference ref = replicaInfo.getVolume().obtainReference();
@@ -1310,7 +1327,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
     LOG.info("Recover failed close " + b);
     while (true) {
       try {
-        synchronized (this) {
+        try(AutoCloseableLock lock = datasetLock.acquire()) {
           // check replica's state
           ReplicaInfo replicaInfo = recoverCheck(b, newGS, expectedBlockLen);
           // bump the replica's GS
@@ -1357,62 +1374,65 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
   }
 
   @Override // FsDatasetSpi
-  public synchronized ReplicaHandler createRbw(
+  public ReplicaHandler createRbw(
       StorageType storageType, ExtendedBlock b, boolean allowLazyPersist)
       throws IOException {
-    ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(),
-        b.getBlockId());
-    if (replicaInfo != null) {
-      throw new ReplicaAlreadyExistsException("Block " + b +
-      " already exists in state " + replicaInfo.getState() +
-      " and thus cannot be created.");
-    }
-    // create a new block
-    FsVolumeReference ref = null;
-
-    // Use ramdisk only if block size is a multiple of OS page size.
-    // This simplifies reservation for partially used replicas
-    // significantly.
-    if (allowLazyPersist &&
-        lazyWriter != null &&
-        b.getNumBytes() % cacheManager.getOsPageSize() == 0 &&
-        reserveLockedMemory(b.getNumBytes())) {
-      try {
-        // First try to place the block on a transient volume.
-        ref = volumes.getNextTransientVolume(b.getNumBytes());
-        datanode.getMetrics().incrRamDiskBlocksWrite();
-      } catch(DiskOutOfSpaceException de) {
-        // Ignore the exception since we just fall back to persistent storage.
-      } finally {
-        if (ref == null) {
-          cacheManager.release(b.getNumBytes());
+    try(AutoCloseableLock lock = datasetLock.acquire()) {
+      ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(),
+          b.getBlockId());
+      if (replicaInfo != null) {
+        throw new ReplicaAlreadyExistsException("Block " + b +
+            " already exists in state " + replicaInfo.getState() +
+            " and thus cannot be created.");
+      }
+      // create a new block
+      FsVolumeReference ref = null;
+
+      // Use ramdisk only if block size is a multiple of OS page size.
+      // This simplifies reservation for partially used replicas
+      // significantly.
+      if (allowLazyPersist &&
+          lazyWriter != null &&
+          b.getNumBytes() % cacheManager.getOsPageSize() == 0 &&
+          reserveLockedMemory(b.getNumBytes())) {
+        try {
+          // First try to place the block on a transient volume.
+          ref = volumes.getNextTransientVolume(b.getNumBytes());
+          datanode.getMetrics().incrRamDiskBlocksWrite();
+        } catch (DiskOutOfSpaceException de) {
+          // Ignore the exception since we just fall back to persistent storage.
+        } finally {
+          if (ref == null) {
+            cacheManager.release(b.getNumBytes());
+          }
         }
       }
-    }
 
-    if (ref == null) {
-      ref = volumes.getNextVolume(storageType, b.getNumBytes());
-    }
+      if (ref == null) {
+        ref = volumes.getNextVolume(storageType, b.getNumBytes());
+      }
 
-    FsVolumeImpl v = (FsVolumeImpl) ref.getVolume();
-    // create an rbw file to hold block in the designated volume
+      FsVolumeImpl v = (FsVolumeImpl) ref.getVolume();
+      // create an rbw file to hold block in the designated volume
 
-    if (allowLazyPersist && !v.isTransientStorage()) {
-      datanode.getMetrics().incrRamDiskBlocksWriteFallback();
-    }
+      if (allowLazyPersist && !v.isTransientStorage()) {
+        datanode.getMetrics().incrRamDiskBlocksWriteFallback();
+      }
 
-    File f;
-    try {
-      f = v.createRbwFile(b.getBlockPoolId(), b.getLocalBlock());
-    } catch (IOException e) {
-      IOUtils.cleanup(null, ref);
-      throw e;
-    }
+      File f;
+      try {
+        f = v.createRbwFile(b.getBlockPoolId(), b.getLocalBlock());
+      } catch (IOException e) {
+        IOUtils.cleanup(null, ref);
+        throw e;
+      }
 
-    ReplicaBeingWritten newReplicaInfo = new ReplicaBeingWritten(b.getBlockId(), 
-        b.getGenerationStamp(), v, f.getParentFile(), b.getNumBytes());
-    volumeMap.add(b.getBlockPoolId(), newReplicaInfo);
-    return new ReplicaHandler(newReplicaInfo, ref);
+      ReplicaBeingWritten newReplicaInfo =
+          new ReplicaBeingWritten(b.getBlockId(),
+          b.getGenerationStamp(), v, f.getParentFile(), b.getNumBytes());
+      volumeMap.add(b.getBlockPoolId(), newReplicaInfo);
+      return new ReplicaHandler(newReplicaInfo, ref);
+    }
   }
 
   @Override // FsDatasetSpi
@@ -1423,7 +1443,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
 
     while (true) {
       try {
-        synchronized (this) {
+        try(AutoCloseableLock lock = datasetLock.acquire()) {
           ReplicaInfo replicaInfo = getReplicaInfo(b.getBlockPoolId(), b.getBlockId());
           
           // check the replica's state
@@ -1444,61 +1464,64 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
     }
   }
 
-  private synchronized ReplicaHandler recoverRbwImpl(ReplicaBeingWritten rbw,
+  private ReplicaHandler recoverRbwImpl(ReplicaBeingWritten rbw,
       ExtendedBlock b, long newGS, long minBytesRcvd, long maxBytesRcvd)
       throws IOException {
-    // check generation stamp
-    long replicaGenerationStamp = rbw.getGenerationStamp();
-    if (replicaGenerationStamp < b.getGenerationStamp() ||
-        replicaGenerationStamp > newGS) {
-      throw new ReplicaNotFoundException(
-          ReplicaNotFoundException.UNEXPECTED_GS_REPLICA + b +
-          ". Expected GS range is [" + b.getGenerationStamp() + ", " + 
-          newGS + "].");
-    }
-    
-    // check replica length
-    long bytesAcked = rbw.getBytesAcked();
-    long numBytes = rbw.getNumBytes();
-    if (bytesAcked < minBytesRcvd || numBytes > maxBytesRcvd){
-      throw new ReplicaNotFoundException("Unmatched length replica " + 
-          rbw + ": BytesAcked = " + bytesAcked + 
-          " BytesRcvd = " + numBytes + " are not in the range of [" + 
-          minBytesRcvd + ", " + maxBytesRcvd + "].");
-    }
+    try(AutoCloseableLock lock = datasetLock.acquire()) {
+      // check generation stamp
+      long replicaGenerationStamp = rbw.getGenerationStamp();
+      if (replicaGenerationStamp < b.getGenerationStamp() ||
+          replicaGenerationStamp > newGS) {
+        throw new ReplicaNotFoundException(
+            ReplicaNotFoundException.UNEXPECTED_GS_REPLICA + b +
+                ". Expected GS range is [" + b.getGenerationStamp() + ", " +
+                newGS + "].");
+      }
 
-    FsVolumeReference ref = rbw.getVolume().obtainReference();
-    try {
-      // Truncate the potentially corrupt portion.
-      // If the source was client and the last node in the pipeline was lost,
-      // any corrupt data written after the acked length can go unnoticed.
-      if (numBytes > bytesAcked) {
-        final File replicafile = rbw.getBlockFile();
-        truncateBlock(replicafile, rbw.getMetaFile(), numBytes, bytesAcked);
-        rbw.setNumBytes(bytesAcked);
-        rbw.setLastChecksumAndDataLen(bytesAcked, null);
-      }
-
-      // bump the replica's generation stamp to newGS
-      bumpReplicaGS(rbw, newGS);
-    } catch (IOException e) {
-      IOUtils.cleanup(null, ref);
-      throw e;
+      // check replica length
+      long bytesAcked = rbw.getBytesAcked();
+      long numBytes = rbw.getNumBytes();
+      if (bytesAcked < minBytesRcvd || numBytes > maxBytesRcvd) {
+        throw new ReplicaNotFoundException("Unmatched length replica " +
+            rbw + ": BytesAcked = " + bytesAcked +
+            " BytesRcvd = " + numBytes + " are not in the range of [" +
+            minBytesRcvd + ", " + maxBytesRcvd + "].");
+      }
+
+      FsVolumeReference ref = rbw.getVolume().obtainReference();
+      try {
+        // Truncate the potentially corrupt portion.
+        // If the source was client and the last node in the pipeline was lost,
+        // any corrupt data written after the acked length can go unnoticed.
+        if (numBytes > bytesAcked) {
+          final File replicafile = rbw.getBlockFile();
+          truncateBlock(replicafile, rbw.getMetaFile(), numBytes, bytesAcked);
+          rbw.setNumBytes(bytesAcked);
+          rbw.setLastChecksumAndDataLen(bytesAcked, null);
+        }
+
+        // bump the replica's generation stamp to newGS
+        bumpReplicaGS(rbw, newGS);
+      } catch (IOException e) {
+        IOUtils.cleanup(null, ref);
+        throw e;
+      }
+      return new ReplicaHandler(rbw, ref);
     }
-    return new ReplicaHandler(rbw, ref);
   }
   
   @Override // FsDatasetSpi
-  public synchronized ReplicaInPipeline convertTemporaryToRbw(
+  public ReplicaInPipeline convertTemporaryToRbw(
       final ExtendedBlock b) throws IOException {
-    final long blockId = b.getBlockId();
-    final long expectedGs = b.getGenerationStamp();
-    final long visible = b.getNumBytes();
-    LOG.info("Convert " + b + " from Temporary to RBW, visible length="
-        + visible);
-
-    final ReplicaInPipeline temp;
-    {
+    try(AutoCloseableLock lock = datasetLock.acquire()) {
+      final long blockId = b.getBlockId();
+      final long expectedGs = b.getGenerationStamp();
+      final long visible = b.getNumBytes();
+      LOG.info("Convert " + b + " from Temporary to RBW, visible length="
+          + visible);
+
+      final ReplicaInPipeline temp;
+
       // get replica
       final ReplicaInfo r = volumeMap.get(b.getBlockPoolId(), blockId);
       if (r == null) {
@@ -1510,43 +1533,44 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
         throw new ReplicaAlreadyExistsException(
             "r.getState() != ReplicaState.TEMPORARY, r=" + r);
       }
-      temp = (ReplicaInPipeline)r;
-    }
-    // check generation stamp
-    if (temp.getGenerationStamp() != expectedGs) {
-      throw new ReplicaAlreadyExistsException(
-          "temp.getGenerationStamp() != expectedGs = " + expectedGs
-          + ", temp=" + temp);
-    }
+      temp = (ReplicaInPipeline) r;
+
+      // check generation stamp
+      if (temp.getGenerationStamp() != expectedGs) {
+        throw new ReplicaAlreadyExistsException(
+            "temp.getGenerationStamp() != expectedGs = " + expectedGs
+                + ", temp=" + temp);
+      }
 
-    // TODO: check writer?
-    // set writer to the current thread
-    // temp.setWriter(Thread.currentThread());
+      // TODO: check writer?
+      // set writer to the current thread
+      // temp.setWriter(Thread.currentThread());
 
-    // check length
-    final long numBytes = temp.getNumBytes();
-    if (numBytes < visible) {
-      throw new IOException(numBytes + " = numBytes < visible = "
-          + visible + ", temp=" + temp);
-    }
-    // check volume
-    final FsVolumeImpl v = (FsVolumeImpl)temp.getVolume();
-    if (v == null) {
-      throw new IOException("r.getVolume() = null, temp="  + temp);
+      // check length
+      final long numBytes = temp.getNumBytes();
+      if (numBytes < visible) {
+        throw new IOException(numBytes + " = numBytes < visible = "
+            + visible + ", temp=" + temp);
+      }
+      // check volume
+      final FsVolumeImpl v = (FsVolumeImpl) temp.getVolume();
+      if (v == null) {
+        throw new IOException("r.getVolume() = null, temp=" + temp);
+      }
+
+      // move block files to the rbw directory
+      BlockPoolSlice bpslice = v.getBlockPoolSlice(b.getBlockPoolId());
+      final File dest = moveBlockFiles(b.getLocalBlock(), temp.getBlockFile(),
+          bpslice.getRbwDir());
+      // create RBW
+      final ReplicaBeingWritten rbw = new ReplicaBeingWritten(
+          blockId, numBytes, expectedGs,
+          v, dest.getParentFile(), Thread.currentThread(), 0);
+      rbw.setBytesAcked(visible);
+      // overwrite the RBW in the volume map
+      volumeMap.add(b.getBlockPoolId(), rbw);
+      return rbw;
     }
-    
-    // move block files to the rbw directory
-    BlockPoolSlice bpslice = v.getBlockPoolSlice(b.getBlockPoolId());
-    final File dest = moveBlockFiles(b.getLocalBlock(), temp.getBlockFile(), 
-        bpslice.getRbwDir());
-    // create RBW
-    final ReplicaBeingWritten rbw = new ReplicaBeingWritten(
-        blockId, numBytes, expectedGs,
-        v, dest.getParentFile(), Thread.currentThread(), 0);
-    rbw.setBytesAcked(visible);
-    // overwrite the RBW in the volume map
-    volumeMap.add(b.getBlockPoolId(), rbw);
-    return rbw;
   }
 
   @Override // FsDatasetSpi
@@ -1556,7 +1580,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
     long writerStopTimeoutMs = datanode.getDnConf().getXceiverStopTimeout();
     ReplicaInfo lastFoundReplicaInfo = null;
     do {
-      synchronized (this) {
+      try(AutoCloseableLock lock = datasetLock.acquire()) {
         ReplicaInfo currentReplicaInfo =
             volumeMap.get(b.getBlockPoolId(), b.getBlockId());
         if (currentReplicaInfo == lastFoundReplicaInfo) {
@@ -1635,72 +1659,82 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
    * Complete the block write!
    */
   @Override // FsDatasetSpi
-  public synchronized void finalizeBlock(ExtendedBlock b) throws IOException {
-    if (Thread.interrupted()) {
-      // Don't allow data modifications from interrupted threads
-      throw new IOException("Cannot finalize block from Interrupted Thread");
-    }
-    ReplicaInfo replicaInfo = getReplicaInfo(b);
-    if (replicaInfo.getState() == ReplicaState.FINALIZED) {
-      // this is legal, when recovery happens on a file that has
-      // been opened for append but never modified
-      return;
+  public void finalizeBlock(ExtendedBlock b) throws IOException {
+    try(AutoCloseableLock lock = datasetLock.acquire()) {
+      if (Thread.interrupted()) {
+        // Don't allow data modifications from interrupted threads
+        throw new IOException("Cannot finalize block from Interrupted Thread");
+      }
+      ReplicaInfo replicaInfo = getReplicaInfo(b);
+      if (replicaInfo.getState() == ReplicaState.FINALIZED) {
+        // this is legal, when recovery happens on a file that has
+        // been opened for append but never modified
+        return;
+      }
+      finalizeReplica(b.getBlockPoolId(), replicaInfo);
     }
-    finalizeReplica(b.getBlockPoolId(), replicaInfo);
   }
   
-  private synchronized FinalizedReplica finalizeReplica(String bpid,
+  private FinalizedReplica finalizeReplica(String bpid,
       ReplicaInfo replicaInfo) throws IOException {
-    FinalizedReplica newReplicaInfo = null;
-    if (replicaInfo.getState() == ReplicaState.RUR &&
-       ((ReplicaUnderRecovery)replicaInfo).getOriginalReplica().getState() == 
-         ReplicaState.FINALIZED) {
-      newReplicaInfo = (FinalizedReplica)
-             ((ReplicaUnderRecovery)replicaInfo).getOriginalReplica();
-    } else {
-      FsVolumeImpl v = (FsVolumeImpl)replicaInfo.getVolume();
-      File f = replicaInfo.getBlockFile();
-      if (v == null) {
-        throw new IOException("No volume for temporary file " + f + 
-            " for block " + replicaInfo);
-      }
+    try(AutoCloseableLock lock = datasetLock.acquire()) {
+      FinalizedReplica newReplicaInfo = null;
+      if (replicaInfo.getState() == ReplicaState.RUR &&
+          ((ReplicaUnderRecovery) replicaInfo).getOriginalReplica().getState()
+              == ReplicaState.FINALIZED) {
+        newReplicaInfo = (FinalizedReplica)
+            ((ReplicaUnderRecovery) replicaInfo).getOriginalReplica();
+      } else {
+        FsVolumeImpl v = (FsVolumeImpl) replicaInfo.getVolume();
+        File f = replicaInfo.getBlockFile();
+        if (v == null) {
+          throw new IOException("No volume for temporary file " + f +
+              " for block " + replicaInfo);
+        }
 
-      File dest = v.addFinalizedBlock(
-          bpid, replicaInfo, f, replicaInfo.getBytesReserved());
-      newReplicaInfo = new FinalizedReplica(replicaInfo, v, dest.getParentFile());
+        File dest = v.addFinalizedBlock(
+            bpid, replicaInfo, f, replicaInfo.getBytesReserved());
+        newReplicaInfo =
+            new FinalizedReplica(replicaInfo, v, dest.getParentFile());
 
-      if (v.isTransientStorage()) {
-        releaseLockedMemory(
-            replicaInfo.getOriginalBytesReserved() - replicaInfo.getNumBytes(),
-            false);
-        ramDiskReplicaTracker.addReplica(
-            bpid, replicaInfo.getBlockId(), v, replicaInfo.getNumBytes());
-        datanode.getMetrics().addRamDiskBytesWrite(replicaInfo.getNumBytes());
+        if (v.isTransientStorage()) {
+          releaseLockedMemory(
+              replicaInfo.getOriginalBytesReserved()
+                  - replicaInfo.getNumBytes(),
+              false);
+          ramDiskReplicaTracker.addReplica(
+              bpid, replicaInfo.getBlockId(), v, replicaInfo.getNumBytes());
+          datanode.getMetrics().addRamDiskBytesWrite(replicaInfo.getNumBytes());
+        }
       }
-    }
-    volumeMap.add(bpid, newReplicaInfo);
+      volumeMap.add(bpid, newReplicaInfo);
 
-    return newReplicaInfo;
+      return newReplicaInfo;
+    }
   }
 
   /**
    * Remove the temporary block file (if any)
    */
   @Override // FsDatasetSpi
-  public synchronized void unfinalizeBlock(ExtendedBlock b) throws IOException {
-    ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(), 
-        b.getLocalBlock());
-    if (replicaInfo != null && replicaInfo.getState() == ReplicaState.TEMPORARY) {
-      // remove from volumeMap
-      volumeMap.remove(b.getBlockPoolId(), b.getLocalBlock());
-
-      // delete the on-disk temp file
-      if (delBlockFromDisk(replicaInfo.getBlockFile(), 
-          replicaInfo.getMetaFile(), b.getLocalBlock())) {
-        LOG.warn("Block " + b + " unfinalized and removed. " );
-      }
-      if (replicaInfo.getVolume().isTransientStorage()) {
-        ramDiskReplicaTracker.discardReplica(b.getBlockPoolId(), b.getBlockId(), true);
+  public void unfinalizeBlock(ExtendedBlock b) throws IOException {
+    try(AutoCloseableLock lock = datasetLock.acquire()) {
+      ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(),
+          b.getLocalBlock());
+      if (replicaInfo != null
+          && replicaInfo.getState() == ReplicaState.TEMPORARY) {
+        // remove from volumeMap
+        volumeMap.remove(b.getBlockPoolId(), b.getLocalBlock());
+
+        // delete the on-disk temp file
+        if (delBlockFromDisk(replicaInfo.getBlockFile(),
+            replicaInfo.getMetaFile(), b.getLocalBlock())) {
+          LOG.warn("Block " + b + " unfinalized and removed. ");
+        }
+        if (replicaInfo.getVolume().isTransientStorage()) {
+          ramDiskReplicaTracker.discardReplica(b.getBlockPoolId(),
+              b.getBlockId(), true);
+        }
       }
     }
   }
@@ -1739,7 +1773,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
         new HashMap<String, BlockListAsLongs.Builder>();
 
     List<FsVolumeImpl> curVolumes = null;
-    synchronized(this) {
+    try(AutoCloseableLock lock = datasetLock.acquire()) {
       curVolumes = volumes.getVolumes();
       for (FsVolumeSpi v : curVolumes) {
         builders.put(v.getStorageID(), BlockListAsLongs.builder(maxDataLength));
@@ -1792,31 +1826,36 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
    * Get the list of finalized blocks from in-memory blockmap for a block pool.
    */
   @Override
-  public synchronized List<FinalizedReplica> getFinalizedBlocks(String bpid) {
-    ArrayList<FinalizedReplica> finalized =
-        new ArrayList<FinalizedReplica>(volumeMap.size(bpid));
-    for (ReplicaInfo b : volumeMap.replicas(bpid)) {
-      if(b.getState() == ReplicaState.FINALIZED) {
-        finalized.add(new FinalizedReplica((FinalizedReplica)b));
+  public List<FinalizedReplica> getFinalizedBlocks(String bpid) {
+    try(AutoCloseableLock lock = datasetLock.acquire()) {
+      ArrayList<FinalizedReplica> finalized =
+          new ArrayList<FinalizedReplica>(volumeMap.size(bpid));
+      for (ReplicaInfo b : volumeMap.replicas(bpid)) {
+        if (b.getState() == ReplicaState.FINALIZED) {
+          finalized.add(new FinalizedReplica((FinalizedReplica) b));
+        }
       }
+      return finalized;
     }
-    return finalized;
   }
 
   /**
    * Get the list of finalized blocks from in-memory blockmap for a block pool.
    */
   @Override
-  public synchronized List<FinalizedReplica> getFinalizedBlocksOnPersistentStorage(String bpid) {
-    ArrayList<FinalizedReplica> finalized =
-        new ArrayList<FinalizedReplica>(volumeMap.size(bpid));
-    for (ReplicaInfo b : volumeMap.replicas(bpid)) {
-      if(!b.getVolume().isTransientStorage() &&
-         b.getState() == ReplicaState.FINALIZED) {
-        finalized.add(new FinalizedReplica((FinalizedReplica)b));
+  public List<FinalizedReplica>
+      getFinalizedBlocksOnPersistentStorage(String bpid) {
+    try(AutoCloseableLock lock = datasetLock.acquire()) {
+      ArrayList<FinalizedReplica> finalized =
+          new ArrayList<FinalizedReplica>(volumeMap.size(bpid));
+      for (ReplicaInfo b : volumeMap.replicas(bpid)) {
+        if (!b.getVolume().isTransientStorage() &&
+            b.getState() == ReplicaState.FINALIZED) {
+          finalized.add(new FinalizedReplica((FinalizedReplica) b));
+        }
       }
+      return finalized;
     }
-    return finalized;
   }
 
   /**
@@ -1892,7 +1931,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
   File validateBlockFile(String bpid, long blockId) {
     //Should we check for metadata file too?
     final File f;
-    synchronized(this) {
+    try(AutoCloseableLock lock = datasetLock.acquire()) {
       f = getFile(bpid, blockId, false);
     }
     
@@ -1941,7 +1980,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
     for (int i = 0; i < invalidBlks.length; i++) {
       final File f;
       final FsVolumeImpl v;
-      synchronized (this) {
+      try(AutoCloseableLock lock = datasetLock.acquire()) {
         final ReplicaInfo info = volumeMap.get(bpid, invalidBlks[i]);
         if (info == null) {
           // It is okay if the block is not found -- it may be deleted earlier.
@@ -2052,7 +2091,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
     long length, genstamp;
     Executor volumeExecutor;
 
-    synchronized (this) {
+    try(AutoCloseableLock lock = datasetLock.acquire()) {
       ReplicaInfo info = volumeMap.get(bpid, blockId);
       boolean success = false;
       try {
@@ -2119,9 +2158,11 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
   }
 
   @Override // FsDatasetSpi
-  public synchronized boolean contains(final ExtendedBlock block) {
-    final long blockId = block.getLocalBlock().getBlockId();
-    return getFile(block.getBlockPoolId(), blockId, false) != null;
+  public boolean contains(final ExtendedBlock block) {
+    try(AutoCloseableLock lock = datasetLock.acquire()) {
+      final long blockId = block.getLocalBlock().getBlockId();
+      return getFile(block.getBlockPoolId(), blockId, false) != null;
+    }
   }
 
   /**
@@ -2247,7 +2288,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
       File diskMetaFile, FsVolumeSpi vol) throws IOException {
     Block corruptBlock = null;
     ReplicaInfo memBlockInfo;
-    synchronized (this) {
+    try(AutoCloseableLock lock = datasetLock.acquire()) {
       memBlockInfo = volumeMap.get(bpid, blockId);
       if (memBlockInfo != null && memBlockInfo.getState() != ReplicaState.FINALIZED) {
         // Block is not finalized - ignore the difference
@@ -2403,9 +2444,11 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
   }
 
   @Override 
-  public synchronized String getReplicaString(String bpid, long blockId) {
-    final Replica r = volumeMap.get(bpid, blockId);
-    return r == null? "null": r.toString();
+  public String getReplicaString(String bpid, long blockId) {
+    try(AutoCloseableLock lock = datasetLock.acquire()) {
+      final Replica r = volumeMap.get(bpid, blockId);
+      return r == null ? "null" : r.toString();
+    }
   }
 
   @Override // FsDatasetSpi
@@ -2498,67 +2541,69 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
   }
 
   @Override // FsDatasetSpi
-  public synchronized Replica updateReplicaUnderRecovery(
+  public Replica updateReplicaUnderRecovery(
                                     final ExtendedBlock oldBlock,
                                     final long recoveryId,
                                     final long newBlockId,
                                     final long newlength) throws IOException {
-    //get replica
-    final String bpid = oldBlock.getBlockPoolId();
-    final ReplicaInfo replica = volumeMap.get(bpid, oldBlock.getBlockId());
-    LOG.info("updateReplica: " + oldBlock
-                 + ", recoveryId=" + recoveryId
-                 + ", length=" + newlength
-                 + ", replica=" + replica);
+    try(AutoCloseableLock lock = datasetLock.acquire()) {
+      //get replica
+      final String bpid = oldBlock.getBlockPoolId();
+      final ReplicaInfo replica = volumeMap.get(bpid, oldBlock.getBlockId());
+      LOG.info("updateReplica: " + oldBlock
+          + ", recoveryId=" + recoveryId
+          + ", length=" + newlength
+          + ", replica=" + replica);
 
-    //check replica
-    if (replica == null) {
-      throw new ReplicaNotFoundException(oldBlock);
-    }
+      //check replica
+      if (replica == null) {
+        throw new ReplicaNotFoundException(oldBlock);
+      }
 
-    //check replica state
-    if (replica.getState() != ReplicaState.RUR) {
-      throw new IOException("replica.getState() != " + ReplicaState.RUR
-          + ", replica=" + replica);
-    }
+      //check replica state
+      if (replica.getState() != ReplicaState.RUR) {
+        throw new IOException("replica.getState() != " + ReplicaState.RUR
+            + ", replica=" + replica);
+      }
 
-    //check replica's byte on disk
-    if (replica.getBytesOnDisk() != oldBlock.getNumBytes()) {
-      throw new IOException("THIS IS NOT SUPPOSED TO HAPPEN:"
-          + " replica.getBytesOnDisk() != block.getNumBytes(), block="
-          + oldBlock + ", replica=" + replica);
-    }
-
-    //check replica files before update
-    checkReplicaFiles(replica);
-
-    //update replica
-    final FinalizedReplica finalized = updateReplicaUnderRecovery(oldBlock
-        .getBlockPoolId(), (ReplicaUnderRecovery) replica, recoveryId,
-        newBlockId, newlength);
-
-    boolean copyTruncate = newBlockId != oldBlock.getBlockId();
-    if(!copyTruncate) {
-      assert finalized.getBlockId() == oldBlock.getBlockId()
-          && finalized.getGenerationStamp() == recoveryId
-          && finalized.getNumBytes() == newlength
-          : "Replica information mismatched: oldBlock=" + oldBlock
-              + ", recoveryId=" + recoveryId + ", newlength=" + newlength
-              + ", newBlockId=" + newBlockId + ", finalized=" + finalized;
-    } else {
-      assert finalized.getBlockId() == oldBlock.getBlockId()
-          && finalized.getGenerationStamp() == oldBlock.getGenerationStamp()
-          && finalized.getNumBytes() == oldBlock.getNumBytes()
-          : "Finalized and old information mismatched: oldBlock=" + oldBlock
-              + ", genStamp=" + oldBlock.getGenerationStamp()
-              + ", len=" + oldBlock.getNumBytes()
-              + ", finalized=" + finalized;
-    }
+      //check replica's byte on disk
+      if (replica.getBytesOnDisk() != oldBlock.getNumBytes()) {
+        throw new IOException("THIS IS NOT SUPPOSED TO HAPPEN:"
+            + " replica.getBytesOnDisk() != block.getNumBytes(), block="
+            + oldBlock + ", replica=" + replica);
+      }
+
+      //check replica files before update
+      checkReplicaFiles(replica);
+
+      //update replica
+      final FinalizedReplica finalized = updateReplicaUnderRecovery(oldBlock
+              .getBlockPoolId(), (ReplicaUnderRecovery) replica, recoveryId,
+          newBlockId, newlength);
+
+      boolean copyTruncate = newBlockId != oldBlock.getBlockId();
+      if (!copyTruncate) {
+        assert finalized.getBlockId() == oldBlock.getBlockId()
+            && finalized.getGenerationStamp() == recoveryId
+            && finalized.getNumBytes() == newlength
+            : "Replica information mismatched: oldBlock=" + oldBlock
+            + ", recoveryId=" + recoveryId + ", newlength=" + newlength
+            + ", newBlockId=" + newBlockId + ", finalized=" + finalized;
+      } else {
+        assert finalized.getBlockId() == oldBlock.getBlockId()
+            && finalized.getGenerationStamp() == oldBlock.getGenerationStamp()
+            && finalized.getNumBytes() == oldBlock.getNumBytes()
+            : "Finalized and old information mismatched: oldBlock=" + oldBlock
+            + ", genStamp=" + oldBlock.getGenerationStamp()
+            + ", len=" + oldBlock.getNumBytes()
+            + ", finalized=" + finalized;
+      }
 
-    //check replica files after update
-    checkReplicaFiles(finalized);
+      //check replica files after update
+      checkReplicaFiles(finalized);
 
-    return finalized;
+      return finalized;
+    }
   }
 
   private FinalizedReplica updateReplicaUnderRecovery(
@@ -2636,23 +2681,25 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
   }
 
   @Override // FsDatasetSpi
-  public synchronized long getReplicaVisibleLength(final ExtendedBlock block)
+  public long getReplicaVisibleLength(final ExtendedBlock block)
   throws IOException {
-    final Replica replica = getReplicaInfo(block.getBlockPoolId(), 
-        block.getBlockId());
-    if (replica.getGenerationStamp() < block.getGenerationStamp()) {
-      throw new IOException(
-          "replica.getGenerationStamp() < block.getGenerationStamp(), block="
-          + block + ", replica=" + replica);
+    try(AutoCloseableLock lock = datasetLock.acquire()) {
+      final Replica replica = getReplicaInfo(block.getBlockPoolId(),
+          block.getBlockId());
+      if (replica.getGenerationStamp() < block.getGenerationStamp()) {
+        throw new IOException(
+            "replica.getGenerationStamp() < block.getGenerationStamp(), block="
+                + block + ", replica=" + replica);
+      }
+      return replica.getVisibleLength();
     }
-    return replica.getVisibleLength();
   }
   
   @Override
   public void addBlockPool(String bpid, Configuration conf)
       throws IOException {
     LOG.info("Adding block pool " + bpid);
-    synchronized(this) {
+    try(AutoCloseableLock lock = datasetLock.acquire()) {
       volumes.addBlockPool(bpid, conf);
       volumeMap.initBlockPool(bpid);
     }
@@ -2660,11 +2707,14 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
   }
 
   @Override
-  public synchronized void shutdownBlockPool(String bpid) {
-    LOG.info("Removing block pool " + bpid);
-    Map<DatanodeStorage, BlockListAsLongs> blocksPerVolume =  getBlockReports(bpid);
-    volumeMap.cleanUpBlockPool(bpid);
-    volumes.removeBlockPool(bpid, blocksPerVolume);
+  public void shutdownBlockPool(String bpid) {
+    try(AutoCloseableLock lock = datasetLock.acquire()) {
+      LOG.info("Removing block pool " + bpid);
+      Map<DatanodeStorage, BlockListAsLongs> blocksPerVolume =
+          getBlockReports(bpid);
+      volumeMap.cleanUpBlockPool(bpid);
+      volumes.removeBlockPool(bpid, blocksPerVolume);
+    }
   }
   
   /**
@@ -2727,35 +2777,38 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
   }
 
   @Override //FsDatasetSpi
-  public synchronized void deleteBlockPool(String bpid, boolean force)
+  public void deleteBlockPool(String bpid, boolean force)
       throws IOException {
-    List<FsVolumeImpl> curVolumes = volumes.getVolumes();
-    if (!force) {
+    try(AutoCloseableLock lock = datasetLock.acquire()) {
+      List<FsVolumeImpl> curVolumes = volumes.getVolumes();
+      if (!force) {
+        for (FsVolumeImpl volume : curVolumes) {
+          try (FsVolumeReference ref = volume.obtainReference()) {
+            if (!volume.isBPDirEmpty(bpid)) {
+              LOG.warn(bpid
+                  + " has some block files, cannot delete unless forced");
+              throw new IOException("Cannot delete block pool, "
+                  + "it contains some block files");
+            }
+          } catch (ClosedChannelException e) {
+            // ignore.
+          }
+        }
+      }
       for (FsVolumeImpl volume : curVolumes) {
         try (FsVolumeReference ref = volume.obtainReference()) {
-          if (!volume.isBPDirEmpty(bpid)) {
-            LOG.warn(bpid + " has some block files, cannot delete unless forced");
-            throw new IOException("Cannot delete block pool, "
-                + "it contains some block files");
-          }
+          volume.deleteBPDirectories(bpid, force);
         } catch (ClosedChannelException e) {
           // ignore.
         }
       }
     }
-    for (FsVolumeImpl volume : curVolumes) {
-      try (FsVolumeReference ref = volume.obtainReference()) {
-        volume.deleteBPDirectories(bpid, force);
-      } catch (ClosedChannelException e) {
-        // ignore.
-      }
-    }
   }
   
   @Override // FsDatasetSpi
   public BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock block)
       throws IOException {
-    synchronized(this) {
+    try(AutoCloseableLock lock = datasetLock.acquire()) {
       final Replica replica = volumeMap.get(block.getBlockPoolId(),
           block.getBlockId());
       if (replica == null) {
@@ -2848,7 +2901,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
   @Override
   public void onCompleteLazyPersist(String bpId, long blockId,
       long creationTime, File[] savedFiles, FsVolumeImpl targetVolume) {
-    synchronized (FsDatasetImpl.this) {
+    try(AutoCloseableLock lock = datasetLock.acquire()) {
       ramDiskReplicaTracker.recordEndLazyPersist(bpId, blockId, savedFiles);
 
       targetVolume.incDfsUsedAndNumBlocks(bpId, savedFiles[0].length()
@@ -2982,7 +3035,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
       try {
         block = ramDiskReplicaTracker.dequeueNextReplicaToPersist();
         if (block != null) {
-          synchronized (FsDatasetImpl.this) {
+          try(AutoCloseableLock lock = datasetLock.acquire()) {
             replicaInfo = volumeMap.get(block.getBlockPoolId(), block.getBlockId());
 
             // If replicaInfo is null, the block was either deleted before
@@ -3052,7 +3105,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
         long blockFileUsed, metaFileUsed;
         final String bpid = replicaState.getBlockPoolId();
 
-        synchronized (FsDatasetImpl.this) {
+        try(AutoCloseableLock lock = datasetLock.acquire()) {
           replicaInfo = getReplicaInfo(replicaState.getBlockPoolId(),
                                        replicaState.getBlockId());
           Preconditions.checkState(replicaInfo.getVolume().isTransientStorage());
@@ -3161,7 +3214,12 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
       return s != null ? s.contains(blockId) : false;
     }
   }
-  
+
+  @Override
+  public AutoCloseableLock acquireDatasetLock() {
+    return datasetLock.acquire();
+  }
+
   public void removeDeletedBlocks(String bpid, Set<Long> blockIds) {
     synchronized (deletingBlock) {
       Set<Long> s = deletingBlock.get(bpid);
@@ -3229,14 +3287,16 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
     this.timer = newTimer;
   }
 
-  synchronized void stopAllDataxceiverThreads(FsVolumeImpl volume) {
-    for (String blockPoolId : volumeMap.getBlockPoolList()) {
-      Collection<ReplicaInfo> replicas = volumeMap.replicas(blockPoolId);
-      for (ReplicaInfo replicaInfo : replicas) {
-        if (replicaInfo instanceof ReplicaInPipeline
-            && replicaInfo.getVolume().equals(volume)) {
-          ReplicaInPipeline replicaInPipeline = (ReplicaInPipeline) replicaInfo;
-          replicaInPipeline.interruptThread();
+  void stopAllDataxceiverThreads(FsVolumeImpl volume) {
+    try(AutoCloseableLock lock = datasetLock.acquire()) {
+      for (String blockPoolId : volumeMap.getBlockPoolList()) {
+        Collection<ReplicaInfo> replicas = volumeMap.replicas(blockPoolId);
+        for (ReplicaInfo replicaInfo : replicas) {
+          if (replicaInfo instanceof ReplicaInPipeline
+              && replicaInfo.getVolume().equals(volume)) {
+            ReplicaInPipeline replicaInPipeline = (ReplicaInPipeline) replicaInfo;
+            replicaInPipeline.interruptThread();
+          }
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ad0ac6cc/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
index 10bae5d..67e7192 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
@@ -56,6 +56,7 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
 import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.util.AutoCloseableLock;
 import org.apache.hadoop.util.CloseableReferenceCount;
 import org.apache.hadoop.util.DiskChecker.DiskErrorException;
 import org.apache.hadoop.util.StringUtils;
@@ -304,7 +305,7 @@ public class FsVolumeImpl implements FsVolumeSpi {
 
   private void decDfsUsedAndNumBlocks(String bpid, long value,
                                       boolean blockFileDeleted) {
-    synchronized(dataset) {
+    try(AutoCloseableLock lock = dataset.acquireDatasetLock()) {
       BlockPoolSlice bp = bpSlices.get(bpid);
       if (bp != null) {
         bp.decDfsUsed(value);
@@ -316,7 +317,7 @@ public class FsVolumeImpl implements FsVolumeSpi {
   }
 
   void incDfsUsedAndNumBlocks(String bpid, long value) {
-    synchronized (dataset) {
+    try(AutoCloseableLock lock = dataset.acquireDatasetLock()) {
       BlockPoolSlice bp = bpSlices.get(bpid);
       if (bp != null) {
         bp.incDfsUsed(value);
@@ -326,7 +327,7 @@ public class FsVolumeImpl implements FsVolumeSpi {
   }
 
   void incDfsUsed(String bpid, long value) {
-    synchronized(dataset) {
+    try(AutoCloseableLock lock = dataset.acquireDatasetLock()) {
       BlockPoolSlice bp = bpSlices.get(bpid);
       if (bp != null) {
         bp.incDfsUsed(value);
@@ -337,7 +338,7 @@ public class FsVolumeImpl implements FsVolumeSpi {
   @VisibleForTesting
   public long getDfsUsed() throws IOException {
     long dfsUsed = 0;
-    synchronized(dataset) {
+    try(AutoCloseableLock lock = dataset.acquireDatasetLock()) {
       for(BlockPoolSlice s : bpSlices.values()) {
         dfsUsed += s.getDfsUsed();
       }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ad0ac6cc/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
index 227f4f8..cbc9690 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
@@ -62,6 +62,7 @@ import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.metrics2.MetricsCollector;
 import org.apache.hadoop.metrics2.util.MBeans;
+import org.apache.hadoop.util.AutoCloseableLock;
 import org.apache.hadoop.util.DataChecksum;
 
 /**
@@ -113,6 +114,8 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
       "dfs.datanode.simulateddatastorage.state";
   private static final DatanodeStorage.State DEFAULT_STATE =
       DatanodeStorage.State.NORMAL;
+
+  private final AutoCloseableLock datasetLock;
   
   static final byte[] nullCrcFileData;
   static {
@@ -550,6 +553,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
         conf.getLong(CONFIG_PROPERTY_CAPACITY, DEFAULT_CAPACITY),
         conf.getEnum(CONFIG_PROPERTY_STATE, DEFAULT_STATE));
     this.volume = new SimulatedVolume(this.storage);
+    this.datasetLock = new AutoCloseableLock();
   }
 
   public synchronized void injectBlocks(String bpid,
@@ -1365,5 +1369,10 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
   public boolean isDeletingBlock(String bpid, long blockId) {
     throw new UnsupportedOperationException();
   }
+
+  @Override
+  public AutoCloseableLock acquireDatasetLock() {
+    return datasetLock.acquire();
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ad0ac6cc/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
index 513e9a8..a11cfb9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
@@ -85,6 +85,7 @@ import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
 import org.apache.hadoop.hdfs.server.protocol.StorageReport;
 import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
 import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.util.AutoCloseableLock;
 import org.apache.hadoop.util.DataChecksum;
 import org.apache.hadoop.util.Time;
 import org.apache.log4j.Level;
@@ -693,7 +694,7 @@ public class TestBlockRecovery {
             final RecoveringBlock recoveringBlock = new RecoveringBlock(
                 block.getBlock(), locations, block.getBlock()
                     .getGenerationStamp() + 1);
-            synchronized (dataNode.data) {
+            try(AutoCloseableLock lock = dataNode.data.acquireDatasetLock()) {
               Thread.sleep(2000);
               dataNode.initReplicaRecovery(recoveringBlock);
             }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ad0ac6cc/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java
index bd99b07..82bbdc4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java
@@ -63,6 +63,7 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetTestUtil;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.LazyPersistTestCase;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.util.AutoCloseableLock;
 import org.apache.hadoop.util.Time;
 import org.junit.Before;
 import org.junit.Test;
@@ -113,7 +114,7 @@ public class TestDirectoryScanner {
 
   /** Truncate a block file */
   private long truncateBlockFile() throws IOException {
-    synchronized (fds) {
+    try(AutoCloseableLock lock = fds.acquireDatasetLock()) {
       for (ReplicaInfo b : FsDatasetTestUtil.getReplicas(fds, bpid)) {
         File f = b.getBlockFile();
         File mf = b.getMetaFile();
@@ -138,7 +139,7 @@ public class TestDirectoryScanner {
 
   /** Delete a block file */
   private long deleteBlockFile() {
-    synchronized(fds) {
+    try(AutoCloseableLock lock = fds.acquireDatasetLock()) {
       for (ReplicaInfo b : FsDatasetTestUtil.getReplicas(fds, bpid)) {
         File f = b.getBlockFile();
         File mf = b.getMetaFile();
@@ -154,7 +155,7 @@ public class TestDirectoryScanner {
 
   /** Delete block meta file */
   private long deleteMetaFile() {
-    synchronized(fds) {
+    try(AutoCloseableLock lock = fds.acquireDatasetLock()) {
       for (ReplicaInfo b : FsDatasetTestUtil.getReplicas(fds, bpid)) {
         File file = b.getMetaFile();
         // Delete a metadata file
@@ -173,7 +174,7 @@ public class TestDirectoryScanner {
    * @throws IOException
    */
   private void duplicateBlock(long blockId) throws IOException {
-    synchronized (fds) {
+    try(AutoCloseableLock lock = fds.acquireDatasetLock()) {
       ReplicaInfo b = FsDatasetTestUtil.fetchReplicaInfo(fds, bpid, blockId);
       try (FsDatasetSpi.FsVolumeReferences volumes =
           fds.getFsVolumeReferences()) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ad0ac6cc/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java
index 8997f73..738f338 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java
@@ -43,6 +43,7 @@ import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
 import org.apache.hadoop.hdfs.server.protocol.StorageReport;
 import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
 import org.apache.hadoop.metrics2.MetricsCollector;
+import org.apache.hadoop.util.AutoCloseableLock;
 
 public class ExternalDatasetImpl implements FsDatasetSpi<ExternalVolumeImpl> {
 
@@ -448,4 +449,9 @@ public class ExternalDatasetImpl implements FsDatasetSpi<ExternalVolumeImpl> {
   public boolean isDeletingBlock(String bpid, long blockId) {
     return false;
   }
+
+  @Override
+  public AutoCloseableLock acquireDatasetLock() {
+    return null;
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[3/3] hadoop git commit: HDFS-10742. Measure lock time in FsDatasetImpl. Contributed by Chen Liang.

Posted by ar...@apache.org.
HDFS-10742. Measure lock time in FsDatasetImpl. Contributed by Chen Liang.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/012b266e
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/012b266e
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/012b266e

Branch: refs/heads/branch-2
Commit: 012b266e5eb4a371955d5d4e128483ed4e75631e
Parents: ad0ac6c
Author: Arpit Agarwal <ar...@apache.org>
Authored: Sat Sep 10 18:04:00 2016 -0700
Committer: Arpit Agarwal <ar...@apache.org>
Committed: Sat Sep 10 18:04:00 2016 -0700

----------------------------------------------------------------------
 .../apache/hadoop/util/AutoCloseableLock.java   |  28 ++-
 .../org/apache/hadoop/hdfs/DFSConfigKeys.java   |   5 +
 .../apache/hadoop/hdfs/InstrumentedLock.java    | 185 +++++++++++++++++++
 .../datanode/fsdataset/impl/FsDatasetImpl.java  |  11 +-
 .../src/main/resources/hdfs-default.xml         |   7 +
 .../hadoop/hdfs/TestInstrumentedLock.java       | 166 +++++++++++++++++
 6 files changed, 394 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/012b266e/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/AutoCloseableLock.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/AutoCloseableLock.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/AutoCloseableLock.java
index 2aa8578..d920bc6 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/AutoCloseableLock.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/AutoCloseableLock.java
@@ -17,22 +17,33 @@
  */
 package org.apache.hadoop.util;
 
+import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
+import com.google.common.annotations.VisibleForTesting;
+
 /**
  * This is a wrap class of a ReentrantLock. Extending AutoCloseable
  * interface such that the users can use a try-with-resource syntax.
  */
 public class AutoCloseableLock implements AutoCloseable {
 
-  private final ReentrantLock lock;
+  private final Lock lock;
 
   /**
    * Creates an instance of {@code AutoCloseableLock}, initializes
-   * the underlying {@code ReentrantLock} object.
+   * the underlying lock instance with a new {@code ReentrantLock}.
    */
   public AutoCloseableLock() {
-    this.lock = new ReentrantLock();
+    this(new ReentrantLock());
+  }
+
+  /**
+   * Wrap provided Lock instance.
+   * @param lock Lock instance to wrap in AutoCloseable API.
+   */
+  public AutoCloseableLock(Lock lock) {
+    this.lock = lock;
   }
 
   /**
@@ -86,7 +97,7 @@ public class AutoCloseableLock implements AutoCloseable {
 
   /**
    * A wrapper method that makes a call to {@code tryLock()} of
-   * the underlying {@code ReentrantLock} object.
+   * the underlying {@code Lock} object.
    *
    * If the lock is not held by another thread, acquires the lock, set the
    * hold count to one and returns {@code true}.
@@ -116,7 +127,12 @@ public class AutoCloseableLock implements AutoCloseable {
    * @return {@code true} if any thread holds this lock and
    *         {@code false} otherwise
    */
-  public boolean isLocked() {
-    return lock.isLocked();
+  @VisibleForTesting
+  boolean isLocked() {
+    if (lock instanceof ReentrantLock) {
+      return ((ReentrantLock)lock).isLocked();
+    }
+    throw new UnsupportedOperationException();
   }
+
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/012b266e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index 0457741..4ed9fc3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -392,6 +392,11 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
   public static final String  DFS_NAMENODE_READ_LOCK_REPORTING_THRESHOLD_MS_KEY =
       "dfs.namenode.read-lock-reporting-threshold-ms";
   public static final long    DFS_NAMENODE_READ_LOCK_REPORTING_THRESHOLD_MS_DEFAULT = 5000L;
+  // Threshold for how long the lock warnings must be suppressed
+  public static final String DFS_LOCK_SUPPRESS_WARNING_INTERVAL_KEY =
+      "dfs.lock.suppress.warning.interval";
+  public static final long DFS_LOCK_SUPPRESS_WARNING_INTERVAL_DEFAULT =
+      10000; //ms
 
   public static final String  DFS_UPGRADE_DOMAIN_FACTOR = "dfs.namenode.upgrade.domain.factor";
   public static final int DFS_UPGRADE_DOMAIN_FACTOR_DEFAULT = DFS_REPLICATION_DEFAULT;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/012b266e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/InstrumentedLock.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/InstrumentedLock.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/InstrumentedLock.java
new file mode 100644
index 0000000..6279e95
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/InstrumentedLock.java
@@ -0,0 +1,185 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.commons.logging.Log;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Timer;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * This is a debugging class that can be used by callers to track
+ * whether a specifc lock is being held for too long and periodically
+ * log a warning and stack trace, if so.
+ *
+ * The logged warnings are throttled so that logs are not spammed.
+ *
+ * A new instance of InstrumentedLock can be created for each object
+ * that needs to be instrumented.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class InstrumentedLock implements Lock {
+
+  private final Lock lock;
+  private final Log logger;
+  private final String name;
+  private final Timer clock;
+
+  /** Minimum gap between two lock warnings. */
+  private final long minLoggingGap;
+  /** Threshold for detecting long lock held time. */
+  private final long lockWarningThreshold;
+
+  // Tracking counters for lock statistics.
+  private volatile long lockAcquireTimestamp;
+  private final AtomicLong lastLogTimestamp;
+  private final AtomicLong warningsSuppressed = new AtomicLong(0);
+
+  /**
+   * Create a instrumented lock instance which logs a warning message
+   * when lock held time is above given threshold.
+   *
+   * @param name the identifier of the lock object
+   * @param logger this class does not have its own logger, will log to the
+   *               given logger instead
+   * @param minLoggingGapMs  the minimum time gap between two log messages,
+   *                         this is to avoid spamming to many logs
+   * @param lockWarningThresholdMs the time threshold to view lock held
+   *                               time as being "too long"
+   */
+  public InstrumentedLock(String name, Log logger, long minLoggingGapMs,
+      long lockWarningThresholdMs) {
+    this(name, logger, new ReentrantLock(),
+        minLoggingGapMs, lockWarningThresholdMs);
+  }
+
+  public InstrumentedLock(String name, Log logger, Lock lock,
+      long minLoggingGapMs, long lockWarningThresholdMs) {
+    this(name, logger, lock,
+        minLoggingGapMs, lockWarningThresholdMs, new Timer());
+  }
+
+  @VisibleForTesting
+  InstrumentedLock(String name, Log logger, Lock lock,
+      long minLoggingGapMs, long lockWarningThresholdMs, Timer clock) {
+    this.name = name;
+    this.lock = lock;
+    this.clock = clock;
+    this.logger = logger;
+    minLoggingGap = minLoggingGapMs;
+    lockWarningThreshold = lockWarningThresholdMs;
+    lastLogTimestamp = new AtomicLong(
+      clock.monotonicNow() - Math.max(minLoggingGap, lockWarningThreshold));
+  }
+
+  @Override
+  public void lock() {
+    lock.lock();
+    lockAcquireTimestamp = clock.monotonicNow();
+  }
+
+  @Override
+  public void lockInterruptibly() throws InterruptedException {
+    lock.lockInterruptibly();
+    lockAcquireTimestamp = clock.monotonicNow();
+  }
+
+  @Override
+  public boolean tryLock() {
+    if (lock.tryLock()) {
+      lockAcquireTimestamp = clock.monotonicNow();
+      return true;
+    }
+    return false;
+  }
+
+  @Override
+  public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
+    if (lock.tryLock(time, unit)) {
+      lockAcquireTimestamp = clock.monotonicNow();
+      return true;
+    }
+    return false;
+  }
+
+  @Override
+  public void unlock() {
+    long localLockReleaseTime = clock.monotonicNow();
+    long localLockAcquireTime = lockAcquireTimestamp;
+    lock.unlock();
+    check(localLockAcquireTime, localLockReleaseTime);
+  }
+
+  @Override
+  public Condition newCondition() {
+    return lock.newCondition();
+  }
+
+  @VisibleForTesting
+  void logWarning(long lockHeldTime, long suppressed) {
+    logger.warn(String.format("Lock held time above threshold: " +
+        "lock identifier: %s " +
+        "lockHeldTimeMs=%d ms. Suppressed %d lock warnings. " +
+        "The stack trace is: %s" ,
+        name, lockHeldTime, suppressed,
+        StringUtils.getStackTrace(Thread.currentThread())));
+  }
+
+  /**
+   * Log a warning if the lock was held for too long.
+   *
+   * Should be invoked by the caller immediately AFTER releasing the lock.
+   *
+   * @param acquireTime  - timestamp just after acquiring the lock.
+   * @param releaseTime - timestamp just before releasing the lock.
+   */
+  private void check(long acquireTime, long releaseTime) {
+    if (!logger.isWarnEnabled()) {
+      return;
+    }
+
+    final long lockHeldTime = releaseTime - acquireTime;
+    if (lockWarningThreshold - lockHeldTime < 0) {
+      long now;
+      long localLastLogTs;
+      do {
+        now = clock.monotonicNow();
+        localLastLogTs = lastLogTimestamp.get();
+        long deltaSinceLastLog = now - localLastLogTs;
+        // check should print log or not
+        if (deltaSinceLastLog - minLoggingGap < 0) {
+          warningsSuppressed.incrementAndGet();
+          return;
+        }
+      } while (!lastLogTimestamp.compareAndSet(localLastLogTs, now));
+      long suppressed = warningsSuppressed.getAndSet(0);
+      logWarning(lockHeldTime, suppressed);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/012b266e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
index a1c2a46..5b3ebce 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
@@ -42,6 +42,7 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.*;
 import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
 
 import javax.management.NotCompliantMBeanException;
 import javax.management.ObjectName;
@@ -62,7 +63,7 @@ import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSUtilClient;
 import org.apache.hadoop.hdfs.ExtendedBlockId;
-import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.InstrumentedLock;
 import org.apache.hadoop.util.AutoCloseableLock;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
@@ -282,7 +283,13 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
     this.dataStorage = storage;
     this.conf = conf;
     this.smallBufferSize = DFSUtilClient.getSmallBufferSize(conf);
-    this.datasetLock = new AutoCloseableLock();
+    this.datasetLock = new AutoCloseableLock(
+        new InstrumentedLock(getClass().getName(), LOG,
+          conf.getTimeDuration(
+            DFSConfigKeys.DFS_LOCK_SUPPRESS_WARNING_INTERVAL_KEY,
+            DFSConfigKeys.DFS_LOCK_SUPPRESS_WARNING_INTERVAL_DEFAULT,
+            TimeUnit.MILLISECONDS),
+          300));
     // The number of volumes required for operation is the total number
     // of volumes minus the number of failed volumes we can tolerate.
     volFailuresTolerated = datanode.getDnConf().getVolFailuresTolerated();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/012b266e/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index 11a2539..44cde11 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -4094,4 +4094,11 @@
     Truststore password for HTTPS SSL configuration
   </description>
 </property>
+
+<property>
+  <name>dfs.lock.suppress.warning.interval</name>
+  <value>10s</value>
+  <description>Instrumentation reporting long critical sections will suppress
+    consecutive warnings within this interval.</description>
+</property>
 </configuration>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/012b266e/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestInstrumentedLock.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestInstrumentedLock.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestInstrumentedLock.java
new file mode 100644
index 0000000..1d1a42b
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestInstrumentedLock.java
@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
+
+import org.apache.hadoop.util.AutoCloseableLock;
+import org.apache.hadoop.util.Timer;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+import static org.mockito.Mockito.*;
+import static org.junit.Assert.*;
+
+/**
+ * A test class for InstrumentedLock.
+ */
+public class TestInstrumentedLock {
+
+  static final Log LOG = LogFactory.getLog(TestInstrumentedLock.class);
+
+  @Rule public TestName name = new TestName();
+
+  /**
+   * Test exclusive access of the lock.
+   * @throws Exception
+   */
+  @Test(timeout=10000)
+  public void testMultipleThread() throws Exception {
+    String testname = name.getMethodName();
+    final InstrumentedLock lock = new InstrumentedLock(testname, LOG, 0, 300);
+    lock.lock();
+    try {
+      Thread competingThread = new Thread() {
+        @Override
+        public void run() {
+          assertFalse(lock.tryLock());
+        }
+      };
+      competingThread.start();
+      competingThread.join();
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  /**
+   * Test the correctness with try-with-resource syntax.
+   * @throws Exception
+   */
+  @Test(timeout=10000)
+  public void testTryWithResourceSyntax() throws Exception {
+    String testname = name.getMethodName();
+    final AtomicReference<Thread> lockThread = new AtomicReference<>(null);
+    final Lock lock = new InstrumentedLock(testname, LOG, 0, 300) {
+      @Override
+      public void lock() {
+        super.lock();
+        lockThread.set(Thread.currentThread());
+      }
+      @Override
+      public void unlock() {
+        super.unlock();
+        lockThread.set(null);
+      }
+    };
+    AutoCloseableLock acl = new AutoCloseableLock(lock);
+    try (AutoCloseable localLock = acl.acquire()) {
+      assertEquals(acl, localLock);
+      Thread competingThread = new Thread() {
+        @Override
+        public void run() {
+          assertNotEquals(Thread.currentThread(), lockThread.get());
+          assertFalse(lock.tryLock());
+        }
+      };
+      competingThread.start();
+      competingThread.join();
+      assertEquals(Thread.currentThread(), lockThread.get());
+    }
+    assertNull(lockThread.get());
+  }
+
+  /**
+   * Test the lock logs warning when lock held time is greater than threshold
+   * and not log warning otherwise.
+   * @throws Exception
+   */
+  @Test(timeout=10000)
+  public void testLockLongHoldingReport() throws Exception {
+    String testname = name.getMethodName();
+    final AtomicLong time = new AtomicLong(0);
+    Timer mclock = new Timer() {
+      @Override
+      public long monotonicNow() {
+        return time.get();
+      }
+    };
+    Lock mlock = mock(Lock.class);
+
+    final AtomicLong wlogged = new AtomicLong(0);
+    final AtomicLong wsuppresed = new AtomicLong(0);
+    InstrumentedLock lock = new InstrumentedLock(
+        testname, LOG, mlock, 2000, 300, mclock) {
+      @Override
+      void logWarning(long lockHeldTime, long suppressed) {
+        wlogged.incrementAndGet();
+        wsuppresed.set(suppressed);
+      }
+    };
+
+    // do not log warning when the lock held time is short
+    lock.lock();   // t = 0
+    time.set(200);
+    lock.unlock(); // t = 200
+    assertEquals(0, wlogged.get());
+    assertEquals(0, wsuppresed.get());
+
+    lock.lock();   // t = 200
+    time.set(700);
+    lock.unlock(); // t = 700
+    assertEquals(1, wlogged.get());
+    assertEquals(0, wsuppresed.get());
+
+    // despite the lock held time is greater than threshold
+    // suppress the log warning due to the logging gap
+    // (not recorded in wsuppressed until next log message)
+    lock.lock();   // t = 700
+    time.set(1100);
+    lock.unlock(); // t = 1100
+    assertEquals(1, wlogged.get());
+    assertEquals(0, wsuppresed.get());
+
+    // log a warning message when the lock held time is greater the threshold
+    // and the logging time gap is satisfied. Also should display suppressed
+    // previous warnings.
+    time.set(2400);
+    lock.lock();   // t = 2400
+    time.set(2800);
+    lock.unlock(); // t = 2800
+    assertEquals(2, wlogged.get());
+    assertEquals(1, wsuppresed.get());
+  }
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[2/3] hadoop git commit: HDFS-10742. Measure lock time in FsDatasetImpl. Contributed by Chen Liang.

Posted by ar...@apache.org.
HDFS-10742. Measure lock time in FsDatasetImpl. Contributed by Chen Liang.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/04f620c4
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/04f620c4
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/04f620c4

Branch: refs/heads/branch-2.8
Commit: 04f620c4d01c3954c780465087a327bba76fe6df
Parents: d5ea508
Author: Arpit Agarwal <ar...@apache.org>
Authored: Sat Sep 10 18:01:37 2016 -0700
Committer: Arpit Agarwal <ar...@apache.org>
Committed: Sat Sep 10 18:01:37 2016 -0700

----------------------------------------------------------------------
 .../apache/hadoop/util/AutoCloseableLock.java   |  28 ++-
 .../org/apache/hadoop/hdfs/DFSConfigKeys.java   |   5 +
 .../apache/hadoop/hdfs/InstrumentedLock.java    | 185 +++++++++++++++++++
 .../datanode/fsdataset/impl/FsDatasetImpl.java  |  11 +-
 .../src/main/resources/hdfs-default.xml         |   7 +
 .../hadoop/hdfs/TestInstrumentedLock.java       | 166 +++++++++++++++++
 6 files changed, 394 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/04f620c4/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/AutoCloseableLock.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/AutoCloseableLock.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/AutoCloseableLock.java
index 2aa8578..d920bc6 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/AutoCloseableLock.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/AutoCloseableLock.java
@@ -17,22 +17,33 @@
  */
 package org.apache.hadoop.util;
 
+import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
+import com.google.common.annotations.VisibleForTesting;
+
 /**
  * This is a wrap class of a ReentrantLock. Extending AutoCloseable
  * interface such that the users can use a try-with-resource syntax.
  */
 public class AutoCloseableLock implements AutoCloseable {
 
-  private final ReentrantLock lock;
+  private final Lock lock;
 
   /**
    * Creates an instance of {@code AutoCloseableLock}, initializes
-   * the underlying {@code ReentrantLock} object.
+   * the underlying lock instance with a new {@code ReentrantLock}.
    */
   public AutoCloseableLock() {
-    this.lock = new ReentrantLock();
+    this(new ReentrantLock());
+  }
+
+  /**
+   * Wrap provided Lock instance.
+   * @param lock Lock instance to wrap in AutoCloseable API.
+   */
+  public AutoCloseableLock(Lock lock) {
+    this.lock = lock;
   }
 
   /**
@@ -86,7 +97,7 @@ public class AutoCloseableLock implements AutoCloseable {
 
   /**
    * A wrapper method that makes a call to {@code tryLock()} of
-   * the underlying {@code ReentrantLock} object.
+   * the underlying {@code Lock} object.
    *
    * If the lock is not held by another thread, acquires the lock, set the
    * hold count to one and returns {@code true}.
@@ -116,7 +127,12 @@ public class AutoCloseableLock implements AutoCloseable {
    * @return {@code true} if any thread holds this lock and
    *         {@code false} otherwise
    */
-  public boolean isLocked() {
-    return lock.isLocked();
+  @VisibleForTesting
+  boolean isLocked() {
+    if (lock instanceof ReentrantLock) {
+      return ((ReentrantLock)lock).isLocked();
+    }
+    throw new UnsupportedOperationException();
   }
+
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/04f620c4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index 9eee1ac..382404c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -383,6 +383,11 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
   public static final String  DFS_NAMENODE_READ_LOCK_REPORTING_THRESHOLD_MS_KEY =
       "dfs.namenode.read-lock-reporting-threshold-ms";
   public static final long    DFS_NAMENODE_READ_LOCK_REPORTING_THRESHOLD_MS_DEFAULT = 5000L;
+  // Threshold for how long the lock warnings must be suppressed
+  public static final String DFS_LOCK_SUPPRESS_WARNING_INTERVAL_KEY =
+      "dfs.lock.suppress.warning.interval";
+  public static final long DFS_LOCK_SUPPRESS_WARNING_INTERVAL_DEFAULT =
+      10000; //ms
 
   public static final String  DFS_UPGRADE_DOMAIN_FACTOR = "dfs.namenode.upgrade.domain.factor";
   public static final int DFS_UPGRADE_DOMAIN_FACTOR_DEFAULT = DFS_REPLICATION_DEFAULT;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/04f620c4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/InstrumentedLock.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/InstrumentedLock.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/InstrumentedLock.java
new file mode 100644
index 0000000..6279e95
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/InstrumentedLock.java
@@ -0,0 +1,185 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.commons.logging.Log;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Timer;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * This is a debugging class that can be used by callers to track
+ * whether a specifc lock is being held for too long and periodically
+ * log a warning and stack trace, if so.
+ *
+ * The logged warnings are throttled so that logs are not spammed.
+ *
+ * A new instance of InstrumentedLock can be created for each object
+ * that needs to be instrumented.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class InstrumentedLock implements Lock {
+
+  private final Lock lock;
+  private final Log logger;
+  private final String name;
+  private final Timer clock;
+
+  /** Minimum gap between two lock warnings. */
+  private final long minLoggingGap;
+  /** Threshold for detecting long lock held time. */
+  private final long lockWarningThreshold;
+
+  // Tracking counters for lock statistics.
+  private volatile long lockAcquireTimestamp;
+  private final AtomicLong lastLogTimestamp;
+  private final AtomicLong warningsSuppressed = new AtomicLong(0);
+
+  /**
+   * Create a instrumented lock instance which logs a warning message
+   * when lock held time is above given threshold.
+   *
+   * @param name the identifier of the lock object
+   * @param logger this class does not have its own logger, will log to the
+   *               given logger instead
+   * @param minLoggingGapMs  the minimum time gap between two log messages,
+   *                         this is to avoid spamming to many logs
+   * @param lockWarningThresholdMs the time threshold to view lock held
+   *                               time as being "too long"
+   */
+  public InstrumentedLock(String name, Log logger, long minLoggingGapMs,
+      long lockWarningThresholdMs) {
+    this(name, logger, new ReentrantLock(),
+        minLoggingGapMs, lockWarningThresholdMs);
+  }
+
+  public InstrumentedLock(String name, Log logger, Lock lock,
+      long minLoggingGapMs, long lockWarningThresholdMs) {
+    this(name, logger, lock,
+        minLoggingGapMs, lockWarningThresholdMs, new Timer());
+  }
+
+  @VisibleForTesting
+  InstrumentedLock(String name, Log logger, Lock lock,
+      long minLoggingGapMs, long lockWarningThresholdMs, Timer clock) {
+    this.name = name;
+    this.lock = lock;
+    this.clock = clock;
+    this.logger = logger;
+    minLoggingGap = minLoggingGapMs;
+    lockWarningThreshold = lockWarningThresholdMs;
+    lastLogTimestamp = new AtomicLong(
+      clock.monotonicNow() - Math.max(minLoggingGap, lockWarningThreshold));
+  }
+
+  @Override
+  public void lock() {
+    lock.lock();
+    lockAcquireTimestamp = clock.monotonicNow();
+  }
+
+  @Override
+  public void lockInterruptibly() throws InterruptedException {
+    lock.lockInterruptibly();
+    lockAcquireTimestamp = clock.monotonicNow();
+  }
+
+  @Override
+  public boolean tryLock() {
+    if (lock.tryLock()) {
+      lockAcquireTimestamp = clock.monotonicNow();
+      return true;
+    }
+    return false;
+  }
+
+  @Override
+  public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
+    if (lock.tryLock(time, unit)) {
+      lockAcquireTimestamp = clock.monotonicNow();
+      return true;
+    }
+    return false;
+  }
+
+  @Override
+  public void unlock() {
+    long localLockReleaseTime = clock.monotonicNow();
+    long localLockAcquireTime = lockAcquireTimestamp;
+    lock.unlock();
+    check(localLockAcquireTime, localLockReleaseTime);
+  }
+
+  @Override
+  public Condition newCondition() {
+    return lock.newCondition();
+  }
+
+  @VisibleForTesting
+  void logWarning(long lockHeldTime, long suppressed) {
+    logger.warn(String.format("Lock held time above threshold: " +
+        "lock identifier: %s " +
+        "lockHeldTimeMs=%d ms. Suppressed %d lock warnings. " +
+        "The stack trace is: %s" ,
+        name, lockHeldTime, suppressed,
+        StringUtils.getStackTrace(Thread.currentThread())));
+  }
+
+  /**
+   * Log a warning if the lock was held for too long.
+   *
+   * Should be invoked by the caller immediately AFTER releasing the lock.
+   *
+   * @param acquireTime  - timestamp just after acquiring the lock.
+   * @param releaseTime - timestamp just before releasing the lock.
+   */
+  private void check(long acquireTime, long releaseTime) {
+    if (!logger.isWarnEnabled()) {
+      return;
+    }
+
+    final long lockHeldTime = releaseTime - acquireTime;
+    if (lockWarningThreshold - lockHeldTime < 0) {
+      long now;
+      long localLastLogTs;
+      do {
+        now = clock.monotonicNow();
+        localLastLogTs = lastLogTimestamp.get();
+        long deltaSinceLastLog = now - localLastLogTs;
+        // check should print log or not
+        if (deltaSinceLastLog - minLoggingGap < 0) {
+          warningsSuppressed.incrementAndGet();
+          return;
+        }
+      } while (!lastLogTimestamp.compareAndSet(localLastLogTs, now));
+      long suppressed = warningsSuppressed.getAndSet(0);
+      logWarning(lockHeldTime, suppressed);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/04f620c4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
index 596ab75..1f0d975 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
@@ -42,6 +42,7 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.*;
 import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
 
 import javax.management.NotCompliantMBeanException;
 import javax.management.ObjectName;
@@ -62,7 +63,7 @@ import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSUtilClient;
 import org.apache.hadoop.hdfs.ExtendedBlockId;
-import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.InstrumentedLock;
 import org.apache.hadoop.util.AutoCloseableLock;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
@@ -280,7 +281,13 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
     this.dataStorage = storage;
     this.conf = conf;
     this.smallBufferSize = DFSUtilClient.getSmallBufferSize(conf);
-    this.datasetLock = new AutoCloseableLock();
+    this.datasetLock = new AutoCloseableLock(
+        new InstrumentedLock(getClass().getName(), LOG,
+          conf.getTimeDuration(
+            DFSConfigKeys.DFS_LOCK_SUPPRESS_WARNING_INTERVAL_KEY,
+            DFSConfigKeys.DFS_LOCK_SUPPRESS_WARNING_INTERVAL_DEFAULT,
+            TimeUnit.MILLISECONDS),
+          300));
     // The number of volumes required for operation is the total number
     // of volumes minus the number of failed volumes we can tolerate.
     volFailuresTolerated = datanode.getDnConf().getVolFailuresTolerated();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/04f620c4/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index 19ae973..6984f6f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -3046,4 +3046,11 @@
       Keytab based login can be enabled with dfs.balancer.keytab.enabled.
     </description>
   </property>
+
+<property>
+  <name>dfs.lock.suppress.warning.interval</name>
+  <value>10s</value>
+  <description>Instrumentation reporting long critical sections will suppress
+    consecutive warnings within this interval.</description>
+</property>
 </configuration>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/04f620c4/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestInstrumentedLock.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestInstrumentedLock.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestInstrumentedLock.java
new file mode 100644
index 0000000..1d1a42b
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestInstrumentedLock.java
@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
+
+import org.apache.hadoop.util.AutoCloseableLock;
+import org.apache.hadoop.util.Timer;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+import static org.mockito.Mockito.*;
+import static org.junit.Assert.*;
+
+/**
+ * A test class for InstrumentedLock.
+ */
+public class TestInstrumentedLock {
+
+  static final Log LOG = LogFactory.getLog(TestInstrumentedLock.class);
+
+  @Rule public TestName name = new TestName();
+
+  /**
+   * Test exclusive access of the lock.
+   * @throws Exception
+   */
+  @Test(timeout=10000)
+  public void testMultipleThread() throws Exception {
+    String testname = name.getMethodName();
+    final InstrumentedLock lock = new InstrumentedLock(testname, LOG, 0, 300);
+    lock.lock();
+    try {
+      Thread competingThread = new Thread() {
+        @Override
+        public void run() {
+          assertFalse(lock.tryLock());
+        }
+      };
+      competingThread.start();
+      competingThread.join();
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  /**
+   * Test the correctness with try-with-resource syntax.
+   * @throws Exception
+   */
+  @Test(timeout=10000)
+  public void testTryWithResourceSyntax() throws Exception {
+    String testname = name.getMethodName();
+    final AtomicReference<Thread> lockThread = new AtomicReference<>(null);
+    final Lock lock = new InstrumentedLock(testname, LOG, 0, 300) {
+      @Override
+      public void lock() {
+        super.lock();
+        lockThread.set(Thread.currentThread());
+      }
+      @Override
+      public void unlock() {
+        super.unlock();
+        lockThread.set(null);
+      }
+    };
+    AutoCloseableLock acl = new AutoCloseableLock(lock);
+    try (AutoCloseable localLock = acl.acquire()) {
+      assertEquals(acl, localLock);
+      Thread competingThread = new Thread() {
+        @Override
+        public void run() {
+          assertNotEquals(Thread.currentThread(), lockThread.get());
+          assertFalse(lock.tryLock());
+        }
+      };
+      competingThread.start();
+      competingThread.join();
+      assertEquals(Thread.currentThread(), lockThread.get());
+    }
+    assertNull(lockThread.get());
+  }
+
+  /**
+   * Test the lock logs warning when lock held time is greater than threshold
+   * and not log warning otherwise.
+   * @throws Exception
+   */
+  @Test(timeout=10000)
+  public void testLockLongHoldingReport() throws Exception {
+    String testname = name.getMethodName();
+    final AtomicLong time = new AtomicLong(0);
+    Timer mclock = new Timer() {
+      @Override
+      public long monotonicNow() {
+        return time.get();
+      }
+    };
+    Lock mlock = mock(Lock.class);
+
+    final AtomicLong wlogged = new AtomicLong(0);
+    final AtomicLong wsuppresed = new AtomicLong(0);
+    InstrumentedLock lock = new InstrumentedLock(
+        testname, LOG, mlock, 2000, 300, mclock) {
+      @Override
+      void logWarning(long lockHeldTime, long suppressed) {
+        wlogged.incrementAndGet();
+        wsuppresed.set(suppressed);
+      }
+    };
+
+    // do not log warning when the lock held time is short
+    lock.lock();   // t = 0
+    time.set(200);
+    lock.unlock(); // t = 200
+    assertEquals(0, wlogged.get());
+    assertEquals(0, wsuppresed.get());
+
+    lock.lock();   // t = 200
+    time.set(700);
+    lock.unlock(); // t = 700
+    assertEquals(1, wlogged.get());
+    assertEquals(0, wsuppresed.get());
+
+    // despite the lock held time is greater than threshold
+    // suppress the log warning due to the logging gap
+    // (not recorded in wsuppressed until next log message)
+    lock.lock();   // t = 700
+    time.set(1100);
+    lock.unlock(); // t = 1100
+    assertEquals(1, wlogged.get());
+    assertEquals(0, wsuppresed.get());
+
+    // log a warning message when the lock held time is greater the threshold
+    // and the logging time gap is satisfied. Also should display suppressed
+    // previous warnings.
+    time.set(2400);
+    lock.lock();   // t = 2400
+    time.set(2800);
+    lock.unlock(); // t = 2800
+    assertEquals(2, wlogged.get());
+    assertEquals(1, wsuppresed.get());
+  }
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org