You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by bi...@apache.org on 2021/04/12 23:48:29 UTC

[hadoop] branch trunk updated: HADOOP-16948. Support infinite lease dirs. (#1925)

This is an automated email from the ASF dual-hosted git repository.

billie pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new c1fde4f  HADOOP-16948. Support infinite lease dirs. (#1925)
c1fde4f is described below

commit c1fde4fe94f268c6d5515b421ac47345dca8163d
Author: billierinaldi <bi...@apache.org>
AuthorDate: Mon Apr 12 19:47:59 2021 -0400

    HADOOP-16948. Support infinite lease dirs. (#1925)
    
    * HADOOP-16948. Support single writer dirs.
    
    * HADOOP-16948. Fix findbugs and checkstyle problems.
    
    * HADOOP-16948. Fix remaining checkstyle problems.
    
    * HADOOP-16948. Add DurationInfo, retry policy for acquiring lease, and javadocs
    
    * HADOOP-16948. Convert ABFS client to use an executor for lease ops
    
    * HADOOP-16948. Fix ABFS lease test for non-HNS
    
    * HADOOP-16948. Fix checkstyle and javadoc
    
    * HADOOP-16948. Address review comments
    
    * HADOOP-16948. Use daemon threads for ABFS lease ops
    
    * HADOOP-16948. Make lease duration configurable
    
    * HADOOP-16948. Add error messages to test assertions
    
    * HADOOP-16948. Remove extra isSingleWriterKey call
    
    * HADOOP-16948. Use only infinite lease duration due to cost of renewal ops
    
    * HADOOP-16948. Remove acquire/renew/release lease methods
    
    * HADOOP-16948. Rename single writer dirs to infinite lease dirs
    
    * HADOOP-16948. Fix checkstyle
    
    * HADOOP-16948. Wait for acquire lease future
    
    * HADOOP-16948. Add unit test for acquire lease failure
---
 .../hadoop/fs/azurebfs/AbfsConfiguration.java      |  35 +++
 .../hadoop/fs/azurebfs/AzureBlobFileSystem.java    |  21 ++
 .../fs/azurebfs/AzureBlobFileSystemStore.java      |  90 +++++-
 .../fs/azurebfs/constants/AbfsHttpConstants.java   |   5 +
 .../fs/azurebfs/constants/ConfigurationKeys.java   |   9 +
 .../constants/FileSystemConfigurations.java        |   7 +
 .../constants/HttpHeaderConfigurations.java        |   5 +
 .../ConfigurationValidationAnnotations.java        |  16 +
 .../exceptions/AzureBlobFileSystemException.java   |   4 +
 .../services/AppendRequestParameters.java          |   8 +-
 .../IntegerConfigurationBasicValidator.java        |  13 +-
 .../hadoop/fs/azurebfs/services/AbfsClient.java    | 131 +++++++-
 .../hadoop/fs/azurebfs/services/AbfsErrors.java    |  53 ++++
 .../hadoop/fs/azurebfs/services/AbfsLease.java     | 188 ++++++++++++
 .../fs/azurebfs/services/AbfsOutputStream.java     |  37 ++-
 .../azurebfs/services/AbfsOutputStreamContext.java |  18 ++
 .../fs/azurebfs/services/AbfsRestOperation.java    |   1 +
 .../azurebfs/services/AbfsRestOperationType.java   |   3 +-
 .../hadoop-azure/src/site/markdown/abfs.md         |  16 +
 .../fs/azurebfs/ITestAzureBlobFileSystemLease.java | 336 +++++++++++++++++++++
 .../diagnostics/TestConfigurationValidators.java   |  29 +-
 .../fs/azurebfs/services/TestAbfsOutputStream.java |  49 +--
 22 files changed, 1032 insertions(+), 42 deletions(-)

diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
index f36cc7d..0a8224a 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants;
 import org.apache.hadoop.fs.azurebfs.constants.AuthConfigurations;
 import org.apache.hadoop.fs.azurebfs.contracts.annotations.ConfigurationValidationAnnotations.IntegerConfigurationValidatorAnnotation;
+import org.apache.hadoop.fs.azurebfs.contracts.annotations.ConfigurationValidationAnnotations.IntegerWithOutlierConfigurationValidatorAnnotation;
 import org.apache.hadoop.fs.azurebfs.contracts.annotations.ConfigurationValidationAnnotations.LongConfigurationValidatorAnnotation;
 import org.apache.hadoop.fs.azurebfs.contracts.annotations.ConfigurationValidationAnnotations.StringConfigurationValidatorAnnotation;
 import org.apache.hadoop.fs.azurebfs.contracts.annotations.ConfigurationValidationAnnotations.Base64StringConfigurationValidatorAnnotation;
@@ -208,6 +209,15 @@ public class AbfsConfiguration{
       DefaultValue = DEFAULT_FS_AZURE_APPEND_BLOB_DIRECTORIES)
   private String azureAppendBlobDirs;
 
+  @StringConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_INFINITE_LEASE_KEY,
+      DefaultValue = DEFAULT_FS_AZURE_INFINITE_LEASE_DIRECTORIES)
+  private String azureInfiniteLeaseDirs;
+
+  @IntegerConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_LEASE_THREADS,
+      MinValue = MIN_LEASE_THREADS,
+      DefaultValue = DEFAULT_LEASE_THREADS)
+  private int numLeaseThreads;
+
   @BooleanConfigurationValidatorAnnotation(ConfigurationKey = AZURE_CREATE_REMOTE_FILESYSTEM_DURING_INITIALIZATION,
       DefaultValue = DEFAULT_AZURE_CREATE_REMOTE_FILESYSTEM_DURING_INITIALIZATION)
   private boolean createRemoteFileSystemDuringInitialization;
@@ -296,6 +306,8 @@ public class AbfsConfiguration{
       field.setAccessible(true);
       if (field.isAnnotationPresent(IntegerConfigurationValidatorAnnotation.class)) {
         field.set(this, validateInt(field));
+      } else if (field.isAnnotationPresent(IntegerWithOutlierConfigurationValidatorAnnotation.class)) {
+        field.set(this, validateIntWithOutlier(field));
       } else if (field.isAnnotationPresent(LongConfigurationValidatorAnnotation.class)) {
         field.set(this, validateLong(field));
       } else if (field.isAnnotationPresent(StringConfigurationValidatorAnnotation.class)) {
@@ -634,6 +646,14 @@ public class AbfsConfiguration{
     return this.azureAppendBlobDirs;
   }
 
+  public String getAzureInfiniteLeaseDirs() {
+    return this.azureInfiniteLeaseDirs;
+  }
+
+  public int getNumLeaseThreads() {
+    return this.numLeaseThreads;
+  }
+
   public boolean getCreateRemoteFileSystemDuringInitialization() {
     // we do not support creating the filesystem when AuthType is SAS
     return this.createRemoteFileSystemDuringInitialization
@@ -843,6 +863,21 @@ public class AbfsConfiguration{
         validator.ThrowIfInvalid()).validate(value);
   }
 
+  int validateIntWithOutlier(Field field) throws IllegalAccessException, InvalidConfigurationValueException {
+    IntegerWithOutlierConfigurationValidatorAnnotation validator =
+        field.getAnnotation(IntegerWithOutlierConfigurationValidatorAnnotation.class);
+    String value = get(validator.ConfigurationKey());
+
+    // validate
+    return new IntegerConfigurationBasicValidator(
+        validator.OutlierValue(),
+        validator.MinValue(),
+        validator.MaxValue(),
+        validator.DefaultValue(),
+        validator.ConfigurationKey(),
+        validator.ThrowIfInvalid()).validate(value);
+  }
+
   long validateLong(Field field) throws IllegalAccessException, InvalidConfigurationValueException {
     LongConfigurationValidatorAnnotation validator = field.getAnnotation(LongConfigurationValidatorAnnotation.class);
     String value = rawConfig.get(validator.ConfigurationKey());
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java
index d8a2ed7..30108ed 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java
@@ -87,6 +87,7 @@ import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.functional.RemoteIterators;
+import org.apache.hadoop.util.DurationInfo;
 import org.apache.hadoop.util.LambdaUtils;
 import org.apache.hadoop.util.Progressable;
 
@@ -506,6 +507,26 @@ public class AzureBlobFileSystem extends FileSystem {
   }
 
   /**
+   * Break the current lease on an ABFS file if it exists. A lease that is broken cannot be
+   * renewed. A new lease may be obtained on the file immediately.
+   *
+   * @param f file name
+   * @throws IOException on any exception while breaking the lease
+   */
+  public void breakLease(final Path f) throws IOException {
+    LOG.debug("AzureBlobFileSystem.breakLease path: {}", f);
+
+    Path qualifiedPath = makeQualified(f);
+
+    try (DurationInfo ignored = new DurationInfo(LOG, false, "Break lease for %s",
+        qualifiedPath)) {
+      abfsStore.breakLease(qualifiedPath);
+    } catch(AzureBlobFileSystemException ex) {
+      checkException(f, ex);
+    }
+  }
+
+  /**
    * Qualify a path to one which uses this FileSystem and, if relative,
    * made absolute.
    * @param path to qualify.
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
index 75419c2..fa7e12b 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
@@ -39,6 +39,7 @@ import java.text.SimpleDateFormat;
 import java.time.Instant;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -48,10 +49,14 @@ import java.util.Locale;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
+import java.util.WeakHashMap;
+import java.util.concurrent.ExecutionException;
 
 import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
 import org.apache.hadoop.thirdparty.com.google.common.base.Strings;
+import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.Futures;
+import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListenableFuture;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -100,6 +105,7 @@ import org.apache.hadoop.fs.azurebfs.services.AbfsPermission;
 import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation;
 import org.apache.hadoop.fs.azurebfs.services.AuthType;
 import org.apache.hadoop.fs.azurebfs.services.ExponentialRetryPolicy;
+import org.apache.hadoop.fs.azurebfs.services.AbfsLease;
 import org.apache.hadoop.fs.azurebfs.services.SharedKeyCredentials;
 import org.apache.hadoop.fs.azurebfs.services.AbfsPerfTracker;
 import org.apache.hadoop.fs.azurebfs.services.AbfsPerfInfo;
@@ -145,8 +151,11 @@ public class AzureBlobFileSystemStore implements Closeable, ListingSupport {
   private static final String XMS_PROPERTIES_ENCODING = "ISO-8859-1";
   private static final int GET_SET_AGGREGATE_COUNT = 2;
 
+  private final Map<AbfsLease, Object> leaseRefs;
+
   private final AbfsConfiguration abfsConfiguration;
   private final Set<String> azureAtomicRenameDirSet;
+  private Set<String> azureInfiniteLeaseDirSet;
   private Trilean isNamespaceEnabled;
   private final AuthType authType;
   private final UserGroupInformation userGroupInformation;
@@ -167,6 +176,8 @@ public class AzureBlobFileSystemStore implements Closeable, ListingSupport {
     final String fileSystemName = authorityParts[0];
     final String accountName = authorityParts[1];
 
+    leaseRefs = Collections.synchronizedMap(new WeakHashMap<>());
+
     try {
       this.abfsConfiguration = new AbfsConfiguration(configuration, accountName);
     } catch (IllegalAccessException exception) {
@@ -195,6 +206,7 @@ public class AzureBlobFileSystemStore implements Closeable, ListingSupport {
 
     this.azureAtomicRenameDirSet = new HashSet<>(Arrays.asList(
         abfsConfiguration.getAzureAtomicRenameDirs().split(AbfsHttpConstants.COMMA)));
+    updateInfiniteLeaseDirs();
     this.authType = abfsConfiguration.getAuthType(accountName);
     boolean usingOauth = (authType == AuthType.OAuth);
     boolean useHttps = (usingOauth || abfsConfiguration.isHttpsAlwaysUsed()) ? true : isSecureScheme;
@@ -246,7 +258,24 @@ public class AzureBlobFileSystemStore implements Closeable, ListingSupport {
 
   @Override
   public void close() throws IOException {
-    IOUtils.cleanupWithLogger(LOG, client);
+    List<ListenableFuture<?>> futures = new ArrayList<>();
+    for (AbfsLease lease : leaseRefs.keySet()) {
+      if (lease == null) {
+        continue;
+      }
+      ListenableFuture<?> future = client.submit(() -> lease.free());
+      futures.add(future);
+    }
+    try {
+      Futures.allAsList(futures).get();
+    } catch (InterruptedException e) {
+      LOG.error("Interrupted freeing leases", e);
+      Thread.currentThread().interrupt();
+    } catch (ExecutionException e) {
+      LOG.error("Error freeing leases", e);
+    } finally {
+      IOUtils.cleanupWithLogger(LOG, client);
+    }
   }
 
   byte[] encodeAttribute(String value) throws UnsupportedEncodingException {
@@ -496,12 +525,14 @@ public class AzureBlobFileSystemStore implements Closeable, ListingSupport {
       }
       perfInfo.registerResult(op.getResult()).registerSuccess(true);
 
+      AbfsLease lease = maybeCreateLease(relativePath);
+
       return new AbfsOutputStream(
           client,
           statistics,
           relativePath,
           0,
-          populateAbfsOutputStreamContext(isAppendBlob));
+          populateAbfsOutputStreamContext(isAppendBlob, lease));
     }
   }
 
@@ -573,7 +604,8 @@ public class AzureBlobFileSystemStore implements Closeable, ListingSupport {
     return op;
   }
 
-  private AbfsOutputStreamContext populateAbfsOutputStreamContext(boolean isAppendBlob) {
+  private AbfsOutputStreamContext populateAbfsOutputStreamContext(boolean isAppendBlob,
+      AbfsLease lease) {
     int bufferSize = abfsConfiguration.getWriteBufferSize();
     if (isAppendBlob && bufferSize > FileSystemConfigurations.APPENDBLOB_MAX_WRITE_BUFFER_SIZE) {
       bufferSize = FileSystemConfigurations.APPENDBLOB_MAX_WRITE_BUFFER_SIZE;
@@ -587,6 +619,7 @@ public class AzureBlobFileSystemStore implements Closeable, ListingSupport {
             .withAppendBlob(isAppendBlob)
             .withWriteMaxConcurrentRequestCount(abfsConfiguration.getWriteMaxConcurrentRequestCount())
             .withMaxWriteRequestsToQueue(abfsConfiguration.getMaxWriteRequestsToQueue())
+            .withLease(lease)
             .build();
   }
 
@@ -705,15 +738,29 @@ public class AzureBlobFileSystemStore implements Closeable, ListingSupport {
         isAppendBlob = true;
       }
 
+      AbfsLease lease = maybeCreateLease(relativePath);
+
       return new AbfsOutputStream(
           client,
           statistics,
           relativePath,
           offset,
-          populateAbfsOutputStreamContext(isAppendBlob));
+          populateAbfsOutputStreamContext(isAppendBlob, lease));
     }
   }
 
+  /**
+   * Break any current lease on an ABFS file.
+   *
+   * @param path file name
+   * @throws AzureBlobFileSystemException on any exception while breaking the lease
+   */
+  public void breakLease(final Path path) throws AzureBlobFileSystemException {
+    LOG.debug("lease path: {}", path);
+
+    client.breakLease(getRelativePath(path));
+  }
+
   public void rename(final Path source, final Path destination) throws
           AzureBlobFileSystemException {
     final Instant startAggregate = abfsPerfTracker.getLatencyInstant();
@@ -1347,6 +1394,13 @@ public class AzureBlobFileSystemStore implements Closeable, ListingSupport {
     return isKeyForDirectorySet(key, azureAtomicRenameDirSet);
   }
 
+  public boolean isInfiniteLeaseKey(String key) {
+    if (azureInfiniteLeaseDirSet.isEmpty()) {
+      return false;
+    }
+    return isKeyForDirectorySet(key, azureInfiniteLeaseDirSet);
+  }
+
   /**
    * A on-off operation to initialize AbfsClient for AzureBlobFileSystem
    * Operations.
@@ -1636,4 +1690,32 @@ public class AzureBlobFileSystemStore implements Closeable, ListingSupport {
     this.isNamespaceEnabled = isNamespaceEnabled;
   }
 
+  private void updateInfiniteLeaseDirs() {
+    this.azureInfiniteLeaseDirSet = new HashSet<>(Arrays.asList(
+        abfsConfiguration.getAzureInfiniteLeaseDirs().split(AbfsHttpConstants.COMMA)));
+    // remove the empty string, since isKeyForDirectory returns true for empty strings
+    // and we don't want to default to enabling infinite lease dirs
+    this.azureInfiniteLeaseDirSet.remove("");
+  }
+
+  private AbfsLease maybeCreateLease(String relativePath)
+      throws AzureBlobFileSystemException {
+    boolean enableInfiniteLease = isInfiniteLeaseKey(relativePath);
+    if (!enableInfiniteLease) {
+      return null;
+    }
+    AbfsLease lease = new AbfsLease(client, relativePath);
+    leaseRefs.put(lease, null);
+    return lease;
+  }
+
+  @VisibleForTesting
+  boolean areLeasesFreed() {
+    for (AbfsLease lease : leaseRefs.keySet()) {
+      if (lease != null && !lease.isFreed()) {
+        return false;
+      }
+    }
+    return true;
+  }
 }
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java
index 184657e..5cf7ec5 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java
@@ -39,6 +39,11 @@ public final class AbfsHttpConstants {
   public static final String GET_ACCESS_CONTROL = "getAccessControl";
   public static final String CHECK_ACCESS = "checkAccess";
   public static final String GET_STATUS = "getStatus";
+  public static final String ACQUIRE_LEASE_ACTION = "acquire";
+  public static final String BREAK_LEASE_ACTION = "break";
+  public static final String RELEASE_LEASE_ACTION = "release";
+  public static final String RENEW_LEASE_ACTION = "renew";
+  public static final String DEFAULT_LEASE_BREAK_PERIOD = "0";
   public static final String DEFAULT_TIMEOUT = "90";
   public static final String APPEND_BLOB_TYPE = "appendblob";
   public static final String TOKEN_VERSION = "2";
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
index 02b143c..4fe1d1c 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
@@ -87,6 +87,15 @@ public final class ConfigurationKeys {
   /** Provides a config to provide comma separated path prefixes on which Appendblob based files are created
    *  Default is empty. **/
   public static final String FS_AZURE_APPEND_BLOB_KEY = "fs.azure.appendblob.directories";
+  /** Provides a config to provide comma separated path prefixes which support infinite leases.
+   *  Files under these paths will be leased when created or opened for writing and the lease will
+   *  be released when the file is closed. The lease may be broken with the breakLease method on
+   *  AzureBlobFileSystem. Default is empty.
+   * **/
+  public static final String FS_AZURE_INFINITE_LEASE_KEY = "fs.azure.infinite-lease.directories";
+  /** Provides a number of threads to use for lease operations for infinite lease directories.
+   *  Must be set to a minimum of 1 if infinite lease directories are to be used. Default is 0. **/
+  public static final String FS_AZURE_LEASE_THREADS = "fs.azure.lease.threads";
   public static final String FS_AZURE_READ_AHEAD_QUEUE_DEPTH = "fs.azure.readaheadqueue.depth";
   public static final String FS_AZURE_ALWAYS_READ_BUFFER_SIZE = "fs.azure.read.alwaysReadBufferSize";
   public static final String FS_AZURE_READ_AHEAD_BLOCK_SIZE = "fs.azure.read.readahead.blocksize";
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
index d90f525..040b18a 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
@@ -78,6 +78,13 @@ public final class FileSystemConfigurations {
   public static final boolean DEFAULT_FS_AZURE_ENABLE_CONDITIONAL_CREATE_OVERWRITE = true;
   public static final boolean DEFAULT_FS_AZURE_ENABLE_MKDIR_OVERWRITE = true;
   public static final String DEFAULT_FS_AZURE_APPEND_BLOB_DIRECTORIES = "";
+  public static final String DEFAULT_FS_AZURE_INFINITE_LEASE_DIRECTORIES = "";
+  public static final int DEFAULT_LEASE_THREADS = 0;
+  public static final int MIN_LEASE_THREADS = 0;
+  public static final int DEFAULT_LEASE_DURATION = -1;
+  public static final int INFINITE_LEASE_DURATION = -1;
+  public static final int MIN_LEASE_DURATION = 15;
+  public static final int MAX_LEASE_DURATION = 60;
 
   public static final int DEFAULT_READ_AHEAD_QUEUE_DEPTH = -1;
 
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/HttpHeaderConfigurations.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/HttpHeaderConfigurations.java
index 27ddcee..2325538 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/HttpHeaderConfigurations.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/HttpHeaderConfigurations.java
@@ -60,6 +60,11 @@ public final class HttpHeaderConfigurations {
   public static final String X_MS_UMASK = "x-ms-umask";
   public static final String X_MS_NAMESPACE_ENABLED = "x-ms-namespace-enabled";
   public static final String X_MS_ABFS_CLIENT_LATENCY = "x-ms-abfs-client-latency";
+  public static final String X_MS_LEASE_ACTION = "x-ms-lease-action";
+  public static final String X_MS_LEASE_DURATION = "x-ms-lease-duration";
+  public static final String X_MS_LEASE_ID = "x-ms-lease-id";
+  public static final String X_MS_PROPOSED_LEASE_ID = "x-ms-proposed-lease-id";
+  public static final String X_MS_LEASE_BREAK_PERIOD = "x-ms-lease-break-period";
 
   private HttpHeaderConfigurations() {}
 }
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/annotations/ConfigurationValidationAnnotations.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/annotations/ConfigurationValidationAnnotations.java
index 82c571a..9fbe5a2 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/annotations/ConfigurationValidationAnnotations.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/annotations/ConfigurationValidationAnnotations.java
@@ -46,6 +46,22 @@ public class ConfigurationValidationAnnotations {
     boolean ThrowIfInvalid() default false;
   }
 
+  @Target({ ElementType.FIELD })
+  @Retention(RetentionPolicy.RUNTIME)
+  public @interface IntegerWithOutlierConfigurationValidatorAnnotation {
+    String ConfigurationKey();
+
+    int MaxValue() default Integer.MAX_VALUE;
+
+    int MinValue() default Integer.MIN_VALUE;
+
+    int OutlierValue() default Integer.MIN_VALUE;
+
+    int DefaultValue();
+
+    boolean ThrowIfInvalid() default false;
+  }
+
   /**
    * Describes the requirements when validating the annotated long field.
    */
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/exceptions/AzureBlobFileSystemException.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/exceptions/AzureBlobFileSystemException.java
index 9b1bead..d829c5a 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/exceptions/AzureBlobFileSystemException.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/exceptions/AzureBlobFileSystemException.java
@@ -37,6 +37,10 @@ public abstract class AzureBlobFileSystemException extends IOException {
     super(message, innerException);
   }
 
+  public AzureBlobFileSystemException(final String message, final Throwable innerThrowable) {
+    super(message, innerThrowable);
+  }
+
   @Override
   public String toString() {
     if (this.getMessage() == null && this.getCause() == null) {
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/AppendRequestParameters.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/AppendRequestParameters.java
index fb4d29f..7369bfa 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/AppendRequestParameters.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/AppendRequestParameters.java
@@ -33,17 +33,20 @@ public class AppendRequestParameters {
   private final int length;
   private final Mode mode;
   private final boolean isAppendBlob;
+  private final String leaseId;
 
   public AppendRequestParameters(final long position,
       final int offset,
       final int length,
       final Mode mode,
-      final boolean isAppendBlob) {
+      final boolean isAppendBlob,
+      final String leaseId) {
     this.position = position;
     this.offset = offset;
     this.length = length;
     this.mode = mode;
     this.isAppendBlob = isAppendBlob;
+    this.leaseId = leaseId;
   }
 
   public long getPosition() {
@@ -66,4 +69,7 @@ public class AppendRequestParameters {
     return this.isAppendBlob;
   }
 
+  public String getLeaseId() {
+    return this.leaseId;
+  }
 }
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/diagnostics/IntegerConfigurationBasicValidator.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/diagnostics/IntegerConfigurationBasicValidator.java
index 26c7d2f..9d4beb7 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/diagnostics/IntegerConfigurationBasicValidator.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/diagnostics/IntegerConfigurationBasicValidator.java
@@ -31,11 +31,18 @@ import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidConfigurationVa
 public class IntegerConfigurationBasicValidator extends ConfigurationBasicValidator<Integer> implements ConfigurationValidator {
   private final int min;
   private final int max;
+  private final int outlier;
 
   public IntegerConfigurationBasicValidator(final int min, final int max, final int defaultVal, final String configKey, final boolean throwIfInvalid) {
+    this(min, min, max, defaultVal, configKey, throwIfInvalid);
+  }
+
+  public IntegerConfigurationBasicValidator(final int outlier, final int min, final int max,
+      final int defaultVal, final String configKey, final boolean throwIfInvalid) {
     super(configKey, defaultVal, throwIfInvalid);
     this.min = min;
     this.max = max;
+    this.outlier = outlier;
   }
 
   public Integer validate(final String configValue) throws InvalidConfigurationValueException {
@@ -47,10 +54,14 @@ public class IntegerConfigurationBasicValidator extends ConfigurationBasicValida
     try {
       result = Integer.parseInt(configValue);
       // throw an exception if a 'within bounds' value is missing
-      if (getThrowIfInvalid() && (result < this.min || result > this.max)) {
+      if (getThrowIfInvalid() && (result != outlier) && (result < this.min || result > this.max)) {
         throw new InvalidConfigurationValueException(getConfigKey());
       }
 
+      if (result == outlier) {
+        return result;
+      }
+
       // set the value to the nearest bound if it's out of bounds
       if (result < this.min) {
         return this.min;
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
index 92b24f0..7c8a211 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
@@ -29,16 +29,27 @@ import java.time.Instant;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Locale;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.thirdparty.com.google.common.base.Strings;
-import org.apache.hadoop.security.ssl.DelegatingSSLSocketFactory;
-import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants;
-import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
-import org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams;
+import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.FutureCallback;
+import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.Futures;
+import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListenableFuture;
+import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListenableScheduledFuture;
+import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListeningScheduledExecutorService;
+import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.MoreExecutors;
+import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants;
+import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
+import org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams;
 import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
 import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidUriException;
 import org.apache.hadoop.fs.azurebfs.contracts.exceptions.SASTokenProviderException;
@@ -49,6 +60,8 @@ import org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters;
 import org.apache.hadoop.fs.azurebfs.oauth2.AccessTokenProvider;
 import org.apache.hadoop.fs.azurebfs.utils.DateTimeUtils;
 import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.security.ssl.DelegatingSSLSocketFactory;
+import org.apache.hadoop.util.concurrent.HadoopExecutors;
 
 import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.*;
 import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_DELETE_CONSIDERED_IDEMPOTENT;
@@ -76,6 +89,8 @@ public class AbfsClient implements Closeable {
   private SASTokenProvider sasTokenProvider;
   private final AbfsCounters abfsCounters;
 
+  private final ListeningScheduledExecutorService executorService;
+
   private AbfsClient(final URL baseUrl, final SharedKeyCredentials sharedKeyCredentials,
                     final AbfsConfiguration abfsConfiguration,
                     final AbfsClientContext abfsClientContext) {
@@ -106,6 +121,11 @@ public class AbfsClient implements Closeable {
     this.userAgent = initializeUserAgent(abfsConfiguration, sslProviderName);
     this.abfsPerfTracker = abfsClientContext.getAbfsPerfTracker();
     this.abfsCounters = abfsClientContext.getAbfsCounters();
+
+    ThreadFactory tf =
+        new ThreadFactoryBuilder().setNameFormat("AbfsClient Lease Ops").setDaemon(true).build();
+    this.executorService = MoreExecutors.listeningDecorator(
+        HadoopExecutors.newScheduledThreadPool(this.abfsConfiguration.getNumLeaseThreads(), tf));
   }
 
   public AbfsClient(final URL baseUrl, final SharedKeyCredentials sharedKeyCredentials,
@@ -129,6 +149,7 @@ public class AbfsClient implements Closeable {
     if (tokenProvider instanceof Closeable) {
       IOUtils.cleanupWithLogger(LOG, (Closeable) tokenProvider);
     }
+    HadoopExecutors.shutdown(executorService, LOG, 0, TimeUnit.SECONDS);
   }
 
   public String getFileSystem() {
@@ -317,6 +338,83 @@ public class AbfsClient implements Closeable {
     return op;
   }
 
+  public AbfsRestOperation acquireLease(final String path, int duration) throws AzureBlobFileSystemException {
+    final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
+
+    requestHeaders.add(new AbfsHttpHeader(X_MS_LEASE_ACTION, ACQUIRE_LEASE_ACTION));
+    requestHeaders.add(new AbfsHttpHeader(X_MS_LEASE_DURATION, Integer.toString(duration)));
+    requestHeaders.add(new AbfsHttpHeader(X_MS_PROPOSED_LEASE_ID, UUID.randomUUID().toString()));
+
+    final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
+
+    final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
+    final AbfsRestOperation op = new AbfsRestOperation(
+        AbfsRestOperationType.LeasePath,
+        this,
+        HTTP_METHOD_POST,
+        url,
+        requestHeaders);
+    op.execute();
+    return op;
+  }
+
+  public AbfsRestOperation renewLease(final String path, final String leaseId) throws AzureBlobFileSystemException {
+    final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
+
+    requestHeaders.add(new AbfsHttpHeader(X_MS_LEASE_ACTION, RENEW_LEASE_ACTION));
+    requestHeaders.add(new AbfsHttpHeader(X_MS_LEASE_ID, leaseId));
+
+    final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
+
+    final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
+    final AbfsRestOperation op = new AbfsRestOperation(
+        AbfsRestOperationType.LeasePath,
+        this,
+        HTTP_METHOD_POST,
+        url,
+        requestHeaders);
+    op.execute();
+    return op;
+  }
+
+  public AbfsRestOperation releaseLease(final String path, final String leaseId) throws AzureBlobFileSystemException {
+    final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
+
+    requestHeaders.add(new AbfsHttpHeader(X_MS_LEASE_ACTION, RELEASE_LEASE_ACTION));
+    requestHeaders.add(new AbfsHttpHeader(X_MS_LEASE_ID, leaseId));
+
+    final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
+
+    final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
+    final AbfsRestOperation op = new AbfsRestOperation(
+        AbfsRestOperationType.LeasePath,
+        this,
+        HTTP_METHOD_POST,
+        url,
+        requestHeaders);
+    op.execute();
+    return op;
+  }
+
+  public AbfsRestOperation breakLease(final String path) throws AzureBlobFileSystemException {
+    final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
+
+    requestHeaders.add(new AbfsHttpHeader(X_MS_LEASE_ACTION, BREAK_LEASE_ACTION));
+    requestHeaders.add(new AbfsHttpHeader(X_MS_LEASE_BREAK_PERIOD, DEFAULT_LEASE_BREAK_PERIOD));
+
+    final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
+
+    final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
+    final AbfsRestOperation op = new AbfsRestOperation(
+        AbfsRestOperationType.LeasePath,
+        this,
+        HTTP_METHOD_POST,
+        url,
+        requestHeaders);
+    op.execute();
+    return op;
+  }
+
   public AbfsRestOperation renamePath(String source, final String destination, final String continuation)
           throws AzureBlobFileSystemException {
     final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
@@ -416,6 +514,9 @@ public class AbfsClient implements Closeable {
     // PUT and specify the real method in the X-Http-Method-Override header.
     requestHeaders.add(new AbfsHttpHeader(X_HTTP_METHOD_OVERRIDE,
         HTTP_METHOD_PATCH));
+    if (reqParams.getLeaseId() != null) {
+      requestHeaders.add(new AbfsHttpHeader(X_MS_LEASE_ID, reqParams.getLeaseId()));
+    }
 
     final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
     abfsUriQueryBuilder.addQuery(QUERY_PARAM_ACTION, APPEND_ACTION);
@@ -492,13 +593,16 @@ public class AbfsClient implements Closeable {
   }
 
   public AbfsRestOperation flush(final String path, final long position, boolean retainUncommittedData,
-                                 boolean isClose, final String cachedSasToken)
+                                 boolean isClose, final String cachedSasToken, final String leaseId)
       throws AzureBlobFileSystemException {
     final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
     // JDK7 does not support PATCH, so to workaround the issue we will use
     // PUT and specify the real method in the X-Http-Method-Override header.
     requestHeaders.add(new AbfsHttpHeader(X_HTTP_METHOD_OVERRIDE,
             HTTP_METHOD_PATCH));
+    if (leaseId != null) {
+      requestHeaders.add(new AbfsHttpHeader(X_MS_LEASE_ID, leaseId));
+    }
 
     final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
     abfsUriQueryBuilder.addQuery(QUERY_PARAM_ACTION, FLUSH_ACTION);
@@ -1003,4 +1107,21 @@ public class AbfsClient implements Closeable {
   protected AbfsCounters getAbfsCounters() {
     return abfsCounters;
   }
+
+  public int getNumLeaseThreads() {
+    return abfsConfiguration.getNumLeaseThreads();
+  }
+
+  public <V> ListenableScheduledFuture<V> schedule(Callable<V> callable, long delay,
+      TimeUnit timeUnit) {
+    return executorService.schedule(callable, delay, timeUnit);
+  }
+
+  public ListenableFuture<?> submit(Runnable runnable) {
+    return executorService.submit(runnable);
+  }
+
+  public <V> void addCallback(ListenableFuture<V> future, FutureCallback<V> callback) {
+    Futures.addCallback(future, callback, executorService);
+  }
 }
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsErrors.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsErrors.java
new file mode 100644
index 0000000..e15795e
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsErrors.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.azurebfs.services;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_LEASE_THREADS;
+
+/**
+ * ABFS error constants.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public final class AbfsErrors {
+  public static final String ERR_WRITE_WITHOUT_LEASE = "Attempted to write to file without lease";
+  public static final String ERR_LEASE_EXPIRED = "A lease ID was specified, but the lease for the"
+      + " resource has expired";
+  public static final String ERR_NO_LEASE_ID_SPECIFIED = "There is currently a lease on the "
+      + "resource and no lease ID was specified in the request";
+  public static final String ERR_PARALLEL_ACCESS_DETECTED = "Parallel access to the create path "
+      + "detected. Failing request to honor single writer semantics";
+  public static final String ERR_ACQUIRING_LEASE = "Unable to acquire lease";
+  public static final String ERR_LEASE_ALREADY_PRESENT = "There is already a lease present";
+  public static final String ERR_LEASE_NOT_PRESENT = "There is currently no lease on the resource";
+  public static final String ERR_LEASE_ID_NOT_PRESENT = "The lease ID is not present with the "
+      + "specified lease operation";
+  public static final String ERR_LEASE_DID_NOT_MATCH = "The lease ID specified did not match the "
+      + "lease ID for the resource with the specified lease operation";
+  public static final String ERR_LEASE_BROKEN = "The lease ID matched, but the lease has been "
+    + "broken explicitly and cannot be renewed";
+  public static final String ERR_LEASE_FUTURE_EXISTS = "There is already an existing lease "
+      + "operation";
+  public static final String ERR_NO_LEASE_THREADS = "Lease desired but no lease threads "
+      + "configured, set " + FS_AZURE_LEASE_THREADS;
+
+  private AbfsErrors() {}
+}
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsLease.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsLease.java
new file mode 100644
index 0000000..97a8b02
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsLease.java
@@ -0,0 +1,188 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.FutureCallback;
+import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListenableScheduledFuture;
+import org.apache.hadoop.thirdparty.org.checkerframework.checker.nullness.qual.Nullable;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
+import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
+import org.apache.hadoop.io.retry.RetryPolicies;
+import org.apache.hadoop.io.retry.RetryPolicy;
+
+import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.INFINITE_LEASE_DURATION;
+import static org.apache.hadoop.fs.azurebfs.services.AbfsErrors.ERR_ACQUIRING_LEASE;
+import static org.apache.hadoop.fs.azurebfs.services.AbfsErrors.ERR_LEASE_FUTURE_EXISTS;
+import static org.apache.hadoop.fs.azurebfs.services.AbfsErrors.ERR_NO_LEASE_THREADS;
+
+/**
+ * AbfsLease manages an Azure blob lease. It acquires an infinite lease on instantiation and
+ * releases the lease when free() is called. Use it to prevent writes to the blob by other
+ * processes that don't have the lease.
+ *
+ * Creating a new Lease object blocks the caller until the Azure blob lease is acquired. It will
+ * retry a fixed number of times before failing if there is a problem acquiring the lease.
+ *
+ * Call free() to release the Lease. If the holder process dies, AzureBlobFileSystem breakLease
+ * will need to be called before another client will be able to write to the file.
+ */
+public final class AbfsLease {
+  private static final Logger LOG = LoggerFactory.getLogger(AbfsLease.class);
+
+  // Number of retries for acquiring lease
+  static final int DEFAULT_LEASE_ACQUIRE_MAX_RETRIES = 7;
+  // Retry interval for acquiring lease in secs
+  static final int DEFAULT_LEASE_ACQUIRE_RETRY_INTERVAL = 10;
+
+  private final AbfsClient client;
+  private final String path;
+
+  // Lease status variables
+  private volatile boolean leaseFreed;
+  private volatile String leaseID = null;
+  private volatile Throwable exception = null;
+  private volatile int acquireRetryCount = 0;
+  private volatile ListenableScheduledFuture<AbfsRestOperation> future = null;
+
+  public static class LeaseException extends AzureBlobFileSystemException {
+    public LeaseException(Throwable t) {
+      super(ERR_ACQUIRING_LEASE + ": " + t, t);
+    }
+
+    public LeaseException(String s) {
+      super(s);
+    }
+  }
+
+  public AbfsLease(AbfsClient client, String path) throws AzureBlobFileSystemException {
+    this(client, path, DEFAULT_LEASE_ACQUIRE_MAX_RETRIES, DEFAULT_LEASE_ACQUIRE_RETRY_INTERVAL);
+  }
+
+  @VisibleForTesting
+  public AbfsLease(AbfsClient client, String path, int acquireMaxRetries,
+      int acquireRetryInterval) throws AzureBlobFileSystemException {
+    this.leaseFreed = false;
+    this.client = client;
+    this.path = path;
+
+    if (client.getNumLeaseThreads() < 1) {
+      throw new LeaseException(ERR_NO_LEASE_THREADS);
+    }
+
+    // Try to get the lease a specified number of times, else throw an error
+    RetryPolicy retryPolicy = RetryPolicies.retryUpToMaximumCountWithFixedSleep(
+        acquireMaxRetries, acquireRetryInterval, TimeUnit.SECONDS);
+    acquireLease(retryPolicy, 0, acquireRetryInterval, 0);
+
+    while (leaseID == null && exception == null) {
+      try {
+        future.get();
+      } catch (Exception e) {
+        LOG.debug("Got exception waiting for acquire lease future. Checking if lease ID or "
+            + "exception have been set", e);
+      }
+    }
+    if (exception != null) {
+      LOG.error("Failed to acquire lease on {}", path);
+      throw new LeaseException(exception);
+    }
+
+    LOG.debug("Acquired lease {} on {}", leaseID, path);
+  }
+
+  private void acquireLease(RetryPolicy retryPolicy, int numRetries, int retryInterval, long delay)
+      throws LeaseException {
+    LOG.debug("Attempting to acquire lease on {}, retry {}", path, numRetries);
+    if (future != null && !future.isDone()) {
+      throw new LeaseException(ERR_LEASE_FUTURE_EXISTS);
+    }
+    future = client.schedule(() -> client.acquireLease(path, INFINITE_LEASE_DURATION),
+        delay, TimeUnit.SECONDS);
+    client.addCallback(future, new FutureCallback<AbfsRestOperation>() {
+      @Override
+      public void onSuccess(@Nullable AbfsRestOperation op) {
+        leaseID = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_LEASE_ID);
+        LOG.debug("Acquired lease {} on {}", leaseID, path);
+      }
+
+      @Override
+      public void onFailure(Throwable throwable) {
+        try {
+          if (RetryPolicy.RetryAction.RetryDecision.RETRY
+              == retryPolicy.shouldRetry(null, numRetries, 0, true).action) {
+            LOG.debug("Failed to acquire lease on {}, retrying: {}", path, throwable);
+            acquireRetryCount++;
+            acquireLease(retryPolicy, numRetries + 1, retryInterval, retryInterval);
+          } else {
+            exception = throwable;
+          }
+        } catch (Exception e) {
+          exception = throwable;
+        }
+      }
+    });
+  }
+
+  /**
+   * Cancel future and free the lease. If an exception occurs while releasing the lease, the error
+   * will be logged. If the lease cannot be released, AzureBlobFileSystem breakLease will need to
+   * be called before another client will be able to write to the file.
+   */
+  public void free() {
+    if (leaseFreed) {
+      return;
+    }
+    try {
+      LOG.debug("Freeing lease: path {}, lease id {}", path, leaseID);
+      if (future != null && !future.isDone()) {
+        future.cancel(true);
+      }
+      client.releaseLease(path, leaseID);
+    } catch (IOException e) {
+      LOG.warn("Exception when trying to release lease {} on {}. Lease will need to be broken: {}",
+          leaseID, path, e.getMessage());
+    } finally {
+      // Even if releasing the lease fails (e.g. because the file was deleted),
+      // make sure to record that we freed the lease
+      leaseFreed = true;
+      LOG.debug("Freed lease {} on {}", leaseID, path);
+    }
+  }
+
+  public boolean isFreed() {
+    return leaseFreed;
+  }
+
+  public String getLeaseID() {
+    return leaseID;
+  }
+
+  @VisibleForTesting
+  public int getAcquireRetryCount() {
+    return acquireRetryCount;
+  }
+}
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java
index 2d02019..80b35ee 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.hadoop.fs.PathIOException;
 import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
 import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
 import org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters;
@@ -53,6 +54,7 @@ import org.apache.hadoop.fs.FSExceptionMessages;
 import org.apache.hadoop.fs.StreamCapabilities;
 import org.apache.hadoop.fs.Syncable;
 
+import static org.apache.hadoop.fs.azurebfs.services.AbfsErrors.ERR_WRITE_WITHOUT_LEASE;
 import static org.apache.hadoop.fs.impl.StoreImplementationUtils.isProbeForSyncable;
 import static org.apache.hadoop.io.IOUtils.wrapException;
 import static org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters.Mode.APPEND_MODE;
@@ -92,6 +94,9 @@ public class AbfsOutputStream extends OutputStream implements Syncable,
   // SAS tokens can be re-used until they expire
   private CachedSASToken cachedSasToken;
 
+  private AbfsLease lease;
+  private String leaseId;
+
   /**
    * Queue storing buffers with the size of the Azure block ready for
    * reuse. The pool allows reusing the blocks instead of allocating new
@@ -142,6 +147,10 @@ public class AbfsOutputStream extends OutputStream implements Syncable,
     }
     this.maxRequestsThatCanBeQueued = abfsOutputStreamContext
         .getMaxWriteRequestsToQueue();
+
+    this.lease = abfsOutputStreamContext.getLease();
+    this.leaseId = abfsOutputStreamContext.getLeaseId();
+
     this.threadExecutor
         = new ThreadPoolExecutor(maxConcurrentRequestCount,
         maxConcurrentRequestCount,
@@ -203,6 +212,10 @@ public class AbfsOutputStream extends OutputStream implements Syncable,
       throw new IndexOutOfBoundsException();
     }
 
+    if (hasLease() && isLeaseFreed()) {
+      throw new PathIOException(path, ERR_WRITE_WITHOUT_LEASE);
+    }
+
     int currentOffset = off;
     int writableBytes = bufferSize - bufferIndex;
     int numberOfBytesToWrite = length;
@@ -306,6 +319,10 @@ public class AbfsOutputStream extends OutputStream implements Syncable,
       // See HADOOP-16785
       throw wrapException(path, e.getMessage(), e);
     } finally {
+      if (hasLease()) {
+        lease.free();
+        lease = null;
+      }
       lastError = new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
       buffer = null;
       bufferIndex = 0;
@@ -372,7 +389,7 @@ public class AbfsOutputStream extends OutputStream implements Syncable,
     try (AbfsPerfInfo perfInfo = new AbfsPerfInfo(tracker,
             "writeCurrentBufferToService", "append")) {
       AppendRequestParameters reqParams = new AppendRequestParameters(offset, 0,
-          bytesLength, APPEND_MODE, true);
+          bytesLength, APPEND_MODE, true, leaseId);
       AbfsRestOperation op = client.append(path, bytes, reqParams, cachedSasToken.get());
       cachedSasToken.update(op.getSasToken());
       if (outputStreamStatistics != null) {
@@ -448,7 +465,7 @@ public class AbfsOutputStream extends OutputStream implements Syncable,
                       mode = FLUSH_MODE;
                     }
                     AppendRequestParameters reqParams = new AppendRequestParameters(
-                        offset, 0, bytesLength, mode, false);
+                        offset, 0, bytesLength, mode, false, leaseId);
                     AbfsRestOperation op = client.append(path, bytes, reqParams,
                         cachedSasToken.get());
                     cachedSasToken.update(op.getSasToken());
@@ -517,7 +534,8 @@ public class AbfsOutputStream extends OutputStream implements Syncable,
     AbfsPerfTracker tracker = client.getAbfsPerfTracker();
     try (AbfsPerfInfo perfInfo = new AbfsPerfInfo(tracker,
             "flushWrittenBytesToServiceInternal", "flush")) {
-      AbfsRestOperation op = client.flush(path, offset, retainUncommitedData, isClose, cachedSasToken.get());
+      AbfsRestOperation op = client.flush(path, offset, retainUncommitedData, isClose,
+          cachedSasToken.get(), leaseId);
       cachedSasToken.update(op.getSasToken());
       perfInfo.registerResult(op.getResult()).registerSuccess(true);
     } catch (AzureBlobFileSystemException ex) {
@@ -637,6 +655,19 @@ public class AbfsOutputStream extends OutputStream implements Syncable,
     return ioStatistics;
   }
 
+  @VisibleForTesting
+  public boolean isLeaseFreed() {
+    if (lease == null) {
+      return true;
+    }
+    return lease.isFreed();
+  }
+
+  @VisibleForTesting
+  public boolean hasLease() {
+    return lease != null;
+  }
+
   /**
    * Appending AbfsOutputStream statistics to base toString().
    *
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamContext.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamContext.java
index 925cd4f..48f6f54 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamContext.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamContext.java
@@ -39,6 +39,8 @@ public class AbfsOutputStreamContext extends AbfsStreamContext {
 
   private int maxWriteRequestsToQueue;
 
+  private AbfsLease lease;
+
   public AbfsOutputStreamContext(final long sasTokenRenewPeriodForStreamsInSeconds) {
     super(sasTokenRenewPeriodForStreamsInSeconds);
   }
@@ -94,6 +96,11 @@ public class AbfsOutputStreamContext extends AbfsStreamContext {
     return this;
   }
 
+  public AbfsOutputStreamContext withLease(final AbfsLease lease) {
+    this.lease = lease;
+    return this;
+  }
+
   public int getWriteBufferSize() {
     return writeBufferSize;
   }
@@ -125,4 +132,15 @@ public class AbfsOutputStreamContext extends AbfsStreamContext {
   public boolean isEnableSmallWriteOptimization() {
     return this.enableSmallWriteOptimization;
   }
+
+  public AbfsLease getLease() {
+    return this.lease;
+  }
+
+  public String getLeaseId() {
+    if (this.lease == null) {
+      return null;
+    }
+    return this.lease.getLeaseID();
+  }
 }
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java
index 584b71f..b046cbc 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java
@@ -131,6 +131,7 @@ public class AbfsRestOperation {
     this.url = url;
     this.requestHeaders = requestHeaders;
     this.hasRequestBody = (AbfsHttpConstants.HTTP_METHOD_PUT.equals(method)
+            || AbfsHttpConstants.HTTP_METHOD_POST.equals(method)
             || AbfsHttpConstants.HTTP_METHOD_PATCH.equals(method));
     this.sasToken = sasToken;
     this.abfsCounters = client.getAbfsCounters();
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperationType.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperationType.java
index d303186..830297f 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperationType.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperationType.java
@@ -40,5 +40,6 @@ public enum AbfsRestOperationType {
     Flush,
     ReadFile,
     DeletePath,
-    CheckAccess
+    CheckAccess,
+    LeasePath,
 }
diff --git a/hadoop-tools/hadoop-azure/src/site/markdown/abfs.md b/hadoop-tools/hadoop-azure/src/site/markdown/abfs.md
index 33d4a0f..6be5952 100644
--- a/hadoop-tools/hadoop-azure/src/site/markdown/abfs.md
+++ b/hadoop-tools/hadoop-azure/src/site/markdown/abfs.md
@@ -887,6 +887,22 @@ enabled for your Azure Storage account."
 The directories can be specified as comma separated values. By default the value
 is "/hbase"
 
+### <a name="infiniteleaseoptions"></a> Infinite Lease Options
+`fs.azure.infinite-lease.directories`: Directories for infinite lease support
+can be specified comma separated in this config. By default, multiple
+clients will be able to write to the same file simultaneously. When writing
+to files contained within the directories specified in this config, the
+client will obtain a lease on the file that will prevent any other clients
+from writing to the file. When the output stream is closed, the lease will be
+released. To revoke a client's write access for a file, the
+AzureBlobFilesystem breakLease method may be called. If the client dies
+before the file can be closed and the lease released, breakLease will need to
+be called before another client will be able to write to the file.
+
+`fs.azure.lease.threads`: This is the size of the thread pool that will be
+used for lease operations for infinite lease directories. By default the value
+is 0, so it must be set to at least 1 to support infinite lease directories.
+
 ### <a name="perfoptions"></a> Perf Options
 
 #### <a name="abfstracklatencyoptions"></a> 1. HTTP Request Tracking Options
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemLease.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemLease.java
new file mode 100644
index 0000000..9857da8
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemLease.java
@@ -0,0 +1,336 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.azurebfs;
+
+import java.io.IOException;
+import java.util.concurrent.RejectedExecutionException;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
+import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
+import org.apache.hadoop.fs.azurebfs.services.AbfsLease;
+import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.test.LambdaTestUtils;
+
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.spy;
+
+import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_INFINITE_LEASE_KEY;
+import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_LEASE_THREADS;
+import static org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.FS_AZURE_TEST_NAMESPACE_ENABLED_ACCOUNT;
+import static org.apache.hadoop.fs.azurebfs.services.AbfsErrors.ERR_ACQUIRING_LEASE;
+import static org.apache.hadoop.fs.azurebfs.services.AbfsErrors.ERR_LEASE_EXPIRED;
+import static org.apache.hadoop.fs.azurebfs.services.AbfsErrors.ERR_LEASE_NOT_PRESENT;
+import static org.apache.hadoop.fs.azurebfs.services.AbfsErrors.ERR_NO_LEASE_ID_SPECIFIED;
+import static org.apache.hadoop.fs.azurebfs.services.AbfsErrors.ERR_NO_LEASE_THREADS;
+import static org.apache.hadoop.fs.azurebfs.services.AbfsErrors.ERR_PARALLEL_ACCESS_DETECTED;
+
+/**
+ * Test lease operations.
+ */
+public class ITestAzureBlobFileSystemLease extends AbstractAbfsIntegrationTest {
+  private static final int TEST_EXECUTION_TIMEOUT = 30 * 1000;
+  private static final int LONG_TEST_EXECUTION_TIMEOUT = 90 * 1000;
+  private static final String TEST_FILE = "testfile";
+  private final boolean isHNSEnabled;
+
+  public ITestAzureBlobFileSystemLease() throws Exception {
+    super();
+
+    this.isHNSEnabled = getConfiguration()
+        .getBoolean(FS_AZURE_TEST_NAMESPACE_ENABLED_ACCOUNT, false);
+  }
+
+  private AzureBlobFileSystem getCustomFileSystem(Path infiniteLeaseDirs, int numLeaseThreads) throws Exception {
+    Configuration conf = getRawConfiguration();
+    conf.setBoolean(String.format("fs.%s.impl.disable.cache", getAbfsScheme()), true);
+    conf.set(FS_AZURE_INFINITE_LEASE_KEY, infiniteLeaseDirs.toUri().getPath());
+    conf.setInt(FS_AZURE_LEASE_THREADS, numLeaseThreads);
+    return getFileSystem(conf);
+  }
+
+  @Test(timeout = TEST_EXECUTION_TIMEOUT)
+  public void testNoInfiniteLease() throws IOException {
+    final Path testFilePath = new Path(path(methodName.getMethodName()), TEST_FILE);
+    final AzureBlobFileSystem fs = getFileSystem();
+    fs.mkdirs(testFilePath.getParent());
+    try (FSDataOutputStream out = fs.create(testFilePath)) {
+      Assert.assertFalse("Output stream should not have lease",
+          ((AbfsOutputStream) out.getWrappedStream()).hasLease());
+    }
+    Assert.assertTrue("Store leases were not freed", fs.getAbfsStore().areLeasesFreed());
+  }
+
+  @Test(timeout = TEST_EXECUTION_TIMEOUT)
+  public void testNoLeaseThreads() throws Exception {
+    final Path testFilePath = new Path(path(methodName.getMethodName()), TEST_FILE);
+    final AzureBlobFileSystem fs = getCustomFileSystem(testFilePath.getParent(), 0);
+    fs.mkdirs(testFilePath.getParent());
+    LambdaTestUtils.intercept(IOException.class, ERR_NO_LEASE_THREADS, () -> {
+      try (FSDataOutputStream out = fs.create(testFilePath)) {
+      }
+      return "No failure when lease requested with 0 lease threads";
+    });
+  }
+
+  @Test(timeout = TEST_EXECUTION_TIMEOUT)
+  public void testOneWriter() throws Exception {
+    final Path testFilePath = new Path(path(methodName.getMethodName()), TEST_FILE);
+    final AzureBlobFileSystem fs = getCustomFileSystem(testFilePath.getParent(), 1);
+    fs.mkdirs(testFilePath.getParent());
+
+    FSDataOutputStream out = fs.create(testFilePath);
+    Assert.assertTrue("Output stream should have lease",
+        ((AbfsOutputStream) out.getWrappedStream()).hasLease());
+    out.close();
+    Assert.assertFalse("Output stream should not have lease",
+        ((AbfsOutputStream) out.getWrappedStream()).hasLease());
+    Assert.assertTrue("Store leases were not freed", fs.getAbfsStore().areLeasesFreed());
+  }
+
+  @Test(timeout = TEST_EXECUTION_TIMEOUT)
+  public void testSubDir() throws Exception {
+    final Path testFilePath = new Path(new Path(path(methodName.getMethodName()), "subdir"),
+        TEST_FILE);
+    final AzureBlobFileSystem fs =
+        getCustomFileSystem(testFilePath.getParent().getParent(), 1);
+    fs.mkdirs(testFilePath.getParent().getParent());
+
+    FSDataOutputStream out = fs.create(testFilePath);
+    Assert.assertTrue("Output stream should have lease",
+        ((AbfsOutputStream) out.getWrappedStream()).hasLease());
+    out.close();
+    Assert.assertFalse("Output stream should not have lease",
+        ((AbfsOutputStream) out.getWrappedStream()).hasLease());
+    Assert.assertTrue("Store leases were not freed", fs.getAbfsStore().areLeasesFreed());
+  }
+
+  @Test(timeout = TEST_EXECUTION_TIMEOUT)
+  public void testTwoCreate() throws Exception {
+    final Path testFilePath = new Path(path(methodName.getMethodName()), TEST_FILE);
+    final AzureBlobFileSystem fs = getCustomFileSystem(testFilePath.getParent(), 1);
+    fs.mkdirs(testFilePath.getParent());
+
+    try (FSDataOutputStream out = fs.create(testFilePath)) {
+      LambdaTestUtils.intercept(IOException.class, isHNSEnabled ? ERR_PARALLEL_ACCESS_DETECTED
+          : ERR_NO_LEASE_ID_SPECIFIED, () -> {
+        try (FSDataOutputStream out2 = fs.create(testFilePath)) {
+        }
+        return "Expected second create on infinite lease dir to fail";
+      });
+    }
+    Assert.assertTrue("Store leases were not freed", fs.getAbfsStore().areLeasesFreed());
+  }
+
+  private void twoWriters(AzureBlobFileSystem fs, Path testFilePath, boolean expectException) throws Exception {
+    try (FSDataOutputStream out = fs.create(testFilePath)) {
+      try (FSDataOutputStream out2 = fs.append(testFilePath)) {
+        out2.writeInt(2);
+        out2.hsync();
+      } catch (IOException e) {
+        if (expectException) {
+          GenericTestUtils.assertExceptionContains(ERR_ACQUIRING_LEASE, e);
+        } else {
+          throw e;
+        }
+      }
+      out.writeInt(1);
+      out.hsync();
+    }
+
+    Assert.assertTrue("Store leases were not freed", fs.getAbfsStore().areLeasesFreed());
+  }
+
+  @Test(timeout = TEST_EXECUTION_TIMEOUT)
+  public void testTwoWritersCreateAppendNoInfiniteLease() throws Exception {
+    final Path testFilePath = new Path(path(methodName.getMethodName()), TEST_FILE);
+    final AzureBlobFileSystem fs = getFileSystem();
+    fs.mkdirs(testFilePath.getParent());
+
+    twoWriters(fs, testFilePath, false);
+  }
+
+  @Test(timeout = LONG_TEST_EXECUTION_TIMEOUT)
+  public void testTwoWritersCreateAppendWithInfiniteLeaseEnabled() throws Exception {
+    final Path testFilePath = new Path(path(methodName.getMethodName()), TEST_FILE);
+    final AzureBlobFileSystem fs = getCustomFileSystem(testFilePath.getParent(), 1);
+    fs.mkdirs(testFilePath.getParent());
+
+    twoWriters(fs, testFilePath, true);
+  }
+
+  @Test(timeout = TEST_EXECUTION_TIMEOUT)
+  public void testLeaseFreedOnClose() throws Exception {
+    final Path testFilePath = new Path(path(methodName.getMethodName()), TEST_FILE);
+    final AzureBlobFileSystem fs = getCustomFileSystem(testFilePath.getParent(), 1);
+    fs.mkdirs(testFilePath.getParent());
+
+    FSDataOutputStream out;
+    out = fs.create(testFilePath);
+    out.write(0);
+    Assert.assertTrue("Output stream should have lease",
+        ((AbfsOutputStream) out.getWrappedStream()).hasLease());
+    out.close();
+    Assert.assertFalse("Output stream should not have lease after close",
+        ((AbfsOutputStream) out.getWrappedStream()).hasLease());
+    Assert.assertTrue("Store leases were not freed", fs.getAbfsStore().areLeasesFreed());
+  }
+
+  @Test(timeout = TEST_EXECUTION_TIMEOUT)
+  public void testWriteAfterBreakLease() throws Exception {
+    final Path testFilePath = new Path(path(methodName.getMethodName()), TEST_FILE);
+    final AzureBlobFileSystem fs = getCustomFileSystem(testFilePath.getParent(), 1);
+    fs.mkdirs(testFilePath.getParent());
+
+    FSDataOutputStream out;
+    out = fs.create(testFilePath);
+    out.write(0);
+    out.hsync();
+
+    fs.breakLease(testFilePath);
+
+    LambdaTestUtils.intercept(IOException.class, ERR_LEASE_EXPIRED, () -> {
+      out.write(1);
+      out.hsync();
+      return "Expected exception on write after lease break but got " + out;
+    });
+
+    LambdaTestUtils.intercept(IOException.class, ERR_LEASE_EXPIRED, () -> {
+      out.close();
+      return "Expected exception on close after lease break but got " + out;
+    });
+
+    Assert.assertTrue("Output stream lease should be freed",
+        ((AbfsOutputStream) out.getWrappedStream()).isLeaseFreed());
+
+    try (FSDataOutputStream out2 = fs.append(testFilePath)) {
+      out2.write(2);
+      out2.hsync();
+    }
+
+    Assert.assertTrue("Store leases were not freed", fs.getAbfsStore().areLeasesFreed());
+  }
+
+  @Test(timeout = LONG_TEST_EXECUTION_TIMEOUT)
+  public void testLeaseFreedAfterBreak() throws Exception {
+    final Path testFilePath = new Path(path(methodName.getMethodName()), TEST_FILE);
+    final AzureBlobFileSystem fs = getCustomFileSystem(testFilePath.getParent(), 1);
+    fs.mkdirs(testFilePath.getParent());
+
+    FSDataOutputStream out = fs.create(testFilePath);
+    out.write(0);
+
+    fs.breakLease(testFilePath);
+
+    LambdaTestUtils.intercept(IOException.class, ERR_LEASE_EXPIRED, () -> {
+      out.close();
+      return "Expected exception on close after lease break but got " + out;
+    });
+
+    Assert.assertTrue("Output stream lease should be freed",
+        ((AbfsOutputStream) out.getWrappedStream()).isLeaseFreed());
+
+    Assert.assertTrue("Store leases were not freed", fs.getAbfsStore().areLeasesFreed());
+  }
+
+  @Test(timeout = TEST_EXECUTION_TIMEOUT)
+  public void testInfiniteLease() throws Exception {
+    final Path testFilePath = new Path(path(methodName.getMethodName()), TEST_FILE);
+    final AzureBlobFileSystem fs = getCustomFileSystem(testFilePath.getParent(), 1);
+    fs.mkdirs(testFilePath.getParent());
+
+    try (FSDataOutputStream out = fs.create(testFilePath)) {
+      Assert.assertTrue("Output stream should have lease",
+          ((AbfsOutputStream) out.getWrappedStream()).hasLease());
+      out.write(0);
+    }
+    Assert.assertTrue(fs.getAbfsStore().areLeasesFreed());
+
+    try (FSDataOutputStream out = fs.append(testFilePath)) {
+      Assert.assertTrue("Output stream should have lease",
+          ((AbfsOutputStream) out.getWrappedStream()).hasLease());
+      out.write(1);
+    }
+    Assert.assertTrue("Store leases were not freed", fs.getAbfsStore().areLeasesFreed());
+  }
+
+  @Test(timeout = TEST_EXECUTION_TIMEOUT)
+  public void testFileSystemClose() throws Exception {
+    final Path testFilePath = new Path(path(methodName.getMethodName()), TEST_FILE);
+    final AzureBlobFileSystem fs = getCustomFileSystem(testFilePath.getParent(), 1);
+    fs.mkdirs(testFilePath.getParent());
+
+    FSDataOutputStream out = fs.create(testFilePath);
+    out.write(0);
+    Assert.assertFalse("Store leases should exist", fs.getAbfsStore().areLeasesFreed());
+    fs.close();
+    Assert.assertTrue("Store leases were not freed", fs.getAbfsStore().areLeasesFreed());
+
+    LambdaTestUtils.intercept(IOException.class, isHNSEnabled ? ERR_LEASE_NOT_PRESENT
+        : ERR_LEASE_EXPIRED, () -> {
+      out.close();
+      return "Expected exception on close after closed FS but got " + out;
+    });
+
+    LambdaTestUtils.intercept(RejectedExecutionException.class, () -> {
+      try (FSDataOutputStream out2 = fs.append(testFilePath)) {
+      }
+      return "Expected exception on new append after closed FS";
+    });
+  }
+
+  @Test(timeout = TEST_EXECUTION_TIMEOUT)
+  public void testAcquireRetry() throws Exception {
+    final Path testFilePath = new Path(path(methodName.getMethodName()), TEST_FILE);
+    final AzureBlobFileSystem fs = getCustomFileSystem(testFilePath.getParent(), 1);
+    fs.mkdirs(testFilePath.getParent());
+    fs.createNewFile(testFilePath);
+
+    AbfsLease lease = new AbfsLease(fs.getAbfsClient(), testFilePath.toUri().getPath());
+    Assert.assertNotNull("Did not successfully lease file", lease.getLeaseID());
+    lease.free();
+    Assert.assertEquals("Unexpected acquire retry count", 0, lease.getAcquireRetryCount());
+
+    AbfsClient mockClient = spy(fs.getAbfsClient());
+
+    doThrow(new AbfsLease.LeaseException("failed to acquire 1"))
+        .doThrow(new AbfsLease.LeaseException("failed to acquire 2"))
+        .doCallRealMethod()
+        .when(mockClient).acquireLease(anyString(), anyInt());
+
+    lease = new AbfsLease(mockClient, testFilePath.toUri().getPath(), 5, 1);
+    Assert.assertNotNull("Acquire lease should have retried", lease.getLeaseID());
+    lease.free();
+    Assert.assertEquals("Unexpected acquire retry count", 2, lease.getAcquireRetryCount());
+
+    doThrow(new AbfsLease.LeaseException("failed to acquire"))
+        .when(mockClient).acquireLease(anyString(), anyInt());
+
+    LambdaTestUtils.intercept(AzureBlobFileSystemException.class, () -> {
+      new AbfsLease(mockClient, testFilePath.toUri().getPath(), 5, 1);
+    });
+  }
+}
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/diagnostics/TestConfigurationValidators.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/diagnostics/TestConfigurationValidators.java
index f02eadc..6a02435 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/diagnostics/TestConfigurationValidators.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/diagnostics/TestConfigurationValidators.java
@@ -24,11 +24,14 @@ import org.junit.Test;
 import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidConfigurationValueException;
 import org.apache.hadoop.fs.azurebfs.utils.Base64;
 
-import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.MIN_BUFFER_SIZE;
-import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.MAX_BUFFER_SIZE;
+import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_LEASE_DURATION;
 import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_READ_BUFFER_SIZE;
 import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_WRITE_BUFFER_SIZE;
-
+import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.INFINITE_LEASE_DURATION;
+import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.MAX_BUFFER_SIZE;
+import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.MAX_LEASE_DURATION;
+import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.MIN_BUFFER_SIZE;
+import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.MIN_LEASE_DURATION;
 
 /**
  * Test configuration validators.
@@ -59,6 +62,26 @@ public class TestConfigurationValidators extends Assert {
   }
 
   @Test
+  public void testIntegerWithOutlierConfigValidator() throws Exception {
+    IntegerConfigurationBasicValidator integerConfigurationValidator = new IntegerConfigurationBasicValidator(
+        INFINITE_LEASE_DURATION, MIN_LEASE_DURATION, MAX_LEASE_DURATION, DEFAULT_LEASE_DURATION, FAKE_KEY,
+        false);
+
+    assertEquals(INFINITE_LEASE_DURATION, (int) integerConfigurationValidator.validate("-1"));
+    assertEquals(DEFAULT_LEASE_DURATION, (int) integerConfigurationValidator.validate(null));
+    assertEquals(MIN_LEASE_DURATION, (int) integerConfigurationValidator.validate("15"));
+    assertEquals(MAX_LEASE_DURATION, (int) integerConfigurationValidator.validate("60"));
+  }
+
+  @Test(expected = InvalidConfigurationValueException.class)
+  public void testIntegerWithOutlierConfigValidatorThrowsIfMissingValidValue() throws Exception {
+    IntegerConfigurationBasicValidator integerConfigurationValidator = new IntegerConfigurationBasicValidator(
+        INFINITE_LEASE_DURATION, MIN_LEASE_DURATION, MAX_LEASE_DURATION, DEFAULT_LEASE_DURATION, FAKE_KEY,
+        true);
+    integerConfigurationValidator.validate("14");
+  }
+
+  @Test
   public void testLongConfigValidator() throws Exception {
     LongConfigurationBasicValidator longConfigurationValidator = new LongConfigurationBasicValidator(
         MIN_BUFFER_SIZE, MAX_BUFFER_SIZE, DEFAULT_WRITE_BUFFER_SIZE, FAKE_KEY, false);
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsOutputStream.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsOutputStream.java
index 1e6b8ef..f4243bc 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsOutputStream.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsOutputStream.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters;
 import org.apache.hadoop.conf.Configuration;
 
 import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.ArgumentMatchers.isNull;
 import static org.mockito.ArgumentMatchers.refEq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
@@ -86,7 +87,7 @@ public final class TestAbfsOutputStream {
     AbfsPerfTracker tracker = new AbfsPerfTracker("test", accountName1, abfsConf);
     when(client.getAbfsPerfTracker()).thenReturn(tracker);
     when(client.append(anyString(), any(byte[].class), any(AppendRequestParameters.class), any())).thenReturn(op);
-    when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), any())).thenReturn(op);
+    when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), any(), isNull())).thenReturn(op);
 
     AbfsOutputStream out = new AbfsOutputStream(client, null, PATH, 0,
         populateAbfsOutputStreamContext(BUFFER_SIZE, true, false, false));
@@ -104,9 +105,9 @@ public final class TestAbfsOutputStream {
     out.hsync();
 
     AppendRequestParameters firstReqParameters = new AppendRequestParameters(
-        0, 0, WRITE_SIZE, APPEND_MODE, false);
+        0, 0, WRITE_SIZE, APPEND_MODE, false, null);
     AppendRequestParameters secondReqParameters = new AppendRequestParameters(
-        WRITE_SIZE, 0, 2 * WRITE_SIZE, APPEND_MODE, false);
+        WRITE_SIZE, 0, 2 * WRITE_SIZE, APPEND_MODE, false, null);
 
     verify(client, times(1)).append(
         eq(PATH), any(byte[].class), refEq(firstReqParameters), any());
@@ -133,7 +134,7 @@ public final class TestAbfsOutputStream {
 
     when(client.getAbfsPerfTracker()).thenReturn(tracker);
     when(client.append(anyString(), any(byte[].class), any(AppendRequestParameters.class), any())).thenReturn(op);
-    when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), any())).thenReturn(op);
+    when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), any(), isNull())).thenReturn(op);
 
     AbfsOutputStream out = new AbfsOutputStream(client, null, PATH, 0,
         populateAbfsOutputStreamContext(BUFFER_SIZE, true, false, false));
@@ -146,9 +147,9 @@ public final class TestAbfsOutputStream {
     out.close();
 
     AppendRequestParameters firstReqParameters = new AppendRequestParameters(
-        0, 0, BUFFER_SIZE, APPEND_MODE, false);
+        0, 0, BUFFER_SIZE, APPEND_MODE, false, null);
     AppendRequestParameters secondReqParameters = new AppendRequestParameters(
-        BUFFER_SIZE, 0, 5*WRITE_SIZE-BUFFER_SIZE, APPEND_MODE, false);
+        BUFFER_SIZE, 0, 5*WRITE_SIZE-BUFFER_SIZE, APPEND_MODE, false, null);
 
     verify(client, times(1)).append(
         eq(PATH), any(byte[].class), refEq(firstReqParameters), any());
@@ -165,7 +166,7 @@ public final class TestAbfsOutputStream {
     ArgumentCaptor<String> acFlushSASToken = ArgumentCaptor.forClass(String.class);
 
     verify(client, times(1)).flush(acFlushPath.capture(), acFlushPosition.capture(), acFlushRetainUnCommittedData.capture(), acFlushClose.capture(),
-        acFlushSASToken.capture());
+        acFlushSASToken.capture(), isNull());
     assertThat(Arrays.asList(PATH)).describedAs("path").isEqualTo(acFlushPath.getAllValues());
     assertThat(Arrays.asList(Long.valueOf(5*WRITE_SIZE))).describedAs("position").isEqualTo(acFlushPosition.getAllValues());
     assertThat(Arrays.asList(false)).describedAs("RetainUnCommittedData flag").isEqualTo(acFlushRetainUnCommittedData.getAllValues());
@@ -189,7 +190,7 @@ public final class TestAbfsOutputStream {
 
     when(client.getAbfsPerfTracker()).thenReturn(tracker);
     when(client.append(anyString(), any(byte[].class), any(AppendRequestParameters.class), any())).thenReturn(op);
-    when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), any())).thenReturn(op);
+    when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), any(), isNull())).thenReturn(op);
     when(op.getSasToken()).thenReturn("testToken");
     when(op.getResult()).thenReturn(httpOp);
 
@@ -204,9 +205,9 @@ public final class TestAbfsOutputStream {
     out.close();
 
     AppendRequestParameters firstReqParameters = new AppendRequestParameters(
-        0, 0, BUFFER_SIZE, APPEND_MODE, false);
+        0, 0, BUFFER_SIZE, APPEND_MODE, false, null);
     AppendRequestParameters secondReqParameters = new AppendRequestParameters(
-        BUFFER_SIZE, 0, BUFFER_SIZE, APPEND_MODE, false);
+        BUFFER_SIZE, 0, BUFFER_SIZE, APPEND_MODE, false, null);
 
     verify(client, times(1)).append(
         eq(PATH), any(byte[].class), refEq(firstReqParameters), any());
@@ -223,7 +224,7 @@ public final class TestAbfsOutputStream {
     ArgumentCaptor<String> acFlushSASToken = ArgumentCaptor.forClass(String.class);
 
     verify(client, times(1)).flush(acFlushPath.capture(), acFlushPosition.capture(), acFlushRetainUnCommittedData.capture(), acFlushClose.capture(),
-        acFlushSASToken.capture());
+        acFlushSASToken.capture(), isNull());
     assertThat(Arrays.asList(PATH)).describedAs("path").isEqualTo(acFlushPath.getAllValues());
     assertThat(Arrays.asList(Long.valueOf(2*BUFFER_SIZE))).describedAs("position").isEqualTo(acFlushPosition.getAllValues());
     assertThat(Arrays.asList(false)).describedAs("RetainUnCommittedData flag").isEqualTo(acFlushRetainUnCommittedData.getAllValues());
@@ -247,7 +248,7 @@ public final class TestAbfsOutputStream {
 
     when(client.getAbfsPerfTracker()).thenReturn(tracker);
     when(client.append(anyString(), any(byte[].class), any(AppendRequestParameters.class), any())).thenReturn(op);
-    when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), any())).thenReturn(op);
+    when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), any(), isNull())).thenReturn(op);
     when(op.getSasToken()).thenReturn("testToken");
     when(op.getResult()).thenReturn(httpOp);
 
@@ -262,9 +263,9 @@ public final class TestAbfsOutputStream {
     Thread.sleep(1000);
 
     AppendRequestParameters firstReqParameters = new AppendRequestParameters(
-        0, 0, BUFFER_SIZE, APPEND_MODE, false);
+        0, 0, BUFFER_SIZE, APPEND_MODE, false, null);
     AppendRequestParameters secondReqParameters = new AppendRequestParameters(
-        BUFFER_SIZE, 0, BUFFER_SIZE, APPEND_MODE, false);
+        BUFFER_SIZE, 0, BUFFER_SIZE, APPEND_MODE, false, null);
 
     verify(client, times(1)).append(
         eq(PATH), any(byte[].class), refEq(firstReqParameters), any());
@@ -291,7 +292,7 @@ public final class TestAbfsOutputStream {
 
     when(client.getAbfsPerfTracker()).thenReturn(tracker);
     when(client.append(anyString(), any(byte[].class), any(AppendRequestParameters.class), any())).thenReturn(op);
-    when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), any())).thenReturn(op);
+    when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), any(), isNull())).thenReturn(op);
 
     AbfsOutputStream out = new AbfsOutputStream(client, null, PATH, 0,
         populateAbfsOutputStreamContext(BUFFER_SIZE, true, false, true));
@@ -304,9 +305,9 @@ public final class TestAbfsOutputStream {
     Thread.sleep(1000);
 
     AppendRequestParameters firstReqParameters = new AppendRequestParameters(
-        0, 0, BUFFER_SIZE, APPEND_MODE, true);
+        0, 0, BUFFER_SIZE, APPEND_MODE, true, null);
     AppendRequestParameters secondReqParameters = new AppendRequestParameters(
-        BUFFER_SIZE, 0, BUFFER_SIZE, APPEND_MODE, true);
+        BUFFER_SIZE, 0, BUFFER_SIZE, APPEND_MODE, true, null);
 
     verify(client, times(1)).append(
         eq(PATH), any(byte[].class), refEq(firstReqParameters), any());
@@ -334,7 +335,7 @@ public final class TestAbfsOutputStream {
 
     when(client.getAbfsPerfTracker()).thenReturn(tracker);
     when(client.append(anyString(), any(byte[].class), any(AppendRequestParameters.class), any())).thenReturn(op);
-    when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), any())).thenReturn(op);
+    when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), any(), isNull())).thenReturn(op);
 
     AbfsOutputStream out = new AbfsOutputStream(client, null, PATH, 0,
         populateAbfsOutputStreamContext(BUFFER_SIZE, true, false, false));
@@ -347,9 +348,9 @@ public final class TestAbfsOutputStream {
     out.hflush();
 
     AppendRequestParameters firstReqParameters = new AppendRequestParameters(
-        0, 0, BUFFER_SIZE, APPEND_MODE, false);
+        0, 0, BUFFER_SIZE, APPEND_MODE, false, null);
     AppendRequestParameters secondReqParameters = new AppendRequestParameters(
-        BUFFER_SIZE, 0, BUFFER_SIZE, APPEND_MODE, false);
+        BUFFER_SIZE, 0, BUFFER_SIZE, APPEND_MODE, false, null);
 
     verify(client, times(1)).append(
         eq(PATH), any(byte[].class), refEq(firstReqParameters), any());
@@ -366,7 +367,7 @@ public final class TestAbfsOutputStream {
     ArgumentCaptor<String> acFlushSASToken = ArgumentCaptor.forClass(String.class);
 
     verify(client, times(1)).flush(acFlushPath.capture(), acFlushPosition.capture(), acFlushRetainUnCommittedData.capture(), acFlushClose.capture(),
-        acFlushSASToken.capture());
+        acFlushSASToken.capture(), isNull());
     assertThat(Arrays.asList(PATH)).describedAs("path").isEqualTo(acFlushPath.getAllValues());
     assertThat(Arrays.asList(Long.valueOf(2*BUFFER_SIZE))).describedAs("position").isEqualTo(acFlushPosition.getAllValues());
     assertThat(Arrays.asList(false)).describedAs("RetainUnCommittedData flag").isEqualTo(acFlushRetainUnCommittedData.getAllValues());
@@ -388,7 +389,7 @@ public final class TestAbfsOutputStream {
     AbfsPerfTracker tracker = new AbfsPerfTracker("test", accountName1, abfsConf);
     when(client.getAbfsPerfTracker()).thenReturn(tracker);
     when(client.append(anyString(), any(byte[].class), any(AppendRequestParameters.class), any())).thenReturn(op);
-    when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), any())).thenReturn(op);
+    when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), any(), isNull())).thenReturn(op);
 
     AbfsOutputStream out = new AbfsOutputStream(client, null, PATH, 0,
         populateAbfsOutputStreamContext(BUFFER_SIZE, true, false, false));
@@ -403,9 +404,9 @@ public final class TestAbfsOutputStream {
     Thread.sleep(1000);
 
     AppendRequestParameters firstReqParameters = new AppendRequestParameters(
-        0, 0, BUFFER_SIZE, APPEND_MODE, false);
+        0, 0, BUFFER_SIZE, APPEND_MODE, false, null);
     AppendRequestParameters secondReqParameters = new AppendRequestParameters(
-        BUFFER_SIZE, 0, BUFFER_SIZE, APPEND_MODE, false);
+        BUFFER_SIZE, 0, BUFFER_SIZE, APPEND_MODE, false, null);
 
     verify(client, times(1)).append(
         eq(PATH), any(byte[].class), refEq(firstReqParameters), any());

---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org