You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by el...@apache.org on 2019/05/14 15:37:27 UTC

[hbase-filesystem] branch master updated: HBASE-22386 HBOSS: Limit depth that listing locks check for other locks

This is an automated email from the ASF dual-hosted git repository.

elserj pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase-filesystem.git


The following commit(s) were added to refs/heads/master by this push:
     new 074c852  HBASE-22386 HBOSS: Limit depth that listing locks check for other locks
074c852 is described below

commit 074c852bd590eec1646ee24ff39fcbc9ce4167ea
Author: Sean Mackrory <ma...@gmail.com>
AuthorDate: Tue May 14 11:36:18 2019 -0400

    HBASE-22386 HBOSS: Limit depth that listing locks check for other locks
    
    Signed-off-by: Josh Elser <el...@apache.org>
---
 .../hbase/oss/HBaseObjectStoreSemantics.java       |  18 +--
 .../hadoop/hbase/oss/sync/TreeLockManager.java     |  44 ++++---
 .../hadoop/hbase/oss/sync/ZKTreeLockManager.java   |  57 +++++----
 .../hbase/oss/sync/LocalTreeLockManager.java       |  41 ++++---
 .../hadoop/hbase/oss/sync/NullTreeLockManager.java |   7 +-
 .../hadoop/hbase/oss/sync/TestTreeLockManager.java | 133 +++++++++++++++++++++
 6 files changed, 233 insertions(+), 67 deletions(-)

diff --git a/hbase-oss/src/main/java/org/apache/hadoop/hbase/oss/HBaseObjectStoreSemantics.java b/hbase-oss/src/main/java/org/apache/hadoop/hbase/oss/HBaseObjectStoreSemantics.java
index f72d7e3..bebc37c 100644
--- a/hbase-oss/src/main/java/org/apache/hadoop/hbase/oss/HBaseObjectStoreSemantics.java
+++ b/hbase-oss/src/main/java/org/apache/hadoop/hbase/oss/HBaseObjectStoreSemantics.java
@@ -53,6 +53,7 @@ import org.apache.hadoop.hbase.oss.sync.AutoLock;
 import org.apache.hadoop.hbase.oss.sync.AutoLock.LockedFSDataOutputStream;
 import org.apache.hadoop.hbase.oss.sync.AutoLock.LockedRemoteIterator;
 import org.apache.hadoop.hbase.oss.sync.TreeLockManager;
+import org.apache.hadoop.hbase.oss.sync.TreeLockManager.Depth;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.util.Progressable;
 import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
