You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by in...@apache.org on 2020/04/18 20:52:24 UTC

[hadoop] branch trunk updated: HDFS-15217 Add more information to longest write/read lock held log

This is an automated email from the ASF dual-hosted git repository.

inigoiri pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 1824aee  HDFS-15217 Add more information to longest write/read lock held log
1824aee is described below

commit 1824aee9da4056de0fb638906b2172e486bbebe7
Author: Toshihiro Suzuki <br...@gmail.com>
AuthorDate: Sun Apr 19 05:52:07 2020 +0900

    HDFS-15217 Add more information to longest write/read lock held log
---
 .../hadoop/hdfs/server/namenode/FSNamesystem.java  | 301 ++++++++++++++-------
 .../hdfs/server/namenode/FSNamesystemLock.java     | 128 +++++++--
 .../namenode/TestFSNamesystemLockReport.java       | 196 ++++++++++++++
 3 files changed, 499 insertions(+), 126 deletions(-)

diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
index 99ad6f2..38d098e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@ -165,6 +165,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Supplier;
 
 import javax.annotation.Nonnull;
 import javax.management.NotCompliantMBeanException;
@@ -1699,29 +1700,41 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
   public void readLock() {
     this.fsLock.readLock();
   }
+
   @Override
   public void readLockInterruptibly() throws InterruptedException {
     this.fsLock.readLockInterruptibly();
   }
+
   @Override
   public void readUnlock() {
     this.fsLock.readUnlock();
   }
+
   public void readUnlock(String opName) {
     this.fsLock.readUnlock(opName);
   }
+
+  public void readUnlock(String opName,
+      Supplier<String> lockReportInfoSupplier) {
+    this.fsLock.readUnlock(opName, lockReportInfoSupplier);
+  }
+
   @Override
   public void writeLock() {
     this.fsLock.writeLock();
   }
+
   @Override
   public void writeLockInterruptibly() throws InterruptedException {
     this.fsLock.writeLockInterruptibly();
   }
+
   @Override
   public void writeUnlock() {
     this.fsLock.writeUnlock();
   }
+
   public void writeUnlock(String opName) {
     this.fsLock.writeUnlock(opName);
   }
@@ -1730,6 +1743,11 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     this.fsLock.writeUnlock(opName, suppressWriteLockReport);
   }
 
+  public void writeUnlock(String opName,
+      Supplier<String> lockReportInfoSupplier) {
+    this.fsLock.writeUnlock(opName, lockReportInfoSupplier);
+  }
+
   @Override
   public boolean hasWriteLock() {
     return this.fsLock.isWriteLockedByCurrentThread();
@@ -1874,7 +1892,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
         out.close();
       }
     } finally {
-      readUnlock(operationName);
+      readUnlock(operationName, getLockReportInfoSupplier(null));
     }
     logAuditEvent(true, operationName, null);
   }
