You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by st...@apache.org on 2021/01/22 11:07:00 UTC

[hadoop] 06/06: HADOOP-17404. ABFS: Small write - Merge append and flush

This is an automated email from the ASF dual-hosted git repository.

stevel pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/hadoop.git

commit 4865589bb4414c87d9ac02b4323ebbd485348cf7
Author: Sneha Vijayarajan <sn...@gmail.com>
AuthorDate: Thu Jan 7 00:13:37 2021 +0530

    HADOOP-17404. ABFS: Small write - Merge append and flush
    
    - Contributed by Sneha Vijayarajan
    
    (cherry picked from commit b612c310c26394aa406c99d8598c9cb7621df052)
---
 hadoop-tools/hadoop-azure/pom.xml                  |   2 +
 .../src/config/checkstyle-suppressions.xml         |   2 +
 .../hadoop/fs/azurebfs/AbfsConfiguration.java      |   8 +
 .../fs/azurebfs/AzureBlobFileSystemStore.java      |   1 +
 .../fs/azurebfs/constants/AbfsHttpConstants.java   |   1 +
 .../fs/azurebfs/constants/ConfigurationKeys.java   |   9 +
 .../constants/FileSystemConfigurations.java        |   1 +
 .../fs/azurebfs/constants/HttpQueryParams.java     |   1 +
 .../services/AppendRequestParameters.java          |  69 +++
 .../hadoop/fs/azurebfs/services/AbfsClient.java    |  47 +-
 .../fs/azurebfs/services/AbfsOutputStream.java     |  67 ++-
 .../azurebfs/services/AbfsOutputStreamContext.java |  11 +
 .../fs/azurebfs/services/AbfsRestOperation.java    |   2 +-
 .../fs/azurebfs/ITestAbfsNetworkStatistics.java    | 339 +++++++------
 .../fs/azurebfs/ITestSmallWriteOptimization.java   | 523 +++++++++++++++++++++
 .../azurebfs/services/ITestAbfsOutputStream.java   |  17 +-
 .../fs/azurebfs/services/TestAbfsOutputStream.java | 279 ++++++-----
 17 files changed, 1030 insertions(+), 349 deletions(-)

diff --git a/hadoop-tools/hadoop-azure/pom.xml b/hadoop-tools/hadoop-azure/pom.xml
index 7bfce01..6b0599d 100644
--- a/hadoop-tools/hadoop-azure/pom.xml
+++ b/hadoop-tools/hadoop-azure/pom.xml
@@ -555,6 +555,7 @@
                     <exclude>**/azurebfs/ITestAbfsReadWriteAndSeek.java</exclude>
                     <exclude>**/azurebfs/ITestAzureBlobFileSystemListStatus.java</exclude>
                     <exclude>**/azurebfs/extensions/ITestAbfsDelegationTokens.java</exclude>
+                    <exclude>**/azurebfs/ITestSmallWriteOptimization.java</exclude>
                   </excludes>
 
                 </configuration>
@@ -594,6 +595,7 @@
                     <include>**/azurebfs/ITestAbfsReadWriteAndSeek.java</include>
                     <include>**/azurebfs/ITestAzureBlobFileSystemListStatus.java</include>
                     <include>**/azurebfs/extensions/ITestAbfsDelegationTokens.java</include>
+                    <include>**/azurebfs/ITestSmallWriteOptimization.java</include>
                   </includes>
                 </configuration>
               </execution>
diff --git a/hadoop-tools/hadoop-azure/src/config/checkstyle-suppressions.xml b/hadoop-tools/hadoop-azure/src/config/checkstyle-suppressions.xml
index c502361..070c8c1 100644
--- a/hadoop-tools/hadoop-azure/src/config/checkstyle-suppressions.xml
+++ b/hadoop-tools/hadoop-azure/src/config/checkstyle-suppressions.xml
@@ -46,4 +46,6 @@
               files="org[\\/]apache[\\/]hadoop[\\/]fs[\\/]azurebfs[\\/]AzureBlobFileSystemStore.java"/>
     <suppress checks="ParameterNumber|MagicNumber"
               files="org[\\/]apache[\\/]hadoop[\\/]fs[\\/]azurebfs[\\/]utils[\\/]Base64.java"/>
+    <suppress checks="ParameterNumber|VisibilityModifier"
+              files="org[\\/]apache[\\/]hadoop[\\/]fs[\\/]azurebfs[\\/]ITestSmallWriteOptimization.java"/>
 </suppressions>
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
index b1c95d2..5a70323 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
@@ -100,6 +100,10 @@ public class AbfsConfiguration{
       DefaultValue = DEFAULT_WRITE_BUFFER_SIZE)
   private int writeBufferSize;
 
+  @BooleanConfigurationValidatorAnnotation(ConfigurationKey = AZURE_ENABLE_SMALL_WRITE_OPTIMIZATION,
+      DefaultValue = DEFAULT_AZURE_ENABLE_SMALL_WRITE_OPTIMIZATION)
+  private boolean enableSmallWriteOptimization;
+
   @BooleanConfigurationValidatorAnnotation(
       ConfigurationKey = AZURE_READ_SMALL_FILES_COMPLETELY,
       DefaultValue = DEFAULT_READ_SMALL_FILES_COMPLETELY)
@@ -537,6 +541,10 @@ public class AbfsConfiguration{
     return this.writeBufferSize;
   }
 