@@ -533,7 +534,7 @@ public class HBaseObjectStoreSemantics extends FileSystem {
 
   public FileStatus[] listStatus(Path f) throws FileNotFoundException,
         IOException {
-    try (AutoLock l = sync.lockListing(f)) {
+    try (AutoLock l = sync.lockListing(f, Depth.DIRECTORY)) {
       return fs.listStatus(f);
     }
   }
@@ -547,21 +548,21 @@ public class HBaseObjectStoreSemantics extends FileSystem {
 
   public FileStatus[] listStatus(Path f, PathFilter filter)
         throws FileNotFoundException, IOException {
-    try (AutoLock l = sync.lockListing(f)) {
+    try (AutoLock l = sync.lockListing(f, Depth.DIRECTORY)) {
       return fs.listStatus(f, filter);
     }
   }
 
   public FileStatus[] listStatus(Path[] files)
         throws FileNotFoundException, IOException {
-    try (AutoLock l = sync.lockListings(files)) {
+    try (AutoLock l = sync.lockListings(files, Depth.DIRECTORY)) {
       return fs.listStatus(files);
     }
   }
 
   public FileStatus[] listStatus(Path[] files, PathFilter filter)
         throws FileNotFoundException, IOException {
-    try (AutoLock l = sync.lockListings(files)) {
+    try (AutoLock l = sync.lockListings(files, Depth.DIRECTORY)) {
       return fs.listStatus(files, filter);
     }
   }
@@ -579,7 +580,7 @@ public class HBaseObjectStoreSemantics extends FileSystem {
 
   public RemoteIterator<LocatedFileStatus> listLocatedStatus(final Path f)
         throws FileNotFoundException, IOException {
-    AutoLock lock = sync.lockListing(f);
+    AutoLock lock = sync.lockListing(f, Depth.DIRECTORY);
     try {
       RemoteIterator<LocatedFileStatus> iterator = fs.listLocatedStatus(f);
       return new LockedRemoteIterator<LocatedFileStatus>(iterator, lock);
@@ -591,7 +592,7 @@ public class HBaseObjectStoreSemantics extends FileSystem {
 
   public RemoteIterator<FileStatus> listStatusIterator(final Path p)
         throws FileNotFoundException, IOException {
-    AutoLock lock = sync.lockListing(p);
+    AutoLock lock = sync.lockListing(p, Depth.DIRECTORY);
     try {
       RemoteIterator<FileStatus> iterator = fs.listStatusIterator(p);
       return new LockedRemoteIterator<FileStatus>(iterator, lock);
@@ -604,7 +605,8 @@ public class HBaseObjectStoreSemantics extends FileSystem {
   public RemoteIterator<LocatedFileStatus> listFiles(
         final Path f, final boolean recursive)
         throws FileNotFoundException, IOException {
-    AutoLock lock = sync.lockListing(f);
+    Depth depth = recursive ? Depth.RECURSIVE : Depth.DIRECTORY;
+    AutoLock lock = sync.lockListing(f, depth);
     try {
       RemoteIterator<LocatedFileStatus> iterator = fs.listFiles(f, recursive);
       return new LockedRemoteIterator<LocatedFileStatus>(iterator, lock);
@@ -847,7 +849,7 @@ public class HBaseObjectStoreSemantics extends FileSystem {
 
   public Path createSnapshot(Path path, String snapshotName)
         throws IOException {
-    try (AutoLock l = sync.lockListing(path)) {
+    try (AutoLock l = sync.lockListing(path, Depth.RECURSIVE)) {
       return fs.createSnapshot(path, snapshotName);
     }
   }
diff --git a/hbase-oss/src/main/java/org/apache/hadoop/hbase/oss/sync/TreeLockManager.java b/hbase-oss/src/main/java/org/apache/hadoop/hbase/oss/sync/TreeLockManager.java
index feb5f16..73be990 100644
--- a/hbase-oss/src/main/java/org/apache/hadoop/hbase/oss/sync/TreeLockManager.java
+++ b/hbase-oss/src/main/java/org/apache/hadoop/hbase/oss/sync/TreeLockManager.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.oss.Constants;
+import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.yetus.audience.InterfaceStability;
 import org.slf4j.Logger;
@@ -46,6 +47,10 @@ public abstract class TreeLockManager {
   private static final Logger LOG =
         LoggerFactory.getLogger(TreeLockManager.class);
 
+  public static enum Depth {
+    DIRECTORY, RECURSIVE
+  }
+
   public static synchronized TreeLockManager get(FileSystem fs)
         throws IOException {
     Configuration conf = fs.getConf();
@@ -78,7 +83,7 @@ public abstract class TreeLockManager {
    * and filesystems, and assumes the URI scheme mapping is consistent
    * everywhere.
    */
-  private Path norm(Path path) {
+  public Path norm(Path path) {
     URI uri = fs.makeQualified(path).toUri();
     String uriScheme = uri.getScheme();
     String uriHost = uri.getHost();
@@ -96,6 +101,7 @@ public abstract class TreeLockManager {
    * Convenience function for calling norm on an array. Returned copy of the
    * array will also be sorted for deadlock avoidance.
    */
+  @VisibleForTesting
   private Path[] norm(Path[] paths) {
     Path[] newPaths = new Path[paths.length];
     for (int i = 0; i < paths.length; i++) {
@@ -177,7 +183,8 @@ public abstract class TreeLockManager {
    * @param p Path to check
    * @return True if a lock is found, false otherwise
    */
-  protected abstract boolean writeLockBelow(Path p) throws IOException;
+  @VisibleForTesting
+  public abstract boolean writeLockBelow(Path p, Depth depth) throws IOException;
 
   /**
    * Checks for the presence of a write lock on all child directories of the
@@ -186,7 +193,8 @@ public abstract class TreeLockManager {
    * @param p Path to check
    * @return True if a lock is found, false otherwise
    */
-  protected abstract boolean readLockBelow(Path p) throws IOException;
+  @VisibleForTesting
+  public abstract boolean readLockBelow(Path p, Depth depth) throws IOException;
 
   /**
    * Recursively cleans up locks that won't be used again.
@@ -224,13 +232,13 @@ public abstract class TreeLockManager {
    *
    * @param path Path to lock
    */
-  protected void treeWriteLock(Path p) throws IOException {
+  protected void treeWriteLock(Path p, Depth depth) throws IOException {
     int outerRetries = 0;
     do {
       int innerRetries = 0;
       do {
         // If there's already a write-lock above or below us in the tree, wait for it to leave
-        if (writeLockAbove(p) || writeLockBelow(p)) {
+        if (writeLockAbove(p) || writeLockBelow(p, depth)) {
           LOG.warn("Blocked on some parent write lock, waiting: {}", p);
           continue;
         }
@@ -239,7 +247,7 @@ public abstract class TreeLockManager {
       // Try obtain the write lock just for our node
       writeLock(p);
       // If there's now a write-lock above or below us in the tree, release and retry
-      if (writeLockAbove(p) || writeLockBelow(p)) {
+      if (writeLockAbove(p) || writeLockBelow(p, depth)) {
         LOG.warn("Blocked on some other write lock, retrying: {}", p);
         writeUnlock(p);
         continue;
@@ -250,7 +258,7 @@ public abstract class TreeLockManager {
     // Once we know we're the only write-lock in our path, drain all read-locks below
     int drainReadLocksRetries = 0;
     do {
-      if (readLockBelow(p)) {
+      if (readLockBelow(p, depth)) {
         LOG.warn("Blocked on some child read lock, writing: {}", p);
         continue;
       }
@@ -303,7 +311,9 @@ public abstract class TreeLockManager {
   public AutoLock lockWrite(Path rawPath) throws IOException {
     Path path = norm(rawPath);
     LOG.debug("About to lock for create / write: {}", rawPath);
-    treeWriteLock(path);
+    // Depth should not matter here, as this is only called on files, not
+    // directories.
+    treeWriteLock(path, Depth.RECURSIVE);
     return new AutoLock() {
       public void close() throws IOException {
         LOG.debug("About to unlock after create / write: {}", path);
@@ -323,7 +333,7 @@ public abstract class TreeLockManager {
   public AutoLock lockDelete(Path rawPath) throws IOException {
     Path path = norm(rawPath);
     LOG.debug("About to lock for delete: {}", path);
-    treeWriteLock(path);
+    treeWriteLock(path, Depth.RECURSIVE);
     return new AutoLock() {
       public void close() throws IOException {
         LOG.debug("About to recursively delete locks: {}", path);
@@ -343,10 +353,10 @@ public abstract class TreeLockManager {
    * @param path Root of the listing operation
    * @return AutoCloseable to release this path
    */
-  public AutoLock lockListing(Path rawPath) throws IOException {
+  public AutoLock lockListing(Path rawPath, Depth depth) throws IOException {
     Path path = norm(rawPath);
     LOG.debug("About to lock for listing: {}", path);
-    treeWriteLock(path);
+    treeWriteLock(path, depth);
     return new AutoLock() {
       public void close() throws IOException {
         LOG.debug("About to unlock after listing: {}", path);
@@ -362,11 +372,11 @@ public abstract class TreeLockManager {
    * @param paths
    * @return AutoCloseable that encapsulate all paths
    */
-  public AutoLock lockListings(Path[] rawPaths) throws IOException {
+  public AutoLock lockListings(Path[] rawPaths, Depth depth) throws IOException {
     Path[] paths = norm(rawPaths);
     for (int i = 0; i < paths.length; i++) {
       LOG.debug("About to lock for listings: {}", paths[i]);
-      treeWriteLock(paths[i]);
+      treeWriteLock(paths[i], depth);
     }
     return new AutoLock() {
       public void close() throws IOException {
@@ -406,11 +416,11 @@ public abstract class TreeLockManager {
     Path dst = norm(rawDst);
     LOG.debug("About to lock for rename: from {} to {}", src, dst);
     if (src.compareTo(dst) < 0) {
-      treeWriteLock(src);
-      treeWriteLock(dst);
+      treeWriteLock(src, Depth.RECURSIVE);
+      treeWriteLock(dst, Depth.RECURSIVE);
     } else {
-      treeWriteLock(dst);
-      treeWriteLock(src);
+      treeWriteLock(dst, Depth.RECURSIVE);
+      treeWriteLock(src, Depth.RECURSIVE);
     }
     return new AutoLock() {
       public void close() throws IOException {
diff --git a/hbase-oss/src/main/java/org/apache/hadoop/hbase/oss/sync/ZKTreeLockManager.java b/hbase-oss/src/main/java/org/apache/hadoop/hbase/oss/sync/ZKTreeLockManager.java
index b1dbcb5..3214e2e 100644
--- a/hbase-oss/src/main/java/org/apache/hadoop/hbase/oss/sync/ZKTreeLockManager.java
+++ b/hbase-oss/src/main/java/org/apache/hadoop/hbase/oss/sync/ZKTreeLockManager.java
@@ -39,6 +39,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
 import org.apache.hadoop.hbase.oss.Constants;
+import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.yetus.audience.InterfaceStability;
 import org.apache.zookeeper.KeeperException;
@@ -167,14 +168,18 @@ public class ZKTreeLockManager extends TreeLockManager {
   }
 
   @Override
-  protected boolean writeLockBelow(Path p) throws IOException {
-    boolean b = writeLockBelow(p, true);
+  @VisibleForTesting
+  public boolean writeLockBelow(Path p, Depth depth) throws IOException {
+    int maxLevel = (depth == Depth.DIRECTORY) ? 1 : Integer.MAX_VALUE;
+    boolean b = writeLockBelow(p, 0, maxLevel);
     return b;
   }
 
   @Override
-  protected boolean readLockBelow(Path p) throws IOException {
-    boolean b = readLockBelow(p, true);
+  @VisibleForTesting
+  public boolean readLockBelow(Path p, Depth depth) throws IOException {
+    int maxLevel = (depth == Depth.DIRECTORY) ? 1 : Integer.MAX_VALUE;
+    boolean b = readLockBelow(p, 0, maxLevel);
     return b;
   }
 
@@ -214,19 +219,21 @@ public class ZKTreeLockManager extends TreeLockManager {
     return 0 == other.toString().indexOf(parent.toString());
   }
 
-  private boolean writeLockBelow(Path p, boolean firstLevel) throws IOException {
+  private boolean writeLockBelow(Path p, int level, int maxLevel) throws IOException {
     try {
-      if (!firstLevel && isLocked(get(p).writeLock())) {
+      if (level > 0 && isLocked(get(p).writeLock())) {
         return true;
       }
-      List<String> children = curator.getChildren().forPath(p.toString());
-      for (String child : children) {
-        if (child.equals(lockSubZnode)) {
-          continue;
-        }
-        if (writeLockBelow(new Path(p, child), false)) {
-          LOG.warn("Parent write lock currently held: {}", p);
-          return true;
+      if (level < maxLevel) {
+        List<String> children = curator.getChildren().forPath(p.toString());
+        for (String child : children) {
+          if (child.equals(lockSubZnode)) {
+            continue;
+          }
+          if (writeLockBelow(new Path(p, child), level+1, maxLevel)) {
+            LOG.warn("Parent write lock currently held: {}", p);
+            return true;
+          }
         }
       }
     } catch (KeeperException.NoNodeException e) {
@@ -237,19 +244,21 @@ public class ZKTreeLockManager extends TreeLockManager {
     return false;
   }
 
-  private boolean readLockBelow(Path p, boolean firstLevel) throws IOException {
+  private boolean readLockBelow(Path p, int level, int maxLevel) throws IOException {
     try {
-      if (!firstLevel && isLocked(get(p).readLock())) {
+      if (level > 0 && isLocked(get(p).readLock())) {
         return true;
       }
-      List<String> children = curator.getChildren().forPath(p.toString());
-      for (String child : children) {
-        if (child.equals(lockSubZnode)) {
-          continue;
-        }
-        if (readLockBelow(new Path(p, child), false)) {
-          LOG.warn("Child read lock currently held: {}", p);
-          return true;
+      if (level < maxLevel) {
+        List<String> children = curator.getChildren().forPath(p.toString());
+        for (String child : children) {
+          if (child.equals(lockSubZnode)) {
+            continue;
+          }
+          if (readLockBelow(new Path(p, child), level+1, maxLevel)) {
+            LOG.warn("Child read lock currently held: {}", p);
+            return true;
+          }
         }
       }
     } catch (KeeperException.NoNodeException e) {
diff --git a/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/sync/LocalTreeLockManager.java b/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/sync/LocalTreeLockManager.java
index 60b213c..372fa2e 100644
--- a/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/sync/LocalTreeLockManager.java
+++ b/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/sync/LocalTreeLockManager.java
@@ -28,6 +28,7 @@ import java.util.Set;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -101,15 +102,19 @@ public class LocalTreeLockManager extends TreeLockManager {
   }
 
   @Override
-  protected boolean writeLockBelow(Path p) throws IOException {
+  @VisibleForTesting
+  public boolean writeLockBelow(Path p, Depth depth) throws IOException {
     createLocksIfNeeded(p);
-    return writeLockBelow(p, true);
+    int maxLevel = (depth == Depth.DIRECTORY) ? 1 : Integer.MAX_VALUE;
+    return writeLockBelow(p, 0, maxLevel);
   }
 
   @Override
-  protected boolean readLockBelow(Path p) throws IOException {
+  @VisibleForTesting
+  public boolean readLockBelow(Path p, Depth depth) throws IOException {
     createLocksIfNeeded(p);
-    return readLockBelow(p, true);
+    int maxLevel = (depth == Depth.DIRECTORY) ? 1 : Integer.MAX_VALUE;
+    return readLockBelow(p, 0, maxLevel);
   }
 
   @Override
@@ -167,33 +172,37 @@ public class LocalTreeLockManager extends TreeLockManager {
     }
   }
 
-  private synchronized boolean writeLockBelow(Path p, boolean firstLevel) {
+  private synchronized boolean writeLockBelow(Path p, int level, int maxLevel) {
     LockNode currentNode = index.get(p);
-    if (!firstLevel && currentNode.lock.isWriteLocked() &&
+    if (level > 0 && currentNode.lock.isWriteLocked() &&
           !currentNode.lock.isWriteLockedByCurrentThread()) {
       LOG.warn("Child write lock current held: {}", p);
       return true;
     }
-    Set<Path> childPaths = currentNode.children.keySet();
-    for (Path child : childPaths) {
-      if (writeLockBelow(child, false)) {
-        return true;
+    if (level <= maxLevel) {
+      Set<Path> childPaths = currentNode.children.keySet();
+      for (Path child : childPaths) {
+        if (writeLockBelow(child, level+1, maxLevel)) {
+          return true;
+        }
       }
     }
     return false;
   }
 
   // TODO will return true even if current thread has a read lock below...
-  private synchronized boolean readLockBelow(Path p, boolean firstLevel) {
+  private synchronized boolean readLockBelow(Path p, int level, int maxLevel) {
     LockNode currentNode = index.get(p);
-    if (!firstLevel && currentNode.lock.getReadLockCount() > 0) {
+    if (level > 0 && currentNode.lock.getReadLockCount() > 0) {
       LOG.warn("Child read lock currently held: {}", p);
       return true;
     }
-    Set<Path> childPaths = index.get(p).children.keySet();
-    for (Path child : childPaths) {
-      if (readLockBelow(child, false)) {
-        return true;
+    if (level <= maxLevel) {
+      Set<Path> childPaths = index.get(p).children.keySet();
+      for (Path child : childPaths) {
+        if (readLockBelow(child, level+1, maxLevel)) {
+          return true;
+        }
       }
     }
     return false;
diff --git a/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/sync/NullTreeLockManager.java b/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/sync/NullTreeLockManager.java
index 9d94077..c9f3fbb 100644
--- a/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/sync/NullTreeLockManager.java
+++ b/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/sync/NullTreeLockManager.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.oss.sync;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
 
 /**
  * Bypasses all synchronization to effectively make HBOSS operations no-ops.
@@ -56,12 +57,14 @@ public class NullTreeLockManager extends TreeLockManager {
   }
 
   @Override
-  protected boolean writeLockBelow(Path p) {
+  @VisibleForTesting
+  public boolean writeLockBelow(Path p, Depth depth) {
     return false;
   }
 
   @Override
-  protected boolean readLockBelow(Path p) {
+  @VisibleForTesting
+  public boolean readLockBelow(Path p, Depth depth) {
     return false;
   }
 
diff --git a/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/sync/TestTreeLockManager.java b/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/sync/TestTreeLockManager.java
new file mode 100644
index 0000000..5efc349
--- /dev/null
+++ b/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/sync/TestTreeLockManager.java
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.oss;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.hadoop.hbase.oss.sync.AutoLock;
+import org.apache.hadoop.hbase.oss.sync.TreeLockManager;
+import org.apache.hadoop.hbase.oss.sync.TreeLockManager.Depth;
+import org.apache.hadoop.fs.Path;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TestTreeLockManager extends HBaseObjectStoreSemanticsTest {
+  public static final Logger LOG =
+        LoggerFactory.getLogger(TestTreeLockManager.class);
+
+  @Test
+  public void testLockBelowChecks() throws Exception {
+    Path parent = testPath("testListingLevels");
+    Path child = new Path(parent, "child");
+    Path deepWrite = new Path(child, "w");
+    Path deepRead = new Path(child, "r");
+
+    AutoLock readLock = sync.lock(deepRead);
+    AutoLock writeLock = sync.lockWrite(deepWrite);
+
+    final int FALSE = 0;
+    final int TRUE = 1;
+    final int UNSET = 2;
+
+    AtomicInteger dirDepthDeeperReadLock = new AtomicInteger(UNSET);
+    AtomicInteger recDepthDeeperReadLock = new AtomicInteger(UNSET);
+    AtomicInteger dirDepthDeeperWriteLock = new AtomicInteger(UNSET);
+    AtomicInteger recDepthDeeperWriteLock = new AtomicInteger(UNSET);
+    AtomicInteger dirDepthShallowReadLock = new AtomicInteger(UNSET);
+    AtomicInteger recDepthShallowReadLock = new AtomicInteger(UNSET);
+    AtomicInteger dirDepthShallowWriteLock = new AtomicInteger(UNSET);
+    AtomicInteger recDepthShallowWriteLock = new AtomicInteger(UNSET);
+    AtomicBoolean threadCompletedSuccessfully = new AtomicBoolean(false);
+
+    try {
+      Runnable r = new Runnable() {
+        public void run() {
+          try {
+            dirDepthDeeperReadLock.set(
+                  sync.readLockBelow(sync.norm(parent), Depth.DIRECTORY) ?
+                  TRUE : FALSE);
+            recDepthDeeperReadLock.set(
+                  sync.readLockBelow(sync.norm(parent), Depth.RECURSIVE) ?
+                  TRUE : FALSE);
+            dirDepthDeeperWriteLock.set(
+                  sync.writeLockBelow(sync.norm(parent), Depth.DIRECTORY) ?
+                  TRUE : FALSE);
+            recDepthDeeperWriteLock.set(
+                  sync.writeLockBelow(sync.norm(parent), Depth.RECURSIVE) ?
+                  TRUE : FALSE);
+            dirDepthShallowReadLock.set(
+                  sync.readLockBelow(sync.norm(child), Depth.DIRECTORY) ?
+                  TRUE : FALSE);
+            recDepthShallowReadLock.set(
+                  sync.readLockBelow(sync.norm(child), Depth.RECURSIVE) ?
+                  TRUE : FALSE);
+            dirDepthShallowWriteLock.set(
+                  sync.writeLockBelow(sync.norm(child), Depth.DIRECTORY) ?
+                  TRUE : FALSE);
+            recDepthShallowWriteLock.set(
+                  sync.writeLockBelow(sync.norm(child), Depth.RECURSIVE) ?
+                  TRUE : FALSE);
+            threadCompletedSuccessfully.set(true);
+          } catch (IOException e) {
+            LOG.error("Exception in side-thread: {}", e);
+            e.printStackTrace();
+          }
+        }
+      };
+      Thread t = new Thread(r);
+      t.start();
+      t.join();
+
+      // Asserts have to be checked from a different thread than the locks were
+      // acquired in, because the locks are reentrant.
+      Assert.assertTrue("Test thread did not complete successfully; see logs " +
+          "for exceptions.", threadCompletedSuccessfully.get());
+      Assert.assertEquals(
+          "Directory-depth check found read-lock 2 levels down; shouldn't have",
+          FALSE, dirDepthDeeperReadLock.get());
+      Assert.assertEquals(
+          "Recursive check failed to find found write-lock 2 levels down",
+          TRUE, recDepthDeeperReadLock.get());
+      Assert.assertEquals(
+          "Directory-depth check found write-lock 2 levels down; shouldn't have",
+          FALSE, dirDepthDeeperWriteLock.get());
+      Assert.assertEquals(
+          "Recursive check failed to find found write-lock 2 levels down",
+          TRUE, recDepthDeeperWriteLock.get());
+      Assert.assertEquals(
+          "Directory-depth check failed to find found read-lock 1 level down",
+          TRUE, dirDepthShallowReadLock.get());
+      Assert.assertEquals(
+          "Directory-depth check failed to find found read-lock 1 level down",
+          TRUE, recDepthShallowReadLock.get());
+      Assert.assertEquals(
+          "Recursive check failed to find found write-lock 1 level down",
+          TRUE, dirDepthShallowWriteLock.get());
+      Assert.assertEquals(
+          "Recursive check failed to find found write-lock 1 level down",
+          TRUE, recDepthShallowWriteLock.get());
+    } finally {
+      readLock.close();
+      writeLock.close();
+    }
+  }
+}