@@ -1925,7 +1943,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
           }
         }
       } finally {
-        readUnlock(operationName);
+        readUnlock(operationName, getLockReportInfoSupplier(null));
       }
     } catch (AccessControlException e) {
       logAuditEvent(false, operationName, null);
@@ -1999,7 +2017,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
    */
   void setPermission(String src, FsPermission permission) throws IOException {
     final String operationName = "setPermission";
-    FileStatus auditStat;
+    FileStatus auditStat = null;
     checkOperation(OperationCategory.WRITE);
     final FSPermissionChecker pc = getPermissionChecker();
     FSPermissionChecker.setOperationType(operationName);
@@ -2010,7 +2028,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
         checkNameNodeSafeMode("Cannot set permission for " + src);
         auditStat = FSDirAttrOp.setPermission(dir, pc, src, permission);
       } finally {
-        writeUnlock(operationName);
+        writeUnlock(operationName,
+            getLockReportInfoSupplier(src, null, auditStat));
       }
     } catch (AccessControlException e) {
       logAuditEvent(false, operationName, src);
@@ -2030,7 +2049,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
   void setOwner(String src, String username, String group)
       throws IOException {
     final String operationName = "setOwner";
-    FileStatus auditStat;
+    FileStatus auditStat = null;
     checkOperation(OperationCategory.WRITE);
     final FSPermissionChecker pc = getPermissionChecker();
     FSPermissionChecker.setOperationType(operationName);
@@ -2041,7 +2060,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
         checkNameNodeSafeMode("Cannot set owner for " + src);
         auditStat = FSDirAttrOp.setOwner(dir, pc, src, username, group);
       } finally {
-        writeUnlock(operationName);
+        writeUnlock(operationName,
+            getLockReportInfoSupplier(src, null, auditStat));
       }
     } catch (AccessControlException e) {
       logAuditEvent(false, operationName, src);
@@ -2095,7 +2115,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
           }
         }
       } finally {
-        readUnlock(operationName);
+        readUnlock(operationName, getLockReportInfoSupplier(srcArg));
       }
     } catch (AccessControlException e) {
       logAuditEvent(false, operationName, srcArg);
@@ -2125,7 +2145,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
             }
           }
         } finally {
-          writeUnlock(operationName);
+          writeUnlock(operationName, getLockReportInfoSupplier(srcArg));
         }
       } catch (Throwable e) {
         LOG.warn("Failed to update the access time of " + src, e);
@@ -2174,6 +2194,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     final FSPermissionChecker pc = getPermissionChecker();
     FSPermissionChecker.setOperationType(operationName);
     checkOperation(OperationCategory.WRITE);
+    String srcsStr = Arrays.toString(srcs);
     try {
       writeLock();
       try {
@@ -2181,15 +2202,15 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
         checkNameNodeSafeMode("Cannot concat " + target);
         stat = FSDirConcatOp.concat(dir, pc, target, srcs, logRetryCache);
       } finally {
-        writeUnlock(operationName);
+        writeUnlock(operationName,
+            getLockReportInfoSupplier(srcsStr, target, stat));
       }
     } catch (AccessControlException ace) {
-      logAuditEvent(false, operationName, Arrays.toString(srcs),
-          target, stat);
+      logAuditEvent(false, operationName, srcsStr, target, stat);
       throw ace;
     }
     getEditLog().logSync();
-    logAuditEvent(true, operationName, Arrays.toString(srcs), target, stat);
+    logAuditEvent(true, operationName, srcsStr, target, stat);
   }
 
   /**
@@ -2199,7 +2220,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
    */
   void setTimes(String src, long mtime, long atime) throws IOException {
     final String operationName = "setTimes";
-    FileStatus auditStat;
+    FileStatus auditStat = null;
     checkOperation(OperationCategory.WRITE);
     final FSPermissionChecker pc = getPermissionChecker();
     FSPermissionChecker.setOperationType(operationName);
@@ -2210,7 +2231,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
         checkNameNodeSafeMode("Cannot set times " + src);
         auditStat = FSDirAttrOp.setTimes(dir, pc, src, mtime, atime);
       } finally {
-        writeUnlock(operationName);
+        writeUnlock(operationName,
+            getLockReportInfoSupplier(src, null, auditStat));
       }
     } catch (AccessControlException e) {
       logAuditEvent(false, operationName, src);
@@ -2235,7 +2257,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
 
     final String operationName = "truncate";
     requireEffectiveLayoutVersionForFeature(Feature.TRUNCATE);
-    final FSDirTruncateOp.TruncateResult r;
+    FSDirTruncateOp.TruncateResult r = null;
     try {
       NameNode.stateChangeLog.debug(
           "DIR* NameSystem.truncate: src={} newLength={}", src, newLength);
@@ -2254,7 +2276,9 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
         r = FSDirTruncateOp.truncate(this, src, newLength, clientName,
             clientMachine, mtime, toRemoveBlocks, pc);
       } finally {
-        writeUnlock(operationName);
+        FileStatus status = r != null ? r.getFileStatus() : null;
+        writeUnlock(operationName,
+            getLockReportInfoSupplier(src, null, status));
       }
       getEditLog().logSync();
       if (!toRemoveBlocks.getToDeleteList().isEmpty()) {
@@ -2290,7 +2314,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
         auditStat = FSDirSymlinkOp.createSymlinkInt(this, target, link,
             dirPerms, createParent, logRetryCache);
       } finally {
-        writeUnlock(operationName);
+        writeUnlock(operationName,
+            getLockReportInfoSupplier(link, target, auditStat));
       }
     } catch (AccessControlException e) {
       logAuditEvent(false, operationName, link, target, null);
@@ -2329,7 +2354,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
         success = FSDirAttrOp.setReplication(dir, pc, blockManager, src,
             replication);
       } finally {
-        writeUnlock(operationName);
+        writeUnlock(operationName, getLockReportInfoSupplier(src));
       }
     } catch (AccessControlException e) {
       logAuditEvent(false, operationName, src);
@@ -2376,7 +2401,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     checkStoragePolicyEnabled("set storage policy", true);
     final FSPermissionChecker pc = getPermissionChecker();
     FSPermissionChecker.setOperationType(operationName);
-    FileStatus auditStat;
+    FileStatus auditStat = null;
     try {
       writeLock();
       try {
@@ -2385,7 +2410,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
         auditStat = FSDirAttrOp.setStoragePolicy(dir, pc, blockManager, src,
             policyName);
       } finally {
-        writeUnlock(operationName);
+        writeUnlock(operationName,
+            getLockReportInfoSupplier(src, null, auditStat));
       }
     } catch (AccessControlException e) {
       logAuditEvent(false, operationName, src);
@@ -2408,7 +2434,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     // make sure storage policy is enabled, otherwise
     // there is no need to satisfy storage policy.
     checkStoragePolicyEnabled("satisfy storage policy", false);
-    FileStatus auditStat;
+    FileStatus auditStat = null;
     validateStoragePolicySatisfy();
     try {
       writeLock();
@@ -2418,7 +2444,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
         auditStat = FSDirSatisfyStoragePolicyOp.satisfyStoragePolicy(
             dir, blockManager, src, logRetryCache);
       } finally {
-        writeUnlock(operationName);
+        writeUnlock(operationName,
+            getLockReportInfoSupplier(src, null, auditStat));
       }
     } catch (AccessControlException e) {
       logAuditEvent(false, operationName, src);
@@ -2456,7 +2483,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     checkStoragePolicyEnabled("unset storage policy", true);
     final FSPermissionChecker pc = getPermissionChecker();
     FSPermissionChecker.setOperationType(operationName);
-    FileStatus auditStat;
+    FileStatus auditStat = null;
     try {
       writeLock();
       try {
@@ -2464,7 +2491,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
         checkNameNodeSafeMode("Cannot unset storage policy for " + src);
         auditStat = FSDirAttrOp.unsetStoragePolicy(dir, pc, blockManager, src);
       } finally {
-        writeUnlock(operationName);
+        writeUnlock(operationName,
+            getLockReportInfoSupplier(src, null, auditStat));
       }
     } catch (AccessControlException e) {
       logAuditEvent(false, operationName, src);
@@ -2690,7 +2718,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
         dir.writeUnlock();
       }
     } finally {
-      writeUnlock("create");
+      writeUnlock("create", getLockReportInfoSupplier(src, null, stat));
       // There might be transactions logged while trying to recover the lease.
       // They need to be sync'ed even when an exception was thrown.
       if (!skipSync) {
@@ -2874,7 +2902,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
         skipSync = true;
         throw se;
       } finally {
-        writeUnlock(operationName);
+        writeUnlock(operationName, getLockReportInfoSupplier(srcArg));
         // There might be transactions logged while trying to recover the lease
         // They need to be sync'ed even when an exception was thrown.
         if (!skipSync) {
@@ -3174,7 +3202,9 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
         checkNameNodeSafeMode("Cannot rename " + src);
         ret = FSDirRenameOp.renameToInt(dir, pc, src, dst, logRetryCache);
       } finally {
-        writeUnlock(operationName);
+        FileStatus status = ret != null ? ret.auditStat : null;
+        writeUnlock(operationName,
+            getLockReportInfoSupplier(src, dst, status));
       }
     } catch (AccessControlException e)  {
       logAuditEvent(false, operationName, src, dst, null);
@@ -3204,7 +3234,9 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
         res = FSDirRenameOp.renameToInt(dir, pc, src, dst, logRetryCache,
             options);
       } finally {
-        writeUnlock(operationName);
+        FileStatus status = res != null ? res.auditStat : null;
+        writeUnlock(operationName,
+            getLockReportInfoSupplier(src, dst, status));
       }
     } catch (AccessControlException e) {
       logAuditEvent(false, operationName + " (options=" +
@@ -3246,7 +3278,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
             this, pc, src, recursive, logRetryCache);
         ret = toRemovedBlocks != null;
       } finally {
-        writeUnlock(operationName);
+        writeUnlock(operationName, getLockReportInfoSupplier(src));
       }
     } catch (AccessControlException e) {
       logAuditEvent(false, operationName, src);
@@ -3352,7 +3384,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
         stat = FSDirStatAndListingOp.getFileInfo(
             dir, pc, src, resolveLink, needLocation, needBlockToken);
       } finally {
-        readUnlock(operationName);
+        readUnlock(operationName, getLockReportInfoSupplier(src));
       }
     } catch (AccessControlException e) {
       logAuditEvent(false, operationName, src);
@@ -3377,7 +3409,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
         checkOperation(OperationCategory.READ);
         success = FSDirStatAndListingOp.isFileClosed(dir, pc, src);
       } finally {
-        readUnlock(operationName);
+        readUnlock(operationName, getLockReportInfoSupplier(src));
       }
     } catch (AccessControlException e) {
       logAuditEvent(false, operationName, src);
@@ -3407,7 +3439,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
         auditStat = FSDirMkdirOp.mkdirs(this, pc, src, permissions,
             createParent);
       } finally {
-        writeUnlock(operationName);
+        writeUnlock(operationName,
+            getLockReportInfoSupplier(src, null, auditStat));
       }
     } catch (AccessControlException e) {
       logAuditEvent(false, operationName, src);
@@ -3444,7 +3477,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
         checkOperation(OperationCategory.READ);
         cs = FSDirStatAndListingOp.getContentSummary(dir, pc, src);
       } finally {
-        readUnlock(operationName);
+        readUnlock(operationName, getLockReportInfoSupplier(src));
       }
     } catch (AccessControlException ace) {
       logAuditEvent(false, operationName, src);
@@ -3480,7 +3513,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
         checkOperation(OperationCategory.READ);
         quotaUsage = FSDirStatAndListingOp.getQuotaUsage(dir, pc, src);
       } finally {
-        readUnlock(operationName);
+        readUnlock(operationName, getLockReportInfoSupplier(src));
       }
     } catch (AccessControlException ace) {
       logAuditEvent(false, operationName, src);
@@ -3517,7 +3550,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
         FSDirAttrOp.setQuota(dir, pc, src, nsQuota, ssQuota, type,
             allowOwnerSetQuota);
       } finally {
-        writeUnlock(operationName);
+        writeUnlock(operationName, getLockReportInfoSupplier(src));
       }
     } catch (AccessControlException ace) {
       logAuditEvent(false, operationName, src);
@@ -4051,7 +4084,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
         checkOperation(NameNode.OperationCategory.READ);
         dl = getListingInt(dir, pc, src, startAfter, needLocation);
       } finally {
-        readUnlock(operationName);
+        readUnlock(operationName, getLockReportInfoSupplier(src));
       }
     } catch (AccessControlException e) {
       logAuditEvent(false, operationName, src);
@@ -4196,7 +4229,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
             returnedStartAfter);
       }
     } finally {
-      readUnlock(operationName);
+      readUnlock(operationName,
+          getLockReportInfoSupplier(Arrays.toString(srcs)));
     }
     for (int i = startSrcsIndex; i < srcsIndex; i++) {
       logAuditEvent(true, operationName, srcs[i]);
@@ -4783,7 +4817,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
         arr[i].setNumBlocks(results.get(i).numBlocks());
       }
     } finally {
-      readUnlock(operationName);
+      readUnlock(operationName, getLockReportInfoSupplier(null));
     }
     logAuditEvent(true, operationName, null);
     return arr;
