You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2022/09/10 00:49:38 UTC

[hbase-filesystem] branch master updated: HBASE-26483. [HBOSS] add support for createFile() and openFile(path) (#35)

This is an automated email from the ASF dual-hosted git repository.

apurtell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase-filesystem.git


The following commit(s) were added to refs/heads/master by this push:
     new 16de7b2  HBASE-26483. [HBOSS] add support for createFile() and openFile(path)  (#35)
16de7b2 is described below

commit 16de7b2e56e6996c50838115fc51558c0771e733
Author: Steve Loughran <st...@cloudera.com>
AuthorDate: Sat Sep 10 01:49:33 2022 +0100

    HBASE-26483. [HBOSS] add support for createFile() and openFile(path)  (#35)
    
    * HBASE-26483 [HBOSS] add lock around openFile operation
    
    * Rebase onto 3.3 only code
    * add cleverness about whether to pass filestatus down to s3
      (that is safety checks except if something other than hive is
       actually wrapping s3a file status entries)
    * lots of improvement on passthrough, including of getters, so that tests
      can see what happened.
    * test explicitly makes sure that on s3a, non s3a status entries don't get
      passed down; hadoop 3.3.x releases may fail there, whereas on later
      builds they just ignore it (as does ABFS and its status type).
    * TestZKLockManagerConfig hardened to work in IDE test runs with
       FS instance caching
    * IOStatistics Awareness and passthrough; used in tests for logging.
    * Adding the check that the FileStatus path must equal
       that of status.getPath before passdown, again, something
       the early openFile impls checked for (now they only validate
       the final filename element for equality).
    
    Signed-off-by: Andrew Purtell <ap...@apache.org>
---
 .../org/apache/hadoop/hbase/oss/Constants.java     |   8 +
 .../hadoop/hbase/oss/FileStatusBindingSupport.java | 100 ++++
 .../hadoop/hbase/oss/FunctionsRaisingIOE.java      |  70 +++
 .../hbase/oss/HBaseObjectStoreSemantics.java       |  73 ++-
 .../hadoop/hbase/oss/LockedCreateFileBuilder.java  | 320 +++++++++++++
 .../hadoop/hbase/oss/LockedOpenFileBuilder.java    | 105 ++++
 .../hadoop/hbase/oss/LockingFSBuilderWrapper.java  | 273 +++++++++++
 .../org/apache/hadoop/hbase/oss/sync/AutoLock.java |  14 +-
 .../hbase/oss/HBaseObjectStoreSemanticsTest.java   |  19 +-
 .../hadoop/hbase/oss/TestCreateNonRecursive.java   |  45 +-
 .../hadoop/hbase/oss/TestFileBuilderAPI.java       | 530 +++++++++++++++++++++
 .../apache/hadoop/hbase/oss/TestZNodeCleanup.java  |   8 +-
 .../hadoop/hbase/oss/contract/HBOSSContract.java   |   3 +-
 .../contract/TestHBOSSContractGetFileStatus.java   |   2 +-
 .../hbase/oss/contract/TestHBOSSContractOpen.java  |  11 +
 .../oss/contract/TestHBOSSContractOpenS3A.java     |  14 +-
 .../hbase/oss/sync/LocalTreeLockManager.java       |  12 +-
 .../hbase/oss/sync/TestZKLockManagerConfig.java    |   8 +-
 hbase-oss/src/test/resources/log4j.properties      |   5 +-
 19 files changed, 1577 insertions(+), 43 deletions(-)

diff --git a/hbase-oss/src/main/java/org/apache/hadoop/hbase/oss/Constants.java b/hbase-oss/src/main/java/org/apache/hadoop/hbase/oss/Constants.java
index 1384061..3f055b4 100644
--- a/hbase-oss/src/main/java/org/apache/hadoop/hbase/oss/Constants.java
+++ b/hbase-oss/src/main/java/org/apache/hadoop/hbase/oss/Constants.java
@@ -34,4 +34,12 @@ public class Constants  {
   public static final String CONTRACT_TEST_SCHEME = "fs.contract.test.fs.scheme";
 
   public static final String WAIT_INTERVAL_WARN = "fs.hboss.lock-wait.interval.warning";
+
+  /**
+   * PathCapabilities probe for hboss support: if the probe succeeds then
+   * the client is using HBoss
+   */
+  public static final String CAPABILITY_HBOSS =
+      "org.apache.hadoop.hbase.hboss";
+
 }
diff --git a/hbase-oss/src/main/java/org/apache/hadoop/hbase/oss/FileStatusBindingSupport.java b/hbase-oss/src/main/java/org/apache/hadoop/hbase/oss/FileStatusBindingSupport.java
new file mode 100644
index 0000000..b3d5428
--- /dev/null
+++ b/hbase-oss/src/main/java/org/apache/hadoop/hbase/oss/FileStatusBindingSupport.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.oss;
+
+import java.util.function.BiFunction;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * Class to make sure that the right filestatus
+ * types for s3a are passed down *or not set at all*.
+ * This is because hadoop 3.3.0 was fussy about the type of
+ * the status.
+ */
+public class FileStatusBindingSupport {
+
+  public static final String S3AFS = "org.apache.hadoop.fs.s3a.S3AFileSystem";
+  private final boolean isS3A;
+  private final BiFunction<Path, FileStatus, Boolean> propagateStatusProbe;
+
+  /**
+   * Create for a target filesystem.
+   * @param fs filesystem
+   */
+  public FileStatusBindingSupport(FileSystem fs) {
+    this(fs.getClass().getName());
+  }
+
+  public FileStatusBindingSupport(String name) {
+    isS3A = name.equals(S3AFS);
+    propagateStatusProbe = isS3A
+        ? FileStatusBindingSupport::allowOnlyS3A
+        : FileStatusBindingSupport::allowAll;
+  }
+
+  /**
+   * Is the FS s3a?
+   *
+   * @return true if the fs classname is that of s3afs.
+   */
+  public boolean isS3A() {
+    return isS3A;
+  }
+
+  /**
+   * Get the status probe of the fs.
+   *
+   * @return the status probe
+   */
+  public BiFunction<Path, FileStatus, Boolean> getPropagateStatusProbe() {
+    return propagateStatusProbe;
+  }
+
+  /**
+   * A status is allowed if non-null and its path matches that of the file
+   * to open.
+   *
+   * @param p path
+   * @param st status
+   *
+   * @return true if the status should be passed down.
+   */
+  public static boolean allowAll(Path p, FileStatus st) {
+    return st != null && p.equals(st.getPath());
+  }
+
+  /**
+   * A status is allowed if it meets the criteria of
+   * {@link #allowAll(Path, FileStatus)} and the
+   * status is an S3AFileStatus.
+   *
+   * @param p path
+   * @param st status
+   *
+   * @return true if the status should be passed down.
+   */
+  public static boolean allowOnlyS3A(Path p, FileStatus st) {
+    return allowAll(p, st)
+        && st.getClass().getName().equals("org.apache.hadoop.fs.s3a.S3AFileStatus");
+  }
+
+}
diff --git a/hbase-oss/src/main/java/org/apache/hadoop/hbase/oss/FunctionsRaisingIOE.java b/hbase-oss/src/main/java/org/apache/hadoop/hbase/oss/FunctionsRaisingIOE.java
new file mode 100644
index 0000000..9f208cb
--- /dev/null
+++ b/hbase-oss/src/main/java/org/apache/hadoop/hbase/oss/FunctionsRaisingIOE.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.oss;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * Support for functional programming/lambda-expressions.
+ * Lifted from org.apache.hadoop.fs.impl.FunctionsRaisingIOE;
+ * it's in 3.3.0+, but now deprecated; having a copy isolates
+ * from any changes
+ */
+public final class FunctionsRaisingIOE {
+
+  private FunctionsRaisingIOE() {
+  }
+
+  /**
+   * Function of arity 1 which may raise an IOException.
+   * @param <T> type of arg1
+   * @param <R> type of return value.
+   */
+  @FunctionalInterface
+  public interface FunctionRaisingIOE<T, R> {
+
+    R apply(T t) throws IOException;
+  }
+
+  /**
+   * Function of arity 2 which may raise an IOException.
+   * @param <T> type of arg1
+   * @param <U> type of arg2
+   * @param <R> type of return value.
+   */
+  @FunctionalInterface
+  public interface BiFunctionRaisingIOE<T, U, R> {
+
+    R apply(T t, U u) throws IOException;
+  }
+
+  /**
+   * This is a callable which only raises an IOException.
+   * @param <R> return type
+   */
+  @FunctionalInterface
+  public interface CallableRaisingIOE<R> {
+
+    R apply() throws IOException;
+  }
+
+}
diff --git a/hbase-oss/src/main/java/org/apache/hadoop/hbase/oss/HBaseObjectStoreSemantics.java b/hbase-oss/src/main/java/org/apache/hadoop/hbase/oss/HBaseObjectStoreSemantics.java
index 65eeb16..a7d23ad 100644
--- a/hbase-oss/src/main/java/org/apache/hadoop/hbase/oss/HBaseObjectStoreSemantics.java
+++ b/hbase-oss/src/main/java/org/apache/hadoop/hbase/oss/HBaseObjectStoreSemantics.java
@@ -32,17 +32,20 @@ import org.apache.hadoop.fs.ContentSummary;
 import org.apache.hadoop.fs.CreateFlag;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FSDataOutputStreamBuilder;
 import org.apache.hadoop.fs.FileAlreadyExistsException;
 import org.apache.hadoop.fs.FileChecksum;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FilterFileSystem;
 import org.apache.hadoop.fs.FsStatus;
+import org.apache.hadoop.fs.FutureDataInputStreamBuilder;
 import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Options.ChecksumOpt;
 import org.apache.hadoop.fs.ParentNotDirectoryException;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.fs.PathHandle;
 import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.fs.UnsupportedFileSystemException;
 import org.apache.hadoop.fs.XAttrSetFlag;
@@ -50,6 +53,8 @@ import org.apache.hadoop.fs.permission.AclEntry;
 import org.apache.hadoop.fs.permission.AclStatus;
 import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.fs.statistics.IOStatistics;
+import org.apache.hadoop.fs.statistics.IOStatisticsSource;
 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
 import org.apache.hadoop.hbase.oss.metrics.MetricsOSSSource;
 import org.apache.hadoop.hbase.oss.metrics.MetricsOSSSourceImpl;
@@ -65,6 +70,9 @@ import org.apache.yetus.audience.InterfaceStability;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.hadoop.fs.statistics.IOStatisticsSupport.retrieveIOStatistics;
+import static org.apache.hadoop.hbase.oss.Constants.CAPABILITY_HBOSS;
+
 /**
  * A FileSystem implementation that layers locking logic on top of another,
  * underlying implementation. The appropriate lock on a path is acquired before
@@ -95,13 +103,26 @@ import org.slf4j.LoggerFactory;
  */
 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
 @InterfaceStability.Unstable
-public class HBaseObjectStoreSemantics extends FilterFileSystem {
+public class HBaseObjectStoreSemantics extends FilterFileSystem
+    implements IOStatisticsSource {
+
   private static final Logger LOG =
         LoggerFactory.getLogger(HBaseObjectStoreSemantics.class);
 
   private TreeLockManager sync;
   private MetricsOSSSource metrics;
 
+  private FileStatusBindingSupport bindingSupport;
+
+  /**
+   * Get the tree lock for this instance; null until
+   * initialized.
+   * @return the lock manager.
+   */
+  private TreeLockManager getSync() {
+    return sync;
+  }
+
   public void initialize(URI name, Configuration conf) throws IOException {
     setConf(conf);
 
@@ -122,6 +143,7 @@ public class HBaseObjectStoreSemantics extends FilterFileSystem {
     fs = FileSystem.get(name, internalConf);
     sync = TreeLockManager.get(fs);
     metrics = MetricsOSSSourceImpl.getInstance();
+    bindingSupport = new FileStatusBindingSupport(fs);
   }
 
   @InterfaceAudience.Private
@@ -916,4 +938,53 @@ public class HBaseObjectStoreSemantics extends FilterFileSystem {
       fs.removeXAttr(path, name);
     }
   }
+
+  @Override
+  public boolean hasPathCapability(final Path path, final String capability) throws IOException {
+    if (CAPABILITY_HBOSS.equalsIgnoreCase(capability)) {
+      return true;
+    }
+    return fs.hasPathCapability(path, capability);
+  }
+
+  @Override
+  public FutureDataInputStreamBuilder openFile(final Path path)
+      throws IOException, UnsupportedOperationException {
+    return new LockedOpenFileBuilder(
+        path,
+        getSync(),
+        fs.openFile(path),
+        bindingSupport.getPropagateStatusProbe());
+  }
+
+  /**
+   * This is rarely supported, and as there's no way to
+   * get the path from a pathHandle, impossible to lock.
+   * @param pathHandle path
+   * @return never returns successfully.
+   * @throws UnsupportedOperationException always
+   */
+  @Override
+  public FutureDataInputStreamBuilder openFile(final PathHandle pathHandle)
+      throws IOException, UnsupportedOperationException {
+    throw new UnsupportedOperationException("openFile(PathHandle) unsupported");
+  }
+
+  @Override
+  public FSDataOutputStreamBuilder createFile(final Path path) {
+    return new LockedCreateFileBuilder(this,
+        path,
+        getSync(),
+        super.createFile(path));
+  }
+
+  /**
+   * Return the IOStatistics of the wrapped FS, or null if
+   * it doesn't have the API.
+   * @return any IOStatistics of the wrapped FS.
+   */
+  @Override
+  public IOStatistics getIOStatistics() {
+    return retrieveIOStatistics(fs);
+  }
 }
diff --git a/hbase-oss/src/main/java/org/apache/hadoop/hbase/oss/LockedCreateFileBuilder.java b/hbase-oss/src/main/java/org/apache/hadoop/hbase/oss/LockedCreateFileBuilder.java
new file mode 100644
index 0000000..2384423
--- /dev/null
+++ b/hbase-oss/src/main/java/org/apache/hadoop/hbase/oss/LockedCreateFileBuilder.java
@@ -0,0 +1,320 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.oss;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.EnumSet;
+import java.util.Set;
+import javax.annotation.Nonnull;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CreateFlag;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FSDataOutputStreamBuilder;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Options;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hbase.oss.sync.AutoLock;
+import org.apache.hadoop.hbase.oss.sync.TreeLockManager;
+import org.apache.hadoop.util.Progressable;
+
+/**
+ * A locked output stream builder for createFile().
+ * The lock is acquired in the {@link #build()} call
+ * and kept in the output stream where it is held
+ * until closed.
+ * As the lock is by thread, the same file can be opened
+ * more than once by that same thread.
+ *
+ * Not all the getter methods in the wrapped builder
+ * are accessible, so to make them retrievable for
+ * testing, the superclass version of them are called
+ * as well as the wrapped class being involed.
+ */
+@SuppressWarnings("rawtypes")
+class LockedCreateFileBuilder
+    extends FSDataOutputStreamBuilder<FSDataOutputStream, LockedCreateFileBuilder> {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(LockedCreateFileBuilder.class);
+
+  /**
+   * Path of the file.
+   */
+  private final Path path;
+
+  /**
+   * Lock manager.
+   */
+  private final TreeLockManager sync;
+
+  private final FSDataOutputStreamBuilder wrapped;
+
+  /**
+   * Constructor adding locking to a builder.
+   * @param fileSystem FS
+   * @param path path
+   * @param sync sync point
+   * @param wrapped wrapped stream builder.
+   */
+  LockedCreateFileBuilder(
+      @Nonnull final FileSystem fileSystem,
+      final Path path,
+      final TreeLockManager sync,
+      final FSDataOutputStreamBuilder wrapped) {
+    super(fileSystem, path);
+    this.path = path;
+    this.sync = sync;
+    this.wrapped = wrapped;
+  }
+
+  @Override
+  public FSDataOutputStream build() throws IOException {
+    LOG.debug("Building output stream for {}", path);
+    AutoLock lock = sync.lockWrite(path);
+    try {
+      FSDataOutputStream stream = getWrapped().build();
+      return new AutoLock.LockedFSDataOutputStream(stream, lock);
+    } catch (IOException e) {
+      lock.close();
+      throw e;
+    }
+  }
+
+  @Override
+  public String toString() {
+    return "LockedCreateFileBuilder{" +
+        "path=" + path +
+        "} " + super.toString();
+  }
+
+
+
+  @Override
+  public LockedCreateFileBuilder overwrite(final boolean overwrite) {
+    super.overwrite(overwrite);
+    getWrapped().overwrite(overwrite);
+    return getThisBuilder();
+  }
+
+  @Override
+  public LockedCreateFileBuilder permission(@Nonnull final FsPermission perm) {
+    super.permission(perm);
+    getWrapped().permission(perm);
+    return getThisBuilder();
+  }
+
+  @Override
+  public LockedCreateFileBuilder bufferSize(final int bufSize) {
+    super.bufferSize(bufSize);
+    getWrapped().bufferSize(bufSize);
+    return getThisBuilder();
+  }
+
+  @Override
+  public LockedCreateFileBuilder replication(final short replica) {
+    getWrapped().replication(replica);
+    return getThisBuilder();
+  }
+
+
+  @Override
+  public LockedCreateFileBuilder blockSize(final long blkSize) {
+    super.blockSize(blkSize);
+    getWrapped().blockSize(blkSize);
+    return getThisBuilder();
+  }
+
+  @Override
+  public LockedCreateFileBuilder recursive() {
+    super.recursive();
+    getWrapped().recursive();
+    return getThisBuilder();
+  }
+
+  @Override
+  public LockedCreateFileBuilder progress(@Nonnull final Progressable prog) {
+    super.progress(prog);
+    getWrapped().progress(prog);
+    return getThisBuilder();
+  }
+
+  @Override
+  public LockedCreateFileBuilder checksumOpt(
+      @Nonnull final Options.ChecksumOpt chksumOpt) {
+    super.checksumOpt(chksumOpt);
+    getWrapped().checksumOpt(chksumOpt);
+    return getThisBuilder();
+  }
+
+  @Override
+  public LockedCreateFileBuilder getThisBuilder() {
+    return this;
+  }
+
+  @Override
+  public LockedCreateFileBuilder opt(@Nonnull final String key,
+      @Nonnull final String value) {
+    getWrapped().opt(key, value);
+    return getThisBuilder();
+  }
+
+  @Override
+  public LockedCreateFileBuilder opt(@Nonnull final String key, final boolean value) {
+    getWrapped().opt(key, value);
+    return getThisBuilder();
+  }
+
+  @Override
+  public LockedCreateFileBuilder opt(@Nonnull final String key, final int value) {
+    getWrapped().opt(key, value);
+    return getThisBuilder();
+  }
+
+  @Override
+  public LockedCreateFileBuilder opt(@Nonnull final String key, final float value) {
+    getWrapped().opt(key, value);
+    return getThisBuilder();
+  }
+
+  @Override
+  public LockedCreateFileBuilder opt(@Nonnull final String key, final double value) {
+    getWrapped().opt(key, value);
+    return getThisBuilder();
+  }
+
+  @Override
+  public LockedCreateFileBuilder opt(@Nonnull final String key,
+      @Nonnull final String... values) {
+    getWrapped().opt(key, values);
+    return getThisBuilder();
+  }
+
+  @Override
+  public LockedCreateFileBuilder must(@Nonnull final String key,
+      @Nonnull final String value) {
+    getWrapped().must(key, value);
+    return getThisBuilder();
+  }
+
+  @Override
+  public LockedCreateFileBuilder must(@Nonnull final String key, final boolean value) {
+    getWrapped().must(key, value);
+    return getThisBuilder();
+  }
+
+  @Override
+  public LockedCreateFileBuilder must(@Nonnull final String key, final int value) {
+    getWrapped().must(key, value);
+    return getThisBuilder();
+  }
+
+  @Override
+  public LockedCreateFileBuilder must(@Nonnull final String key, final float value) {
+    getWrapped().must(key, value);
+    return getThisBuilder();
+  }
+
+  @Override
+  public LockedCreateFileBuilder must(@Nonnull final String key, final double value) {
+    getWrapped().must(key, value);
+    return getThisBuilder();
+  }
+
+  @Override
+  public LockedCreateFileBuilder must(@Nonnull final String key,
+      @Nonnull final String... values) {
+    getWrapped().must(key, values);
+    return getThisBuilder();
+  }
+
+  /**
+   * Configure with a long value.
+   *  was not on the original interface.
+   * It is implemented in the wrapper by converting
+   * to a string and calling the wrapper's
+   * {@code #opt(String, String)}.
+   */
+  public LockedCreateFileBuilder opt(@Nonnull String key, long value) {
+    return opt(key, Long.toString(value));
+  }
+
+  /**
+   * Configure with a long value.
+   * must(String, Long) was not on the original interface.
+   * It is implemented in the wrapper by converting
+   * to a string and calling the wrapper's
+   * {@code #must(String, String)}.
+   */
+  public LockedCreateFileBuilder must(@Nonnull String key, long value) {
+    return must(key, Long.toString(value));
+  }
+
+  @Override
+  public Configuration getOptions() {
+    return getWrapped().getOptions();
+  }
+
+  @Override
+  public Set<String> getMandatoryKeys() {
+    return getWrapped().getMandatoryKeys();
+  }
+
+  /**
+   * Can't call into the wrapped class, so reimplement.
+   * {@inheritDoc}
+   */
+  @Override
+  public void rejectUnknownMandatoryKeys(final Collection<String> knownKeys,
+      final String extraErrorText)
+      throws IllegalArgumentException {
+    rejectUnknownMandatoryKeys(getMandatoryKeys(),
+        knownKeys, extraErrorText);
+  }
+
+  /**
+   * The wrapped builder.
+   */
+  public FSDataOutputStreamBuilder getWrapped() {
+    return wrapped;
+  }
+
+  @Override
+  public LockedCreateFileBuilder create() {
+    super.create();
+    getWrapped().create();
+    return getThisBuilder();
+  }
+
+  @Override
+  public LockedCreateFileBuilder append() {
+    super.create();
+    getWrapped().append();
+    return getThisBuilder();
+  }
+
+  @Override
+  public Progressable getProgress() {
+    return super.getProgress();
+  }
+}
diff --git a/hbase-oss/src/main/java/org/apache/hadoop/hbase/oss/LockedOpenFileBuilder.java b/hbase-oss/src/main/java/org/apache/hadoop/hbase/oss/LockedOpenFileBuilder.java
new file mode 100644
index 0000000..b88e01a
--- /dev/null
+++ b/hbase-oss/src/main/java/org/apache/hadoop/hbase/oss/LockedOpenFileBuilder.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.oss;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.function.BiFunction;
+import javax.annotation.Nonnull;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FutureDataInputStreamBuilder;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.oss.sync.TreeLockManager;
+
+/**
+ * An input stream builder which locks the path to read on the build() call.
+ * all builder operations must return this instance so that the final
+ * build will acquire the lock.
+ *
+ * There's a bit of extra complexity around the file status
+ * as s3a in 3.3.0 overreacts to the passed in status not being
+ * S3AFileStatus by raising an exception.
+ * Later versions just ignore the value in the build() phase if it isn't.
+ * A predicate can be passed in to determine if the status
+ * should be added.
+ */
+public class LockedOpenFileBuilder extends
+    LockingFSBuilderWrapper<CompletableFuture<FSDataInputStream>, FutureDataInputStreamBuilder>
+    implements FutureDataInputStreamBuilder {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(LockedOpenFileBuilder.class);
+
+  private final BiFunction<Path, FileStatus, Boolean> propagateStatusProbe;
+
+  private boolean wasStatusPropagated;
+
+  public LockedOpenFileBuilder(@Nonnull final Path path,
+      final TreeLockManager sync,
+      final FutureDataInputStreamBuilder wrapped,
+      final BiFunction<Path, FileStatus, Boolean> propagateStatusProbe) {
+    super(path, sync, wrapped,
+        LockedOpenFileBuilder::complete);
+    this.propagateStatusProbe = propagateStatusProbe;
+  }
+
+  /**
+   * Update the file status if the status probe is happy.
+   *
+   * @param status status.
+   *
+   * @return the builder.
+   */
+  public FutureDataInputStreamBuilder withFileStatus(
+      final FileStatus status) {
+    if (status != null && propagateStatusProbe.apply(getPath(), status)) {
+      wasStatusPropagated = true;
+      getWrapped().withFileStatus(status);
+    }
+
+    return getThisBuilder();
+  }
+
+  /**
+   * The completion operation simply logs the value at debug.
+   * The open may include an async HEAD call, or skip the probe
+   * entirely.
+   *
+   * @param in input
+   *
+   * @return the input
+   */
+  private static CompletableFuture<FSDataInputStream> complete(
+      CompletableFuture<FSDataInputStream> in) {
+    LOG.debug("Created input stream {}", in);
+    return in;
+  }
+
+  /**
+   * was the status option passed down.
+   * @return true if the status was passed in.
+   */
+  public boolean wasStatusPropagated() {
+    return wasStatusPropagated;
+  }
+}
diff --git a/hbase-oss/src/main/java/org/apache/hadoop/hbase/oss/LockingFSBuilderWrapper.java b/hbase-oss/src/main/java/org/apache/hadoop/hbase/oss/LockingFSBuilderWrapper.java
new file mode 100644
index 0000000..c84e675
--- /dev/null
+++ b/hbase-oss/src/main/java/org/apache/hadoop/hbase/oss/LockingFSBuilderWrapper.java
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.oss;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Set;
+
+import javax.annotation.Nonnull;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSBuilder;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.impl.AbstractFSBuilderImpl;
+import org.apache.hadoop.hbase.oss.sync.AutoLock;
+import org.apache.hadoop.hbase.oss.sync.TreeLockManager;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * a builder which wraps another FSBuilder and locks the
+ * final build operation.
+ * It also supports a transform of the wrapped result
+ * for advanced processing.
+ *
+ * @param <S> type of built item
+ * @param <B> builder interface
+ */
+public class LockingFSBuilderWrapper<S, B extends FSBuilder<S, B>>
+    extends AbstractFSBuilderImpl<S, B> {
+
+  private static final Logger LOG =
+        LoggerFactory.getLogger(LockingFSBuilderWrapper.class);
+  /**
+   * Target path.
+   */
+  private final Path path;
+
+  /**
+   * Lock.
+   */
+  private final TreeLockManager sync;
+
+  /**
+   * Wrapped builder.
+   */
+  private final B wrapped;
+
+  /**
+   * A function which is invoked on the output of the wrapped build,
+   * inside the lock operation.
+   */
+  private final FunctionsRaisingIOE.FunctionRaisingIOE<S, S> afterBuildTransform;
+
+  /**
+   * Constructor.
+   * @param path Target path.
+   * @param sync Lock.
+   * @param wrapped Wrapped builder.
+   * @param afterBuildTransform a function which is invoked on the output of the
+   * wrapped build, inside the lock operation.
+   *
+   */
+  public LockingFSBuilderWrapper(@Nonnull final Path path,
+      final TreeLockManager sync,
+      final B wrapped,
+      final FunctionsRaisingIOE.FunctionRaisingIOE<S, S> afterBuildTransform) {
+
+    super(path);
+    this.sync = requireNonNull(sync);
+    this.path = requireNonNull(path);
+    this.wrapped = requireNonNull(wrapped);
+    this.afterBuildTransform = requireNonNull(afterBuildTransform);
+  }
+
+  /**
+   * Build the result.
+   * @return result of wrapped build.
+   * @throws IOException failure.
+   */
+  @Override
+  public S build() throws IOException {
+
+    LOG.debug("building stream for {}:", path);
+    try (AutoLock l = sync.lock(path)) {
+      S result = afterBuildTransform.apply(wrapped.build());
+      LOG.debug("result is {}:", result);
+      return result;
+    }
+  }
+
+  /**
+   * Get the wrapped builder.
+   * @return wrapped builder.
+   */
+  protected B getWrapped() {
+    return wrapped;
+  }
+
+  /**
+   * Get the wrapped builder.
+   * @return wrapped builder.
+   */
+  protected AbstractFSBuilderImpl getWrappedAsBuilderImpl() {
+    return (AbstractFSBuilderImpl)wrapped;
+  }
+
+
+  @Override
+  public B opt(@Nonnull final String key,
+      @Nonnull final String value) {
+    LOG.debug("{}: option {}=\"{}\"", path, key, value);
+    wrapped.opt(key, value);
+    return getThisBuilder();
+  }
+
+  @Override
+  public B opt(@Nonnull final String key, final boolean value) {
+    LOG.debug("{}: option {}=\"{}\"", path, key, value);
+    wrapped.opt(key, value);
+    return getThisBuilder();
+  }
+
+  @Override
+  public B opt(@Nonnull final String key, final int value) {
+    LOG.debug("{}: option {}=\"{}\"", path, key, value);
+    wrapped.opt(key, value);
+    return getThisBuilder();
+  }
+
+  @Override
+  public B opt(@Nonnull final String key, final float value) {
+    LOG.debug("{}: option {}=\"{}\"", path, key, value);
+    wrapped.opt(key, value);
+    return getThisBuilder();
+  }
+
+  @Override
+  public B opt(@Nonnull final String key, final double value) {
+    LOG.debug("{}: option {}=\"{}\"", path, key, value);
+    wrapped.opt(key, value);
+    return getThisBuilder();
+  }
+
+  @Override
+  public B opt(@Nonnull final String key,
+      @Nonnull final String... values) {
+    LOG.debug("{}: option {}=(values)", path, key);
+    wrapped.opt(key, values);
+    return getThisBuilder();
+  }
+
+  @Override
+  public B must(@Nonnull final String key,
+      @Nonnull final String value) {
+    wrapped.must(key, value);
+    return getThisBuilder();
+  }
+
+  @Override
+  public B must(@Nonnull final String key, final boolean value) {
+    LOG.debug("{}: must {}=\"{}\"", path, key, value);
+    wrapped.must(key, value);
+    return getThisBuilder();
+  }
+
+  @Override
+  public B must(@Nonnull final String key, final int value) {
+    LOG.debug("{}: must {}=\"{}\"", path, key, value);
+    wrapped.must(key, value);
+    return getThisBuilder();
+  }
+
+  @Override
+  public B must(@Nonnull final String key, final float value) {
+    LOG.debug("{}: must {}=\"{}\"", path, key, value);
+    wrapped.must(key, value);
+    return getThisBuilder();
+  }
+
+  @Override
+  public B must(@Nonnull final String key, final double value) {
+    LOG.debug("{}: must {}=\"{}\"", path, key, value);
+    wrapped.must(key, value);
+    return getThisBuilder();
+  }
+
+  @Override
+  public B must(@Nonnull final String key,
+      @Nonnull final String... values) {
+    LOG.debug("{}: must {}=(values)", path, key);
+    wrapped.must(key, values);
+    return getThisBuilder();
+  }
+
+  /**
+   * Configure with a long value.
+   * opt(String, Long) was not on the original interface,
+   * though it is in recent hadoop builds.
+   * It is implemented in the wrapper by converting
+   * to a string and calling the wrapper's
+   * {@code #opt(String, String)}.
+   * It is NOT declared @Override so still compiles and
+   * runs against hadoop 3.3.0.
+   * @param key key to set
+   * @param value long value
+   * @return the builder
+   */
+  public B opt(@Nonnull String key, long value) {
+    return opt(key, Long.toString(value));
+  }
+
+  /**
+   * Configure with a long value.
+   * must(String, Long) was not on the original interface,
+   * though it is in recent hadoop builds.
+   * It is implemented in the wrapper by converting
+   * to a string and calling the wrapper's
+   * {@code #must(String, String)}.
+   * It is NOT declared @Override so still compiles and
+   * runs against hadoop 3.3.0.
+   * @param key key to set
+   * @param value long value
+   * @return the builder
+   */
+  public B must(@Nonnull String key, long value) {
+    return must(key, Long.toString(value));
+  }
+
+  @Override
+  public String toString() {
+    return "LockingFSBuilderWrapper{" +
+        "path=" + path +
+        ", wrapped=" + wrapped +
+        "} " + super.toString();
+  }
+
+  @Override
+  public Configuration getOptions() {
+    return getWrappedAsBuilderImpl().getOptions();
+  }
+
+  @Override
+  public Set<String> getMandatoryKeys() {
+    return getWrappedAsBuilderImpl().getMandatoryKeys();
+  }
+
+  @Override
+  protected void rejectUnknownMandatoryKeys(final Collection<String> knownKeys,
+      final String extraErrorText)
+      throws IllegalArgumentException {
+    super.rejectUnknownMandatoryKeys(knownKeys, extraErrorText);
+  }
+}
diff --git a/hbase-oss/src/main/java/org/apache/hadoop/hbase/oss/sync/AutoLock.java b/hbase-oss/src/main/java/org/apache/hadoop/hbase/oss/sync/AutoLock.java
index 18eb8cf..3a4859b 100644
--- a/hbase-oss/src/main/java/org/apache/hadoop/hbase/oss/sync/AutoLock.java
+++ b/hbase-oss/src/main/java/org/apache/hadoop/hbase/oss/sync/AutoLock.java
@@ -24,6 +24,7 @@ import java.util.NoSuchElementException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.statistics.IOStatistics;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.yetus.audience.InterfaceStability;
 
@@ -116,8 +117,8 @@ public interface AutoLock extends AutoCloseable {
     }
 
     private final FSDataOutputStream stream;
-    private AutoLock lock;
-    private AtomicBoolean closed = new AtomicBoolean(false);
+    private final AutoLock lock;
+    private final AtomicBoolean closed = new AtomicBoolean(false);
 
     private void checkClosed() throws IOException {
       if (closed.get()) {
@@ -160,7 +161,9 @@ public interface AutoLock extends AutoCloseable {
 
     @Override
     public String toString() {
-      return "LockedFSDataOutputStream:" + stream.toString();
+      return "LockedFSDataOutputStream:"
+          + " closed=" + closed.get() + " "
+          + stream.toString();
     }
 
     @Override
@@ -190,5 +193,10 @@ public interface AutoLock extends AutoCloseable {
       checkClosed();
       stream.setDropBehind(dropBehind);
     }
+
+    @Override
+    public IOStatistics getIOStatistics() {
+      return stream.getIOStatistics();
+    }
   }
 }
diff --git a/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/HBaseObjectStoreSemanticsTest.java b/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/HBaseObjectStoreSemanticsTest.java
index a2c83f0..84e7c10 100644
--- a/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/HBaseObjectStoreSemanticsTest.java
+++ b/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/HBaseObjectStoreSemanticsTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.hbase.oss;
 
+import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsToPrettyString;
 import static org.apache.hadoop.hbase.oss.TestUtils.addContract;
 
 import org.apache.hadoop.conf.Configuration;
@@ -27,10 +28,14 @@ import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.yetus.audience.InterfaceStability;
 import org.junit.After;
 import org.junit.Before;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 @InterfaceAudience.Private
 @InterfaceStability.Unstable
 public class HBaseObjectStoreSemanticsTest {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(HBaseObjectStoreSemanticsTest.class);
 
   protected HBaseObjectStoreSemantics hboss = null;
   protected TreeLockManager sync = null;
@@ -49,15 +54,27 @@ public class HBaseObjectStoreSemanticsTest {
 
   @Before
   public void setup() throws Exception {
-    Configuration conf = new Configuration();
+    Configuration conf = createConfiguration();
     addContract(conf);
     hboss = TestUtils.getFileSystem(conf);
     sync = hboss.getLockManager();
     hboss.mkdirs(testPathRoot());
   }
 
+  /**
+   * Create the configuration for the test FS.
+   * @return a configuration.
+   */
+  protected Configuration createConfiguration() {
+    return new Configuration();
+  }
+
   @After
   public void tearDown() throws Exception {
+    if (hboss != null) {
+      LOG.info("Store statistics {}",
+          ioStatisticsToPrettyString(hboss.getIOStatistics()));
+    }
     TestUtils.cleanup(hboss);
   }
 }
diff --git a/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/TestCreateNonRecursive.java b/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/TestCreateNonRecursive.java
index 6b5987a..81f5ea0 100644
--- a/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/TestCreateNonRecursive.java
+++ b/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/TestCreateNonRecursive.java
@@ -18,7 +18,6 @@
 
 package org.apache.hadoop.hbase.oss;
 
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.concurrent.Callable;
 import java.util.concurrent.Executors;
@@ -30,6 +29,8 @@ import org.apache.hadoop.fs.Path;
 import org.junit.Assert;
 import org.junit.Test;
 
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+
 public class TestCreateNonRecursive extends HBaseObjectStoreSemanticsTest {
 
   @Test
@@ -41,15 +42,8 @@ public class TestCreateNonRecursive extends HBaseObjectStoreSemanticsTest {
       out = hboss.createNonRecursive(serialPath, false, 1024, (short)1, 1024, null);
       out.close();
 
-      boolean exceptionThrown = false;
-      try {
-        out = hboss.createNonRecursive(serialPath, false, 1024, (short)1, 1024, null);
-      } catch(FileAlreadyExistsException e) {
-        exceptionThrown = true;
-      }
-      if (!exceptionThrown) {
-        Assert.fail("Second call to createNonRecursive should throw FileAlreadyExistsException, but didn't.");
-      }
+      intercept(FileAlreadyExistsException.class, () ->
+        hboss.createNonRecursive(serialPath, false, 1024, (short)1, 1024, null));
     } finally {
       hboss.delete(serialPath);
     }
@@ -60,32 +54,30 @@ public class TestCreateNonRecursive extends HBaseObjectStoreSemanticsTest {
     int experiments = 10;
     int experimentSize = 10;
     for (int e = 0; e < experiments; e++) {
-      ArrayList<Callable<Boolean>> callables = new ArrayList<Callable<Boolean>>(experimentSize);
-      ArrayList<Future<Boolean>> futures = new ArrayList<Future<Boolean>>(experimentSize);
+      ArrayList<Callable<Boolean>> callables = new ArrayList<>(experimentSize);
+      ArrayList<Future<Boolean>> futures = new ArrayList<>(experimentSize);
 
       Path parallelPath = testPath("testCreateNonRecursiveParallel" + e);
       ThreadPoolExecutor executor = (ThreadPoolExecutor)Executors.newFixedThreadPool(10);
       executor.prestartAllCoreThreads();
       for (int i = 0; i < experimentSize; i++) {
-        callables.add(new Callable<Boolean>() {
-          public Boolean call() throws IOException {
-            FSDataOutputStream out = null;
-            boolean exceptionThrown = false;
-            try {
-              out = hboss.createNonRecursive(parallelPath, false, 1024, (short)1, 1024, null);
-            } catch(FileAlreadyExistsException e) {
-              exceptionThrown = true;
-            } finally {
-              if (out != null) {
-                out.close();
-              }
+        callables.add(() -> {
+          FSDataOutputStream out = null;
+          boolean exceptionThrown = false;
+          try {
+            out = hboss.createNonRecursive(parallelPath, false, 1024, (short)1, 1024, null);
+          } catch(FileAlreadyExistsException e1) {
+            exceptionThrown = true;
+          } finally {
+            if (out != null) {
+              out.close();
             }
-            return exceptionThrown;
           }
+          return exceptionThrown;
         });
       }
       try {
-        for (Callable callable : callables) {
+        for (Callable<Boolean> callable : callables) {
           // This is in a separate loop to try and get them all as overlapped as possible
           futures.add(executor.submit(callable));
         }
@@ -101,6 +93,7 @@ public class TestCreateNonRecursive extends HBaseObjectStoreSemanticsTest {
               experimentSize - 1, exceptionsThrown);
       } finally {
         hboss.delete(parallelPath);
+        executor.shutdown();
       }
     }
   }
diff --git a/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/TestFileBuilderAPI.java b/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/TestFileBuilderAPI.java
new file mode 100644
index 0000000..ec7a21a
--- /dev/null
+++ b/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/TestFileBuilderAPI.java
@@ -0,0 +1,530 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.oss;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.assertj.core.api.Assertions;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FSDataOutputStreamBuilder;
+import org.apache.hadoop.fs.FileAlreadyExistsException;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FutureDataInputStreamBuilder;
+import org.apache.hadoop.fs.Options;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.contract.ContractTestUtils;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.fs.s3a.S3AFileStatus;
+import org.apache.hadoop.hbase.oss.sync.AutoLock;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.util.Progressable;
+
+import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsToPrettyString;
+import static org.apache.hadoop.hbase.oss.FileStatusBindingSupport.S3AFS;
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+
+/**
+ * test the createFile() and openFile builder APIs,
+ * where the existence checks
+ * take place in build(), not the FS API call.
+ * This means the lock checking needs to be postponed.
+ */
+public class TestFileBuilderAPI extends HBaseObjectStoreSemanticsTest {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(TestFileBuilderAPI.class);
+  public static final int SLEEP_INTERVAL = 5_000;
+  public static final int TIMEOUT = 15;
+  public static final String EXPERIMENTAL_FADVISE = "fs.s3a.experimental.input.fadvise";
+
+  /**
+   * Prefix for all standard filesystem options: {@value}.
+   */
+  private static final String FILESYSTEM_OPTION = "fs.option.";
+
+  /**
+   * Prefix for all openFile options: {@value}.
+   */
+  public static final String FS_OPTION_OPENFILE =
+      FILESYSTEM_OPTION + "openfile.";
+
+  /**
+   * OpenFile option for file length: {@value}.
+   */
+  public static final String FS_OPTION_OPENFILE_LENGTH =
+      FS_OPTION_OPENFILE + "length";
+
+  private final ExecutorService executor = Executors.newSingleThreadExecutor();
+
+  /**
+   * Create the file via the builder in separate threads, verifying that
+   * the builder executed in the second thread is blocked
+   * until the first thread finishes its write.
+   */
+  @Test
+  public void testCreateOverlappingBuilders() throws Exception {
+    Path path = testPath("testCreateOverlappingBuilders");
+
+    FSDataOutputStream stream = null;
+    try {
+      FSDataOutputStreamBuilder builder = hboss.createFile(path)
+          .overwrite(false);
+
+      LOG.info("building {}:", builder);
+      stream = builder.build();
+
+      Assertions.assertThat(stream)
+          .describedAs("expected a LockedFSDataOutputStream")
+          .isInstanceOf(AutoLock.LockedFSDataOutputStream.class);
+
+      LOG.info("Output stream  {}:", stream);
+      stream.write(0);
+
+      // submit the writer into a new thread
+
+      Future<Long> fut = executor.submit(() -> {
+        try {
+          FSDataOutputStreamBuilder builder1 = hboss.createFile(path)
+              .overwrite(true);
+          LOG.info("Inner thread building {}", builder1);
+          try (FSDataOutputStream out = builder1.build()) {
+            long executionTime = System.currentTimeMillis();
+            LOG.info("Inner thread file created at {}", executionTime);
+            out.write(Bytes.toBytes("localhost"));
+            out.flush();
+            return executionTime;
+          }
+        } catch (Exception e) {
+          LOG.error("Caught exception", e);
+          throw e;
+        }
+      });
+
+      long streamCloseTimestamp = sleep();
+      LOG.info("main thread closing stream");
+      stream.close();
+      // try twice for rigorousness
+      stream.close();
+      long threadTimestamp = fut.get(TIMEOUT, TimeUnit.SECONDS);
+      Assertions.assertThat(threadTimestamp)
+          .describedAs("timestamp of file creation in the second thread")
+          .isGreaterThanOrEqualTo(streamCloseTimestamp);
+
+    } finally {
+      IOUtils.cleanupWithLogger(LOG, stream);
+      hboss.delete(path, false);
+    }
+  }
+
+  /**
+   * Create the file via the builder in separate threads,
+   * each without overwrite.
+   * verify that the builder executed in the second thread
+   * failed because the file exists.
+   * Because in s3 the file doesn't exist until the write is finished,
+   * this verifies that all existence probes are delayed until the
+   * first write finishes
+   */
+  @Test
+  public void testCreateNoOverwrite() throws Exception {
+    Path path = testPath("testCreateNoOverwrite");
+
+    FSDataOutputStream stream = null;
+    try {
+      FSDataOutputStreamBuilder builder = hboss.createFile(path)
+          .overwrite(false);
+
+      LOG.info("building {}:", builder);
+      stream = builder.build();
+
+      LOG.info("Output stream  {}:", stream);
+      stream.write(0);
+
+      // submit the writer into a new thread
+
+      Future<Long> fut = executor.submit(() -> {
+        FSDataOutputStreamBuilder builder1 = hboss.createFile(path)
+            .overwrite(false);
+        LOG.info("Inner thread building {}", builder1);
+        try (FSDataOutputStream out = builder1.build()) {
+          long executionTime = System.currentTimeMillis();
+          LOG.info("Inner thread file created at {}", executionTime);
+          out.write(Bytes.toBytes("localhost"));
+          out.flush();
+          return executionTime;
+        }
+      });
+
+      sleep();
+      stream.close();
+
+      // the operation failed.
+      ExecutionException ex = intercept(ExecutionException.class, () ->
+          fut.get(TIMEOUT, TimeUnit.SECONDS));
+
+      // and the inner exception was an overwrite failure
+      // therefore the second file exists
+      intercept(FileAlreadyExistsException.class, () -> {
+        throw (Exception) ex.getCause();
+      });
+
+    } finally {
+      IOUtils.cleanupWithLogger(LOG, stream);
+      hboss.delete(path, false);
+    }
+  }
+
+  /**
+   * Create the file via the builder in one thread;
+   * read in the second.
+   * Compare the output.
+   */
+  @Test
+  public void testCreateAndRead() throws Exception {
+    Path path = testPath("testCreateAndRead");
+
+    FSDataOutputStream stream = null;
+    try {
+      // do a full chain of options. so if there's a problem
+      // with passthrough, we will see
+      // the must calls get overridden by the opt ones called after,
+      // so don't fail on creation
+      //
+      // FSDataOutputStreamBuilder has some problems with type
+      // once some of the bulder methods are called.
+      FSDataOutputStreamBuilder builder = (FSDataOutputStreamBuilder)
+          hboss.createFile(path)
+              .overwrite(false)
+              .replication((short) 1)
+              .bufferSize(20_000)
+              .permission(FsPermission.getDefault())
+              .recursive()
+              .blockSize(32_000_000)
+              .checksumOpt(Options.ChecksumOpt.createDisabled())
+              .must("int", 0)
+              .must("long", 0L)
+              .must("string", "s")
+              .must("double", 0.2f)
+              .must("bool", false)
+              .opt("int", 0)
+              .opt("long", 0L)
+              .opt("string", "s")
+              .opt("double", 0.2f)
+              .opt("bool", false);
+
+      assertLockedCreateBuilder(builder);
+
+      LOG.info("building {}:", builder);
+      stream = builder.build();
+
+      LOG.info("Output stream  {}:", stream);
+      byte[] output = Bytes.toBytes("localhost");
+      stream.write(output);
+
+      // submit the writer into a new thread
+
+      Future<Long> fut = executor.submit(() -> {
+        FutureDataInputStreamBuilder builder1 = hboss.openFile(path)
+            .must("int", 0)
+            .must("long", 0L)
+            .must("string", "s")
+            .must("double", 0.2f)
+            .must("bool", false)
+            .opt("int", 0)
+            .opt("long", 0L)
+            .opt("string", "s")
+            .opt("double", 0.2f)
+            .opt("bool", false);
+        LOG.info("Inner thread building {}", builder1);
+        assertLockedOpenBuilder(builder1);
+        try (FSDataInputStream in = builder1.build().get()) {
+          long executionTime = System.currentTimeMillis();
+          LOG.info("Inner thread file created at {}", executionTime);
+          byte[] expected = new byte[output.length];
+          in.readFully(0, expected);
+          Assertions.assertThat(expected)
+              .describedAs("data read from %s", path)
+              .isEqualTo(output);
+          LOG.info("Stream statistics {}",
+              ioStatisticsToPrettyString(in.getIOStatistics()));
+          return executionTime;
+        }
+      });
+      long streamCloseTimestamp = sleep();
+      LOG.info("main thread closing stream");
+      stream.close();
+      long threadTimestamp = fut.get(TIMEOUT, TimeUnit.SECONDS);
+      Assertions.assertThat(threadTimestamp)
+          .describedAs("timestamp of file creation in the second thread")
+          .isGreaterThanOrEqualTo(streamCloseTimestamp);
+
+    } finally {
+      IOUtils.cleanupWithLogger(LOG, stream);
+      hboss.delete(path, false);
+    }
+  }
+
+  /**
+   * Validate all the builder options and that they
+   * get through to the interior.
+   */
+  @Test
+  public void testCreateOptionPassthrough() {
+    Path path = testPath("testCreateOptionPassthrough");
+    AtomicLong progress = new AtomicLong();
+
+    LockedCreateFileBuilder builder = (LockedCreateFileBuilder)
+        hboss.createFile(path)
+            .overwrite(false)
+            .create()
+            .replication((short) 1)
+            .bufferSize(20_000)
+            .permission(FsPermission.getDefault())
+            .recursive()
+            .blockSize(32_000_000)
+            .checksumOpt(Options.ChecksumOpt.createDisabled())
+            .progress(progress::incrementAndGet)
+            .must("m1", 0)
+            .must("m2", 0L)
+            .must("m3", "s")
+            .must("m4", 0.2f)
+            .must("m5", false)
+            .opt("o1", 0)
+            .opt("o2", 0L)
+            .opt("o3", "s")
+            .opt("o4", 0.2f)
+            .opt("o5", false);
+
+    // validate the view from the builder
+    assertMandatoryKeysComplete(builder.getMandatoryKeys());
+    assertAllKeysComplete(builder.getOptions());
+    // and that the wrapped class is happy
+    assertMandatoryKeysComplete(builder.getWrapped().getMandatoryKeys());
+    assertAllKeysComplete(builder.getWrapped().getOptions());
+    Progressable progressable = builder.getProgress();
+    Assertions.assertThat(progressable)
+        .describedAs("progress callback")
+        .isNotNull();
+    progressable.progress();
+    Assertions.assertThat(progress.get())
+        .describedAs("progress callback counter")
+        .isEqualTo(1);
+  }
+
+  /**
+   * Validate all the builder options and that they
+   * get through to the interior.
+   */
+  @Test
+  public void testOpenOptionPassthrough() throws Exception {
+    Path path = testPath("testOpenOptionPassthrough");
+
+    LockedOpenFileBuilder builder = (LockedOpenFileBuilder)
+        hboss.openFile(path)
+            .withFileStatus(null)
+            .must("m1", 0)
+            .must("m2", 0L)
+            .must("m3", "s")
+            .must("m4", 0.2f)
+            .must("m5", false)
+            .opt("o1", 0)
+            .opt("o2", 0L)
+            .opt("o3", "s")
+            .opt("o4", 0.2f)
+            .opt("o5", false);
+
+    // validate the view from the builder
+    assertMandatoryKeysComplete(builder.getMandatoryKeys());
+    assertAllKeysComplete(builder.getOptions());
+    Assertions.assertThat(builder.wasStatusPropagated())
+        .describedAs("was status propagated")
+        .isFalse();
+  }
+
+  /**
+   * Use the filestatus with the read.
+   * on s3 this should go all the way through
+   */
+  @Test
+  public void testOpenOptionWithStatus() throws Exception {
+    Path path = testPath("testOpenOptionWithStatus");
+    try {
+      ContractTestUtils.touch(hboss, path);
+      FileStatus status = hboss.getFileStatus(path);
+
+      LockedOpenFileBuilder builder =
+          (LockedOpenFileBuilder) hboss.openFile(path)
+              .withFileStatus(status)
+              .opt(EXPERIMENTAL_FADVISE, "random");
+      Assertions.assertThat(builder.wasStatusPropagated())
+          .describedAs("was status %s propagated", status)
+          .isTrue();
+      readEmptyFile(builder);
+      // if the fs was s3a, look more closely
+
+      FileStatusBindingSupport bindingSupport =
+          new FileStatusBindingSupport(hboss.getRawFileSystem());
+      if (bindingSupport.isS3A()) {
+        // try again with a different status
+        LOG.info("Opening s3a file with different status than {}", status);
+        LockedOpenFileBuilder builder2 =
+            (LockedOpenFileBuilder) hboss.openFile(path)
+                .withFileStatus(new FileStatus(status))
+                .must(EXPERIMENTAL_FADVISE, "sequential")
+                .opt(FS_OPTION_OPENFILE_LENGTH, status.getLen());  // HADOOP-16202
+        Assertions.assertThat(builder2.wasStatusPropagated())
+              .describedAs("was status %s propagated", status)
+              .isFalse();
+        readEmptyFile(builder2);
+      }
+    } finally {
+      hboss.delete(path, false);
+    }
+
+  }
+
+  /**
+   * Test the status propgation logic for non-s3a filesystems.
+   */
+  @Test
+  public void testAllowAllBinding() throws Throwable {
+    FileStatusBindingSupport bindingSupport = new FileStatusBindingSupport(
+        "org.apache.hadoop.fs.LocalFileSystem");
+    Assertions.assertThat(bindingSupport.isS3A())
+        .describedAs("binding to local fs must not be s3a")
+        .isFalse();
+    Path p = new Path("file://path/file");
+    FileStatus st = new FileStatus(0, false, 1, 1, 0, p);
+    Assertions.assertThat(bindingSupport.getPropagateStatusProbe().apply(p, st))
+        .describedAs("allow(%s, %s)", p, st)
+        .isTrue();
+    Path other = new Path("file://other");
+    Assertions.assertThat(bindingSupport.getPropagateStatusProbe().apply(other, st))
+        .describedAs("allow(%s, %s)", other, st)
+        .isFalse();
+  }
+  /**
+   * Test the status propgation logic for an s3a filesystems.
+   */
+  @Test
+  public void testAllowS3ABinding() throws Throwable {
+    FileStatusBindingSupport bindingSupport = new FileStatusBindingSupport(S3AFS);
+    Assertions.assertThat(bindingSupport.isS3A())
+        .describedAs("binding to S3A fs")
+        .isTrue();
+    Path p = new Path("s3a://bucket/path/file");
+    FileStatus st = new S3AFileStatus(false, p, "owner");
+    Assertions.assertThat(bindingSupport.getPropagateStatusProbe().apply(p, st))
+        .describedAs("allow(%s, %s)", p, st)
+        .isTrue();
+    Path other = new Path("file://other");
+    Assertions.assertThat(bindingSupport.getPropagateStatusProbe().apply(other, st))
+        .describedAs("allow(%s, %s)", other, st)
+        .isFalse();
+
+    // instances which are not s3a fS are skipped
+    FileStatus st2 =  new FileStatus(0, false, 1, 1, 0, p);
+    Assertions.assertThat(bindingSupport.getPropagateStatusProbe().apply(p, st2))
+        .describedAs("allow(%s, %s)", p, st2)
+        .isFalse();
+  }
+
+  /**
+   * Open and read an empty file.
+   * @param builder builder.
+   */
+  private void readEmptyFile(LockedOpenFileBuilder builder)
+      throws IOException, ExecutionException, InterruptedException {
+    try (FSDataInputStream in = builder.build().get()) {
+      Assertions.assertThat(in.read())
+          .describedAs("read of empty file from %s", in)
+          .isEqualTo(-1);
+      LOG.info("Stream statistics {}",
+          ioStatisticsToPrettyString(in.getIOStatistics()));
+    }
+  }
+
+
+  /**
+   * Assert that the optional and mandatory keys are present.
+   *
+   * @param options options to scan.
+   */
+  private void assertAllKeysComplete(final Configuration options) {
+    List<String> list = new ArrayList<>();
+    options.iterator().forEachRemaining(e ->
+        list.add(e.getKey()));
+    Assertions.assertThat(list)
+        .describedAs("all keys in the builder")
+        .containsExactlyInAnyOrder(
+            "m1", "m2", "m3", "m4", "m5",
+            "o1", "o2", "o3", "o4", "o5");
+  }
+
+  /**
+   * Assert that the mandatory keys are present.
+   *
+   * @param mandatoryKeys mandatory keys
+   */
+  private void assertMandatoryKeysComplete(final Set<String> mandatoryKeys) {
+    Assertions.assertThat(mandatoryKeys)
+        .describedAs("mandatory keys in the builder")
+        .containsExactlyInAnyOrder("m1", "m2", "m3", "m4", "m5");
+  }
+
+  private void assertLockedOpenBuilder(final FutureDataInputStreamBuilder builder1) {
+    Assertions.assertThat(builder1)
+        .isInstanceOf(LockedOpenFileBuilder.class);
+  }
+
+  private void assertLockedCreateBuilder(final FSDataOutputStreamBuilder builder) {
+    Assertions.assertThat(builder)
+        .isInstanceOf(LockedCreateFileBuilder.class);
+  }
+
+  /**
+   * log and sleep.
+   *
+   * @return current time in millis.
+   *
+   * @throws InterruptedException if the sleep was interrupted.
+   */
+  private long sleep() throws InterruptedException {
+    LOG.info("main thread sleeping");
+    Thread.sleep(SLEEP_INTERVAL);
+    LOG.info("main thread awake");
+    return System.currentTimeMillis();
+  }
+
+}
diff --git a/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/TestZNodeCleanup.java b/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/TestZNodeCleanup.java
index bf1dec6..e1b76ed 100644
--- a/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/TestZNodeCleanup.java
+++ b/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/TestZNodeCleanup.java
@@ -75,9 +75,11 @@ public class TestZNodeCleanup extends HBaseObjectStoreSemanticsTest {
 
   @After
   public void teardown() throws Exception {
-    String zkRoot = lockManager.norm(TestUtils.testPathRoot(hboss)).toString();
-    LOG.info("Dumping contents of ZooKeeper after test from {}", zkRoot);
-    printZkBFS(zkRoot);
+    if (lockManager != null) {
+      String zkRoot = lockManager.norm(TestUtils.testPathRoot(hboss)).toString();
+      LOG.info("Dumping contents of ZooKeeper after test from {}", zkRoot);
+      printZkBFS(zkRoot);
+    }
     if (zk != null) {
       zk.close();
       zk = null;
diff --git a/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/contract/HBOSSContract.java b/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/contract/HBOSSContract.java
index d37252d..be84e4f 100644
--- a/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/contract/HBOSSContract.java
+++ b/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/contract/HBOSSContract.java
@@ -70,8 +70,7 @@ public class HBOSSContract extends AbstractFSContract {
       try {
         fs = TestUtils.getFileSystem(conf);
       } catch (Exception e) {
-        LOG.error(e.getMessage());
-        e.printStackTrace();
+        LOG.error(e.toString(), e);
         throw new IOException("Failed to get FS", e);
       }
     }
diff --git a/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/contract/TestHBOSSContractGetFileStatus.java b/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/contract/TestHBOSSContractGetFileStatus.java
index 3425afc..f5c2ad9 100644
--- a/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/contract/TestHBOSSContractGetFileStatus.java
+++ b/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/contract/TestHBOSSContractGetFileStatus.java
@@ -33,7 +33,7 @@ public class TestHBOSSContractGetFileStatus extends AbstractContractGetFileStatu
       TestUtils.disableFilesystemCaching(conf);
     } catch (Exception e) {
       e.printStackTrace();
-      fail("Exception configuring FS: " + e);
+      throw new AssertionError("Exception configuring FS: " + e, e);
     }
     return conf;
   }
diff --git a/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/contract/TestHBOSSContractOpen.java b/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/contract/TestHBOSSContractOpen.java
index 33d684d..fa7196d 100644
--- a/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/contract/TestHBOSSContractOpen.java
+++ b/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/contract/TestHBOSSContractOpen.java
@@ -18,6 +18,8 @@
 
 package org.apache.hadoop.hbase.oss.contract;
 
+import org.junit.Test;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.contract.AbstractContractOpenTest;
 import org.apache.hadoop.fs.contract.AbstractFSContract;
@@ -41,4 +43,13 @@ public class TestHBOSSContractOpen extends AbstractContractOpenTest {
     TestUtils.runIfS3(false, conf);
     return contract;
   }
+
+  /**
+   * HADOOP-16202 cut this test as it is now OK to
+   * pass down a null status.
+   */
+  @Test
+  public void testOpenFileNullStatus() {
+
+  }
 }
diff --git a/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/contract/TestHBOSSContractOpenS3A.java b/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/contract/TestHBOSSContractOpenS3A.java
index cbf5b65..7f8c2dd 100644
--- a/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/contract/TestHBOSSContractOpenS3A.java
+++ b/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/contract/TestHBOSSContractOpenS3A.java
@@ -18,8 +18,9 @@
 
 package org.apache.hadoop.hbase.oss.contract;
 
+import org.junit.Test;
+
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.contract.AbstractContractOpenTest;
 import org.apache.hadoop.fs.contract.AbstractFSContract;
 import org.apache.hadoop.hbase.oss.TestUtils;
 
@@ -29,7 +30,7 @@ import org.apache.hadoop.hbase.oss.TestUtils;
  * areZeroByteFilesEncrypted() method to always return true.
  * TestHBOSSContractOpen remains to be run in the general case.
  */
-public class TestHBOSSContractOpenS3A extends AbstractContractOpenTest {
+public class TestHBOSSContractOpenS3A extends TestHBOSSContractOpen {
 
   @Override
   protected Configuration createConfiguration() {
@@ -50,4 +51,13 @@ public class TestHBOSSContractOpenS3A extends AbstractContractOpenTest {
   protected boolean areZeroByteFilesEncrypted() {
     return true;
   }
+
+  /**
+   * HADOOP-16202 cut this test as it is now OK to
+   * pass down a null status.
+   */
+  @Test
+  public void testOpenFileNullStatus() {
+
+  }
 }
diff --git a/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/sync/LocalTreeLockManager.java b/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/sync/LocalTreeLockManager.java
index d7f5eb0..def2a99 100644
--- a/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/sync/LocalTreeLockManager.java
+++ b/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/sync/LocalTreeLockManager.java
@@ -47,7 +47,9 @@ public class LocalTreeLockManager extends TreeLockManager {
   @Override
   protected void writeLock(Path p) {
     createLocksIfNeeded(p);
-    index.get(p).lock.writeLock().lock();
+    LockNode node = index.get(p);
+    LOG.debug("Acquiring write lock on {}", node);
+    node.lock.writeLock().lock();
   }
 
   @Override
@@ -56,6 +58,7 @@ public class LocalTreeLockManager extends TreeLockManager {
       LockNode node = index.get(p);
       // Node to unlock may already be gone after deletes
       if (node != null) {
+        LOG.debug("Releasing write lock on {}", node);
         node.lock.writeLock().unlock();
       }
     } catch(IllegalMonitorStateException e) {
@@ -141,6 +144,13 @@ public class LocalTreeLockManager extends TreeLockManager {
     public Map<Path, LockNode> children = new HashMap<>();
     public ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
 
+    @Override
+    public String toString() {
+      return "LockNode{" +
+          "path=" + path +
+          ", lock=" + lock +
+          '}';
+    }
   }
 
   private Map<Path, LockNode> index = new HashMap<>();
diff --git a/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/sync/TestZKLockManagerConfig.java b/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/sync/TestZKLockManagerConfig.java
index 107aafc..dacf541 100644
--- a/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/sync/TestZKLockManagerConfig.java
+++ b/hbase-oss/src/test/java/org/apache/hadoop/hbase/oss/sync/TestZKLockManagerConfig.java
@@ -17,6 +17,8 @@
  */
 package org.apache.hadoop.hbase.oss.sync;
 
+import java.net.URI;
+
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
@@ -50,8 +52,10 @@ public class TestZKLockManagerConfig {
       conf.unset(Constants.ZK_CONN_STRING);
       assertNull(conf.get(Constants.ZK_CONN_STRING));
       
-      // Get a LocalFS -- we don't really care about it, just passing it to the lockManager.
-      FileSystem fs = LocalFileSystem.get(conf);
+      // Create a LocalFS -- we don't really care about it, just passing it to the lockManager.
+      // why a new instance? to avoid problems with cached fs instances across tests.
+      FileSystem fs = new LocalFileSystem();
+      fs.initialize(URI.create("file:///"), conf);
 
       // Initializing the ZKTreeLockManager should succeed, even when we only have
       // the hbase.zookeeper.quorum config property set.
diff --git a/hbase-oss/src/test/resources/log4j.properties b/hbase-oss/src/test/resources/log4j.properties
index 108b3f9..151741e 100644
--- a/hbase-oss/src/test/resources/log4j.properties
+++ b/hbase-oss/src/test/resources/log4j.properties
@@ -18,4 +18,7 @@ log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
 log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} (%F:%M(%L)) - %m%n
 log4j.logger.org.apache.hadoop=DEBUG
 log4j.logger.org.apache.hadoop.metrics2=WARN
-log4j.logger.org.apache.hadoop.fs=WARN
\ No newline at end of file
+log4j.logger.org.apache.hadoop.fs=WARN
+log4j.org.apache.hadoop.util=WARN
+log4j.logger.org.apache.hadoop.fs.s3a=INFO
+log4j.logger.org.apache.hadoop.hbase.oss=DEBUG