+  public boolean isSmallWriteOptimizationEnabled() {
+    return this.enableSmallWriteOptimization;
+  }
+
   public boolean readSmallFilesCompletely() {
     return this.readSmallFilesCompletely;
   }
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
index 869a6f9..c8dd518 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
@@ -578,6 +578,7 @@ public class AzureBlobFileSystemStore implements Closeable {
     return new AbfsOutputStreamContext(abfsConfiguration.getSasTokenRenewPeriodForStreamsInSeconds())
             .withWriteBufferSize(bufferSize)
             .enableFlush(abfsConfiguration.isFlushEnabled())
+            .enableSmallWriteOptimization(abfsConfiguration.isSmallWriteOptimizationEnabled())
             .disableOutputStreamFlush(abfsConfiguration.isOutputStreamFlushDisabled())
             .withStreamStatistics(new AbfsOutputStreamStatisticsImpl())
             .withAppendBlob(isAppendBlob)
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java
index 38b79c9..184657e 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java
@@ -76,6 +76,7 @@ public final class AbfsHttpConstants {
   public static final String AT = "@";
   public static final String HTTP_HEADER_PREFIX = "x-ms-";
   public static final String HASH = "#";
+  public static final String TRUE = "true";
 
   public static final String PLUS_ENCODE = "%20";
   public static final String FORWARD_SLASH_ENCODE = "%2F";
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
index 3e1ff80..cdef9c9 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
@@ -55,6 +55,15 @@ public final class ConfigurationKeys {
   public static final String AZURE_WRITE_MAX_CONCURRENT_REQUESTS = "fs.azure.write.max.concurrent.requests";
   public static final String AZURE_WRITE_MAX_REQUESTS_TO_QUEUE = "fs.azure.write.max.requests.to.queue";
   public static final String AZURE_WRITE_BUFFER_SIZE = "fs.azure.write.request.size";
+  /** If the data size written by Hadoop app is small, i.e. data size :
+   *  (a) before any of HFlush/HSync call is made or
+   *  (b) between 2 HFlush/Hsync API calls
+   *  is less than write buffer size, 2 separate calls, one for append and
+   *  another for flush are made.
+   *  By enabling the small write optimization, a single call will be made to
+   *  perform both append and flush operations and hence reduce request count.
+   */
+  public static final String AZURE_ENABLE_SMALL_WRITE_OPTIMIZATION = "fs.azure.write.enableappendwithflush";
   public static final String AZURE_READ_BUFFER_SIZE = "fs.azure.read.request.size";
   public static final String AZURE_READ_SMALL_FILES_COMPLETELY = "fs.azure.read.smallfilescompletely";
   public static final String AZURE_READ_OPTIMIZE_FOOTER_READ = "fs.azure.read.optimizefooterread";
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
index 8008206..a23dfd5 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
@@ -56,6 +56,7 @@ public final class FileSystemConfigurations {
   // Default upload and download buffer size
   public static final int DEFAULT_WRITE_BUFFER_SIZE = 8 * ONE_MB;  // 8 MB
   public static final int APPENDBLOB_MAX_WRITE_BUFFER_SIZE = 4 * ONE_MB;  // 4 MB
+  public static final boolean DEFAULT_AZURE_ENABLE_SMALL_WRITE_OPTIMIZATION = false;
   public static final int DEFAULT_READ_BUFFER_SIZE = 4 * ONE_MB;  // 4 MB
   public static final boolean DEFAULT_READ_SMALL_FILES_COMPLETELY = false;
   public static final boolean DEFAULT_OPTIMIZE_FOOTER_READ = false;
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/HttpQueryParams.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/HttpQueryParams.java
index 5a550ac..8a4ca90 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/HttpQueryParams.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/HttpQueryParams.java
@@ -36,6 +36,7 @@ public final class HttpQueryParams {
   public static final String QUERY_PARAM_POSITION = "position";
   public static final String QUERY_PARAM_TIMEOUT = "timeout";
   public static final String QUERY_PARAM_RETAIN_UNCOMMITTED_DATA = "retainUncommittedData";
+  public static final String QUERY_PARAM_FLUSH = "flush";
   public static final String QUERY_PARAM_CLOSE = "close";
   public static final String QUERY_PARAM_UPN = "upn";
   public static final String QUERY_PARAM_BLOBTYPE = "blobtype";
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/AppendRequestParameters.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/AppendRequestParameters.java
new file mode 100644
index 0000000..fb4d29f
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/AppendRequestParameters.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.contracts.services;
+
+/**
+ * Saves the different request parameters for append
+ */
+public class AppendRequestParameters {
+  public enum Mode {
+    APPEND_MODE,
+    FLUSH_MODE,
+    FLUSH_CLOSE_MODE
+  }
+
+  private final long position;
+  private final int offset;
+  private final int length;
+  private final Mode mode;
+  private final boolean isAppendBlob;
+
+  public AppendRequestParameters(final long position,
+      final int offset,
+      final int length,
+      final Mode mode,
+      final boolean isAppendBlob) {
+    this.position = position;
+    this.offset = offset;
+    this.length = length;
+    this.mode = mode;
+    this.isAppendBlob = isAppendBlob;
+  }
+
+  public long getPosition() {
+    return this.position;
+  }
+
+  public int getoffset() {
+    return this.offset;
+  }
+
+  public int getLength() {
+    return this.length;
+  }
+
+  public Mode getMode() {
+    return this.mode;
+  }
+
+  public boolean isAppendBlob() {
+    return this.isAppendBlob;
+  }
+
+}
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
index db2f44f..bfc11a6 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
@@ -45,6 +45,7 @@ import org.apache.hadoop.fs.azurebfs.contracts.exceptions.SASTokenProviderExcept
 import org.apache.hadoop.fs.azurebfs.extensions.ExtensionHelper;
 import org.apache.hadoop.fs.azurebfs.extensions.SASTokenProvider;
 import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
+import org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters;
 import org.apache.hadoop.fs.azurebfs.oauth2.AccessTokenProvider;
 import org.apache.hadoop.fs.azurebfs.utils.DateTimeUtils;
 import org.apache.hadoop.io.IOUtils;
@@ -396,17 +397,27 @@ public class AbfsClient implements Closeable {
     return op;
   }
 
-  public AbfsRestOperation append(final String path, final long position, final byte[] buffer, final int offset,
-                                  final int length, final String cachedSasToken, final boolean isAppendBlob) throws AzureBlobFileSystemException {
+  public AbfsRestOperation append(final String path, final byte[] buffer,
+      AppendRequestParameters reqParams, final String cachedSasToken)
+      throws AzureBlobFileSystemException {
     final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
     // JDK7 does not support PATCH, so to workaround the issue we will use
     // PUT and specify the real method in the X-Http-Method-Override header.
     requestHeaders.add(new AbfsHttpHeader(X_HTTP_METHOD_OVERRIDE,
-            HTTP_METHOD_PATCH));
+        HTTP_METHOD_PATCH));
 
     final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
     abfsUriQueryBuilder.addQuery(QUERY_PARAM_ACTION, APPEND_ACTION);
-    abfsUriQueryBuilder.addQuery(QUERY_PARAM_POSITION, Long.toString(position));
+    abfsUriQueryBuilder.addQuery(QUERY_PARAM_POSITION, Long.toString(reqParams.getPosition()));
+
+    if ((reqParams.getMode() == AppendRequestParameters.Mode.FLUSH_MODE) || (
+        reqParams.getMode() == AppendRequestParameters.Mode.FLUSH_CLOSE_MODE)) {
+      abfsUriQueryBuilder.addQuery(QUERY_PARAM_FLUSH, TRUE);
+      if (reqParams.getMode() == AppendRequestParameters.Mode.FLUSH_CLOSE_MODE) {
+        abfsUriQueryBuilder.addQuery(QUERY_PARAM_CLOSE, TRUE);
+      }
+    }
+
     // AbfsInputStream/AbfsOutputStream reuse SAS tokens for better performance
     String sasTokenForReuse = appendSASTokenToQuery(path, SASTokenProvider.WRITE_OPERATION,
         abfsUriQueryBuilder, cachedSasToken);
@@ -414,20 +425,30 @@ public class AbfsClient implements Closeable {
     final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
     final AbfsRestOperation op = new AbfsRestOperation(
         AbfsRestOperationType.Append,
-            this,
-            HTTP_METHOD_PUT,
-            url,
-            requestHeaders, buffer, offset, length, sasTokenForReuse);
+        this,
+        HTTP_METHOD_PUT,
+        url,
+        requestHeaders,
+        buffer,
+        reqParams.getoffset(),
+        reqParams.getLength(),
+        sasTokenForReuse);
     try {
       op.execute();
     } catch (AzureBlobFileSystemException e) {
-      if (isAppendBlob && appendSuccessCheckOp(op, path, (position + length))) {
+      if (reqParams.isAppendBlob()
+          && appendSuccessCheckOp(op, path,
+          (reqParams.getPosition() + reqParams.getLength()))) {
         final AbfsRestOperation successOp = new AbfsRestOperation(
             AbfsRestOperationType.Append,
-                this,
-                HTTP_METHOD_PUT,
-                url,
-                requestHeaders, buffer, offset, length, sasTokenForReuse);
+            this,
+            HTTP_METHOD_PUT,
+            url,
+            requestHeaders,
+            buffer,
+            reqParams.getoffset(),
+            reqParams.getLength(),
+            sasTokenForReuse);
         successOp.hardSetResult(HttpURLConnection.HTTP_OK);
         return successOp;
       }
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java
index 01b2fa5..402fdda 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java
@@ -35,11 +35,13 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
 import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
+import org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters;
 import org.apache.hadoop.fs.azurebfs.utils.CachedSASToken;
 import org.apache.hadoop.io.ElasticByteBufferPool;
 import org.apache.hadoop.fs.FileSystem.Statistics;
@@ -48,6 +50,9 @@ import org.apache.hadoop.fs.StreamCapabilities;
 import org.apache.hadoop.fs.Syncable;
 
 import static org.apache.hadoop.io.IOUtils.wrapException;
+import static org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters.Mode.APPEND_MODE;
+import static org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters.Mode.FLUSH_CLOSE_MODE;
+import static org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters.Mode.FLUSH_MODE;
 
 /**
  * The BlobFsOutputStream for Rest AbfsClient.
@@ -60,6 +65,7 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
   private boolean closed;
   private boolean supportFlush;
   private boolean disableOutputStreamFlush;
+  private boolean enableSmallWriteOptimization;
   private boolean isAppendBlob;
   private volatile IOException lastError;
 
@@ -69,6 +75,7 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
   private final int bufferSize;
   private byte[] buffer;
   private int bufferIndex;
+  private int numOfAppendsToServerSinceLastFlush;
   private final int maxConcurrentRequestCount;
   private final int maxRequestsThatCanBeQueued;
 
@@ -108,12 +115,15 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
     this.supportFlush = abfsOutputStreamContext.isEnableFlush();
     this.disableOutputStreamFlush = abfsOutputStreamContext
             .isDisableOutputStreamFlush();
+    this.enableSmallWriteOptimization
+        = abfsOutputStreamContext.isEnableSmallWriteOptimization();
     this.isAppendBlob = abfsOutputStreamContext.isAppendBlob();
     this.lastError = null;
     this.lastFlushOffset = 0;
     this.bufferSize = abfsOutputStreamContext.getWriteBufferSize();
     this.buffer = byteBufferPool.getBuffer(false, bufferSize).array();
     this.bufferIndex = 0;
+    this.numOfAppendsToServerSinceLastFlush = 0;
     this.writeOperations = new ConcurrentLinkedDeque<>();
     this.outputStreamStatistics = abfsOutputStreamContext.getStreamStatistics();
 
@@ -309,8 +319,29 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
 
   private synchronized void flushInternal(boolean isClose) throws IOException {
     maybeThrowLastError();
+
+    // if its a flush post write < buffersize, send flush parameter in append
+    if (!isAppendBlob
+        && enableSmallWriteOptimization
+        && (numOfAppendsToServerSinceLastFlush == 0) // there are no ongoing store writes
+        && (writeOperations.size() == 0) // double checking no appends in progress
+        && (bufferIndex > 0)) { // there is some data that is pending to be written
+      smallWriteOptimizedflushInternal(isClose);
+      return;
+    }
+
     writeCurrentBufferToService();
     flushWrittenBytesToService(isClose);
+    numOfAppendsToServerSinceLastFlush = 0;
+  }
+
+  private synchronized void smallWriteOptimizedflushInternal(boolean isClose) throws IOException {
+    // writeCurrentBufferToService will increment numOfAppendsToServerSinceLastFlush
+    writeCurrentBufferToService(true, isClose);
+    waitForAppendsToComplete();
+    shrinkWriteOperationQueue();
+    maybeThrowLastError();
+    numOfAppendsToServerSinceLastFlush = 0;
   }
 
   private synchronized void flushInternalAsync() throws IOException {
@@ -335,8 +366,9 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
     AbfsPerfTracker tracker = client.getAbfsPerfTracker();
     try (AbfsPerfInfo perfInfo = new AbfsPerfInfo(tracker,
             "writeCurrentBufferToService", "append")) {
-      AbfsRestOperation op = client.append(path, offset, bytes, 0,
-          bytesLength, cachedSasToken.get(), this.isAppendBlob);
+      AppendRequestParameters reqParams = new AppendRequestParameters(offset, 0,
+          bytesLength, APPEND_MODE, true);
+      AbfsRestOperation op = client.append(path, bytes, reqParams, cachedSasToken.get());
       cachedSasToken.update(op.getSasToken());
       outputStreamStatistics.uploadSuccessful(bytesLength);
       perfInfo.registerResult(op.getResult());
@@ -358,6 +390,10 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
   }
 
   private synchronized void writeCurrentBufferToService() throws IOException {
+    writeCurrentBufferToService(false, false);
+  }
+
+  private synchronized void writeCurrentBufferToService(boolean isFlush, boolean isClose) throws IOException {
     if (this.isAppendBlob) {
       writeAppendBlobCurrentBufferToService();
       return;
@@ -367,6 +403,7 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
       return;
     }
     outputStreamStatistics.writeCurrentBuffer();
+    numOfAppendsToServerSinceLastFlush++;
 
     final byte[] bytes = buffer;
     final int bytesLength = bufferIndex;
@@ -388,8 +425,19 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
         AbfsPerfTracker tracker = client.getAbfsPerfTracker();
         try (AbfsPerfInfo perfInfo = new AbfsPerfInfo(tracker,
                 "writeCurrentBufferToService", "append")) {
-          AbfsRestOperation op = client.append(path, offset, bytes, 0,
-                  bytesLength, cachedSasToken.get(), false);
+          AppendRequestParameters.Mode
+              mode = APPEND_MODE;
+          if (isFlush & isClose) {
+            mode = FLUSH_CLOSE_MODE;
+          } else if (isFlush) {
+            mode = FLUSH_MODE;
+          }
+
+          AppendRequestParameters reqParams = new AppendRequestParameters(
+              offset, 0, bytesLength, mode, false);
+          AbfsRestOperation op = client.append(path, bytes, reqParams,
+              cachedSasToken.get());
+
           cachedSasToken.update(op.getSasToken());
           perfInfo.registerResult(op.getResult());
           byteBufferPool.putBuffer(ByteBuffer.wrap(bytes));
@@ -410,7 +458,7 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
     shrinkWriteOperationQueue();
   }
 
-  private synchronized void flushWrittenBytesToService(boolean isClose) throws IOException {
+  private synchronized void waitForAppendsToComplete() throws IOException {
     for (WriteOperation writeOperation : writeOperations) {
       try {
         writeOperation.task.get();
@@ -428,6 +476,10 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
         throw lastError;
       }
     }
+  }
+
+  private synchronized void flushWrittenBytesToService(boolean isClose) throws IOException {
+    waitForAppendsToComplete();
     flushWrittenBytesToServiceInternal(position, false, isClose);
   }
 
@@ -558,6 +610,11 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
     return maxRequestsThatCanBeQueued;
   }
 
+  @VisibleForTesting
+  Boolean isAppendBlobStream() {
+    return isAppendBlob;
+  }
+
   /**
    * Appending AbfsOutputStream statistics to base toString().
    *
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamContext.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamContext.java
index 2dce5dc..925cd4f 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamContext.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamContext.java
@@ -27,6 +27,8 @@ public class AbfsOutputStreamContext extends AbfsStreamContext {
 
   private boolean enableFlush;
 
+  private boolean enableSmallWriteOptimization;
+
   private boolean disableOutputStreamFlush;
 
   private AbfsOutputStreamStatistics streamStatistics;
@@ -52,6 +54,11 @@ public class AbfsOutputStreamContext extends AbfsStreamContext {
     return this;
   }
 
+  public AbfsOutputStreamContext enableSmallWriteOptimization(final boolean enableSmallWriteOptimization) {
+    this.enableSmallWriteOptimization = enableSmallWriteOptimization;
+    return this;
+  }
+
   public AbfsOutputStreamContext disableOutputStreamFlush(
           final boolean disableOutputStreamFlush) {
     this.disableOutputStreamFlush = disableOutputStreamFlush;
@@ -114,4 +121,8 @@ public class AbfsOutputStreamContext extends AbfsStreamContext {
   public int getMaxWriteRequestsToQueue() {
     return this.maxWriteRequestsToQueue;
   }
+
+  public boolean isEnableSmallWriteOptimization() {
+    return this.enableSmallWriteOptimization;
+  }
 }
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java
index 83c76f5..24ec292 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java
@@ -290,7 +290,7 @@ public class AbfsRestOperation {
       AbfsClientThrottlingIntercept.updateMetrics(operationType, httpOperation);
     }
 
-    LOG.debug("HttpRequest: {}", httpOperation.toString());
+    LOG.debug("HttpRequest: {}: {}", operationType, httpOperation.toString());
 
     if (client.getRetryPolicy().shouldRetry(retryCount, httpOperation.getStatusCode())) {
       return false;
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsNetworkStatistics.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsNetworkStatistics.java
index c2dbe93..66b8da8 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsNetworkStatistics.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsNetworkStatistics.java
@@ -33,14 +33,16 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream;
 import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation;
 
+import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.BYTES_RECEIVED;
 import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.CONNECTIONS_MADE;
+import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.GET_RESPONSES;
 import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.SEND_REQUESTS;
 
 public class ITestAbfsNetworkStatistics extends AbstractAbfsIntegrationTest {
 
   private static final Logger LOG =
       LoggerFactory.getLogger(ITestAbfsNetworkStatistics.class);
-  private static final int LARGE_OPERATIONS = 10;
+  private static final int WRITE_OPERATION_LOOP_COUNT = 10;
 
   public ITestAbfsNetworkStatistics() throws Exception {
   }
@@ -58,117 +60,126 @@ public class ITestAbfsNetworkStatistics extends AbstractAbfsIntegrationTest {
     Map<String, Long> metricMap;
     Path sendRequestPath = path(getMethodName());
     String testNetworkStatsString = "http_send";
-    long connectionsMade, requestsSent, bytesSent;
 
     metricMap = fs.getInstrumentationMap();
-    long connectionsMadeBeforeTest = metricMap
-        .get(CONNECTIONS_MADE.getStatName());
-    long requestsMadeBeforeTest = metricMap.get(SEND_REQUESTS.getStatName());
-
-    /*
-     * Creating AbfsOutputStream will result in 1 connection made and 1 send
-     * request.
-     */
+    long expectedConnectionsMade = metricMap.get(CONNECTIONS_MADE.getStatName());
+    long expectedRequestsSent = metricMap.get(SEND_REQUESTS.getStatName());
+    long expectedBytesSent = 0;
+
+    // --------------------------------------------------------------------
+     // Operation: Creating AbfsOutputStream
     try (AbfsOutputStream out = createAbfsOutputStreamWithFlushEnabled(fs,
         sendRequestPath)) {
+       // Network stats calculation: For Creating AbfsOutputStream:
+       // 1 create request = 1 connection made and 1 send request
+      expectedConnectionsMade++;
+      expectedRequestsSent++;
+      // --------------------------------------------------------------------
+
+      // Operation: Write small data
+      // Network stats calculation: No additions.
+      // Data written is less than the buffer size and hence will not
+      // trigger any append request to store
       out.write(testNetworkStatsString.getBytes());
+      // --------------------------------------------------------------------
 
-      /*
-       * Flushes all outstanding data (i.e. the current unfinished packet)
-       * from the client into the service on all DataNode replicas.
-       */
+       // Operation: HFlush
+       // Flushes all outstanding data (i.e. the current unfinished packet)
+       // from the client into the service on all DataNode replicas.
       out.hflush();
-
-      metricMap = fs.getInstrumentationMap();
-
       /*
-       * Testing the network stats with 1 write operation.
-       *
-       * connections_made : (connections made above) + 2(flush).
+       * Network stats calculation:
+       * 3 possibilities here:
+       * A. As there is pending data to be written to store, this will result in:
+       *    1 append + 1 flush = 2 connections and 2 send requests
        *
-       * send_requests : (requests sent above) + 2(flush).
+       * B. If config "fs.azure.enable.small.write.optimization" is enabled, append
+       *   and flush call will be merged for small data in buffer in this test.
+       *   In which case it will be:
+       *   1 append+flush request = 1 connection and 1 send request
        *
-       * bytes_sent : bytes wrote in AbfsOutputStream.
+       * C. If the path is configured for append Blob files to be used, hflush
+       *   is a no-op. So in this case:
+       *   1 append = 1 connection and 1 send request
        */
-      long extraCalls = 0;
-      if (!fs.getAbfsStore()
-          .isAppendBlobKey(fs.makeQualified(sendRequestPath).toString())) {
-        // no network calls are made for hflush in case of appendblob
-        extraCalls++;
+      if (fs.getAbfsStore().isAppendBlobKey(fs.makeQualified(sendRequestPath).toString())
+          || (this.getConfiguration().isSmallWriteOptimizationEnabled())) {
+        expectedConnectionsMade++;
+        expectedRequestsSent++;
+      } else {
+        expectedConnectionsMade += 2;
+        expectedRequestsSent += 2;
       }
-      long expectedConnectionsMade = connectionsMadeBeforeTest + extraCalls + 2;
-      long expectedRequestsSent = requestsMadeBeforeTest + extraCalls + 2;
-      connectionsMade = assertAbfsStatistics(CONNECTIONS_MADE,
+      expectedBytesSent += testNetworkStatsString.getBytes().length;
+      // --------------------------------------------------------------------
+
+      // Assertions
+      metricMap = fs.getInstrumentationMap();
+      assertAbfsStatistics(CONNECTIONS_MADE,
           expectedConnectionsMade, metricMap);
-      requestsSent = assertAbfsStatistics(SEND_REQUESTS, expectedRequestsSent,
+      assertAbfsStatistics(SEND_REQUESTS, expectedRequestsSent,
           metricMap);
-      bytesSent = assertAbfsStatistics(AbfsStatistic.BYTES_SENT,
-          testNetworkStatsString.getBytes().length, metricMap);
+      assertAbfsStatistics(AbfsStatistic.BYTES_SENT,
+          expectedBytesSent, metricMap);
     }
 
-    // To close the AbfsOutputStream 1 connection is made and 1 request is sent.
-    connectionsMade++;
-    requestsSent++;
-
+    // --------------------------------------------------------------------
+    // Operation: AbfsOutputStream close.
+    // Network Stats calculation: 1 flush (with close) is send.
+    // 1 flush request = 1 connection and 1 send request
+    expectedConnectionsMade++;
+    expectedRequestsSent++;
+    // --------------------------------------------------------------------
 
+    // Operation: Re-create the file / create overwrite scenario
     try (AbfsOutputStream out = createAbfsOutputStreamWithFlushEnabled(fs,
         sendRequestPath)) {
-
-      // Is a file overwrite case
-      long createRequestCalls = 1;
-      long createTriggeredGFSForETag = 0;
+      /*
+       * Network Stats calculation: create overwrite
+       * There are 2 possibilities here.
+       * A. create overwrite results in 1 server call
+       *    create with overwrite=true = 1 connection and 1 send request
+       *
+       * B. If config "fs.azure.enable.conditional.create.overwrite" is enabled,
+       *    create overwrite=false (will fail in this case as file is indeed present)
+       *    + getFileStatus to fetch the file ETag
+       *    + create overwrite=true
+       *    = 3 connections and 2 send requests
+       */
       if (this.getConfiguration().isConditionalCreateOverwriteEnabled()) {
-        createRequestCalls += 1;
-        createTriggeredGFSForETag = 1;
+        expectedConnectionsMade += 3;
+        expectedRequestsSent += 2;
+      } else {
+        expectedConnectionsMade += 1;
+        expectedRequestsSent += 1;
       }
+      // --------------------------------------------------------------------
 
-      for (int i = 0; i < LARGE_OPERATIONS; i++) {
+      // Operation: Multiple small appends + hflush
+      for (int i = 0; i < WRITE_OPERATION_LOOP_COUNT; i++) {
         out.write(testNetworkStatsString.getBytes());
-
-        /*
-         * 1 flush call would create 2 connections and 2 send requests.
-         * when hflush() is called it will essentially trigger append() and
-         * flush() inside AbfsRestOperation. Both of which calls
-         * executeHttpOperation() method which creates a connection and sends
-         * requests.
-         */
+        // Network stats calculation: no-op. Small write
         out.hflush();
+        // Network stats calculation: Hflush
+        // refer to previous comments for hFlush network stats calcualtion
+        // possibilities
+        if (fs.getAbfsStore().isAppendBlobKey(fs.makeQualified(sendRequestPath).toString())
+            || (this.getConfiguration().isSmallWriteOptimizationEnabled())) {
+          expectedConnectionsMade++;
+          expectedRequestsSent++;
+        } else {
+          expectedConnectionsMade += 2;
+          expectedRequestsSent += 2;
+        }
+        expectedBytesSent += testNetworkStatsString.getBytes().length;
       }
+      // --------------------------------------------------------------------
 
+      // Assertions
       metricMap = fs.getInstrumentationMap();
-
-      /*
-       * Testing the network stats with Large amount of bytes sent.
-       *
-       * connections made : connections_made(Last assertion) + 1
-       * (AbfsOutputStream) + LARGE_OPERATIONS * 2(flush).
-       *
-       * send requests : requests_sent(Last assertion) + 1(AbfsOutputStream) +
-       * LARGE_OPERATIONS * 2(flush).
-       *
-       * bytes sent : bytes_sent(Last assertion) + LARGE_OPERATIONS * (bytes
-       * wrote each time).
-       *
-       */
-
-      connectionsMade += createRequestCalls + createTriggeredGFSForETag;
-      requestsSent += createRequestCalls;
-      if (fs.getAbfsStore().isAppendBlobKey(fs.makeQualified(sendRequestPath).toString())) {
-        // no network calls are made for hflush in case of appendblob
-        assertAbfsStatistics(CONNECTIONS_MADE,
-            connectionsMade + LARGE_OPERATIONS, metricMap);
-        assertAbfsStatistics(SEND_REQUESTS,
-            requestsSent + LARGE_OPERATIONS, metricMap);
-      } else {
-        assertAbfsStatistics(CONNECTIONS_MADE,
-            connectionsMade + LARGE_OPERATIONS * 2, metricMap);
-        assertAbfsStatistics(SEND_REQUESTS,
-            requestsSent + LARGE_OPERATIONS * 2, metricMap);
-      }
-      assertAbfsStatistics(AbfsStatistic.BYTES_SENT,
-          bytesSent + LARGE_OPERATIONS * (testNetworkStatsString.getBytes().length),
-          metricMap);
-
+      assertAbfsStatistics(CONNECTIONS_MADE, expectedConnectionsMade, metricMap);
+      assertAbfsStatistics(SEND_REQUESTS, expectedRequestsSent, metricMap);
+      assertAbfsStatistics(AbfsStatistic.BYTES_SENT, expectedBytesSent, metricMap);
     }
 
   }
@@ -185,130 +196,100 @@ public class ITestAbfsNetworkStatistics extends AbstractAbfsIntegrationTest {
     Path getResponsePath = path(getMethodName());
     Map<String, Long> metricMap;
     String testResponseString = "some response";
-    long getResponses, bytesReceived;
 
     FSDataOutputStream out = null;
     FSDataInputStream in = null;
-    try {
+    long expectedConnectionsMade;
+    long expectedGetResponses;
+    long expectedBytesReceived;
 
-      /*
-       * Creating a File and writing some bytes in it.
-       *
-       * get_response : 3(getFileSystem) + 1(OutputStream creation) + 2
-       * (Writing data in Data store).
-       *
-       */
+    try {
+      // Creating a File and writing some bytes in it.
       out = fs.create(getResponsePath);
       out.write(testResponseString.getBytes());
       out.hflush();
 
+      // Set metric baseline
       metricMap = fs.getInstrumentationMap();
-      long getResponsesBeforeTest = metricMap
-          .get(CONNECTIONS_MADE.getStatName());
+      long bytesWrittenToFile = testResponseString.getBytes().length;
+      expectedConnectionsMade = metricMap.get(CONNECTIONS_MADE.getStatName());
+      expectedGetResponses = metricMap.get(CONNECTIONS_MADE.getStatName());
+      expectedBytesReceived = metricMap.get(BYTES_RECEIVED.getStatName());
 
-      // open would require 1 get response.
+      // --------------------------------------------------------------------
+      // Operation: Create AbfsInputStream
       in = fs.open(getResponsePath);
-      // read would require 1 get response and also get the bytes received.
-      int result = in.read();
-
-      // Confirming read isn't -1.
-      LOG.info("Result of read operation : {}", result);
+      // Network stats calculation: For Creating AbfsInputStream:
+      // 1 GetFileStatus request to fetch file size = 1 connection and 1 get response
+      expectedConnectionsMade++;
+      expectedGetResponses++;
+      // --------------------------------------------------------------------
 
+      // Operation: Read
+      int result = in.read();
+      // Network stats calculation: For read:
+      // 1 read request = 1 connection and 1 get response
+      expectedConnectionsMade++;
+      expectedGetResponses++;
+      expectedBytesReceived += bytesWrittenToFile;
+      // --------------------------------------------------------------------
+
+      // Assertions
       metricMap = fs.getInstrumentationMap();
-
-      /*
-       * Testing values of statistics after writing and reading a buffer.
-       *
-       * get_responses - (above operations) + 1(open()) + 1 (read()).;
-       *
-       * bytes_received - This should be equal to bytes sent earlier.
-       */
-      long extraCalls = 0;
-      if (!fs.getAbfsStore()
-          .isAppendBlobKey(fs.makeQualified(getResponsePath).toString())) {
-        // no network calls are made for hflush in case of appendblob
-        extraCalls++;
-      }
-      long expectedGetResponses = getResponsesBeforeTest + extraCalls + 1;
-      getResponses = assertAbfsStatistics(AbfsStatistic.GET_RESPONSES,
-          expectedGetResponses, metricMap);
-
-      // Testing that bytes received is equal to bytes sent.
-      long bytesSend = metricMap.get(AbfsStatistic.BYTES_SENT.getStatName());
-      bytesReceived = assertAbfsStatistics(AbfsStatistic.BYTES_RECEIVED,
-          bytesSend,
-          metricMap);
-
+      assertAbfsStatistics(CONNECTIONS_MADE, expectedConnectionsMade, metricMap);
+      assertAbfsStatistics(GET_RESPONSES, expectedGetResponses, metricMap);
+      assertAbfsStatistics(AbfsStatistic.BYTES_RECEIVED, expectedBytesReceived, metricMap);
     } finally {
       IOUtils.cleanupWithLogger(LOG, out, in);
     }
 
-    // To close the streams 1 response is received.
-    getResponses++;
+    // --------------------------------------------------------------------
+    // Operation: AbfsOutputStream close.
+    // Network Stats calculation: no op.
+    // --------------------------------------------------------------------
 
     try {
 
-      /*
-       * Creating a file and writing buffer into it.
-       * This is a file recreate, so it will trigger
-       * 2 extra calls if create overwrite is off by default.
-       * Also recording the buffer for future read() call.
-       * This creating outputStream and writing requires 2 *
-       * (LARGE_OPERATIONS) get requests.
-       */
+      // Recreate file with different file size
+      // [Create and append related network stats checks are done in
+      // test method testAbfsHttpSendStatistics]
       StringBuilder largeBuffer = new StringBuilder();
       out = fs.create(getResponsePath);
 
-      long createRequestCalls = 1;
-      if (this.getConfiguration().isConditionalCreateOverwriteEnabled()) {
-        createRequestCalls += 2;
-      }
-
-      for (int i = 0; i < LARGE_OPERATIONS; i++) {
+      for (int i = 0; i < WRITE_OPERATION_LOOP_COUNT; i++) {
         out.write(testResponseString.getBytes());
         out.hflush();
         largeBuffer.append(testResponseString);
       }
 
-      // Open requires 1 get_response.
+      // sync back to metric baseline
+      metricMap = fs.getInstrumentationMap();
+      expectedConnectionsMade = metricMap.get(CONNECTIONS_MADE.getStatName());
+      expectedGetResponses = metricMap.get(GET_RESPONSES.getStatName());
+      // --------------------------------------------------------------------
+      // Operation: Create AbfsInputStream
       in = fs.open(getResponsePath);
-
-      /*
-       * Reading the file which was written above. This read() call would
-       * read bytes equal to the bytes that was written above.
-       * Get response would be 1 only.
-       */
-      in.read(0, largeBuffer.toString().getBytes(), 0,
-          largeBuffer.toString().getBytes().length);
-
+      // Network stats calculation: For Creating AbfsInputStream:
+      // 1 GetFileStatus for file size = 1 connection and 1 get response
+      expectedConnectionsMade++;
+      expectedGetResponses++;
+      // --------------------------------------------------------------------
+
+      // Operation: Read
+      in.read(0, largeBuffer.toString().getBytes(), 0, largeBuffer.toString().getBytes().length);
+      // Network stats calculation: Total data written is still lesser than
+      // a buffer size. Hence will trigger only one read to store. So result is:
+      // 1 read request = 1 connection and 1 get response
+      expectedConnectionsMade++;
+      expectedGetResponses++;
+      expectedBytesReceived += (WRITE_OPERATION_LOOP_COUNT * testResponseString.getBytes().length);
+      // --------------------------------------------------------------------
+
+      // Assertions
       metricMap = fs.getInstrumentationMap();
-
-      /*
-       * Testing the statistics values after writing and reading a large buffer.
-       *
-       * get_response : get_responses(Last assertion) + 1
-       * (OutputStream) + 2 * LARGE_OPERATIONS(Writing and flushing
-       * LARGE_OPERATIONS times) + 1(open()) + 1(read()) +
-       * 1 (createOverwriteTriggeredGetForeTag).
-       *
-       * bytes_received : bytes_received(Last assertion) + LARGE_OPERATIONS *
-       * bytes wrote each time (bytes_received is equal to bytes wrote in the
-       * File).
-       *
-       */
-      assertAbfsStatistics(AbfsStatistic.BYTES_RECEIVED,
-          bytesReceived + LARGE_OPERATIONS * (testResponseString.getBytes().length),
-          metricMap);
-      if (fs.getAbfsStore().isAppendBlobKey(fs.makeQualified(getResponsePath).toString())) {
-        // no network calls are made for hflush in case of appendblob
-        assertAbfsStatistics(AbfsStatistic.GET_RESPONSES,
-            getResponses + 3 + LARGE_OPERATIONS, metricMap);
-      } else {
-        assertAbfsStatistics(AbfsStatistic.GET_RESPONSES,
-            getResponses + 2 + createRequestCalls + 2 * LARGE_OPERATIONS,
-            metricMap);
-      }
-
+      assertAbfsStatistics(CONNECTIONS_MADE, expectedConnectionsMade, metricMap);
+      assertAbfsStatistics(GET_RESPONSES, expectedGetResponses, metricMap);
+      assertAbfsStatistics(AbfsStatistic.BYTES_RECEIVED, expectedBytesReceived, metricMap);
     } finally {
       IOUtils.cleanupWithLogger(LOG, out, in);
     }
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestSmallWriteOptimization.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestSmallWriteOptimization.java
new file mode 100644
index 0000000..fce2b68
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestSmallWriteOptimization.java
@@ -0,0 +1,523 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs;
+
+import java.util.Arrays;
+import java.util.Random;
+import java.util.UUID;
+import java.util.Map;
+import java.io.IOException;
+
+import org.assertj.core.api.Assertions;
+import org.junit.Assume;
+import org.junit.runners.Parameterized;
+import org.junit.runner.RunWith;
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys;
+
+import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.BYTES_SENT;
+import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.CONNECTIONS_MADE;
+import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.SEND_REQUESTS;
+import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_AZURE_ENABLE_SMALL_WRITE_OPTIMIZATION;
+import static org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.FS_AZURE_TEST_APPENDBLOB_ENABLED;
+
+/**
+ * Test combination for small writes with flush and close operations.
+ * This test class formulates an append test flow to assert on various scenarios.
+ * Test stages:
+ * 1. Pre-create test file of required size. This is determined by
+ * startingFileSize parameter. If it is 0, then pre-creation is skipped.
+ *
+ * 2. Formulate an append loop or iteration. An iteration, will do N writes
+ * (determined by numOfClientWrites parameter) with each writing X bytes
+ * (determined by recurringClientWriteSize parameter).
+ *
+ * 3. Determine total number of append iterations needed by a test.
+ * If intention is to close the outputStream right after append, setting
+ * directCloseTest parameter will determine 1 append test iteration with an
+ * ending close.
+ * Else, it will execute TEST_FLUSH_ITERATION number of test iterations, with
+ * each doing appends, hflush/hsync and then close.
+ *
+ * 4. Execute test iterations with asserts on number of store requests made and
+ * validating file content.
+ */
+@RunWith(Parameterized.class)
+public class ITestSmallWriteOptimization extends AbstractAbfsScaleTest {
+  private static final int ONE_MB = 1024 * 1024;
+  private static final int TWO_MB = 2 * ONE_MB;
+  private static final int TEST_BUFFER_SIZE = TWO_MB;
+  private static final int HALF_TEST_BUFFER_SIZE = TWO_MB / 2;
+  private static final int QUARTER_TEST_BUFFER_SIZE = TWO_MB / 4;
+  private static final int TEST_FLUSH_ITERATION = 2;
+
+  @Parameterized.Parameter
+  public String testScenario;
+
+  @Parameterized.Parameter(1)
+  public boolean enableSmallWriteOptimization;
+
+  /**
+   * If true, will initiate close after appends. (That is, no explicit hflush or
+   * hsync calls will be made from client app.)
+   */
+  @Parameterized.Parameter(2)
+  public boolean directCloseTest;
+
+  /**
+   * If non-zero, test file should be created as pre-requisite with this size.
+   */
+  @Parameterized.Parameter(3)
+  public Integer startingFileSize;
+
+  /**
+   * Determines the write sizes to be issued by client app.
+   */
+  @Parameterized.Parameter(4)
+  public Integer recurringClientWriteSize;
+
+  /**
+   * Determines the number of Client writes to make.
+   */
+  @Parameterized.Parameter(5)
+  public Integer numOfClientWrites;
+
+  /**
+   * True, if the small write optimization is supposed to be effective in
+   * the scenario.
+   */
+  @Parameterized.Parameter(6)
+  public boolean flushExpectedToBeMergedWithAppend;
+
+  @Parameterized.Parameters(name = "{0}")
+  public static Iterable<Object[]> params() {
+    return Arrays.asList(
+        // Parameter Order :
+        // testScenario,
+        // enableSmallWriteOptimization, directCloseTest, startingFileSize,
+        // recurringClientWriteSize, numOfClientWrites, flushExpectedToBeMergedWithAppend
+        new Object[][]{
+            // Buffer Size Write tests
+            { "OptmON_FlushCloseTest_EmptyFile_BufferSizeWrite",
+                true, false, 0, TEST_BUFFER_SIZE, 1, false
+            },
+            {   "OptmON_FlushCloseTest_NonEmptyFile_BufferSizeWrite",
+                true, false, 2 * TEST_BUFFER_SIZE, TEST_BUFFER_SIZE, 1, false
+            },
+            {   "OptmON_CloseTest_EmptyFile_BufferSizeWrite",
+                true, true, 0, TEST_BUFFER_SIZE, 1, false
+            },
+            {   "OptmON_CloseTest_NonEmptyFile_BufferSizeWrite",
+                true, true, 2 * TEST_BUFFER_SIZE, TEST_BUFFER_SIZE, 1, false
+            },
+            {   "OptmOFF_FlushCloseTest_EmptyFile_BufferSizeWrite",
+                false, false, 0, TEST_BUFFER_SIZE, 1, false
+            },
+            {   "OptmOFF_FlushCloseTest_NonEmptyFile_BufferSizeWrite",
+                false, false, 2 * TEST_BUFFER_SIZE, TEST_BUFFER_SIZE, 1, false
+            },
+            {   "OptmOFF_CloseTest_EmptyFile_BufferSizeWrite",
+                false, true, 0, TEST_BUFFER_SIZE, 1, false
+            },
+            {   "OptmOFF_CloseTest_NonEmptyFile_BufferSizeWrite",
+                false, true, 2 * TEST_BUFFER_SIZE, TEST_BUFFER_SIZE, 1, false
+            },
+            // Less than buffer size write tests
+            {   "OptmON_FlushCloseTest_EmptyFile_LessThanBufferSizeWrite",
+                true, false, 0, Math.abs(HALF_TEST_BUFFER_SIZE), 1, true
+            },
+            {   "OptmON_FlushCloseTest_NonEmptyFile_LessThanBufferSizeWrite",
+                true, false, 2 * TEST_BUFFER_SIZE,
+                Math.abs(HALF_TEST_BUFFER_SIZE), 1, true
+            },
+            {   "OptmON_CloseTest_EmptyFile_LessThanBufferSizeWrite",
+                true, true, 0, Math.abs(HALF_TEST_BUFFER_SIZE), 1, true
+            },
+            {   "OptmON_CloseTest_NonEmptyFile_LessThanBufferSizeWrite",
+                true, true, 2 * TEST_BUFFER_SIZE,
+                Math.abs(HALF_TEST_BUFFER_SIZE), 1, true
+            },
+            {   "OptmOFF_FlushCloseTest_EmptyFile_LessThanBufferSizeWrite",
+                false, false, 0, Math.abs(HALF_TEST_BUFFER_SIZE), 1, false
+            },
+            {   "OptmOFF_FlushCloseTest_NonEmptyFile_LessThanBufferSizeWrite",
+                false, false, 2 * TEST_BUFFER_SIZE,
+                Math.abs(HALF_TEST_BUFFER_SIZE), 1, false
+            },
+            {   "OptmOFF_CloseTest_EmptyFile_LessThanBufferSizeWrite",
+                false, true, 0, Math.abs(HALF_TEST_BUFFER_SIZE), 1, false
+            },
+            {   "OptmOFF_CloseTest_NonEmptyFile_LessThanBufferSizeWrite",
+                false, true, 2 * TEST_BUFFER_SIZE,
+                Math.abs(HALF_TEST_BUFFER_SIZE), 1, false
+            },
+            // Multiple small writes still less than buffer size
+            {   "OptmON_FlushCloseTest_EmptyFile_MultiSmallWritesStillLessThanBufferSize",
+                true, false, 0, Math.abs(QUARTER_TEST_BUFFER_SIZE), 3, true
+            },
+            {   "OptmON_FlushCloseTest_NonEmptyFile_MultiSmallWritesStillLessThanBufferSize",
+                true, false, 2 * TEST_BUFFER_SIZE,
+                Math.abs(QUARTER_TEST_BUFFER_SIZE), 3, true
+            },
+            {   "OptmON_CloseTest_EmptyFile_MultiSmallWritesStillLessThanBufferSize",
+                true, true, 0, Math.abs(QUARTER_TEST_BUFFER_SIZE), 3, true
+            },
+            {   "OptmON_CloseTest_NonEmptyFile_MultiSmallWritesStillLessThanBufferSize",
+                true, true, 2 * TEST_BUFFER_SIZE,
+                Math.abs(QUARTER_TEST_BUFFER_SIZE), 3, true
+            },
+            {   "OptmOFF_FlushCloseTest_EmptyFile_MultiSmallWritesStillLessThanBufferSize",
+                false, false, 0, Math.abs(QUARTER_TEST_BUFFER_SIZE), 3, false
+            },
+            {   "OptmOFF_FlushCloseTest_NonEmptyFile_MultiSmallWritesStillLessThanBufferSize",
+                false, false, 2 * TEST_BUFFER_SIZE,
+                Math.abs(QUARTER_TEST_BUFFER_SIZE), 3, false
+            },
+            {   "OptmOFF_CloseTest_EmptyFile_MultiSmallWritesStillLessThanBufferSize",
+                false, true, 0, Math.abs(QUARTER_TEST_BUFFER_SIZE), 3, false
+            },
+            {   "OptmOFF_CloseTest_NonEmptyFile_MultiSmallWritesStillLessThanBufferSize",
+                false, true, 2 * TEST_BUFFER_SIZE,
+                Math.abs(QUARTER_TEST_BUFFER_SIZE), 3, false
+            },
+            // Multiple full buffer writes
+            {   "OptmON_FlushCloseTest_EmptyFile_MultiBufferSizeWrite",
+                true, false, 0, TEST_BUFFER_SIZE, 3, false
+            },
+            {   "OptmON_FlushCloseTest_NonEmptyFile_MultiBufferSizeWrite",
+                true, false, 2 * TEST_BUFFER_SIZE, TEST_BUFFER_SIZE, 3, false
+            },
+            {   "OptmON_CloseTest_EmptyFile_MultiBufferSizeWrite",
+                true, true, 0, TEST_BUFFER_SIZE, 3, false
+            },
+            {   "OptmON_CloseTest_NonEmptyFile_MultiBufferSizeWrite",
+                true, true, 2 * TEST_BUFFER_SIZE, TEST_BUFFER_SIZE, 3, false
+            },
+            {   "OptmOFF_FlushCloseTest_EmptyFile_MultiBufferSizeWrite",
+                false, false, 0, TEST_BUFFER_SIZE, 3, false
+            },
+            {   "OptmOFF_FlushCloseTest_NonEmptyFile_MultiBufferSizeWrite",
+                false, false, 2 * TEST_BUFFER_SIZE, TEST_BUFFER_SIZE, 3, false
+            },
+            {   "OptmOFF_CloseTest_EmptyFile_MultiBufferSizeWrite",
+                false, true, 0, TEST_BUFFER_SIZE, 3, false
+            },
+            {   "OptmOFF_CloseTest_NonEmptyFile_MultiBufferSizeWrite",
+                false, true, 2 * TEST_BUFFER_SIZE, TEST_BUFFER_SIZE, 3, false
+            },
+            // Multiple full buffers triggered and data less than buffer size pending
+            {   "OptmON_FlushCloseTest_EmptyFile_BufferAndExtraWrite",
+                true, false, 0,
+                TEST_BUFFER_SIZE + Math.abs(QUARTER_TEST_BUFFER_SIZE),
+                3, false
+            },
+            {   "OptmON_FlushCloseTest_NonEmptyFile_BufferAndExtraWrite",
+                true, false, 2 * TEST_BUFFER_SIZE,
+                TEST_BUFFER_SIZE + Math.abs(QUARTER_TEST_BUFFER_SIZE),
+                3, false
+            },
+            {   "OptmON_CloseTest_EmptyFile__BufferAndExtraWrite",
+                true, true, 0,
+                TEST_BUFFER_SIZE + Math.abs(QUARTER_TEST_BUFFER_SIZE),
+                3, false
+            },
+            {   "OptmON_CloseTest_NonEmptyFile_BufferAndExtraWrite",
+                true, true, 2 * TEST_BUFFER_SIZE,
+                TEST_BUFFER_SIZE + Math.abs(QUARTER_TEST_BUFFER_SIZE),
+                3, false
+            },
+            {   "OptmOFF_FlushCloseTest_EmptyFile_BufferAndExtraWrite",
+                false, false, 0,
+                TEST_BUFFER_SIZE + Math.abs(QUARTER_TEST_BUFFER_SIZE),
+                3, false
+            },
+            {   "OptmOFF_FlushCloseTest_NonEmptyFile_BufferAndExtraWrite",
+                false, false, 2 * TEST_BUFFER_SIZE,
+                TEST_BUFFER_SIZE + Math.abs(QUARTER_TEST_BUFFER_SIZE),
+                3, false
+            },
+            {   "OptmOFF_CloseTest_EmptyFile_BufferAndExtraWrite",
+                false, true, 0,
+                TEST_BUFFER_SIZE + Math.abs(QUARTER_TEST_BUFFER_SIZE),
+                3, false
+            },
+            {   "OptmOFF_CloseTest_NonEmptyFile_BufferAndExtraWrite",
+                false, true, 2 * TEST_BUFFER_SIZE,
+                TEST_BUFFER_SIZE + Math.abs(QUARTER_TEST_BUFFER_SIZE),
+                3, false
+            },
+            // 0 byte tests
+            {   "OptmON_FlushCloseTest_EmptyFile_0ByteWrite",
+                true, false, 0, 0, 1, false
+            },
+            {   "OptmON_FlushCloseTest_NonEmptyFile_0ByteWrite",
+                true, false, 2 * TEST_BUFFER_SIZE, 0, 1, false
+            },
+            {   "OptmON_CloseTest_EmptyFile_0ByteWrite",
+                true, true, 0, 0, 1, false
+            },
+            {   "OptmON_CloseTest_NonEmptyFile_0ByteWrite",
+                true, true, 2 * TEST_BUFFER_SIZE, 0, 1, false
+            },
+            {   "OptmOFF_FlushCloseTest_EmptyFile_0ByteWrite",
+                false, false, 0, 0, 1, false
+            },
+            {   "OptmOFF_FlushCloseTest_NonEmptyFile_0ByteWrite",
+                false, false, 2 * TEST_BUFFER_SIZE, 0, 1, false
+            },
+            {   "OptmOFF_CloseTest_EmptyFile_0ByteWrite",
+                false, true, 0, 0, 1, false
+            },
+            {   "OptmOFF_CloseTest_NonEmptyFile_0ByteWrite",
+                false, true, 2 * TEST_BUFFER_SIZE, 0, 1, false
+            },
+        });
+  }
+  public ITestSmallWriteOptimization() throws Exception {
+    super();
+  }
+
+  @Test
+  public void testSmallWriteOptimization()
+      throws IOException {
+    boolean serviceDefaultOptmSettings = DEFAULT_AZURE_ENABLE_SMALL_WRITE_OPTIMIZATION;
+    // Tests with Optimization should only run if service has the feature on by
+    // default. Default settings will be turned on when server support is
+    // available on all store prod regions.
+    if (enableSmallWriteOptimization) {
+      Assume.assumeTrue(serviceDefaultOptmSettings);
+    }
+
+    final AzureBlobFileSystem currentfs = this.getFileSystem();
+    Configuration config = currentfs.getConf();
+    boolean isAppendBlobTestSettingEnabled = (config.get(FS_AZURE_TEST_APPENDBLOB_ENABLED) == "true");
+
+    // This optimization doesnt take effect when append blob is on.
+    Assume.assumeFalse(isAppendBlobTestSettingEnabled);
+
+    config.set(ConfigurationKeys.AZURE_WRITE_BUFFER_SIZE, Integer.toString(TEST_BUFFER_SIZE));
+    config.set(ConfigurationKeys.AZURE_ENABLE_SMALL_WRITE_OPTIMIZATION, Boolean.toString(enableSmallWriteOptimization));
+    final AzureBlobFileSystem fs = (AzureBlobFileSystem) FileSystem.get(
+        currentfs.getUri(), config);
+
+    formulateSmallWriteTestAppendPattern(fs, startingFileSize,
+        recurringClientWriteSize, numOfClientWrites,
+        directCloseTest, flushExpectedToBeMergedWithAppend);
+  }
+
+  /**
+   * if isDirectCloseTest == true, append + close is triggered
+   * if isDirectCloseTest == false, append + flush runs are repeated over
+   * iterations followed by close
+   * @param fs
+   * @param startingFileSize
+   * @param recurringWriteSize
+   * @param numOfWrites
+   * @param isDirectCloseTest
+   * @throws IOException
+   */
+  private void formulateSmallWriteTestAppendPattern(final AzureBlobFileSystem fs,
+      int startingFileSize,
+      int recurringWriteSize,
+      int numOfWrites,
+      boolean isDirectCloseTest,
+      boolean flushExpectedToBeMergedWithAppend) throws IOException {
+
+    int totalDataToBeAppended = 0;
+    int testIteration = 0;
+    int dataWrittenPerIteration = (numOfWrites * recurringWriteSize);
+
+    if (isDirectCloseTest) {
+      totalDataToBeAppended = dataWrittenPerIteration;
+      testIteration = 1;
+    } else {
+      testIteration = TEST_FLUSH_ITERATION;
+      totalDataToBeAppended = testIteration * dataWrittenPerIteration;
+    }
+
+    int totalFileSize = totalDataToBeAppended + startingFileSize;
+    // write buffer of file size created. This will be used as write
+    // source and for file content validation
+    final byte[] writeBuffer = new byte[totalFileSize];
+    new Random().nextBytes(writeBuffer);
+    int writeBufferCursor = 0;
+
+    Path testPath = new Path(getMethodName() + UUID.randomUUID().toString());
+    FSDataOutputStream opStream;
+
+    if (startingFileSize > 0) {
+      writeBufferCursor += createFileWithStartingTestSize(fs, writeBuffer, writeBufferCursor, testPath,
+          startingFileSize);
+      opStream = fs.append(testPath);
+    } else {
+      opStream = fs.create(testPath);
+    }
+
+    final int writeBufferSize = fs.getAbfsStore()
+        .getAbfsConfiguration()
+        .getWriteBufferSize();
+    long expectedTotalRequestsMade = fs.getInstrumentationMap()
+        .get(CONNECTIONS_MADE.getStatName());
+    long expectedRequestsMadeWithData = fs.getInstrumentationMap()
+        .get(SEND_REQUESTS.getStatName());
+    long expectedBytesSent = fs.getInstrumentationMap()
+        .get(BYTES_SENT.getStatName());
+
+    while (testIteration > 0) {
+      // trigger recurringWriteSize appends over numOfWrites
+      writeBufferCursor += executeWritePattern(opStream, writeBuffer,
+          writeBufferCursor, numOfWrites, recurringWriteSize);
+
+      int numOfBuffersWrittenToStore = (int) Math.floor(
+          dataWrittenPerIteration / writeBufferSize);
+      int dataSizeWrittenToStore = numOfBuffersWrittenToStore * writeBufferSize;
+      int pendingDataToStore = dataWrittenPerIteration - dataSizeWrittenToStore;
+
+      expectedTotalRequestsMade += numOfBuffersWrittenToStore;
+      expectedRequestsMadeWithData += numOfBuffersWrittenToStore;
+      expectedBytesSent += dataSizeWrittenToStore;
+
+      if (isDirectCloseTest) {
+        opStream.close();
+      } else {
+        opStream.hflush();
+      }
+
+      boolean wasDataPendingToBeWrittenToServer = (pendingDataToStore > 0);
+      // Small write optimization will only work if
+      // a. config for small write optimization is on
+      // b. no buffer writes have been triggered since last flush
+      // c. there is some pending data in buffer to write to store
+      final boolean smallWriteOptimizationEnabled = fs.getAbfsStore()
+          .getAbfsConfiguration()
+          .isSmallWriteOptimizationEnabled();
+      boolean flushWillBeMergedWithAppend = smallWriteOptimizationEnabled
+          && (numOfBuffersWrittenToStore == 0)
+          && (wasDataPendingToBeWrittenToServer);
+
+      Assertions.assertThat(flushWillBeMergedWithAppend)
+          .describedAs(flushExpectedToBeMergedWithAppend
+              ? "Flush was to be merged with Append"
+              : "Flush should not have been merged with Append")
+          .isEqualTo(flushExpectedToBeMergedWithAppend);
+
+      int totalAppendFlushCalls = (flushWillBeMergedWithAppend
+          ? 1 // 1 append (with flush and close param)
+          : (wasDataPendingToBeWrittenToServer)
+              ? 2 // 1 append + 1 flush (with close)
+              : 1); // 1 flush (with close)
+
+      expectedTotalRequestsMade += totalAppendFlushCalls;
+      expectedRequestsMadeWithData += totalAppendFlushCalls;
+      expectedBytesSent += wasDataPendingToBeWrittenToServer
+          ? pendingDataToStore
+          : 0;
+
+      assertOpStats(fs.getInstrumentationMap(), expectedTotalRequestsMade,
+          expectedRequestsMadeWithData, expectedBytesSent);
+
+      if (isDirectCloseTest) {
+        // stream already closed
+        validateStoreAppends(fs, testPath, totalFileSize, writeBuffer);
+        return;
+      }
+
+      testIteration--;
+    }
+
+    opStream.close();
+    expectedTotalRequestsMade += 1;
+    expectedRequestsMadeWithData += 1;
+    // no change in expectedBytesSent
+    assertOpStats(fs.getInstrumentationMap(), expectedTotalRequestsMade, expectedRequestsMadeWithData, expectedBytesSent);
+
+    validateStoreAppends(fs, testPath, totalFileSize, writeBuffer);
+  }
+
+  private int createFileWithStartingTestSize(AzureBlobFileSystem fs, byte[] writeBuffer,
+      int writeBufferCursor, Path testPath, int startingFileSize)
+      throws IOException {
+    FSDataOutputStream opStream = fs.create(testPath);
+    writeBufferCursor += executeWritePattern(opStream,
+        writeBuffer,
+        writeBufferCursor,
+        1,
+        startingFileSize);
+
+    opStream.close();
+    Assertions.assertThat(fs.getFileStatus(testPath).getLen())
+        .describedAs("File should be of size %d at the start of test.",
+            startingFileSize)
+        .isEqualTo(startingFileSize);
+
+    return writeBufferCursor;
+  }
+
+  private void validateStoreAppends(AzureBlobFileSystem fs,
+      Path testPath,
+      int totalFileSize,
+      byte[] bufferWritten)
+      throws IOException {
+    // Final validation
+    Assertions.assertThat(fs.getFileStatus(testPath).getLen())
+        .describedAs("File should be of size %d at the end of test.",
+            totalFileSize)
+        .isEqualTo(totalFileSize);
+
+    byte[] fileReadFromStore = new byte[totalFileSize];
+    fs.open(testPath).read(fileReadFromStore, 0, totalFileSize);
+
+    assertArrayEquals("Test file content incorrect", bufferWritten,
+        fileReadFromStore);
+  }
+
+  private void assertOpStats(Map<String, Long> metricMap,
+      long expectedTotalRequestsMade,
+      long expectedRequestsMadeWithData,
+      long expectedBytesSent) {
+    assertAbfsStatistics(CONNECTIONS_MADE, expectedTotalRequestsMade,
+        metricMap);
+    assertAbfsStatistics(SEND_REQUESTS, expectedRequestsMadeWithData,
+        metricMap);
+    assertAbfsStatistics(BYTES_SENT, expectedBytesSent, metricMap);
+  }
+
+  private int executeWritePattern(FSDataOutputStream opStream,
+      byte[] buffer,
+      int startOffset,
+      int writeLoopCount,
+      int writeSize)
+      throws IOException {
+    int dataSizeWritten = startOffset;
+
+    while (writeLoopCount > 0) {
+      opStream.write(buffer, startOffset, writeSize);
+      startOffset += writeSize;
+      writeLoopCount--;
+    }
+
+    dataSizeWritten = startOffset - dataSizeWritten;
+    return dataSizeWritten;
+  }
+}
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsOutputStream.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsOutputStream.java
index 7f91116..fff0051 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsOutputStream.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsOutputStream.java
@@ -44,10 +44,16 @@ public class ITestAbfsOutputStream extends AbstractAbfsIntegrationTest {
     final AzureBlobFileSystem fs = getFileSystem(conf);
     try (FSDataOutputStream out = fs.create(TEST_FILE_PATH)) {
     AbfsOutputStream stream = (AbfsOutputStream) out.getWrappedStream();
+
+      int maxConcurrentRequests
+          = getConfiguration().getWriteMaxConcurrentRequestCount();
+      if (stream.isAppendBlobStream()) {
+        maxConcurrentRequests = 1;
+      }
+
     Assertions.assertThat(stream.getMaxConcurrentRequestCount()).describedAs(
-        "maxConcurrentRequests should be " + getConfiguration()
-            .getWriteMaxConcurrentRequestCount())
-        .isEqualTo(getConfiguration().getWriteMaxConcurrentRequestCount());
+        "maxConcurrentRequests should be " + maxConcurrentRequests)
+        .isEqualTo(maxConcurrentRequests);
     Assertions.assertThat(stream.getMaxRequestsThatCanBeQueued()).describedAs(
         "maxRequestsToQueue should be " + getConfiguration()
             .getMaxWriteRequestsToQueue())
@@ -67,6 +73,11 @@ public class ITestAbfsOutputStream extends AbstractAbfsIntegrationTest {
     final AzureBlobFileSystem fs = getFileSystem(conf);
     FSDataOutputStream out = fs.create(TEST_FILE_PATH);
     AbfsOutputStream stream = (AbfsOutputStream) out.getWrappedStream();
+
+    if (stream.isAppendBlobStream()) {
+      maxConcurrentRequests = 1;
+    }
+
     Assertions.assertThat(stream.getMaxConcurrentRequestCount())
         .describedAs("maxConcurrentRequests should be " + maxConcurrentRequests)
         .isEqualTo(maxConcurrentRequests);
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsOutputStream.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsOutputStream.java
index aab0248..1e6b8ef 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsOutputStream.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsOutputStream.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.fs.azurebfs.services;
 
 import java.io.IOException;
 import java.util.Arrays;
-import java.util.HashSet;
 import java.util.Random;
 
 import org.junit.Test;
@@ -28,19 +27,22 @@ import org.junit.Test;
 import org.mockito.ArgumentCaptor;
 
 import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
+import org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters;
 import org.apache.hadoop.conf.Configuration;
 
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.ArgumentMatchers.refEq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.anyInt;
 import static org.mockito.Mockito.anyBoolean;
 import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.anyString;
 import static org.mockito.Mockito.anyLong;
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters.Mode.APPEND_MODE;
 
 public final class TestAbfsOutputStream {
 
@@ -83,22 +85,15 @@ public final class TestAbfsOutputStream {
     abfsConf = new AbfsConfiguration(conf, accountName1);
     AbfsPerfTracker tracker = new AbfsPerfTracker("test", accountName1, abfsConf);
     when(client.getAbfsPerfTracker()).thenReturn(tracker);
-    when(client.append(anyString(), anyLong(), any(byte[].class), anyInt(), anyInt(), any(), anyBoolean())).thenReturn(op);
+    when(client.append(anyString(), any(byte[].class), any(AppendRequestParameters.class), any())).thenReturn(op);
     when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), any())).thenReturn(op);
 
-    AbfsOutputStream out = new AbfsOutputStream(client, null, PATH, 0, populateAbfsOutputStreamContext(BUFFER_SIZE, true, false, false));
+    AbfsOutputStream out = new AbfsOutputStream(client, null, PATH, 0,
+        populateAbfsOutputStreamContext(BUFFER_SIZE, true, false, false));
     final byte[] b = new byte[WRITE_SIZE];
     new Random().nextBytes(b);
     out.write(b);
     out.hsync();
-    ArgumentCaptor<String> acString = ArgumentCaptor.forClass(String.class);
-    ArgumentCaptor<Long> acLong = ArgumentCaptor.forClass(Long.class);
-    ArgumentCaptor<Integer> acBufferOffset = ArgumentCaptor.forClass(Integer.class);
-    ArgumentCaptor<Integer> acBufferLength = ArgumentCaptor.forClass(Integer.class);
-    ArgumentCaptor<byte[]> acByteArray = ArgumentCaptor.forClass(byte[].class);
-    ArgumentCaptor<Boolean> acAppendBlobAppend = ArgumentCaptor.forClass(Boolean.class);
-    ArgumentCaptor<String> acSASToken = ArgumentCaptor.forClass(String.class);
-
 
     final byte[] b1 = new byte[2*WRITE_SIZE];
     new Random().nextBytes(b1);
@@ -108,13 +103,18 @@ public final class TestAbfsOutputStream {
 
     out.hsync();
 
-    verify(client, times(2)).append(acString.capture(), acLong.capture(), acByteArray.capture(), acBufferOffset.capture(), acBufferLength.capture(),
-                                    acSASToken.capture(), acAppendBlobAppend.capture());
-    assertThat(Arrays.asList(PATH, PATH)).describedAs("Path of the requests").isEqualTo(acString.getAllValues());
-    assertThat(Arrays.asList(Long.valueOf(0), Long.valueOf(WRITE_SIZE))).describedAs("Write Position").isEqualTo(acLong.getAllValues());
-    assertThat(Arrays.asList(0, 0)).describedAs("Buffer Offset").isEqualTo(acBufferOffset.getAllValues());
-    assertThat(Arrays.asList(WRITE_SIZE, 2*WRITE_SIZE)).describedAs("Buffer length").isEqualTo(acBufferLength.getAllValues());
-
+    AppendRequestParameters firstReqParameters = new AppendRequestParameters(
+        0, 0, WRITE_SIZE, APPEND_MODE, false);
+    AppendRequestParameters secondReqParameters = new AppendRequestParameters(
+        WRITE_SIZE, 0, 2 * WRITE_SIZE, APPEND_MODE, false);
+
+    verify(client, times(1)).append(
+        eq(PATH), any(byte[].class), refEq(firstReqParameters), any());
+    verify(client, times(1)).append(
+        eq(PATH), any(byte[].class), refEq(secondReqParameters), any());
+    // confirm there were only 2 invocations in all
+    verify(client, times(2)).append(
+        eq(PATH), any(byte[].class), any(), any());
   }
 
   /**
@@ -132,10 +132,11 @@ public final class TestAbfsOutputStream {
     AbfsPerfTracker tracker = new AbfsPerfTracker("test", accountName1, abfsConf);
 
     when(client.getAbfsPerfTracker()).thenReturn(tracker);
-    when(client.append(anyString(), anyLong(), any(byte[].class), anyInt(), anyInt(), any(), anyBoolean())).thenReturn(op);
+    when(client.append(anyString(), any(byte[].class), any(AppendRequestParameters.class), any())).thenReturn(op);
     when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), any())).thenReturn(op);
 
-    AbfsOutputStream out = new AbfsOutputStream(client, null, PATH, 0, populateAbfsOutputStreamContext(BUFFER_SIZE, true, false, false));
+    AbfsOutputStream out = new AbfsOutputStream(client, null, PATH, 0,
+        populateAbfsOutputStreamContext(BUFFER_SIZE, true, false, false));
     final byte[] b = new byte[WRITE_SIZE];
     new Random().nextBytes(b);
 
@@ -144,33 +145,29 @@ public final class TestAbfsOutputStream {
     }
     out.close();
 
-    ArgumentCaptor<String> acString = ArgumentCaptor.forClass(String.class);
-    ArgumentCaptor<Long> acLong = ArgumentCaptor.forClass(Long.class);
-    ArgumentCaptor<Integer> acBufferOffset = ArgumentCaptor.forClass(Integer.class);
-    ArgumentCaptor<Integer> acBufferLength = ArgumentCaptor.forClass(Integer.class);
-    ArgumentCaptor<byte[]> acByteArray = ArgumentCaptor.forClass(byte[].class);
-    ArgumentCaptor<Boolean> acAppendBlobAppend = ArgumentCaptor.forClass(Boolean.class);
-    ArgumentCaptor<String> acSASToken = ArgumentCaptor.forClass(String.class);
-
-    verify(client, times(2)).append(acString.capture(), acLong.capture(), acByteArray.capture(), acBufferOffset.capture(), acBufferLength.capture(),
-                                    acSASToken.capture(), acAppendBlobAppend.capture());
-    assertThat(Arrays.asList(PATH, PATH)).describedAs("Path").isEqualTo(acString.getAllValues());
-    assertThat(new HashSet<Long>(Arrays.asList(Long.valueOf(0), Long.valueOf(BUFFER_SIZE)))).describedAs("Position").isEqualTo(new HashSet<Long>(
-               acLong.getAllValues()));
-    assertThat(Arrays.asList(0, 0)).describedAs("Buffer Offset").isEqualTo(acBufferOffset.getAllValues());
-    assertThat(new HashSet<Integer>(Arrays.asList(BUFFER_SIZE, 5*WRITE_SIZE-BUFFER_SIZE))).describedAs("Buffer Length").isEqualTo(new HashSet<Integer>(
-               acBufferLength.getAllValues()));
-
-    ArgumentCaptor<String> acFlushString = ArgumentCaptor.forClass(String.class);
-    ArgumentCaptor<Long> acFlushLong = ArgumentCaptor.forClass(Long.class);
+    AppendRequestParameters firstReqParameters = new AppendRequestParameters(
+        0, 0, BUFFER_SIZE, APPEND_MODE, false);
+    AppendRequestParameters secondReqParameters = new AppendRequestParameters(
+        BUFFER_SIZE, 0, 5*WRITE_SIZE-BUFFER_SIZE, APPEND_MODE, false);
+
+    verify(client, times(1)).append(
+        eq(PATH), any(byte[].class), refEq(firstReqParameters), any());
+    verify(client, times(1)).append(
+        eq(PATH), any(byte[].class), refEq(secondReqParameters), any());
+    // confirm there were only 2 invocations in all
+    verify(client, times(2)).append(
+        eq(PATH), any(byte[].class), any(), any());
+
+    ArgumentCaptor<String> acFlushPath = ArgumentCaptor.forClass(String.class);
+    ArgumentCaptor<Long> acFlushPosition = ArgumentCaptor.forClass(Long.class);
     ArgumentCaptor<Boolean> acFlushRetainUnCommittedData = ArgumentCaptor.forClass(Boolean.class);
     ArgumentCaptor<Boolean> acFlushClose = ArgumentCaptor.forClass(Boolean.class);
     ArgumentCaptor<String> acFlushSASToken = ArgumentCaptor.forClass(String.class);
 
-    verify(client, times(1)).flush(acFlushString.capture(), acFlushLong.capture(), acFlushRetainUnCommittedData.capture(), acFlushClose.capture(),
-                                   acFlushSASToken.capture());
-    assertThat(Arrays.asList(PATH)).describedAs("path").isEqualTo(acFlushString.getAllValues());
-    assertThat(Arrays.asList(Long.valueOf(5*WRITE_SIZE))).describedAs("position").isEqualTo(acFlushLong.getAllValues());
+    verify(client, times(1)).flush(acFlushPath.capture(), acFlushPosition.capture(), acFlushRetainUnCommittedData.capture(), acFlushClose.capture(),
+        acFlushSASToken.capture());
+    assertThat(Arrays.asList(PATH)).describedAs("path").isEqualTo(acFlushPath.getAllValues());
+    assertThat(Arrays.asList(Long.valueOf(5*WRITE_SIZE))).describedAs("position").isEqualTo(acFlushPosition.getAllValues());
     assertThat(Arrays.asList(false)).describedAs("RetainUnCommittedData flag").isEqualTo(acFlushRetainUnCommittedData.getAllValues());
     assertThat(Arrays.asList(true)).describedAs("Close flag").isEqualTo(acFlushClose.getAllValues());
   }
@@ -191,12 +188,13 @@ public final class TestAbfsOutputStream {
     AbfsPerfTracker tracker = new AbfsPerfTracker("test", accountName1, abfsConf);
 
     when(client.getAbfsPerfTracker()).thenReturn(tracker);
-    when(client.append(anyString(), anyLong(), any(byte[].class), anyInt(), anyInt(), any(), anyBoolean())).thenReturn(op);
+    when(client.append(anyString(), any(byte[].class), any(AppendRequestParameters.class), any())).thenReturn(op);
     when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), any())).thenReturn(op);
     when(op.getSasToken()).thenReturn("testToken");
     when(op.getResult()).thenReturn(httpOp);
 
-    AbfsOutputStream out = new AbfsOutputStream(client, null, PATH, 0, populateAbfsOutputStreamContext(BUFFER_SIZE, true, false, false));
+    AbfsOutputStream out = new AbfsOutputStream(client, null, PATH, 0,
+        populateAbfsOutputStreamContext(BUFFER_SIZE, true, false, false));
     final byte[] b = new byte[BUFFER_SIZE];
     new Random().nextBytes(b);
 
@@ -205,35 +203,31 @@ public final class TestAbfsOutputStream {
     }
     out.close();
 
-    ArgumentCaptor<String> acString = ArgumentCaptor.forClass(String.class);
-    ArgumentCaptor<Long> acLong = ArgumentCaptor.forClass(Long.class);
-    ArgumentCaptor<Integer> acBufferOffset = ArgumentCaptor.forClass(Integer.class);
-    ArgumentCaptor<Integer> acBufferLength = ArgumentCaptor.forClass(Integer.class);
-    ArgumentCaptor<byte[]> acByteArray = ArgumentCaptor.forClass(byte[].class);
-    ArgumentCaptor<Boolean> acAppendBlobAppend = ArgumentCaptor.forClass(Boolean.class);
-    ArgumentCaptor<String> acSASToken = ArgumentCaptor.forClass(String.class);
-
-    verify(client, times(2)).append(acString.capture(), acLong.capture(), acByteArray.capture(), acBufferOffset.capture(), acBufferLength.capture(),
-                                    acSASToken.capture(), acAppendBlobAppend.capture());
-    assertThat(Arrays.asList(PATH, PATH)).describedAs("path").isEqualTo(acString.getAllValues());
-    assertThat(new HashSet<Long>(Arrays.asList(Long.valueOf(0), Long.valueOf(BUFFER_SIZE)))).describedAs("Position").isEqualTo(new HashSet<Long>(
-               acLong.getAllValues()));
-    assertThat(Arrays.asList(0, 0)).describedAs("Buffer Offset").isEqualTo(acBufferOffset.getAllValues());
-    assertThat(Arrays.asList(BUFFER_SIZE, BUFFER_SIZE)).describedAs("Buffer Length").isEqualTo(acBufferLength.getAllValues());
-
-    ArgumentCaptor<String> acFlushString = ArgumentCaptor.forClass(String.class);
-    ArgumentCaptor<Long> acFlushLong = ArgumentCaptor.forClass(Long.class);
+    AppendRequestParameters firstReqParameters = new AppendRequestParameters(
+        0, 0, BUFFER_SIZE, APPEND_MODE, false);
+    AppendRequestParameters secondReqParameters = new AppendRequestParameters(
+        BUFFER_SIZE, 0, BUFFER_SIZE, APPEND_MODE, false);
+
+    verify(client, times(1)).append(
+        eq(PATH), any(byte[].class), refEq(firstReqParameters), any());
+    verify(client, times(1)).append(
+        eq(PATH), any(byte[].class), refEq(secondReqParameters), any());
+    // confirm there were only 2 invocations in all
+    verify(client, times(2)).append(
+        eq(PATH), any(byte[].class), any(), any());
+
+    ArgumentCaptor<String> acFlushPath = ArgumentCaptor.forClass(String.class);
+    ArgumentCaptor<Long> acFlushPosition = ArgumentCaptor.forClass(Long.class);
     ArgumentCaptor<Boolean> acFlushRetainUnCommittedData = ArgumentCaptor.forClass(Boolean.class);
     ArgumentCaptor<Boolean> acFlushClose = ArgumentCaptor.forClass(Boolean.class);
     ArgumentCaptor<String> acFlushSASToken = ArgumentCaptor.forClass(String.class);
 
-    verify(client, times(1)).flush(acFlushString.capture(), acFlushLong.capture(), acFlushRetainUnCommittedData.capture(), acFlushClose.capture(),
-                                   acFlushSASToken.capture());
-    assertThat(Arrays.asList(PATH)).describedAs("path").isEqualTo(acFlushString.getAllValues());
-    assertThat(Arrays.asList(Long.valueOf(2*BUFFER_SIZE))).describedAs("position").isEqualTo(acFlushLong.getAllValues());
+    verify(client, times(1)).flush(acFlushPath.capture(), acFlushPosition.capture(), acFlushRetainUnCommittedData.capture(), acFlushClose.capture(),
+        acFlushSASToken.capture());
+    assertThat(Arrays.asList(PATH)).describedAs("path").isEqualTo(acFlushPath.getAllValues());
+    assertThat(Arrays.asList(Long.valueOf(2*BUFFER_SIZE))).describedAs("position").isEqualTo(acFlushPosition.getAllValues());
     assertThat(Arrays.asList(false)).describedAs("RetainUnCommittedData flag").isEqualTo(acFlushRetainUnCommittedData.getAllValues());
     assertThat(Arrays.asList(true)).describedAs("Close flag").isEqualTo(acFlushClose.getAllValues());
-
   }
 
   /**
@@ -252,12 +246,13 @@ public final class TestAbfsOutputStream {
     AbfsPerfTracker tracker = new AbfsPerfTracker("test", accountName1, abfsConf);
 
     when(client.getAbfsPerfTracker()).thenReturn(tracker);
-    when(client.append(anyString(), anyLong(), any(byte[].class), anyInt(), anyInt(), any(), anyBoolean())).thenReturn(op);
+    when(client.append(anyString(), any(byte[].class), any(AppendRequestParameters.class), any())).thenReturn(op);
     when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), any())).thenReturn(op);
     when(op.getSasToken()).thenReturn("testToken");
     when(op.getResult()).thenReturn(httpOp);
 
-    AbfsOutputStream out = new AbfsOutputStream(client, null, PATH, 0, populateAbfsOutputStreamContext(BUFFER_SIZE, true, false, false));
+    AbfsOutputStream out = new AbfsOutputStream(client, null, PATH, 0,
+        populateAbfsOutputStreamContext(BUFFER_SIZE, true, false, false));
     final byte[] b = new byte[BUFFER_SIZE];
     new Random().nextBytes(b);
 
@@ -266,22 +261,18 @@ public final class TestAbfsOutputStream {
     }
     Thread.sleep(1000);
 
-    ArgumentCaptor<String> acString = ArgumentCaptor.forClass(String.class);
-    ArgumentCaptor<Long> acLong = ArgumentCaptor.forClass(Long.class);
-    ArgumentCaptor<Integer> acBufferOffset = ArgumentCaptor.forClass(Integer.class);
-    ArgumentCaptor<Integer> acBufferLength = ArgumentCaptor.forClass(Integer.class);
-    ArgumentCaptor<byte[]> acByteArray = ArgumentCaptor.forClass(byte[].class);
-    ArgumentCaptor<Boolean> acAppendBlobAppend = ArgumentCaptor.forClass(Boolean.class);
-    ArgumentCaptor<String> acSASToken = ArgumentCaptor.forClass(String.class);
-
-    verify(client, times(2)).append(acString.capture(), acLong.capture(), acByteArray.capture(), acBufferOffset.capture(), acBufferLength.capture(),
-                                    acSASToken.capture(), acAppendBlobAppend.capture());
-    assertThat(Arrays.asList(PATH, PATH)).describedAs("File Path").isEqualTo(acString.getAllValues());
-    assertThat(new HashSet<Long>(Arrays.asList(Long.valueOf(0), Long.valueOf(BUFFER_SIZE)))).describedAs("Position in file").isEqualTo(
-               new HashSet<Long>(acLong.getAllValues()));
-    assertThat(Arrays.asList(0, 0)).describedAs("buffer offset").isEqualTo(acBufferOffset.getAllValues());
-    assertThat(Arrays.asList(BUFFER_SIZE, BUFFER_SIZE)).describedAs("buffer length").isEqualTo(acBufferLength.getAllValues());
-
+    AppendRequestParameters firstReqParameters = new AppendRequestParameters(
+        0, 0, BUFFER_SIZE, APPEND_MODE, false);
+    AppendRequestParameters secondReqParameters = new AppendRequestParameters(
+        BUFFER_SIZE, 0, BUFFER_SIZE, APPEND_MODE, false);
+
+    verify(client, times(1)).append(
+        eq(PATH), any(byte[].class), refEq(firstReqParameters), any());
+    verify(client, times(1)).append(
+        eq(PATH), any(byte[].class), refEq(secondReqParameters), any());
+    // confirm there were only 2 invocations in all
+    verify(client, times(2)).append(
+        eq(PATH), any(byte[].class), any(), any());
   }
 
   /**
@@ -299,10 +290,11 @@ public final class TestAbfsOutputStream {
     AbfsPerfTracker tracker = new AbfsPerfTracker("test", accountName1, abfsConf);
 
     when(client.getAbfsPerfTracker()).thenReturn(tracker);
-    when(client.append(anyString(), anyLong(), any(byte[].class), anyInt(), anyInt(), any(), anyBoolean())).thenReturn(op);
+    when(client.append(anyString(), any(byte[].class), any(AppendRequestParameters.class), any())).thenReturn(op);
     when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), any())).thenReturn(op);
 
-    AbfsOutputStream out = new AbfsOutputStream(client, null, PATH, 0, populateAbfsOutputStreamContext(BUFFER_SIZE, true, false, true));
+    AbfsOutputStream out = new AbfsOutputStream(client, null, PATH, 0,
+        populateAbfsOutputStreamContext(BUFFER_SIZE, true, false, true));
     final byte[] b = new byte[BUFFER_SIZE];
     new Random().nextBytes(b);
 
@@ -311,22 +303,18 @@ public final class TestAbfsOutputStream {
     }
     Thread.sleep(1000);
 
-    ArgumentCaptor<String> acString = ArgumentCaptor.forClass(String.class);
-    ArgumentCaptor<Long> acLong = ArgumentCaptor.forClass(Long.class);
-    ArgumentCaptor<Integer> acBufferOffset = ArgumentCaptor.forClass(Integer.class);
-    ArgumentCaptor<Integer> acBufferLength = ArgumentCaptor.forClass(Integer.class);
-    ArgumentCaptor<byte[]> acByteArray = ArgumentCaptor.forClass(byte[].class);
-    ArgumentCaptor<Boolean> acAppendBlobAppend = ArgumentCaptor.forClass(Boolean.class);
-    ArgumentCaptor<String> acSASToken = ArgumentCaptor.forClass(String.class);
-
-    verify(client, times(2)).append(acString.capture(), acLong.capture(), acByteArray.capture(), acBufferOffset.capture(), acBufferLength.capture(),
-                                    acSASToken.capture(), acAppendBlobAppend.capture());
-    assertThat(Arrays.asList(PATH, PATH)).describedAs("File Path").isEqualTo(acString.getAllValues());
-    assertThat(Arrays.asList(Long.valueOf(0), Long.valueOf(BUFFER_SIZE))).describedAs("File Position").isEqualTo(acLong.getAllValues());
-    assertThat(Arrays.asList(0, 0)).describedAs("Buffer Offset").isEqualTo(acBufferOffset.getAllValues());
-    assertThat(Arrays.asList(BUFFER_SIZE, BUFFER_SIZE)).describedAs("Buffer Length").isEqualTo(acBufferLength.getAllValues());
-    assertThat(Arrays.asList(true, true)).describedAs("is AppendBlob Append").isEqualTo(acAppendBlobAppend.getAllValues());
-
+    AppendRequestParameters firstReqParameters = new AppendRequestParameters(
+        0, 0, BUFFER_SIZE, APPEND_MODE, true);
+    AppendRequestParameters secondReqParameters = new AppendRequestParameters(
+        BUFFER_SIZE, 0, BUFFER_SIZE, APPEND_MODE, true);
+
+    verify(client, times(1)).append(
+        eq(PATH), any(byte[].class), refEq(firstReqParameters), any());
+    verify(client, times(1)).append(
+        eq(PATH), any(byte[].class), refEq(secondReqParameters), any());
+    // confirm there were only 2 invocations in all
+    verify(client, times(2)).append(
+        eq(PATH), any(byte[].class), any(), any());
   }
 
   /**
@@ -337,6 +325,7 @@ public final class TestAbfsOutputStream {
 
     AbfsClient client = mock(AbfsClient.class);
     AbfsRestOperation op = mock(AbfsRestOperation.class);
+    when(op.getSasToken()).thenReturn("");
     AbfsConfiguration abfsConf;
     final Configuration conf = new Configuration();
     conf.set(accountKey1, accountValue1);
@@ -344,10 +333,11 @@ public final class TestAbfsOutputStream {
     AbfsPerfTracker tracker = new AbfsPerfTracker("test", accountName1, abfsConf);
 
     when(client.getAbfsPerfTracker()).thenReturn(tracker);
-    when(client.append(anyString(), anyLong(), any(byte[].class), anyInt(), anyInt(), any(), anyBoolean())).thenReturn(op);
+    when(client.append(anyString(), any(byte[].class), any(AppendRequestParameters.class), any())).thenReturn(op);
     when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), any())).thenReturn(op);
 
-    AbfsOutputStream out = new AbfsOutputStream(client, null, PATH, 0, populateAbfsOutputStreamContext(BUFFER_SIZE, true, false, false));
+    AbfsOutputStream out = new AbfsOutputStream(client, null, PATH, 0,
+        populateAbfsOutputStreamContext(BUFFER_SIZE, true, false, false));
     final byte[] b = new byte[BUFFER_SIZE];
     new Random().nextBytes(b);
 
@@ -356,35 +346,31 @@ public final class TestAbfsOutputStream {
     }
     out.hflush();
 
-    ArgumentCaptor<String> acString = ArgumentCaptor.forClass(String.class);
-    ArgumentCaptor<Long> acLong = ArgumentCaptor.forClass(Long.class);
-    ArgumentCaptor<Integer> acBufferOffset = ArgumentCaptor.forClass(Integer.class);
-    ArgumentCaptor<Integer> acBufferLength = ArgumentCaptor.forClass(Integer.class);
-    ArgumentCaptor<byte[]> acByteArray = ArgumentCaptor.forClass(byte[].class);
-    ArgumentCaptor<Boolean> acAppendBlobAppend = ArgumentCaptor.forClass(Boolean.class);
-    ArgumentCaptor<String> acSASToken = ArgumentCaptor.forClass(String.class);
-
-    verify(client, times(2)).append(acString.capture(), acLong.capture(), acByteArray.capture(), acBufferOffset.capture(), acBufferLength.capture(),
-                                    acSASToken.capture(), acAppendBlobAppend.capture());
-    assertThat(Arrays.asList(PATH, PATH)).describedAs("File Path").isEqualTo(acString.getAllValues());
-    assertThat(new HashSet<Long>(Arrays.asList(Long.valueOf(0), Long.valueOf(BUFFER_SIZE)))).describedAs("File Position").isEqualTo(
-               new HashSet<Long>(acLong.getAllValues()));
-    assertThat(Arrays.asList(0, 0)).describedAs("Buffer Offset").isEqualTo(acBufferOffset.getAllValues());
-    assertThat(Arrays.asList(BUFFER_SIZE, BUFFER_SIZE)).describedAs("Buffer Length").isEqualTo(acBufferLength.getAllValues());
-
-    ArgumentCaptor<String> acFlushString = ArgumentCaptor.forClass(String.class);
-    ArgumentCaptor<Long> acFlushLong = ArgumentCaptor.forClass(Long.class);
+    AppendRequestParameters firstReqParameters = new AppendRequestParameters(
+        0, 0, BUFFER_SIZE, APPEND_MODE, false);
+    AppendRequestParameters secondReqParameters = new AppendRequestParameters(
+        BUFFER_SIZE, 0, BUFFER_SIZE, APPEND_MODE, false);
+
+    verify(client, times(1)).append(
+        eq(PATH), any(byte[].class), refEq(firstReqParameters), any());
+    verify(client, times(1)).append(
+        eq(PATH), any(byte[].class), refEq(secondReqParameters), any());
+    // confirm there were only 2 invocations in all
+    verify(client, times(2)).append(
+        eq(PATH), any(byte[].class), any(), any());
+
+    ArgumentCaptor<String> acFlushPath = ArgumentCaptor.forClass(String.class);
+    ArgumentCaptor<Long> acFlushPosition = ArgumentCaptor.forClass(Long.class);
     ArgumentCaptor<Boolean> acFlushRetainUnCommittedData = ArgumentCaptor.forClass(Boolean.class);
     ArgumentCaptor<Boolean> acFlushClose = ArgumentCaptor.forClass(Boolean.class);
     ArgumentCaptor<String> acFlushSASToken = ArgumentCaptor.forClass(String.class);
 
-    verify(client, times(1)).flush(acFlushString.capture(), acFlushLong.capture(), acFlushRetainUnCommittedData.capture(), acFlushClose.capture(),
-                                   acFlushSASToken.capture());
-    assertThat(Arrays.asList(PATH)).describedAs("path").isEqualTo(acFlushString.getAllValues());
-    assertThat(Arrays.asList(Long.valueOf(2*BUFFER_SIZE))).describedAs("position").isEqualTo(acFlushLong.getAllValues());
+    verify(client, times(1)).flush(acFlushPath.capture(), acFlushPosition.capture(), acFlushRetainUnCommittedData.capture(), acFlushClose.capture(),
+        acFlushSASToken.capture());
+    assertThat(Arrays.asList(PATH)).describedAs("path").isEqualTo(acFlushPath.getAllValues());
+    assertThat(Arrays.asList(Long.valueOf(2*BUFFER_SIZE))).describedAs("position").isEqualTo(acFlushPosition.getAllValues());
     assertThat(Arrays.asList(false)).describedAs("RetainUnCommittedData flag").isEqualTo(acFlushRetainUnCommittedData.getAllValues());
     assertThat(Arrays.asList(false)).describedAs("Close flag").isEqualTo(acFlushClose.getAllValues());
-
   }
 
   /**
@@ -401,10 +387,11 @@ public final class TestAbfsOutputStream {
     abfsConf = new AbfsConfiguration(conf, accountName1);
     AbfsPerfTracker tracker = new AbfsPerfTracker("test", accountName1, abfsConf);
     when(client.getAbfsPerfTracker()).thenReturn(tracker);
-    when(client.append(anyString(), anyLong(), any(byte[].class), anyInt(), anyInt(), any(), anyBoolean())).thenReturn(op);
+    when(client.append(anyString(), any(byte[].class), any(AppendRequestParameters.class), any())).thenReturn(op);
     when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), any())).thenReturn(op);
 
-    AbfsOutputStream out = new AbfsOutputStream(client, null, PATH, 0, populateAbfsOutputStreamContext(BUFFER_SIZE, true, false, false));
+    AbfsOutputStream out = new AbfsOutputStream(client, null, PATH, 0,
+        populateAbfsOutputStreamContext(BUFFER_SIZE, true, false, false));
     final byte[] b = new byte[BUFFER_SIZE];
     new Random().nextBytes(b);
 
@@ -415,21 +402,17 @@ public final class TestAbfsOutputStream {
     out.flush();
     Thread.sleep(1000);
 
-    ArgumentCaptor<String> acString = ArgumentCaptor.forClass(String.class);
-    ArgumentCaptor<Long> acLong = ArgumentCaptor.forClass(Long.class);
-    ArgumentCaptor<Integer> acBufferOffset = ArgumentCaptor.forClass(Integer.class);
-    ArgumentCaptor<Integer> acBufferLength = ArgumentCaptor.forClass(Integer.class);
-    ArgumentCaptor<byte[]> acByteArray = ArgumentCaptor.forClass(byte[].class);
-    ArgumentCaptor<Boolean> acAppendBlobAppend = ArgumentCaptor.forClass(Boolean.class);
-    ArgumentCaptor<String> acSASToken = ArgumentCaptor.forClass(String.class);
-
-    verify(client, times(2)).append(acString.capture(), acLong.capture(), acByteArray.capture(), acBufferOffset.capture(), acBufferLength.capture(),
-                                    acSASToken.capture(), acAppendBlobAppend.capture());
-    assertThat(Arrays.asList(PATH, PATH)).describedAs("path").isEqualTo(acString.getAllValues());
-    assertThat(new HashSet<Long>(Arrays.asList(Long.valueOf(0), Long.valueOf(BUFFER_SIZE)))).describedAs("Position").isEqualTo(
-               new HashSet<Long>(acLong.getAllValues()));
-    assertThat(Arrays.asList(0, 0)).describedAs("Buffer Offset").isEqualTo(acBufferOffset.getAllValues());
-    assertThat(Arrays.asList(BUFFER_SIZE, BUFFER_SIZE)).describedAs("Buffer Length").isEqualTo(acBufferLength.getAllValues());
-
+    AppendRequestParameters firstReqParameters = new AppendRequestParameters(
+        0, 0, BUFFER_SIZE, APPEND_MODE, false);
+    AppendRequestParameters secondReqParameters = new AppendRequestParameters(
+        BUFFER_SIZE, 0, BUFFER_SIZE, APPEND_MODE, false);
+
+    verify(client, times(1)).append(
+        eq(PATH), any(byte[].class), refEq(firstReqParameters), any());
+    verify(client, times(1)).append(
+        eq(PATH), any(byte[].class), refEq(secondReqParameters), any());
+    // confirm there were only 2 invocations in all
+    verify(client, times(2)).append(
+        eq(PATH), any(byte[].class), any(), any());
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org