@@ -4801,7 +4835,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       final DatanodeManager dm = getBlockManager().getDatanodeManager();      
       reports = dm.getDatanodeStorageReport(type);
     } finally {
-      readUnlock(operationName);
+      readUnlock(operationName, getLockReportInfoSupplier(null));
     }
     logAuditEvent(true, operationName, null);
     return reports;
@@ -4830,7 +4864,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       }
       saved = getFSImage().saveNamespace(timeWindow, txGap, this);
     } finally {
-      readUnlock(operationName);
+      readUnlock(operationName, getLockReportInfoSupplier(null));
       cpUnlock();
     }
     if (saved) {
@@ -4864,7 +4898,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
         getFSImage().getStorage().setRestoreFailedStorage(val);
       }
     } finally {
-      writeUnlock(operationName);
+      writeUnlock(operationName, getLockReportInfoSupplier(null));
       cpUnlock();
     }
     logAuditEvent(true, operationName, null);
@@ -4885,7 +4919,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       checkOperation(OperationCategory.UNCHECKED);
       getFSImage().finalizeUpgrade(this.isHaEnabled() && inActiveState());
     } finally {
-      writeUnlock(operationName);
+      writeUnlock(operationName, getLockReportInfoSupplier(null));
       cpUnlock();
     }
     logAuditEvent(true, operationName, null);
@@ -5011,7 +5045,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       NameNode.stateChangeLog.info("STATE* Safe mode is ON.\n" +
           getSafeModeTip());
     } finally {
-      writeUnlock("enterSafeMode");
+      writeUnlock("enterSafeMode", getLockReportInfoSupplier(null));
     }
   }
 
@@ -5031,7 +5065,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
         startSecretManagerIfNecessary();
       }
     } finally {
-      writeUnlock("leaveSafeMode");
+      writeUnlock("leaveSafeMode", getLockReportInfoSupplier(null));
     }
   }
 
@@ -5078,7 +5112,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       }
       result = getFSImage().rollEditLog(getEffectiveLayoutVersion());
     } finally {
-      writeUnlock(operationName);
+      writeUnlock(operationName, getLockReportInfoSupplier(null));
     }
     logAuditEvent(true, operationName, null);
     return result;
@@ -6016,7 +6050,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
   Token<DelegationTokenIdentifier> getDelegationToken(Text renewer)
       throws IOException {
     final String operationName = "getDelegationToken";
-    final String tokenId;
+    String tokenId = null;
     Token<DelegationTokenIdentifier> token;
     checkOperation(OperationCategory.WRITE);
     writeLock();
@@ -6047,7 +6081,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       getEditLog().logGetDelegationToken(dtId, expiryTime);
       tokenId = dtId.toStringStable();
     } finally {
-      writeUnlock(operationName);
+      writeUnlock(operationName, getLockReportInfoSupplier(tokenId));
     }
     getEditLog().logSync();
     logAuditEvent(true, operationName, tokenId);
@@ -6064,7 +6098,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
   long renewDelegationToken(Token<DelegationTokenIdentifier> token)
       throws InvalidToken, IOException {
     final String operationName = "renewDelegationToken";
-    String tokenId;
+    String tokenId = null;
     long expiryTime;
     checkOperation(OperationCategory.WRITE);
     try {
@@ -6085,7 +6119,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
         getEditLog().logRenewDelegationToken(id, expiryTime);
         tokenId = id.toStringStable();
       } finally {
-        writeUnlock(operationName);
+        writeUnlock(operationName, getLockReportInfoSupplier(tokenId));
       }
     } catch (AccessControlException ace) {
       final DelegationTokenIdentifier id = DFSUtil.decodeDelegationToken(token);
@@ -6106,7 +6140,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
   void cancelDelegationToken(Token<DelegationTokenIdentifier> token)
       throws IOException {
     final String operationName = "cancelDelegationToken";
-    String tokenId;
+    String tokenId = null;
     checkOperation(OperationCategory.WRITE);
     try {
       writeLock();
@@ -6119,7 +6153,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
         getEditLog().logCancelDelegationToken(id);
         tokenId = id.toStringStable();
       } finally {
-        writeUnlock(operationName);
+        writeUnlock(operationName, getLockReportInfoSupplier(tokenId));
       }
     } catch (AccessControlException ace) {
       final DelegationTokenIdentifier id = DFSUtil.decodeDelegationToken(token);
@@ -6831,7 +6865,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       checkNameNodeSafeMode("Cannot allow snapshot for " + path);
       FSDirSnapshotOp.allowSnapshot(dir, snapshotManager, path);
     } finally {
-      writeUnlock(operationName);
+      writeUnlock(operationName, getLockReportInfoSupplier(path));
     }
     getEditLog().logSync();
     logAuditEvent(true, operationName, path, null, null);
@@ -6848,7 +6882,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       checkNameNodeSafeMode("Cannot disallow snapshot for " + path);
       FSDirSnapshotOp.disallowSnapshot(dir, snapshotManager, path);
     } finally {
-      writeUnlock(operationName);
+      writeUnlock(operationName, getLockReportInfoSupplier(path));
     }
     getEditLog().logSync();
     logAuditEvent(true, operationName, path, null, null);
@@ -6874,7 +6908,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
         snapshotPath = FSDirSnapshotOp.createSnapshot(dir, pc,
             snapshotManager, snapshotRoot, snapshotName, logRetryCache);
       } finally {
-        writeUnlock(operationName);
+        writeUnlock(operationName, getLockReportInfoSupplier(snapshotRoot));
       }
     } catch (AccessControlException ace) {
       logAuditEvent(false, operationName, snapshotRoot);
@@ -6911,7 +6945,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
         FSDirSnapshotOp.renameSnapshot(dir, pc, snapshotManager, path,
             snapshotOldName, snapshotNewName, logRetryCache);
       } finally {
-        writeUnlock(operationName);
+        writeUnlock(operationName, getLockReportInfoSupplier(oldSnapshotRoot,
+            newSnapshotRoot));
       }
     } catch (AccessControlException ace) {
       logAuditEvent(false, operationName, oldSnapshotRoot,
@@ -6944,7 +6979,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
         status = FSDirSnapshotOp.getSnapshottableDirListing(dir, pc,
             snapshotManager);
       } finally {
-        readUnlock(operationName);
+        readUnlock(operationName, getLockReportInfoSupplier(null));
       }
     } catch (AccessControlException ace) {
       logAuditEvent(false, operationName, null, null, null);
@@ -6989,7 +7024,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
         diffs = FSDirSnapshotOp.getSnapshotDiffReport(dir, pc, snapshotManager,
             path, fromSnapshot, toSnapshot);
       } finally {
-        readUnlock(operationName);
+        readUnlock(operationName, getLockReportInfoSupplier(fromSnapshotRoot,
+            toSnapshotRoot));
       }
     } catch (AccessControlException ace) {
       logAuditEvent(false, operationName, fromSnapshotRoot,
@@ -7064,7 +7100,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
                 fromSnapshot, toSnapshot, startPath, index,
                 snapshotDiffReportLimit);
       } finally {
-        readUnlock(operationName);
+        readUnlock(operationName, getLockReportInfoSupplier(fromSnapshotRoot,
+            toSnapshotRoot));
       }
     } catch (AccessControlException ace) {
       logAuditEvent(false, operationName, fromSnapshotRoot, toSnapshotRoot,
@@ -7100,7 +7137,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
         blocksToBeDeleted = FSDirSnapshotOp.deleteSnapshot(dir, pc,
             snapshotManager, snapshotRoot, snapshotName, logRetryCache);
       } finally {
-        writeUnlock(operationName);
+        writeUnlock(operationName, getLockReportInfoSupplier(rootPath));
       }
     } catch (AccessControlException ace) {
       logAuditEvent(false, operationName, rootPath, null, null);
@@ -7140,7 +7177,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       boolean hasRollbackImage = this.getFSImage().hasRollbackFSImage();
       rollingUpgradeInfo.setCreatedRollbackImages(hasRollbackImage);
     } finally {
-      readUnlock(operationName);
+      readUnlock(operationName, getLockReportInfoSupplier(null));
     }
     logAuditEvent(true, operationName, null, null, null);
     return rollingUpgradeInfo;
@@ -7170,7 +7207,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
         getFSImage().rollEditLog(getEffectiveLayoutVersion());
       }
     } finally {
-      writeUnlock(operationName);
+      writeUnlock(operationName, getLockReportInfoSupplier(null));
     }
 
     getEditLog().logSync();
@@ -7359,7 +7396,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       getFSImage().renameCheckpoint(NameNodeFile.IMAGE_ROLLBACK,
           NameNodeFile.IMAGE);
     } finally {
-      writeUnlock(operationName);
+      writeUnlock(operationName, getLockReportInfoSupplier(null));
     }
 
     if (!haEnabled) {
@@ -7394,7 +7431,10 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
         effectiveDirective = FSNDNCacheOp.addCacheDirective(this, cacheManager,
             directive, flags, logRetryCache);
       } finally {
-        writeUnlock(operationName);
+        effectiveDirectiveStr = effectiveDirective != null ?
+            effectiveDirective.toString() : null;
+        writeUnlock(operationName,
+            getLockReportInfoSupplier(effectiveDirectiveStr));
       }
     } catch (AccessControlException ace) {
       logAuditEvent(false, operationName, null);
@@ -7423,7 +7463,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
         FSNDNCacheOp.modifyCacheDirective(this, cacheManager, directive, flags,
             logRetryCache);
       } finally {
-        writeUnlock(operationName);
+        writeUnlock(operationName,
+            getLockReportInfoSupplier(idStr, directive.toString()));
       }
     } catch (AccessControlException ace) {
       logAuditEvent(false, operationName, idStr,
@@ -7448,7 +7489,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
         FSNDNCacheOp.removeCacheDirective(this, cacheManager, id,
             logRetryCache);
       } finally {
-        writeUnlock(operationName);
+        writeUnlock(operationName, getLockReportInfoSupplier(idStr));
       }
     } catch (AccessControlException ace) {
       logAuditEvent(false, operationName, idStr, null, null);
@@ -7472,7 +7513,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
         results = FSNDNCacheOp.listCacheDirectives(this, cacheManager, startId,
             filter);
       } finally {
-        readUnlock(operationName);
+        readUnlock(operationName,
+            getLockReportInfoSupplier(filter.toString()));
       }
     } catch (AccessControlException ace) {
       logAuditEvent(false, operationName, filter.toString());
@@ -7498,7 +7540,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
             logRetryCache);
         poolInfoStr = info.toString();
       } finally {
-        writeUnlock(operationName);
+        writeUnlock(operationName, getLockReportInfoSupplier(poolInfoStr));
       }
     } catch (AccessControlException ace) {
       logAuditEvent(false, operationName, poolInfoStr);
@@ -7523,7 +7565,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
             + (req == null ? null : req.getPoolName()));
         FSNDNCacheOp.modifyCachePool(this, cacheManager, req, logRetryCache);
       } finally {
-        writeUnlock(operationName);
+        writeUnlock(operationName, getLockReportInfoSupplier(poolNameStr,
+            req == null ? null : req.toString()));
       }
     } catch (AccessControlException ace) {
       logAuditEvent(false, operationName, poolNameStr,
@@ -7549,7 +7592,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
         FSNDNCacheOp.removeCachePool(this, cacheManager, cachePoolName,
             logRetryCache);
       } finally {
-        writeUnlock(operationName);
+        writeUnlock(operationName, getLockReportInfoSupplier(poolNameStr));
       }
     } catch (AccessControlException ace) {
       logAuditEvent(false, operationName, poolNameStr);
@@ -7572,7 +7615,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
         checkOperation(OperationCategory.READ);
         results = FSNDNCacheOp.listCachePools(this, cacheManager, prevKey);
       } finally {
-        readUnlock(operationName);
+        readUnlock(operationName, getLockReportInfoSupplier(null));
       }
     } catch (AccessControlException ace) {
       logAuditEvent(false, operationName, null);
@@ -7596,7 +7639,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
         checkNameNodeSafeMode("Cannot modify ACL entries on " + src);
         auditStat = FSDirAclOp.modifyAclEntries(dir, pc, src, aclSpec);
       } finally {
-        writeUnlock(operationName);
+        writeUnlock(operationName,
+            getLockReportInfoSupplier(src, null, auditStat));
       }
     } catch (AccessControlException e) {
       logAuditEvent(false, operationName, src);
@@ -7620,7 +7664,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
         checkNameNodeSafeMode("Cannot remove ACL entries on " + src);
         auditStat = FSDirAclOp.removeAclEntries(dir, pc, src, aclSpec);
       } finally {
-        writeUnlock(operationName);
+        writeUnlock(operationName,
+            getLockReportInfoSupplier(src, null, auditStat));
       }
     } catch (AccessControlException e) {
       logAuditEvent(false, operationName, src);
@@ -7643,7 +7688,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
         checkNameNodeSafeMode("Cannot remove default ACL entries on " + src);
         auditStat = FSDirAclOp.removeDefaultAcl(dir, pc, src);
       } finally {
-        writeUnlock(operationName);
+        writeUnlock(operationName,
+            getLockReportInfoSupplier(src, null, auditStat));
       }
     } catch (AccessControlException e) {
       logAuditEvent(false, operationName, src);
@@ -7666,7 +7712,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
         checkNameNodeSafeMode("Cannot remove ACL on " + src);
         auditStat = FSDirAclOp.removeAcl(dir, pc, src);
       } finally {
-        writeUnlock(operationName);
+        writeUnlock(operationName,
+            getLockReportInfoSupplier(src, null, auditStat));
       }
     } catch (AccessControlException e) {
       logAuditEvent(false, operationName, src);
@@ -7689,7 +7736,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
         checkNameNodeSafeMode("Cannot set ACL on " + src);
         auditStat = FSDirAclOp.setAcl(dir, pc, src, aclSpec);
       } finally {
-        writeUnlock(operationName);
+        writeUnlock(operationName,
+            getLockReportInfoSupplier(src, null, auditStat));
       }
     } catch (AccessControlException e) {
       logAuditEvent(false, operationName, src);
@@ -7711,7 +7759,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
         checkOperation(OperationCategory.READ);
         ret = FSDirAclOp.getAclStatus(dir, pc, src);
       } finally {
-        readUnlock(operationName);
+        readUnlock(operationName, getLockReportInfoSupplier(src));
       }
     } catch(AccessControlException ace) {
       logAuditEvent(false, operationName, src);
@@ -7736,7 +7784,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       boolean logRetryCache) throws IOException, UnresolvedLinkException,
           SafeModeException, AccessControlException {
     final String operationName = "createEncryptionZone";
-    final FileStatus resultingStat;
+    FileStatus resultingStat = null;
     try {
       Metadata metadata = FSDirEncryptionZoneOp.ensureKeyIsInitialized(dir,
           keyName, src);
@@ -7751,7 +7799,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
         resultingStat = FSDirEncryptionZoneOp.createEncryptionZone(dir, src,
             pc, metadata.getCipher(), keyName, logRetryCache);
       } finally {
-        writeUnlock(operationName);
+        writeUnlock(operationName,
+            getLockReportInfoSupplier(src, null, resultingStat));
       }
     } catch (AccessControlException e) {
       logAuditEvent(false, operationName, src);
@@ -7786,7 +7835,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
         resultingStat = ezForPath.getValue();
         encryptionZone = ezForPath.getKey();
       } finally {
-        readUnlock(operationName);
+        readUnlock(operationName,
+            getLockReportInfoSupplier(srcArg, null, resultingStat));
       }
     } catch (AccessControlException ace) {
       logAuditEvent(false, operationName, srcArg, null, resultingStat);
@@ -7812,7 +7862,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       success = true;
       return ret;
     } finally {
-      readUnlock(operationName);
+      readUnlock(operationName, getLockReportInfoSupplier(null));
       logAuditEvent(success, operationName, null);
     }
   }
@@ -7850,7 +7900,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       success = true;
       return ret;
     } finally {
-      readUnlock(operationName);
+      readUnlock(operationName, getLockReportInfoSupplier(null));
       logAuditEvent(success, operationName, null);
     }
   }
@@ -7905,7 +7955,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
         getEditLog().logSetXAttrs(zone, xattrs, logRetryCache);
       }
     } finally {
-      writeUnlock();
+      writeUnlock(action + "reencryption", getLockReportInfoSupplier(zone));
     }
     getEditLog().logSync();
   }
@@ -7938,7 +7988,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       logAuditEvent(false, operationName, srcArg);
       throw ace;
     } finally {
-      writeUnlock(operationName);
+      writeUnlock(operationName,
+          getLockReportInfoSupplier(srcArg, null, resultingStat));
     }
     getEditLog().logSync();
     logAuditEvent(true, operationName, srcArg, null, resultingStat);
@@ -7976,7 +8027,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
         }
       }
     } finally {
-      writeUnlock(operationName);
+      writeUnlock(operationName,
+          getLockReportInfoSupplier(addECPolicyNames.toString()));
     }
     getEditLog().logSync();
     logAuditEvent(true, operationName, addECPolicyNames.toString());
@@ -8003,7 +8055,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       FSDirErasureCodingOp.removeErasureCodingPolicy(this, ecPolicyName,
           logRetryCache);
     } finally {
-      writeUnlock(operationName);
+      writeUnlock(operationName, getLockReportInfoSupplier(ecPolicyName));
     }
     getEditLog().logSync();
     logAuditEvent(true, operationName, ecPolicyName, null, null);
@@ -8032,7 +8084,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
         success = FSDirErasureCodingOp.enableErasureCodingPolicy(this,
             ecPolicyName, logRetryCache);
       } finally {
-        writeUnlock(operationName);
+        writeUnlock(operationName, getLockReportInfoSupplier(ecPolicyName));
       }
     } catch (AccessControlException ace) {
       logAuditEvent(false, operationName, ecPolicyName);
@@ -8068,7 +8120,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
         success = FSDirErasureCodingOp.disableErasureCodingPolicy(this,
             ecPolicyName, logRetryCache);
       } finally {
-        writeUnlock(operationName);
+        writeUnlock(operationName, getLockReportInfoSupplier(ecPolicyName));
       }
     } catch (AccessControlException ace) {
       logAuditEvent(false, operationName, ecPolicyName);
@@ -8104,7 +8156,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       resultingStat = FSDirErasureCodingOp.unsetErasureCodingPolicy(this,
           srcArg, pc, logRetryCache);
     } finally {
-      writeUnlock(operationName);
+      writeUnlock(operationName,
+          getLockReportInfoSupplier(srcArg, null, resultingStat));
     }
     getEditLog().logSync();
     logAuditEvent(true, operationName, srcArg, null, resultingStat);
@@ -8146,14 +8199,14 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
             .getECTopologyVerifierResult(numOfRacks, numOfDataNodes, policies);
       }
     } finally {
-      readUnlock();
+      readUnlock(operationName, getLockReportInfoSupplier(null));
     }
     logAuditEvent(true, operationName, null);
     return result;
   }
 
   /**
-   * Get the erasure coding policy information for specified path
+   * Get the erasure coding policy information for specified path.
    */
   ErasureCodingPolicy getErasureCodingPolicy(String src)
       throws AccessControlException, UnresolvedLinkException, IOException {
@@ -8171,7 +8224,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       success = true;
       return ret;
     } finally {
-      readUnlock(operationName);
+      readUnlock(operationName, getLockReportInfoSupplier(src));
       logAuditEvent(success, operationName, src);
     }
   }
@@ -8192,7 +8245,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       success = true;
       return ret;
     } finally {
-      readUnlock(operationName);
+      readUnlock(operationName, getLockReportInfoSupplier(null));
       logAuditEvent(success, operationName, null);
     }
   }
@@ -8213,7 +8266,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       success = true;
       return ret;
     } finally {
-      readUnlock(operationName);
+      readUnlock(operationName, getLockReportInfoSupplier(null));
       logAuditEvent(success, operationName, null);
     }
   }
@@ -8234,7 +8287,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
         auditStat = FSDirXAttrOp.setXAttr(dir, pc, src, xAttr, flag,
             logRetryCache);
       } finally {
-        writeUnlock(operationName);
+        writeUnlock(operationName,
+            getLockReportInfoSupplier(src, null, auditStat));
       }
     } catch (AccessControlException e) {
       logAuditEvent(false, operationName, src);
@@ -8257,7 +8311,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
         checkOperation(OperationCategory.READ);
         fsXattrs = FSDirXAttrOp.getXAttrs(dir, pc, src, xAttrs);
       } finally {
-        readUnlock(operationName);
+        readUnlock(operationName, getLockReportInfoSupplier(src));
       }
     } catch (AccessControlException e) {
       logAuditEvent(false, operationName, src);
@@ -8279,7 +8333,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
         checkOperation(OperationCategory.READ);
         fsXattrs = FSDirXAttrOp.listXAttrs(dir, pc, src);
       } finally {
-        readUnlock(operationName);
+        readUnlock(operationName, getLockReportInfoSupplier(src));
       }
     } catch (AccessControlException e) {
       logAuditEvent(false, operationName, src);
@@ -8304,7 +8358,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
         auditStat = FSDirXAttrOp.removeXAttr(dir, pc, src, xAttr,
             logRetryCache);
       } finally {
-        writeUnlock(operationName);
+        writeUnlock(operationName,
+            getLockReportInfoSupplier(src, null, auditStat));
       }
     } catch (AccessControlException e) {
       logAuditEvent(false, operationName, src);
@@ -8356,7 +8411,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
           dir.checkPathAccess(pc, iip, mode);
         }
       } finally {
-        readUnlock(operationName);
+        readUnlock(operationName, getLockReportInfoSupplier(src));
       }
     } catch (AccessControlException e) {
       logAuditEvent(false, operationName, src);
@@ -8365,6 +8420,56 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     logAuditEvent(true, operationName, src);
   }
 
+  private Supplier<String> getLockReportInfoSupplier(String src) {
+    return getLockReportInfoSupplier(src, null);
+  }
+
+  private Supplier<String> getLockReportInfoSupplier(String src, String dst) {
+    return getLockReportInfoSupplier(src, dst, (FileStatus) null);
+  }
+
+  private Supplier<String> getLockReportInfoSupplier(String src, String dst,
+      HdfsFileStatus stat) {
+    FileStatus status = null;
+    if (stat != null) {
+      Path symlink = stat.isSymlink()
+          ? new Path(DFSUtilClient.bytes2String(stat.getSymlinkInBytes()))
+          : null;
+      Path path = new Path(src);
+      status = new FileStatus(stat.getLen(), stat.isDirectory(),
+        stat.getReplication(), stat.getBlockSize(),
+        stat.getModificationTime(),
+        stat.getAccessTime(), stat.getPermission(), stat.getOwner(),
+        stat.getGroup(), symlink, path);
+    }
+    return getLockReportInfoSupplier(src, dst, status);
+  }
+
+  private Supplier<String> getLockReportInfoSupplier(String src, String dst,
+      FileStatus status) {
+    return () -> {
+      UserGroupInformation ugi = Server.getRemoteUser();
+      String userName = ugi != null ? ugi.toString() : null;
+      InetAddress addr = Server.getRemoteIp();
+      StringBuilder sb = new StringBuilder();
+      String s = escapeJava(src);
+      String d = escapeJava(dst);
+      sb.append("ugi=").append(userName).append(",")
+          .append("ip=").append(addr).append(",")
+          .append("src=").append(s).append(",")
+          .append("dst=").append(d).append(",");
+      if (null == status) {
+        sb.append("perm=null");
+      } else {
+        sb.append("perm=")
+            .append(status.getOwner()).append(":")
+            .append(status.getGroup()).append(":")
+            .append(status.getPermission());
+      }
+      return sb.toString();
+    };
+  }
+
   /**
    * FSNamesystem Default AuditLogger implementation;used when no access logger
    * is defined in the config file. It can also be explicitly listed in the
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystemLock.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystemLock.java
index b87117c..8ed17bc 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystemLock.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystemLock.java
@@ -24,6 +24,7 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Supplier;
 
 import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.conf.Configuration;
@@ -106,8 +107,8 @@ class FSNamesystemLock {
    * lock was held since the last report.
    */
   private final AtomicReference<LockHeldInfo> longestReadLockHeldInfo =
-      new AtomicReference<>(new LockHeldInfo(0, 0, null));
-  private LockHeldInfo longestWriteLockHeldInfo = new LockHeldInfo(0, 0, null);
+      new AtomicReference<>(new LockHeldInfo());
+  private LockHeldInfo longestWriteLockHeldInfo = new LockHeldInfo();
 
   @VisibleForTesting
   static final String OP_NAME_OTHER = "OTHER";
@@ -159,10 +160,15 @@ class FSNamesystemLock {
   }
 
   public void readUnlock() {
-    readUnlock(OP_NAME_OTHER);
+    readUnlock(OP_NAME_OTHER, null);
   }
 
   public void readUnlock(String opName) {
+    readUnlock(opName, null);
+  }
+
+  public void readUnlock(String opName,
+      Supplier<String> lockReportInfoSupplier) {
     final boolean needReport = coarseLock.getReadHoldCount() == 1;
     final long readLockIntervalNanos =
         timer.monotonicNowNanos() - readLockHeldTimeStampNanos.get();
@@ -176,13 +182,25 @@ class FSNamesystemLock {
     final long readLockIntervalMs =
         TimeUnit.NANOSECONDS.toMillis(readLockIntervalNanos);
     if (needReport && readLockIntervalMs >= this.readLockReportingThresholdMs) {
-      LockHeldInfo localLockHeldInfo;
-      do {
-        localLockHeldInfo = longestReadLockHeldInfo.get();
-      } while (localLockHeldInfo.getIntervalMs() - readLockIntervalMs < 0 &&
-          !longestReadLockHeldInfo.compareAndSet(localLockHeldInfo,
+      String lockReportInfo = null;
+      boolean done = false;
+      while (!done) {
+        LockHeldInfo localLockHeldInfo = longestReadLockHeldInfo.get();
+        if (localLockHeldInfo.getIntervalMs() <= readLockIntervalMs) {
+          if (lockReportInfo == null) {
+            lockReportInfo = lockReportInfoSupplier != null ? " (" +
+                lockReportInfoSupplier.get() + ")" : "";
+          }
+          if (longestReadLockHeldInfo.compareAndSet(localLockHeldInfo,
               new LockHeldInfo(currentTimeMs, readLockIntervalMs,
-                  StringUtils.getStackTrace(Thread.currentThread()))));
+              StringUtils.getStackTrace(Thread.currentThread()), opName,
+              lockReportInfo))) {
+            done = true;
+          }
+        } else {
+          done = true;
+        }
+      }
 
       long localTimeStampOfLastReadLockReport;
       long nowMs;
@@ -199,12 +217,13 @@ class FSNamesystemLock {
           localTimeStampOfLastReadLockReport, nowMs));
       int numSuppressedWarnings = numReadLockWarningsSuppressed.getAndSet(0);
       LockHeldInfo lockHeldInfo =
-          longestReadLockHeldInfo.getAndSet(new LockHeldInfo(0, 0, null));
+          longestReadLockHeldInfo.getAndSet(new LockHeldInfo());
       FSNamesystem.LOG.info(
           "\tNumber of suppressed read-lock reports: {}"
-              + "\n\tLongest read-lock held at {} for {}ms via {}",
+              + "\n\tLongest read-lock held at {} for {}ms by {}{} via {}",
           numSuppressedWarnings, Time.formatTime(lockHeldInfo.getStartTimeMs()),
-          lockHeldInfo.getIntervalMs(), lockHeldInfo.getStackTrace());
+          lockHeldInfo.getIntervalMs(), lockHeldInfo.getOpName(),
+          lockHeldInfo.getLockReportInfo(), lockHeldInfo.getStackTrace());
     }
   }
   
@@ -218,20 +237,44 @@ class FSNamesystemLock {
 
   /**
    * Unlocks FSNameSystem write lock. This internally calls {@link
-   * FSNamesystemLock#writeUnlock(String, boolean)}
+   * FSNamesystemLock#writeUnlock(String, boolean, Supplier)}
    */
   public void writeUnlock() {
-    writeUnlock(OP_NAME_OTHER, false);
+    writeUnlock(OP_NAME_OTHER, false, null);
   }
 
   /**
    * Unlocks FSNameSystem write lock. This internally calls {@link
-   * FSNamesystemLock#writeUnlock(String, boolean)}
+   * FSNamesystemLock#writeUnlock(String, boolean, Supplier)}
    *
    * @param opName Operation name.
    */
   public void writeUnlock(String opName) {
-    writeUnlock(opName, false);
+    writeUnlock(opName, false, null);
+  }
+
+  /**
+   * Unlocks FSNameSystem write lock. This internally calls {@link
+   * FSNamesystemLock#writeUnlock(String, boolean, Supplier)}
+   *
+   * @param opName Operation name.
+   * @param lockReportInfoSupplier The info shown in the lock report
+   */
+  public void writeUnlock(String opName,
+      Supplier<String> lockReportInfoSupplier) {
+    writeUnlock(opName, false, lockReportInfoSupplier);
+  }
+
+  /**
+   * Unlocks FSNameSystem write lock. This internally calls {@link
+   * FSNamesystemLock#writeUnlock(String, boolean, Supplier)}
+   *
+   * @param opName Operation name.
+   * @param suppressWriteLockReport When false, event of write lock being held
+   * for long time will be logged in logs and metrics.
+   */
+  public void writeUnlock(String opName, boolean suppressWriteLockReport) {
+    writeUnlock(opName, suppressWriteLockReport, null);
   }
 
   /**
@@ -240,8 +283,10 @@ class FSNamesystemLock {
    * @param opName Operation name
    * @param suppressWriteLockReport When false, event of write lock being held
    * for long time will be logged in logs and metrics.
+   * @param lockReportInfoSupplier The info shown in the lock report
    */
-  public void writeUnlock(String opName, boolean suppressWriteLockReport) {
+  private void writeUnlock(String opName, boolean suppressWriteLockReport,
+      Supplier<String> lockReportInfoSupplier) {
     final boolean needReport = !suppressWriteLockReport && coarseLock
         .getWriteHoldCount() == 1 && coarseLock.isWriteLockedByCurrentThread();
     final long writeLockIntervalNanos =
@@ -253,10 +298,13 @@ class FSNamesystemLock {
     LogAction logAction = LogThrottlingHelper.DO_NOT_LOG;
     if (needReport &&
         writeLockIntervalMs >= this.writeLockReportingThresholdMs) {
-      if (longestWriteLockHeldInfo.getIntervalMs() < writeLockIntervalMs) {
-        longestWriteLockHeldInfo =
-            new LockHeldInfo(currentTimeMs, writeLockIntervalMs,
-                StringUtils.getStackTrace(Thread.currentThread()));
+      if (longestWriteLockHeldInfo.getIntervalMs() <= writeLockIntervalMs) {
+        String lockReportInfo = lockReportInfoSupplier != null ? " (" +
+            lockReportInfoSupplier.get() + ")" : "";
+        longestWriteLockHeldInfo = new LockHeldInfo(currentTimeMs,
+            writeLockIntervalMs,
+            StringUtils.getStackTrace(Thread.currentThread()), opName,
+            lockReportInfo);
       }
 
       logAction = writeLockReportLogger
@@ -265,7 +313,7 @@ class FSNamesystemLock {
 
     LockHeldInfo lockHeldInfo = longestWriteLockHeldInfo;
     if (logAction.shouldLog()) {
-      longestWriteLockHeldInfo = new LockHeldInfo(0, 0, null);
+      longestWriteLockHeldInfo = new LockHeldInfo();
     }
 
     coarseLock.writeLock().unlock();
@@ -277,11 +325,12 @@ class FSNamesystemLock {
     if (logAction.shouldLog()) {
       FSNamesystem.LOG.info(
           "\tNumber of suppressed write-lock reports: {}"
-              + "\n\tLongest write-lock held at {} for {}ms via {}"
+              + "\n\tLongest write-lock held at {} for {}ms by {}{} via {}"
               + "\n\tTotal suppressed write-lock held time: {}",
           logAction.getCount() - 1,
           Time.formatTime(lockHeldInfo.getStartTimeMs()),
-          lockHeldInfo.getIntervalMs(), lockHeldInfo.getStackTrace(),
+          lockHeldInfo.getIntervalMs(), lockHeldInfo.getOpName(),
+          lockHeldInfo.getLockReportInfo(), lockHeldInfo.getStackTrace(),
           logAction.getStats(0).getSum() - lockHeldInfo.getIntervalMs());
     }
   }
@@ -383,16 +432,31 @@ class FSNamesystemLock {
    */
   private static class LockHeldInfo {
     /** Lock held start time. */
-    private Long startTimeMs;
+    private final Long startTimeMs;
     /** Lock held time. */
-    private Long intervalMs;
+    private final Long intervalMs;
     /** The stack trace lock was held. */
-    private String stackTrace;
+    private final String stackTrace;
+    /** The operation name. */
+    private final String opName;
+    /** The info shown in a lock report. */
+    private final String lockReportInfo;
+
+    LockHeldInfo() {
+      this.startTimeMs = 0L;
+      this.intervalMs = 0L;
+      this.stackTrace = null;
+      this.opName = null;
+      this.lockReportInfo = null;
+    }
 
-    LockHeldInfo(long startTimeMs, long intervalMs, String stackTrace) {
+    LockHeldInfo(long startTimeMs, long intervalMs, String stackTrace,
+        String opName, String lockReportInfo) {
       this.startTimeMs = startTimeMs;
       this.intervalMs = intervalMs;
       this.stackTrace = stackTrace;
+      this.opName = opName;
+      this.lockReportInfo = lockReportInfo;
     }
 
     public Long getStartTimeMs() {
@@ -406,5 +470,13 @@ class FSNamesystemLock {
     public String getStackTrace() {
       return this.stackTrace;
     }
+
+    public String getOpName() {
+      return opName;
+    }
+
+    public String getLockReportInfo() {
+      return lockReportInfo;
+    }
   }
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSNamesystemLockReport.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSNamesystemLockReport.java
new file mode 100644
index 0000000..ef1ed9b
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSNamesystemLockReport.java
@@ -0,0 +1,196 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.namenode;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.LoggerFactory;
+
+import java.security.PrivilegedExceptionAction;
+
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_LOCK_SUPPRESS_WARNING_INTERVAL_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_READ_LOCK_REPORTING_THRESHOLD_MS_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_WRITE_LOCK_REPORTING_THRESHOLD_MS_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROUP_KEY;
+import static org.junit.Assert.assertTrue;
+
+public class TestFSNamesystemLockReport {
+
+  @FunctionalInterface
+  private interface SupplierWithException<T> {
+    T get() throws Exception;
+  }
+
+  @FunctionalInterface
+  private interface Procedure {
+    void invoke() throws Exception;
+  }
+
+  private Configuration conf;
+  private MiniDFSCluster cluster;
+  private FileSystem fs;
+  private UserGroupInformation userGroupInfo;
+  private GenericTestUtils.LogCapturer logs;
+
+  @Before
+  public void setUp() throws Exception {
+    conf = new HdfsConfiguration();
+    conf.set(DFS_PERMISSIONS_SUPERUSERGROUP_KEY, "hadoop");
+
+    // Make the lock report always shown
+    conf.setLong(DFS_NAMENODE_READ_LOCK_REPORTING_THRESHOLD_MS_KEY, 0);
+    conf.setLong(DFS_NAMENODE_WRITE_LOCK_REPORTING_THRESHOLD_MS_KEY, 0);
+    conf.setLong(DFS_LOCK_SUPPRESS_WARNING_INTERVAL_KEY, 0);
+
+    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(4).build();
+    fs = cluster.getFileSystem();
+
+    userGroupInfo = UserGroupInformation.createUserForTesting("bob",
+        new String[] {"hadoop"});
+
+    logs = GenericTestUtils.LogCapturer.captureLogs(FSNamesystem.LOG);
+    GenericTestUtils
+        .setLogLevel(LoggerFactory.getLogger(FSNamesystem.class.getName()),
+        org.slf4j.event.Level.INFO);
+  }
+
+  @After
+  public void cleanUp() throws Exception {
+    if (fs != null) {
+      fs.close();
+      fs = null;
+    }
+    if (cluster != null) {
+      cluster.shutdown();
+      cluster = null;
+    }
+  }
+
+  @Test
+  public void test() throws Exception {
+    FileSystem userfs = DFSTestUtil.getFileSystemAs(userGroupInfo, conf);
+
+    // The log output should contain "by create (ugi=bob (auth:SIMPLE),
+    // ip=/127.0.0.1,src=/file,dst=null,perm=bob:hadoop:rw-r--r--)"
+    FSDataOutputStream os = testLockReport(() ->
+        userfs.create(new Path("/file")),
+        ".* by create \\(ugi=bob \\(auth:SIMPLE\\)," +
+        "ip=/\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\.\\d{1,3},src=/file,dst=null," +
+        "perm=bob:hadoop:rw-r--r--\\) .*");
+    os.close();
+
+    // The log output should contain "by open (ugi=bob (auth:SIMPLE),
+    // ip=/127.0.0.1,src=/file,dst=null,perm=null)"
+    FSDataInputStream is = testLockReport(() -> userfs.open(new Path("/file")),
+        ".* by open \\(ugi=bob \\(auth:SIMPLE\\)," +
+        "ip=/\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\.\\d{1,3},src=/file,dst=null," +
+        "perm=null\\) .*");
+    is.close();
+
+    // The log output should contain "by setPermission (ugi=bob (auth:SIMPLE),
+    // ip=/127.0.0.1,src=/file,dst=null,perm=bob:hadoop:-w----r-T)"
+    testLockReport(() ->
+        userfs.setPermission(new Path("/file"), new FsPermission(644)),
+        ".* by setPermission \\(ugi=bob \\(auth:SIMPLE\\)," +
+        "ip=/\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\.\\d{1,3},src=/file,dst=null," +
+        "perm=bob:hadoop:-w----r-T\\) .*");
+
+    // The log output should contain "by setOwner (ugi=bob (auth:SIMPLE),
+    // ip=/127.0.0.1,src=/file,dst=null,perm=alice:group1:-w----r-T)"
+    testLockReport(() -> userfs.setOwner(new Path("/file"), "alice", "group1"),
+        ".* by setOwner \\(ugi=bob \\(auth:SIMPLE\\)," +
+        "ip=/\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\.\\d{1,3},src=/file,dst=null," +
+        "perm=alice:group1:-w----r-T\\) .*");
+
+    // The log output should contain "by listStatus (ugi=bob (auth:SIMPLE),
+    // ip=/127.0.0.1,src=/,dst=null,perm=null)"
+    testLockReport(() -> userfs.listStatus(new Path("/")),
+        ".* by listStatus \\(ugi=bob \\(auth:SIMPLE\\)," +
+        "ip=/\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\.\\d{1,3},src=/,dst=null," +
+        "perm=null\\) .*");
+
+    // The log output should contain "by getfileinfo (ugi=bob (auth:SIMPLE),
+    // ip=/127.0.0.1,src=/file,dst=null,perm=null)"
+    testLockReport(() -> userfs.getFileStatus(new Path("/file")),
+        ".* by getfileinfo \\(ugi=bob \\(auth:SIMPLE\\)," +
+        "ip=/\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\.\\d{1,3},src=/file,dst=null," +
+        "perm=null\\) .*");
+
+    // The log output should contain "by mkdirs (ugi=bob (auth:SIMPLE),
+    // ip=/127.0.0.1,src=/dir,dst=null,perm=bob:hadoop:rwxr-xr-x)"
+    testLockReport(() -> userfs.mkdirs(new Path("/dir")),
+        ".* by mkdirs \\(ugi=bob \\(auth:SIMPLE\\)," +
+        "ip=/\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\.\\d{1,3},src=/dir,dst=null," +
+        "perm=bob:hadoop:rwxr-xr-x\\) .*");
+
+    // The log output should contain "by delete (ugi=bob (auth:SIMPLE),
+    // ip=/127.0.0.1,src=/file2,dst=null,perm=null)"
+    testLockReport(() -> userfs.rename(new Path("/file"), new Path("/file2")),
+        ".* by rename \\(ugi=bob \\(auth:SIMPLE\\)," +
+        "ip=/\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\.\\d{1,3},src=/file,dst=/file2," +
+        "perm=alice:group1:-w----r-T\\) .*");
+
+    // The log output should contain "by rename (ugi=bob (auth:SIMPLE),
+    // ip=/127.0.0.1,src=/file,dst=/file2,perm=alice:group1:-w----r-T)"
+    testLockReport(() -> userfs.delete(new Path("/file2"), false),
+        ".* by delete \\(ugi=bob \\(auth:SIMPLE\\)," +
+        "ip=/\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\.\\d{1,3},src=/file2,dst=null," +
+        "perm=null\\) .*");
+  }
+
+  private void testLockReport(Procedure procedure,
+      String expectedLockReportRegex) throws Exception {
+    logs.clearOutput();
+    userGroupInfo.doAs((PrivilegedExceptionAction<Void>) () -> {
+      procedure.invoke();
+      return null;
+    });
+    assertTrue(matches(expectedLockReportRegex));
+  }
+
+  private <T> T testLockReport(SupplierWithException<T> supplier,
+      String expectedLockReportRegex) throws Exception {
+    logs.clearOutput();
+    T ret = userGroupInfo.doAs((PrivilegedExceptionAction<T>) supplier::get);
+    assertTrue(matches(expectedLockReportRegex));
+    return ret;
+  }
+
+  private boolean matches(String regex) {
+    // Check for each line
+    for (String line : logs.getOutput().split(System.lineSeparator())) {
+      if (line.matches(regex)) {
+        return true;
+      }
+    }
+    return false;
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org