You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by st...@apache.org on 2021/01/22 11:06:54 UTC

[hadoop] branch branch-3.3 updated (1520b84 -> 4865589)

This is an automated email from the ASF dual-hosted git repository.

stevel pushed a change to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/hadoop.git.


    from 1520b84  YARN-10519. Refactor QueueMetricsForCustomResources class to move to yarn-common package. Contributed by Minni Mittal
     new a44890e  HADOOP-17296. ABFS: Force reads to be always of buffer size.
     new d3caa15  Hadoop-17413. Release elastic byte buffer pool at close
     new 5f312a0  HADOOP-17422: ABFS: Set default ListMaxResults to max server limit (#2535) Contributed by Sumangala Patki
     new f3a0ca6  HADOOP-17407. ABFS: Fix NPE on delete idempotency flow
     new cb67292  HADOOP-17347. ABFS: Read optimizations
     new 4865589  HADOOP-17404. ABFS: Small write - Merge append and flush

The 6 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 hadoop-tools/hadoop-azure/pom.xml                  |   2 +
 .../src/config/checkstyle-suppressions.xml         |   2 +
 .../hadoop/fs/azurebfs/AbfsConfiguration.java      |  54 +++
 .../fs/azurebfs/AzureBlobFileSystemStore.java      |   6 +
 .../fs/azurebfs/constants/AbfsHttpConstants.java   |   1 +
 .../fs/azurebfs/constants/ConfigurationKeys.java   |  13 +
 .../constants/FileSystemConfigurations.java        |  12 +-
 .../fs/azurebfs/constants/HttpQueryParams.java     |   1 +
 .../services/AppendRequestParameters.java          |  69 +++
 .../hadoop/fs/azurebfs/services/AbfsClient.java    |  50 +-
 .../fs/azurebfs/services/AbfsHttpOperation.java    |  39 +-
 .../fs/azurebfs/services/AbfsInputStream.java      | 231 ++++++++-
 .../azurebfs/services/AbfsInputStreamContext.java  |  62 +++
 .../fs/azurebfs/services/AbfsOutputStream.java     |  70 ++-
 .../azurebfs/services/AbfsOutputStreamContext.java |  11 +
 .../fs/azurebfs/services/AbfsRestOperation.java    |   2 +-
 .../fs/azurebfs/services/ReadBufferManager.java    | 105 ++++-
 .../hadoop-azure/src/site/markdown/abfs.md         |  18 +-
 .../fs/azurebfs/AbstractAbfsIntegrationTest.java   |   8 +
 .../fs/azurebfs/ITestAbfsNetworkStatistics.java    | 339 +++++++------
 .../azurebfs/ITestAzureBlobFileSystemDelete.java   |  31 +-
 .../ITestAzureBlobFileSystemListStatus.java        |   8 +-
 .../ITestAzureBlobFileSystemRandomRead.java        | 244 ++++++++--
 .../fs/azurebfs/ITestSmallWriteOptimization.java   | 523 +++++++++++++++++++++
 .../fs/azurebfs/services/ITestAbfsInputStream.java | 256 ++++++++++
 .../services/ITestAbfsInputStreamReadFooter.java   | 358 ++++++++++++++
 .../ITestAbfsInputStreamSmallFileReads.java        | 326 +++++++++++++
 .../azurebfs/services/ITestAbfsOutputStream.java   |  17 +-
 .../fs/azurebfs/services/TestAbfsClient.java       |  46 ++
 .../fs/azurebfs/services/TestAbfsInputStream.java  | 223 ++++++++-
 .../fs/azurebfs/services/TestAbfsOutputStream.java | 279 ++++++-----
 .../fs/azurebfs/services/TestAbfsPerfTracker.java  |  13 +
 .../hadoop/fs/azurebfs/utils/TestMockHelpers.java  |  59 +++
 33 files changed, 3031 insertions(+), 447 deletions(-)
 create mode 100644 hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/AppendRequestParameters.java
 create mode 100644 hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestSmallWriteOptimization.java
 create mode 100644 hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsInputStream.java
 create mode 100644 hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsInputStreamReadFooter.java
 create mode 100644 hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsInputStreamSmallFileReads.java
 create mode 100644 hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/utils/TestMockHelpers.java


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[hadoop] 02/06: Hadoop-17413. Release elastic byte buffer pool at close

Posted by st...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

stevel pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/hadoop.git

commit d3caa1552b143b1d578d69d3786ccdedf66e4557
Author: Sneha Vijayarajan <sn...@gmail.com>
AuthorDate: Tue Dec 15 10:15:37 2020 +0530

    Hadoop-17413. Release elastic byte buffer pool at close
    
    - Contributed by Sneha Vijayarajan
    
    (cherry picked from commit 5bf977e6b16287d7d140dd96dad66d0fce213954)
---
 .../java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java  | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)

diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java
index b53b2b2..01b2fa5 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java
@@ -85,7 +85,7 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
    * blocks. After the data is sent to the service, the buffer is returned
    * back to the queue
    */
-  private final ElasticByteBufferPool byteBufferPool
+  private ElasticByteBufferPool byteBufferPool
           = new ElasticByteBufferPool();
 
   private final Statistics statistics;
@@ -297,6 +297,7 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
       bufferIndex = 0;
       closed = true;
       writeOperations.clear();
+      byteBufferPool = null;
       if (!threadExecutor.isShutdown()) {
         threadExecutor.shutdownNow();
       }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[hadoop] 04/06: HADOOP-17407. ABFS: Fix NPE on delete idempotency flow

Posted by st...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

stevel pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/hadoop.git

commit f3a0ca66c2d50ac6605010d970a8dbb4ceeeac1d
Author: Sneha Vijayarajan <sn...@gmail.com>
AuthorDate: Sat Jan 2 23:52:10 2021 +0530

    HADOOP-17407. ABFS: Fix NPE on delete idempotency flow
    
    - Contributed by Sneha Vijayarajan
    
    (cherry picked from commit 5ca1ea89b3f57017768ae4d8002f353e3d240e07)
---
 .../hadoop/fs/azurebfs/services/AbfsClient.java    |  3 ++
 .../fs/azurebfs/services/AbfsHttpOperation.java    | 39 ++++++++++++--
 .../azurebfs/ITestAzureBlobFileSystemDelete.java   | 31 ++++++++++--
 .../fs/azurebfs/services/TestAbfsClient.java       | 46 +++++++++++++++++
 .../fs/azurebfs/services/TestAbfsPerfTracker.java  | 13 +++++
 .../hadoop/fs/azurebfs/utils/TestMockHelpers.java  | 59 ++++++++++++++++++++++
 6 files changed, 183 insertions(+), 8 deletions(-)

diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
index 7722c62..db2f44f 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
@@ -383,6 +383,7 @@ public class AbfsClient implements Closeable {
               HttpHeaderConfigurations.LAST_MODIFIED);
 
           if (DateTimeUtils.isRecentlyModified(lmt, renameRequestStartTime)) {
+            LOG.debug("Returning success response from rename idempotency logic");
             return destStatusOp;
           }
         }
@@ -450,6 +451,7 @@ public class AbfsClient implements Closeable {
         String fileLength = destStatusOp.getResult().getResponseHeader(
             HttpHeaderConfigurations.CONTENT_LENGTH);
         if (length <= Long.parseLong(fileLength)) {
+          LOG.debug("Returning success response from append blob idempotency code");
           return true;
         }
       }
@@ -627,6 +629,7 @@ public class AbfsClient implements Closeable {
           op.getUrl(),
           op.getRequestHeaders());
       successOp.hardSetResult(HttpURLConnection.HTTP_OK);
+      LOG.debug("Returning success response from delete idempotency logic");
       return successOp;
     }
 
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java
index 51d0fb1..720b99b 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java
@@ -86,12 +86,23 @@ public class AbfsHttpOperation implements AbfsPerfLoggable {
   private long sendRequestTimeMs;
   private long recvResponseTimeMs;
 
-  public static AbfsHttpOperation getAbfsHttpOperationWithFixedResult(final URL url,
-      final String method, final int httpStatus) {
-       return new AbfsHttpOperation(url, method, httpStatus);
+  public static AbfsHttpOperation getAbfsHttpOperationWithFixedResult(
+      final URL url,
+      final String method,
+      final int httpStatus) {
+    AbfsHttpOperationWithFixedResult httpOp
+        = new AbfsHttpOperationWithFixedResult(url, method, httpStatus);
+    return httpOp;
   }
 
-  private AbfsHttpOperation(final URL url, final String method,
+  /**
+   * Constructor for FixedResult instance, avoiding connection init.
+   * @param url request url
+   * @param method Http method
+   * @param httpStatus HttpStatus
+   */
+  protected AbfsHttpOperation(final URL url,
+      final String method,
       final int httpStatus) {
     this.isTraceEnabled = LOG.isTraceEnabled();
     this.url = url;
@@ -547,4 +558,24 @@ public class AbfsHttpOperation implements AbfsPerfLoggable {
     return this.maskedEncodedUrl;
   }
 
+  public static class AbfsHttpOperationWithFixedResult extends AbfsHttpOperation {
+    /**
+     * Creates an instance to represent fixed results.
+     * This is used in idempotency handling.
+     *
+     * @param url The full URL including query string parameters.
+     * @param method The HTTP method (PUT, PATCH, POST, GET, HEAD, or DELETE).
+     * @param httpStatus StatusCode to hard set
+     */
+    public AbfsHttpOperationWithFixedResult(final URL url,
+        final String method,
+        final int httpStatus) {
+      super(url, method, httpStatus);
+    }
+
+    @Override
+    public String getResponseHeader(final String httpHeader) {
+      return "";
+    }
+  }
 }
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemDelete.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemDelete.java
index 2f2a619..9bd82db 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemDelete.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemDelete.java
@@ -35,6 +35,8 @@ import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
 import org.apache.hadoop.fs.azurebfs.services.AbfsHttpOperation;
 import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation;
 import org.apache.hadoop.fs.azurebfs.services.TestAbfsClient;
+import org.apache.hadoop.fs.azurebfs.services.TestAbfsPerfTracker;
+import org.apache.hadoop.fs.azurebfs.utils.TestMockHelpers;
 import org.apache.hadoop.fs.FileAlreadyExistsException;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
@@ -44,11 +46,14 @@ import static java.net.HttpURLConnection.HTTP_NOT_FOUND;
 import static java.net.HttpURLConnection.HTTP_OK;
 
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doCallRealMethod;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
+import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_DELETE;
 import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_DELETE_CONSIDERED_IDEMPOTENT;
+import static org.apache.hadoop.fs.azurebfs.services.AbfsRestOperationType.DeletePath;
 import static org.apache.hadoop.fs.contract.ContractTestUtils.assertDeleted;
 import static org.apache.hadoop.fs.contract.ContractTestUtils.assertPathDoesNotExist;
 import static org.apache.hadoop.test.LambdaTestUtils.intercept;
@@ -213,6 +218,12 @@ public class ITestAzureBlobFileSystemDelete extends
         this.getConfiguration());
 
     // Case 1: Not a retried case should throw error back
+    // Add asserts at AzureBlobFileSystemStore and AbfsClient levels
+    intercept(AbfsRestOperationException.class,
+        () -> fs.getAbfsStore().delete(
+            new Path("/NonExistingPath"),
+            false));
+
     intercept(AbfsRestOperationException.class,
         () -> client.deletePath(
         "/NonExistingPath",
@@ -223,13 +234,22 @@ public class ITestAzureBlobFileSystemDelete extends
     AbfsClient mockClient = TestAbfsClient.getMockAbfsClient(
         fs.getAbfsStore().getClient(),
         this.getConfiguration());
+    AzureBlobFileSystemStore mockStore = mock(AzureBlobFileSystemStore.class);
+    mockStore = TestMockHelpers.setClassField(AzureBlobFileSystemStore.class, mockStore,
+        "client", mockClient);
+    mockStore = TestMockHelpers.setClassField(AzureBlobFileSystemStore.class,
+        mockStore,
+        "abfsPerfTracker",
+        TestAbfsPerfTracker.getAPerfTrackerInstance(this.getConfiguration()));
+    doCallRealMethod().when(mockStore).delete(new Path("/NonExistingPath"), false);
 
     // Case 2: Mimic retried case
     // Idempotency check on Delete always returns success
-    AbfsRestOperation idempotencyRetOp = mock(AbfsRestOperation.class);
-    AbfsHttpOperation http200Op = mock(AbfsHttpOperation.class);
-    when(http200Op.getStatusCode()).thenReturn(HTTP_OK);
-    when(idempotencyRetOp.getResult()).thenReturn(http200Op);
+    AbfsRestOperation idempotencyRetOp = TestAbfsClient.getRestOp(
+        DeletePath, mockClient, HTTP_METHOD_DELETE,
+        TestAbfsClient.getTestUrl(mockClient, "/NonExistingPath"),
+        TestAbfsClient.getTestRequestHeaders(mockClient));
+    idempotencyRetOp.hardSetResult(HTTP_OK);
 
     doReturn(idempotencyRetOp).when(mockClient).deleteIdempotencyCheckOp(any());
     when(mockClient.deletePath("/NonExistingPath", false,
@@ -244,6 +264,9 @@ public class ITestAzureBlobFileSystemDelete extends
         .describedAs("Idempotency check reports successful "
             + "delete. 200OK should be returned")
         .isEqualTo(idempotencyRetOp.getResult().getStatusCode());
+
+    // Call from AzureBlobFileSystemStore should not fail either
+    mockStore.delete(new Path("/NonExistingPath"), false);
   }
 
 }
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsClient.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsClient.java
index 7a7992d..4facc10 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsClient.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsClient.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.lang.reflect.Field;
 import java.net.MalformedURLException;
 import java.net.URL;
+import java.util.List;
 import java.util.regex.Pattern;
 
 import org.junit.Test;
@@ -351,4 +352,49 @@ public final class TestAbfsClient {
     field.set(client, fieldObject);
     return client;
   }
+
+  /**
+   * Test helper method to access private createRequestUrl method.
+   * @param client test AbfsClient instace
+   * @param path path to generate Url
+   * @return return store path url
+   * @throws AzureBlobFileSystemException
+   */
+  public static URL getTestUrl(AbfsClient client, String path) throws
+      AzureBlobFileSystemException {
+    final AbfsUriQueryBuilder abfsUriQueryBuilder
+        = client.createDefaultUriQueryBuilder();
+    return client.createRequestUrl(path, abfsUriQueryBuilder.toString());
+  }
+
+  /**
+   * Test helper method to access private createDefaultHeaders method.
+   * @param client test AbfsClient instance
+   * @return List of AbfsHttpHeaders
+   */
+  public static List<AbfsHttpHeader> getTestRequestHeaders(AbfsClient client) {
+    return client.createDefaultHeaders();
+  }
+
+  /**
+   * Test helper method to create an AbfsRestOperation instance.
+   * @param type RestOpType
+   * @param client AbfsClient
+   * @param method HttpMethod
+   * @param url Test path url
+   * @param requestHeaders request headers
+   * @return instance of AbfsRestOperation
+   */
+  public static AbfsRestOperation getRestOp(AbfsRestOperationType type,
+      AbfsClient client,
+      String method,
+      URL url,
+      List<AbfsHttpHeader> requestHeaders) {
+    return new AbfsRestOperation(
+        type,
+        client,
+        method,
+        url,
+        requestHeaders);
+  }
 }
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsPerfTracker.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsPerfTracker.java
index 4f42102..191d6e7 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsPerfTracker.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsPerfTracker.java
@@ -34,6 +34,8 @@ import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
+
 import static org.assertj.core.api.Assertions.assertThat;
 
 /**
@@ -405,4 +407,15 @@ public final class TestAbfsPerfTracker {
       tracker13.registerResult(httpOperation).registerSuccess(false).registerAggregates(Instant.MIN, TEST_AGGREGATE_COUNT);
     }
   }
+
+  /**
+   * Test helper method to create an AbfsPerfTracker instance.
+   * @param abfsConfig active test abfs config
+   * @return instance of AbfsPerfTracker
+   */
+  public static AbfsPerfTracker getAPerfTrackerInstance(AbfsConfiguration abfsConfig) {
+    AbfsPerfTracker tracker = new AbfsPerfTracker("test",
+        abfsConfig.getAccountName(), abfsConfig);
+    return tracker;
+  }
 }
\ No newline at end of file
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/utils/TestMockHelpers.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/utils/TestMockHelpers.java
new file mode 100644
index 0000000..e25a099
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/utils/TestMockHelpers.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.utils;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
+
+/**
+ * Test Mock Helpers.
+ */
+public final class TestMockHelpers {
+
+  /**
+   * Sets a class field by reflection.
+   * @param type
+   * @param obj
+   * @param fieldName
+   * @param fieldObject
+   * @param <T>
+   * @return
+   * @throws Exception
+   */
+  public static <T> T setClassField(
+      Class<T> type,
+      final T obj,
+      final String fieldName,
+      Object fieldObject) throws Exception {
+
+    Field field = type.getDeclaredField(fieldName);
+    field.setAccessible(true);
+    Field modifiersField = Field.class.getDeclaredField("modifiers");
+    modifiersField.setAccessible(true);
+    modifiersField.setInt(field,
+        field.getModifiers() & ~Modifier.FINAL);
+    field.set(obj, fieldObject);
+
+    return obj;
+  }
+
+  private TestMockHelpers() {
+    // Not called. - For checkstyle: HideUtilityClassConstructor
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[hadoop] 06/06: HADOOP-17404. ABFS: Small write - Merge append and flush

Posted by st...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

stevel pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/hadoop.git

commit 4865589bb4414c87d9ac02b4323ebbd485348cf7
Author: Sneha Vijayarajan <sn...@gmail.com>
AuthorDate: Thu Jan 7 00:13:37 2021 +0530

    HADOOP-17404. ABFS: Small write - Merge append and flush
    
    - Contributed by Sneha Vijayarajan
    
    (cherry picked from commit b612c310c26394aa406c99d8598c9cb7621df052)
---
 hadoop-tools/hadoop-azure/pom.xml                  |   2 +
 .../src/config/checkstyle-suppressions.xml         |   2 +
 .../hadoop/fs/azurebfs/AbfsConfiguration.java      |   8 +
 .../fs/azurebfs/AzureBlobFileSystemStore.java      |   1 +
 .../fs/azurebfs/constants/AbfsHttpConstants.java   |   1 +
 .../fs/azurebfs/constants/ConfigurationKeys.java   |   9 +
 .../constants/FileSystemConfigurations.java        |   1 +
 .../fs/azurebfs/constants/HttpQueryParams.java     |   1 +
 .../services/AppendRequestParameters.java          |  69 +++
 .../hadoop/fs/azurebfs/services/AbfsClient.java    |  47 +-
 .../fs/azurebfs/services/AbfsOutputStream.java     |  67 ++-
 .../azurebfs/services/AbfsOutputStreamContext.java |  11 +
 .../fs/azurebfs/services/AbfsRestOperation.java    |   2 +-
 .../fs/azurebfs/ITestAbfsNetworkStatistics.java    | 339 +++++++------
 .../fs/azurebfs/ITestSmallWriteOptimization.java   | 523 +++++++++++++++++++++
 .../azurebfs/services/ITestAbfsOutputStream.java   |  17 +-
 .../fs/azurebfs/services/TestAbfsOutputStream.java | 279 ++++++-----
 17 files changed, 1030 insertions(+), 349 deletions(-)

diff --git a/hadoop-tools/hadoop-azure/pom.xml b/hadoop-tools/hadoop-azure/pom.xml
index 7bfce01..6b0599d 100644
--- a/hadoop-tools/hadoop-azure/pom.xml
+++ b/hadoop-tools/hadoop-azure/pom.xml
@@ -555,6 +555,7 @@
                     <exclude>**/azurebfs/ITestAbfsReadWriteAndSeek.java</exclude>
                     <exclude>**/azurebfs/ITestAzureBlobFileSystemListStatus.java</exclude>
                     <exclude>**/azurebfs/extensions/ITestAbfsDelegationTokens.java</exclude>
+                    <exclude>**/azurebfs/ITestSmallWriteOptimization.java</exclude>
                   </excludes>
 
                 </configuration>
@@ -594,6 +595,7 @@
                     <include>**/azurebfs/ITestAbfsReadWriteAndSeek.java</include>
                     <include>**/azurebfs/ITestAzureBlobFileSystemListStatus.java</include>
                     <include>**/azurebfs/extensions/ITestAbfsDelegationTokens.java</include>
+                    <include>**/azurebfs/ITestSmallWriteOptimization.java</include>
                   </includes>
                 </configuration>
               </execution>
diff --git a/hadoop-tools/hadoop-azure/src/config/checkstyle-suppressions.xml b/hadoop-tools/hadoop-azure/src/config/checkstyle-suppressions.xml
index c502361..070c8c1 100644
--- a/hadoop-tools/hadoop-azure/src/config/checkstyle-suppressions.xml
+++ b/hadoop-tools/hadoop-azure/src/config/checkstyle-suppressions.xml
@@ -46,4 +46,6 @@
               files="org[\\/]apache[\\/]hadoop[\\/]fs[\\/]azurebfs[\\/]AzureBlobFileSystemStore.java"/>
     <suppress checks="ParameterNumber|MagicNumber"
               files="org[\\/]apache[\\/]hadoop[\\/]fs[\\/]azurebfs[\\/]utils[\\/]Base64.java"/>
+    <suppress checks="ParameterNumber|VisibilityModifier"
+              files="org[\\/]apache[\\/]hadoop[\\/]fs[\\/]azurebfs[\\/]ITestSmallWriteOptimization.java"/>
 </suppressions>
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
index b1c95d2..5a70323 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
@@ -100,6 +100,10 @@ public class AbfsConfiguration{
       DefaultValue = DEFAULT_WRITE_BUFFER_SIZE)
   private int writeBufferSize;
 
+  @BooleanConfigurationValidatorAnnotation(ConfigurationKey = AZURE_ENABLE_SMALL_WRITE_OPTIMIZATION,
+      DefaultValue = DEFAULT_AZURE_ENABLE_SMALL_WRITE_OPTIMIZATION)
+  private boolean enableSmallWriteOptimization;
+
   @BooleanConfigurationValidatorAnnotation(
       ConfigurationKey = AZURE_READ_SMALL_FILES_COMPLETELY,
       DefaultValue = DEFAULT_READ_SMALL_FILES_COMPLETELY)
@@ -537,6 +541,10 @@ public class AbfsConfiguration{
     return this.writeBufferSize;
   }
 
+  public boolean isSmallWriteOptimizationEnabled() {
+    return this.enableSmallWriteOptimization;
+  }
+
   public boolean readSmallFilesCompletely() {
     return this.readSmallFilesCompletely;
   }
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
index 869a6f9..c8dd518 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
@@ -578,6 +578,7 @@ public class AzureBlobFileSystemStore implements Closeable {
     return new AbfsOutputStreamContext(abfsConfiguration.getSasTokenRenewPeriodForStreamsInSeconds())
             .withWriteBufferSize(bufferSize)
             .enableFlush(abfsConfiguration.isFlushEnabled())
+            .enableSmallWriteOptimization(abfsConfiguration.isSmallWriteOptimizationEnabled())
             .disableOutputStreamFlush(abfsConfiguration.isOutputStreamFlushDisabled())
             .withStreamStatistics(new AbfsOutputStreamStatisticsImpl())
             .withAppendBlob(isAppendBlob)
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java
index 38b79c9..184657e 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java
@@ -76,6 +76,7 @@ public final class AbfsHttpConstants {
   public static final String AT = "@";
   public static final String HTTP_HEADER_PREFIX = "x-ms-";
   public static final String HASH = "#";
+  public static final String TRUE = "true";
 
   public static final String PLUS_ENCODE = "%20";
   public static final String FORWARD_SLASH_ENCODE = "%2F";
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
index 3e1ff80..cdef9c9 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
@@ -55,6 +55,15 @@ public final class ConfigurationKeys {
   public static final String AZURE_WRITE_MAX_CONCURRENT_REQUESTS = "fs.azure.write.max.concurrent.requests";
   public static final String AZURE_WRITE_MAX_REQUESTS_TO_QUEUE = "fs.azure.write.max.requests.to.queue";
   public static final String AZURE_WRITE_BUFFER_SIZE = "fs.azure.write.request.size";
+  /** If the data size written by Hadoop app is small, i.e. data size :
+   *  (a) before any of HFlush/HSync call is made or
+   *  (b) between 2 HFlush/Hsync API calls
+   *  is less than write buffer size, 2 separate calls, one for append and
+   *  another for flush are made.
+   *  By enabling the small write optimization, a single call will be made to
+   *  perform both append and flush operations and hence reduce request count.
+   */
+  public static final String AZURE_ENABLE_SMALL_WRITE_OPTIMIZATION = "fs.azure.write.enableappendwithflush";
   public static final String AZURE_READ_BUFFER_SIZE = "fs.azure.read.request.size";
   public static final String AZURE_READ_SMALL_FILES_COMPLETELY = "fs.azure.read.smallfilescompletely";
   public static final String AZURE_READ_OPTIMIZE_FOOTER_READ = "fs.azure.read.optimizefooterread";
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
index 8008206..a23dfd5 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
@@ -56,6 +56,7 @@ public final class FileSystemConfigurations {
   // Default upload and download buffer size
   public static final int DEFAULT_WRITE_BUFFER_SIZE = 8 * ONE_MB;  // 8 MB
   public static final int APPENDBLOB_MAX_WRITE_BUFFER_SIZE = 4 * ONE_MB;  // 4 MB
+  public static final boolean DEFAULT_AZURE_ENABLE_SMALL_WRITE_OPTIMIZATION = false;
   public static final int DEFAULT_READ_BUFFER_SIZE = 4 * ONE_MB;  // 4 MB
   public static final boolean DEFAULT_READ_SMALL_FILES_COMPLETELY = false;
   public static final boolean DEFAULT_OPTIMIZE_FOOTER_READ = false;
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/HttpQueryParams.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/HttpQueryParams.java
index 5a550ac..8a4ca90 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/HttpQueryParams.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/HttpQueryParams.java
@@ -36,6 +36,7 @@ public final class HttpQueryParams {
   public static final String QUERY_PARAM_POSITION = "position";
   public static final String QUERY_PARAM_TIMEOUT = "timeout";
   public static final String QUERY_PARAM_RETAIN_UNCOMMITTED_DATA = "retainUncommittedData";
+  public static final String QUERY_PARAM_FLUSH = "flush";
   public static final String QUERY_PARAM_CLOSE = "close";
   public static final String QUERY_PARAM_UPN = "upn";
   public static final String QUERY_PARAM_BLOBTYPE = "blobtype";
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/AppendRequestParameters.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/AppendRequestParameters.java
new file mode 100644
index 0000000..fb4d29f
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/AppendRequestParameters.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.contracts.services;
+
+/**
+ * Saves the different request parameters for append
+ */
+public class AppendRequestParameters {
+  public enum Mode {
+    APPEND_MODE,
+    FLUSH_MODE,
+    FLUSH_CLOSE_MODE
+  }
+
+  private final long position;
+  private final int offset;
+  private final int length;
+  private final Mode mode;
+  private final boolean isAppendBlob;
+
+  public AppendRequestParameters(final long position,
+      final int offset,
+      final int length,
+      final Mode mode,
+      final boolean isAppendBlob) {
+    this.position = position;
+    this.offset = offset;
+    this.length = length;
+    this.mode = mode;
+    this.isAppendBlob = isAppendBlob;
+  }
+
+  public long getPosition() {
+    return this.position;
+  }
+
+  public int getoffset() {
+    return this.offset;
+  }
+
+  public int getLength() {
+    return this.length;
+  }
+
+  public Mode getMode() {
+    return this.mode;
+  }
+
+  public boolean isAppendBlob() {
+    return this.isAppendBlob;
+  }
+
+}
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
index db2f44f..bfc11a6 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
@@ -45,6 +45,7 @@ import org.apache.hadoop.fs.azurebfs.contracts.exceptions.SASTokenProviderExcept
 import org.apache.hadoop.fs.azurebfs.extensions.ExtensionHelper;
 import org.apache.hadoop.fs.azurebfs.extensions.SASTokenProvider;
 import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
+import org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters;
 import org.apache.hadoop.fs.azurebfs.oauth2.AccessTokenProvider;
 import org.apache.hadoop.fs.azurebfs.utils.DateTimeUtils;
 import org.apache.hadoop.io.IOUtils;
@@ -396,17 +397,27 @@ public class AbfsClient implements Closeable {
     return op;
   }
 
-  public AbfsRestOperation append(final String path, final long position, final byte[] buffer, final int offset,
-                                  final int length, final String cachedSasToken, final boolean isAppendBlob) throws AzureBlobFileSystemException {
+  public AbfsRestOperation append(final String path, final byte[] buffer,
+      AppendRequestParameters reqParams, final String cachedSasToken)
+      throws AzureBlobFileSystemException {
     final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
     // JDK7 does not support PATCH, so to workaround the issue we will use
     // PUT and specify the real method in the X-Http-Method-Override header.
     requestHeaders.add(new AbfsHttpHeader(X_HTTP_METHOD_OVERRIDE,
-            HTTP_METHOD_PATCH));
+        HTTP_METHOD_PATCH));
 
     final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
     abfsUriQueryBuilder.addQuery(QUERY_PARAM_ACTION, APPEND_ACTION);
-    abfsUriQueryBuilder.addQuery(QUERY_PARAM_POSITION, Long.toString(position));
+    abfsUriQueryBuilder.addQuery(QUERY_PARAM_POSITION, Long.toString(reqParams.getPosition()));
+
+    if ((reqParams.getMode() == AppendRequestParameters.Mode.FLUSH_MODE) || (
+        reqParams.getMode() == AppendRequestParameters.Mode.FLUSH_CLOSE_MODE)) {
+      abfsUriQueryBuilder.addQuery(QUERY_PARAM_FLUSH, TRUE);
+      if (reqParams.getMode() == AppendRequestParameters.Mode.FLUSH_CLOSE_MODE) {
+        abfsUriQueryBuilder.addQuery(QUERY_PARAM_CLOSE, TRUE);
+      }
+    }
+
     // AbfsInputStream/AbfsOutputStream reuse SAS tokens for better performance
     String sasTokenForReuse = appendSASTokenToQuery(path, SASTokenProvider.WRITE_OPERATION,
         abfsUriQueryBuilder, cachedSasToken);
@@ -414,20 +425,30 @@ public class AbfsClient implements Closeable {
     final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
     final AbfsRestOperation op = new AbfsRestOperation(
         AbfsRestOperationType.Append,
-            this,
-            HTTP_METHOD_PUT,
-            url,
-            requestHeaders, buffer, offset, length, sasTokenForReuse);
+        this,
+        HTTP_METHOD_PUT,
+        url,
+        requestHeaders,
+        buffer,
+        reqParams.getoffset(),
+        reqParams.getLength(),
+        sasTokenForReuse);
     try {
       op.execute();
     } catch (AzureBlobFileSystemException e) {
-      if (isAppendBlob && appendSuccessCheckOp(op, path, (position + length))) {
+      if (reqParams.isAppendBlob()
+          && appendSuccessCheckOp(op, path,
+          (reqParams.getPosition() + reqParams.getLength()))) {
         final AbfsRestOperation successOp = new AbfsRestOperation(
             AbfsRestOperationType.Append,
-                this,
-                HTTP_METHOD_PUT,
-                url,
-                requestHeaders, buffer, offset, length, sasTokenForReuse);
+            this,
+            HTTP_METHOD_PUT,
+            url,
+            requestHeaders,
+            buffer,
+            reqParams.getoffset(),
+            reqParams.getLength(),
+            sasTokenForReuse);
         successOp.hardSetResult(HttpURLConnection.HTTP_OK);
         return successOp;
       }
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java
index 01b2fa5..402fdda 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java
@@ -35,11 +35,13 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
 import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
+import org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters;
 import org.apache.hadoop.fs.azurebfs.utils.CachedSASToken;
 import org.apache.hadoop.io.ElasticByteBufferPool;
 import org.apache.hadoop.fs.FileSystem.Statistics;
@@ -48,6 +50,9 @@ import org.apache.hadoop.fs.StreamCapabilities;
 import org.apache.hadoop.fs.Syncable;
 
 import static org.apache.hadoop.io.IOUtils.wrapException;
+import static org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters.Mode.APPEND_MODE;
+import static org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters.Mode.FLUSH_CLOSE_MODE;
+import static org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters.Mode.FLUSH_MODE;
 
 /**
  * The BlobFsOutputStream for Rest AbfsClient.
@@ -60,6 +65,7 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
   private boolean closed;
   private boolean supportFlush;
   private boolean disableOutputStreamFlush;
+  private boolean enableSmallWriteOptimization;
   private boolean isAppendBlob;
   private volatile IOException lastError;
 
@@ -69,6 +75,7 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
   private final int bufferSize;
   private byte[] buffer;
   private int bufferIndex;
+  private int numOfAppendsToServerSinceLastFlush;
   private final int maxConcurrentRequestCount;
   private final int maxRequestsThatCanBeQueued;
 
@@ -108,12 +115,15 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
     this.supportFlush = abfsOutputStreamContext.isEnableFlush();
     this.disableOutputStreamFlush = abfsOutputStreamContext
             .isDisableOutputStreamFlush();
+    this.enableSmallWriteOptimization
+        = abfsOutputStreamContext.isEnableSmallWriteOptimization();
     this.isAppendBlob = abfsOutputStreamContext.isAppendBlob();
     this.lastError = null;
     this.lastFlushOffset = 0;
     this.bufferSize = abfsOutputStreamContext.getWriteBufferSize();
     this.buffer = byteBufferPool.getBuffer(false, bufferSize).array();
     this.bufferIndex = 0;
+    this.numOfAppendsToServerSinceLastFlush = 0;
     this.writeOperations = new ConcurrentLinkedDeque<>();
     this.outputStreamStatistics = abfsOutputStreamContext.getStreamStatistics();
 
@@ -309,8 +319,29 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
 
   private synchronized void flushInternal(boolean isClose) throws IOException {
     maybeThrowLastError();
+
+    // if its a flush post write < buffersize, send flush parameter in append
+    if (!isAppendBlob
+        && enableSmallWriteOptimization
+        && (numOfAppendsToServerSinceLastFlush == 0) // there are no ongoing store writes
+        && (writeOperations.size() == 0) // double checking no appends in progress
+        && (bufferIndex > 0)) { // there is some data that is pending to be written
+      smallWriteOptimizedflushInternal(isClose);
+      return;
+    }
+
     writeCurrentBufferToService();
     flushWrittenBytesToService(isClose);
+    numOfAppendsToServerSinceLastFlush = 0;
+  }
+
+  private synchronized void smallWriteOptimizedflushInternal(boolean isClose) throws IOException {
+    // writeCurrentBufferToService will increment numOfAppendsToServerSinceLastFlush
+    writeCurrentBufferToService(true, isClose);
+    waitForAppendsToComplete();
+    shrinkWriteOperationQueue();
+    maybeThrowLastError();
+    numOfAppendsToServerSinceLastFlush = 0;
   }
 
   private synchronized void flushInternalAsync() throws IOException {
@@ -335,8 +366,9 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
     AbfsPerfTracker tracker = client.getAbfsPerfTracker();
     try (AbfsPerfInfo perfInfo = new AbfsPerfInfo(tracker,
             "writeCurrentBufferToService", "append")) {
-      AbfsRestOperation op = client.append(path, offset, bytes, 0,
-          bytesLength, cachedSasToken.get(), this.isAppendBlob);
+      AppendRequestParameters reqParams = new AppendRequestParameters(offset, 0,
+          bytesLength, APPEND_MODE, true);
+      AbfsRestOperation op = client.append(path, bytes, reqParams, cachedSasToken.get());
       cachedSasToken.update(op.getSasToken());
       outputStreamStatistics.uploadSuccessful(bytesLength);
       perfInfo.registerResult(op.getResult());
@@ -358,6 +390,10 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
   }
 
   private synchronized void writeCurrentBufferToService() throws IOException {
+    writeCurrentBufferToService(false, false);
+  }
+
+  private synchronized void writeCurrentBufferToService(boolean isFlush, boolean isClose) throws IOException {
     if (this.isAppendBlob) {
       writeAppendBlobCurrentBufferToService();
       return;
@@ -367,6 +403,7 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
       return;
     }
     outputStreamStatistics.writeCurrentBuffer();
+    numOfAppendsToServerSinceLastFlush++;
 
     final byte[] bytes = buffer;
     final int bytesLength = bufferIndex;
@@ -388,8 +425,19 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
         AbfsPerfTracker tracker = client.getAbfsPerfTracker();
         try (AbfsPerfInfo perfInfo = new AbfsPerfInfo(tracker,
                 "writeCurrentBufferToService", "append")) {
-          AbfsRestOperation op = client.append(path, offset, bytes, 0,
-                  bytesLength, cachedSasToken.get(), false);
+          AppendRequestParameters.Mode
+              mode = APPEND_MODE;
+          if (isFlush & isClose) {
+            mode = FLUSH_CLOSE_MODE;
+          } else if (isFlush) {
+            mode = FLUSH_MODE;
+          }
+
+          AppendRequestParameters reqParams = new AppendRequestParameters(
+              offset, 0, bytesLength, mode, false);
+          AbfsRestOperation op = client.append(path, bytes, reqParams,
+              cachedSasToken.get());
+
           cachedSasToken.update(op.getSasToken());
           perfInfo.registerResult(op.getResult());
           byteBufferPool.putBuffer(ByteBuffer.wrap(bytes));
@@ -410,7 +458,7 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
     shrinkWriteOperationQueue();
   }
 
-  private synchronized void flushWrittenBytesToService(boolean isClose) throws IOException {
+  private synchronized void waitForAppendsToComplete() throws IOException {
     for (WriteOperation writeOperation : writeOperations) {
       try {
         writeOperation.task.get();
@@ -428,6 +476,10 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
         throw lastError;
       }
     }
+  }
+
+  private synchronized void flushWrittenBytesToService(boolean isClose) throws IOException {
+    waitForAppendsToComplete();
     flushWrittenBytesToServiceInternal(position, false, isClose);
   }
 
@@ -558,6 +610,11 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
     return maxRequestsThatCanBeQueued;
   }
 
+  @VisibleForTesting
+  Boolean isAppendBlobStream() {
+    return isAppendBlob;
+  }
+
   /**
    * Appending AbfsOutputStream statistics to base toString().
    *
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamContext.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamContext.java
index 2dce5dc..925cd4f 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamContext.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamContext.java
@@ -27,6 +27,8 @@ public class AbfsOutputStreamContext extends AbfsStreamContext {
 
   private boolean enableFlush;
 
+  private boolean enableSmallWriteOptimization;
+
   private boolean disableOutputStreamFlush;
 
   private AbfsOutputStreamStatistics streamStatistics;
@@ -52,6 +54,11 @@ public class AbfsOutputStreamContext extends AbfsStreamContext {
     return this;
   }
 
+  public AbfsOutputStreamContext enableSmallWriteOptimization(final boolean enableSmallWriteOptimization) {
+    this.enableSmallWriteOptimization = enableSmallWriteOptimization;
+    return this;
+  }
+
   public AbfsOutputStreamContext disableOutputStreamFlush(
           final boolean disableOutputStreamFlush) {
     this.disableOutputStreamFlush = disableOutputStreamFlush;
@@ -114,4 +121,8 @@ public class AbfsOutputStreamContext extends AbfsStreamContext {
   public int getMaxWriteRequestsToQueue() {
     return this.maxWriteRequestsToQueue;
   }
+
+  public boolean isEnableSmallWriteOptimization() {
+    return this.enableSmallWriteOptimization;
+  }
 }
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java
index 83c76f5..24ec292 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java
@@ -290,7 +290,7 @@ public class AbfsRestOperation {
       AbfsClientThrottlingIntercept.updateMetrics(operationType, httpOperation);
     }
 
-    LOG.debug("HttpRequest: {}", httpOperation.toString());
+    LOG.debug("HttpRequest: {}: {}", operationType, httpOperation.toString());
 
     if (client.getRetryPolicy().shouldRetry(retryCount, httpOperation.getStatusCode())) {
       return false;
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsNetworkStatistics.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsNetworkStatistics.java
index c2dbe93..66b8da8 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsNetworkStatistics.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsNetworkStatistics.java
@@ -33,14 +33,16 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream;
 import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation;
 
+import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.BYTES_RECEIVED;
 import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.CONNECTIONS_MADE;
+import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.GET_RESPONSES;
 import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.SEND_REQUESTS;
 
 public class ITestAbfsNetworkStatistics extends AbstractAbfsIntegrationTest {
 
   private static final Logger LOG =
       LoggerFactory.getLogger(ITestAbfsNetworkStatistics.class);
-  private static final int LARGE_OPERATIONS = 10;
+  private static final int WRITE_OPERATION_LOOP_COUNT = 10;
 
   public ITestAbfsNetworkStatistics() throws Exception {
   }
@@ -58,117 +60,126 @@ public class ITestAbfsNetworkStatistics extends AbstractAbfsIntegrationTest {
     Map<String, Long> metricMap;
     Path sendRequestPath = path(getMethodName());
     String testNetworkStatsString = "http_send";
-    long connectionsMade, requestsSent, bytesSent;
 
     metricMap = fs.getInstrumentationMap();
-    long connectionsMadeBeforeTest = metricMap
-        .get(CONNECTIONS_MADE.getStatName());
-    long requestsMadeBeforeTest = metricMap.get(SEND_REQUESTS.getStatName());
-
-    /*
-     * Creating AbfsOutputStream will result in 1 connection made and 1 send
-     * request.
-     */
+    long expectedConnectionsMade = metricMap.get(CONNECTIONS_MADE.getStatName());
+    long expectedRequestsSent = metricMap.get(SEND_REQUESTS.getStatName());
+    long expectedBytesSent = 0;
+
+    // --------------------------------------------------------------------
+     // Operation: Creating AbfsOutputStream
     try (AbfsOutputStream out = createAbfsOutputStreamWithFlushEnabled(fs,
         sendRequestPath)) {
+       // Network stats calculation: For Creating AbfsOutputStream:
+       // 1 create request = 1 connection made and 1 send request
+      expectedConnectionsMade++;
+      expectedRequestsSent++;
+      // --------------------------------------------------------------------
+
+      // Operation: Write small data
+      // Network stats calculation: No additions.
+      // Data written is less than the buffer size and hence will not
+      // trigger any append request to store
       out.write(testNetworkStatsString.getBytes());
+      // --------------------------------------------------------------------
 
-      /*
-       * Flushes all outstanding data (i.e. the current unfinished packet)
-       * from the client into the service on all DataNode replicas.
-       */
+       // Operation: HFlush
+       // Flushes all outstanding data (i.e. the current unfinished packet)
+       // from the client into the service on all DataNode replicas.
       out.hflush();
-
-      metricMap = fs.getInstrumentationMap();
-
       /*
-       * Testing the network stats with 1 write operation.
-       *
-       * connections_made : (connections made above) + 2(flush).
+       * Network stats calculation:
+       * 3 possibilities here:
+       * A. As there is pending data to be written to store, this will result in:
+       *    1 append + 1 flush = 2 connections and 2 send requests
        *
-       * send_requests : (requests sent above) + 2(flush).
+       * B. If config "fs.azure.enable.small.write.optimization" is enabled, append
+       *   and flush call will be merged for small data in buffer in this test.
+       *   In which case it will be:
+       *   1 append+flush request = 1 connection and 1 send request
        *
-       * bytes_sent : bytes wrote in AbfsOutputStream.
+       * C. If the path is configured for append Blob files to be used, hflush
+       *   is a no-op. So in this case:
+       *   1 append = 1 connection and 1 send request
        */
-      long extraCalls = 0;
-      if (!fs.getAbfsStore()
-          .isAppendBlobKey(fs.makeQualified(sendRequestPath).toString())) {
-        // no network calls are made for hflush in case of appendblob
-        extraCalls++;
+      if (fs.getAbfsStore().isAppendBlobKey(fs.makeQualified(sendRequestPath).toString())
+          || (this.getConfiguration().isSmallWriteOptimizationEnabled())) {
+        expectedConnectionsMade++;
+        expectedRequestsSent++;
+      } else {
+        expectedConnectionsMade += 2;
+        expectedRequestsSent += 2;
       }
-      long expectedConnectionsMade = connectionsMadeBeforeTest + extraCalls + 2;
-      long expectedRequestsSent = requestsMadeBeforeTest + extraCalls + 2;
-      connectionsMade = assertAbfsStatistics(CONNECTIONS_MADE,
+      expectedBytesSent += testNetworkStatsString.getBytes().length;
+      // --------------------------------------------------------------------
+
+      // Assertions
+      metricMap = fs.getInstrumentationMap();
+      assertAbfsStatistics(CONNECTIONS_MADE,
           expectedConnectionsMade, metricMap);
-      requestsSent = assertAbfsStatistics(SEND_REQUESTS, expectedRequestsSent,
+      assertAbfsStatistics(SEND_REQUESTS, expectedRequestsSent,
           metricMap);
-      bytesSent = assertAbfsStatistics(AbfsStatistic.BYTES_SENT,
-          testNetworkStatsString.getBytes().length, metricMap);
+      assertAbfsStatistics(AbfsStatistic.BYTES_SENT,
+          expectedBytesSent, metricMap);
     }
 
-    // To close the AbfsOutputStream 1 connection is made and 1 request is sent.
-    connectionsMade++;
-    requestsSent++;
-
+    // --------------------------------------------------------------------
+    // Operation: AbfsOutputStream close.
+    // Network Stats calculation: 1 flush (with close) is send.
+    // 1 flush request = 1 connection and 1 send request
+    expectedConnectionsMade++;
+    expectedRequestsSent++;
+    // --------------------------------------------------------------------
 
+    // Operation: Re-create the file / create overwrite scenario
     try (AbfsOutputStream out = createAbfsOutputStreamWithFlushEnabled(fs,
         sendRequestPath)) {
-
-      // Is a file overwrite case
-      long createRequestCalls = 1;
-      long createTriggeredGFSForETag = 0;
+      /*
+       * Network Stats calculation: create overwrite
+       * There are 2 possibilities here.
+       * A. create overwrite results in 1 server call
+       *    create with overwrite=true = 1 connection and 1 send request
+       *
+       * B. If config "fs.azure.enable.conditional.create.overwrite" is enabled,
+       *    create overwrite=false (will fail in this case as file is indeed present)
+       *    + getFileStatus to fetch the file ETag
+       *    + create overwrite=true
+       *    = 3 connections and 2 send requests
+       */
       if (this.getConfiguration().isConditionalCreateOverwriteEnabled()) {
-        createRequestCalls += 1;
-        createTriggeredGFSForETag = 1;
+        expectedConnectionsMade += 3;
+        expectedRequestsSent += 2;
+      } else {
+        expectedConnectionsMade += 1;
+        expectedRequestsSent += 1;
       }
+      // --------------------------------------------------------------------
 
-      for (int i = 0; i < LARGE_OPERATIONS; i++) {
+      // Operation: Multiple small appends + hflush
+      for (int i = 0; i < WRITE_OPERATION_LOOP_COUNT; i++) {
         out.write(testNetworkStatsString.getBytes());
-
-        /*
-         * 1 flush call would create 2 connections and 2 send requests.
-         * when hflush() is called it will essentially trigger append() and
-         * flush() inside AbfsRestOperation. Both of which calls
-         * executeHttpOperation() method which creates a connection and sends
-         * requests.
-         */
+        // Network stats calculation: no-op. Small write
         out.hflush();
+        // Network stats calculation: Hflush
+        // refer to previous comments for hFlush network stats calcualtion
+        // possibilities
+        if (fs.getAbfsStore().isAppendBlobKey(fs.makeQualified(sendRequestPath).toString())
+            || (this.getConfiguration().isSmallWriteOptimizationEnabled())) {
+          expectedConnectionsMade++;
+          expectedRequestsSent++;
+        } else {
+          expectedConnectionsMade += 2;
+          expectedRequestsSent += 2;
+        }
+        expectedBytesSent += testNetworkStatsString.getBytes().length;
       }
+      // --------------------------------------------------------------------
 
+      // Assertions
       metricMap = fs.getInstrumentationMap();
-
-      /*
-       * Testing the network stats with Large amount of bytes sent.
-       *
-       * connections made : connections_made(Last assertion) + 1
-       * (AbfsOutputStream) + LARGE_OPERATIONS * 2(flush).
-       *
-       * send requests : requests_sent(Last assertion) + 1(AbfsOutputStream) +
-       * LARGE_OPERATIONS * 2(flush).
-       *
-       * bytes sent : bytes_sent(Last assertion) + LARGE_OPERATIONS * (bytes
-       * wrote each time).
-       *
-       */
-
-      connectionsMade += createRequestCalls + createTriggeredGFSForETag;
-      requestsSent += createRequestCalls;
-      if (fs.getAbfsStore().isAppendBlobKey(fs.makeQualified(sendRequestPath).toString())) {
-        // no network calls are made for hflush in case of appendblob
-        assertAbfsStatistics(CONNECTIONS_MADE,
-            connectionsMade + LARGE_OPERATIONS, metricMap);
-        assertAbfsStatistics(SEND_REQUESTS,
-            requestsSent + LARGE_OPERATIONS, metricMap);
-      } else {
-        assertAbfsStatistics(CONNECTIONS_MADE,
-            connectionsMade + LARGE_OPERATIONS * 2, metricMap);
-        assertAbfsStatistics(SEND_REQUESTS,
-            requestsSent + LARGE_OPERATIONS * 2, metricMap);
-      }
-      assertAbfsStatistics(AbfsStatistic.BYTES_SENT,
-          bytesSent + LARGE_OPERATIONS * (testNetworkStatsString.getBytes().length),
-          metricMap);
-
+      assertAbfsStatistics(CONNECTIONS_MADE, expectedConnectionsMade, metricMap);
+      assertAbfsStatistics(SEND_REQUESTS, expectedRequestsSent, metricMap);
+      assertAbfsStatistics(AbfsStatistic.BYTES_SENT, expectedBytesSent, metricMap);
     }
 
   }
@@ -185,130 +196,100 @@ public class ITestAbfsNetworkStatistics extends AbstractAbfsIntegrationTest {
     Path getResponsePath = path(getMethodName());
     Map<String, Long> metricMap;
     String testResponseString = "some response";
-    long getResponses, bytesReceived;
 
     FSDataOutputStream out = null;
     FSDataInputStream in = null;
-    try {
+    long expectedConnectionsMade;
+    long expectedGetResponses;
+    long expectedBytesReceived;
 
-      /*
-       * Creating a File and writing some bytes in it.
-       *
-       * get_response : 3(getFileSystem) + 1(OutputStream creation) + 2
-       * (Writing data in Data store).
-       *
-       */
+    try {
+      // Creating a File and writing some bytes in it.
       out = fs.create(getResponsePath);
       out.write(testResponseString.getBytes());
       out.hflush();
 
+      // Set metric baseline
       metricMap = fs.getInstrumentationMap();
-      long getResponsesBeforeTest = metricMap
-          .get(CONNECTIONS_MADE.getStatName());
+      long bytesWrittenToFile = testResponseString.getBytes().length;
+      expectedConnectionsMade = metricMap.get(CONNECTIONS_MADE.getStatName());
+      expectedGetResponses = metricMap.get(CONNECTIONS_MADE.getStatName());
+      expectedBytesReceived = metricMap.get(BYTES_RECEIVED.getStatName());
 
-      // open would require 1 get response.
+      // --------------------------------------------------------------------
+      // Operation: Create AbfsInputStream
       in = fs.open(getResponsePath);
-      // read would require 1 get response and also get the bytes received.
-      int result = in.read();
-
-      // Confirming read isn't -1.
-      LOG.info("Result of read operation : {}", result);
+      // Network stats calculation: For Creating AbfsInputStream:
+      // 1 GetFileStatus request to fetch file size = 1 connection and 1 get response
+      expectedConnectionsMade++;
+      expectedGetResponses++;
+      // --------------------------------------------------------------------
 
+      // Operation: Read
+      int result = in.read();
+      // Network stats calculation: For read:
+      // 1 read request = 1 connection and 1 get response
+      expectedConnectionsMade++;
+      expectedGetResponses++;
+      expectedBytesReceived += bytesWrittenToFile;
+      // --------------------------------------------------------------------
+
+      // Assertions
       metricMap = fs.getInstrumentationMap();
-
-      /*
-       * Testing values of statistics after writing and reading a buffer.
-       *
-       * get_responses - (above operations) + 1(open()) + 1 (read()).;
-       *
-       * bytes_received - This should be equal to bytes sent earlier.
-       */
-      long extraCalls = 0;
-      if (!fs.getAbfsStore()
-          .isAppendBlobKey(fs.makeQualified(getResponsePath).toString())) {
-        // no network calls are made for hflush in case of appendblob
-        extraCalls++;
-      }
-      long expectedGetResponses = getResponsesBeforeTest + extraCalls + 1;
-      getResponses = assertAbfsStatistics(AbfsStatistic.GET_RESPONSES,
-          expectedGetResponses, metricMap);
-
-      // Testing that bytes received is equal to bytes sent.
-      long bytesSend = metricMap.get(AbfsStatistic.BYTES_SENT.getStatName());
-      bytesReceived = assertAbfsStatistics(AbfsStatistic.BYTES_RECEIVED,
-          bytesSend,
-          metricMap);
-
+      assertAbfsStatistics(CONNECTIONS_MADE, expectedConnectionsMade, metricMap);
+      assertAbfsStatistics(GET_RESPONSES, expectedGetResponses, metricMap);
+      assertAbfsStatistics(AbfsStatistic.BYTES_RECEIVED, expectedBytesReceived, metricMap);
     } finally {
       IOUtils.cleanupWithLogger(LOG, out, in);
     }
 
-    // To close the streams 1 response is received.
-    getResponses++;
+    // --------------------------------------------------------------------
+    // Operation: AbfsOutputStream close.
+    // Network Stats calculation: no op.
+    // --------------------------------------------------------------------
 
     try {
 
-      /*
-       * Creating a file and writing buffer into it.
-       * This is a file recreate, so it will trigger
-       * 2 extra calls if create overwrite is off by default.
-       * Also recording the buffer for future read() call.
-       * This creating outputStream and writing requires 2 *
-       * (LARGE_OPERATIONS) get requests.
-       */
+      // Recreate file with different file size
+      // [Create and append related network stats checks are done in
+      // test method testAbfsHttpSendStatistics]
       StringBuilder largeBuffer = new StringBuilder();
       out = fs.create(getResponsePath);
 
-      long createRequestCalls = 1;
-      if (this.getConfiguration().isConditionalCreateOverwriteEnabled()) {
-        createRequestCalls += 2;
-      }
-
-      for (int i = 0; i < LARGE_OPERATIONS; i++) {
+      for (int i = 0; i < WRITE_OPERATION_LOOP_COUNT; i++) {
         out.write(testResponseString.getBytes());
         out.hflush();
         largeBuffer.append(testResponseString);
       }
 
-      // Open requires 1 get_response.
+      // sync back to metric baseline
+      metricMap = fs.getInstrumentationMap();
+      expectedConnectionsMade = metricMap.get(CONNECTIONS_MADE.getStatName());
+      expectedGetResponses = metricMap.get(GET_RESPONSES.getStatName());
+      // --------------------------------------------------------------------
+      // Operation: Create AbfsInputStream
       in = fs.open(getResponsePath);
-
-      /*
-       * Reading the file which was written above. This read() call would
-       * read bytes equal to the bytes that was written above.
-       * Get response would be 1 only.
-       */
-      in.read(0, largeBuffer.toString().getBytes(), 0,
-          largeBuffer.toString().getBytes().length);
-
+      // Network stats calculation: For Creating AbfsInputStream:
+      // 1 GetFileStatus for file size = 1 connection and 1 get response
+      expectedConnectionsMade++;
+      expectedGetResponses++;
+      // --------------------------------------------------------------------
+
+      // Operation: Read
+      in.read(0, largeBuffer.toString().getBytes(), 0, largeBuffer.toString().getBytes().length);
+      // Network stats calculation: Total data written is still lesser than
+      // a buffer size. Hence will trigger only one read to store. So result is:
+      // 1 read request = 1 connection and 1 get response
+      expectedConnectionsMade++;
+      expectedGetResponses++;
+      expectedBytesReceived += (WRITE_OPERATION_LOOP_COUNT * testResponseString.getBytes().length);
+      // --------------------------------------------------------------------
+
+      // Assertions
       metricMap = fs.getInstrumentationMap();
-
-      /*
-       * Testing the statistics values after writing and reading a large buffer.
-       *
-       * get_response : get_responses(Last assertion) + 1
-       * (OutputStream) + 2 * LARGE_OPERATIONS(Writing and flushing
-       * LARGE_OPERATIONS times) + 1(open()) + 1(read()) +
-       * 1 (createOverwriteTriggeredGetForeTag).
-       *
-       * bytes_received : bytes_received(Last assertion) + LARGE_OPERATIONS *
-       * bytes wrote each time (bytes_received is equal to bytes wrote in the
-       * File).
-       *
-       */
-      assertAbfsStatistics(AbfsStatistic.BYTES_RECEIVED,
-          bytesReceived + LARGE_OPERATIONS * (testResponseString.getBytes().length),
-          metricMap);
-      if (fs.getAbfsStore().isAppendBlobKey(fs.makeQualified(getResponsePath).toString())) {
-        // no network calls are made for hflush in case of appendblob
-        assertAbfsStatistics(AbfsStatistic.GET_RESPONSES,
-            getResponses + 3 + LARGE_OPERATIONS, metricMap);
-      } else {
-        assertAbfsStatistics(AbfsStatistic.GET_RESPONSES,
-            getResponses + 2 + createRequestCalls + 2 * LARGE_OPERATIONS,
-            metricMap);
-      }
-
+      assertAbfsStatistics(CONNECTIONS_MADE, expectedConnectionsMade, metricMap);
+      assertAbfsStatistics(GET_RESPONSES, expectedGetResponses, metricMap);
+      assertAbfsStatistics(AbfsStatistic.BYTES_RECEIVED, expectedBytesReceived, metricMap);
     } finally {
       IOUtils.cleanupWithLogger(LOG, out, in);
     }
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestSmallWriteOptimization.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestSmallWriteOptimization.java
new file mode 100644
index 0000000..fce2b68
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestSmallWriteOptimization.java
@@ -0,0 +1,523 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs;
+
+import java.util.Arrays;
+import java.util.Random;
+import java.util.UUID;
+import java.util.Map;
+import java.io.IOException;
+
+import org.assertj.core.api.Assertions;
+import org.junit.Assume;
+import org.junit.runners.Parameterized;
+import org.junit.runner.RunWith;
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys;
+
+import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.BYTES_SENT;
+import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.CONNECTIONS_MADE;
+import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.SEND_REQUESTS;
+import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_AZURE_ENABLE_SMALL_WRITE_OPTIMIZATION;
+import static org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.FS_AZURE_TEST_APPENDBLOB_ENABLED;
+
+/**
+ * Test combination for small writes with flush and close operations.
+ * This test class formulates an append test flow to assert on various scenarios.
+ * Test stages:
+ * 1. Pre-create test file of required size. This is determined by
+ * startingFileSize parameter. If it is 0, then pre-creation is skipped.
+ *
+ * 2. Formulate an append loop or iteration. An iteration, will do N writes
+ * (determined by numOfClientWrites parameter) with each writing X bytes
+ * (determined by recurringClientWriteSize parameter).
+ *
+ * 3. Determine total number of append iterations needed by a test.
+ * If intention is to close the outputStream right after append, setting
+ * directCloseTest parameter will determine 1 append test iteration with an
+ * ending close.
+ * Else, it will execute TEST_FLUSH_ITERATION number of test iterations, with
+ * each doing appends, hflush/hsync and then close.
+ *
+ * 4. Execute test iterations with asserts on number of store requests made and
+ * validating file content.
+ */
+@RunWith(Parameterized.class)
+public class ITestSmallWriteOptimization extends AbstractAbfsScaleTest {
+  private static final int ONE_MB = 1024 * 1024;
+  private static final int TWO_MB = 2 * ONE_MB;
+  private static final int TEST_BUFFER_SIZE = TWO_MB;
+  private static final int HALF_TEST_BUFFER_SIZE = TWO_MB / 2;
+  private static final int QUARTER_TEST_BUFFER_SIZE = TWO_MB / 4;
+  private static final int TEST_FLUSH_ITERATION = 2;
+
+  @Parameterized.Parameter
+  public String testScenario;
+
+  @Parameterized.Parameter(1)
+  public boolean enableSmallWriteOptimization;
+
+  /**
+   * If true, will initiate close after appends. (That is, no explicit hflush or
+   * hsync calls will be made from client app.)
+   */
+  @Parameterized.Parameter(2)
+  public boolean directCloseTest;
+
+  /**
+   * If non-zero, test file should be created as pre-requisite with this size.
+   */
+  @Parameterized.Parameter(3)
+  public Integer startingFileSize;
+
+  /**
+   * Determines the write sizes to be issued by client app.
+   */
+  @Parameterized.Parameter(4)
+  public Integer recurringClientWriteSize;
+
+  /**
+   * Determines the number of Client writes to make.
+   */
+  @Parameterized.Parameter(5)
+  public Integer numOfClientWrites;
+
+  /**
+   * True, if the small write optimization is supposed to be effective in
+   * the scenario.
+   */
+  @Parameterized.Parameter(6)
+  public boolean flushExpectedToBeMergedWithAppend;
+
+  @Parameterized.Parameters(name = "{0}")
+  public static Iterable<Object[]> params() {
+    return Arrays.asList(
+        // Parameter Order :
+        // testScenario,
+        // enableSmallWriteOptimization, directCloseTest, startingFileSize,
+        // recurringClientWriteSize, numOfClientWrites, flushExpectedToBeMergedWithAppend
+        new Object[][]{
+            // Buffer Size Write tests
+            { "OptmON_FlushCloseTest_EmptyFile_BufferSizeWrite",
+                true, false, 0, TEST_BUFFER_SIZE, 1, false
+            },
+            {   "OptmON_FlushCloseTest_NonEmptyFile_BufferSizeWrite",
+                true, false, 2 * TEST_BUFFER_SIZE, TEST_BUFFER_SIZE, 1, false
+            },
+            {   "OptmON_CloseTest_EmptyFile_BufferSizeWrite",
+                true, true, 0, TEST_BUFFER_SIZE, 1, false
+            },
+            {   "OptmON_CloseTest_NonEmptyFile_BufferSizeWrite",
+                true, true, 2 * TEST_BUFFER_SIZE, TEST_BUFFER_SIZE, 1, false
+            },
+            {   "OptmOFF_FlushCloseTest_EmptyFile_BufferSizeWrite",
+                false, false, 0, TEST_BUFFER_SIZE, 1, false
+            },
+            {   "OptmOFF_FlushCloseTest_NonEmptyFile_BufferSizeWrite",
+                false, false, 2 * TEST_BUFFER_SIZE, TEST_BUFFER_SIZE, 1, false
+            },
+            {   "OptmOFF_CloseTest_EmptyFile_BufferSizeWrite",
+                false, true, 0, TEST_BUFFER_SIZE, 1, false
+            },
+            {   "OptmOFF_CloseTest_NonEmptyFile_BufferSizeWrite",
+                false, true, 2 * TEST_BUFFER_SIZE, TEST_BUFFER_SIZE, 1, false
+            },
+            // Less than buffer size write tests
+            {   "OptmON_FlushCloseTest_EmptyFile_LessThanBufferSizeWrite",
+                true, false, 0, Math.abs(HALF_TEST_BUFFER_SIZE), 1, true
+            },
+            {   "OptmON_FlushCloseTest_NonEmptyFile_LessThanBufferSizeWrite",
+                true, false, 2 * TEST_BUFFER_SIZE,
+                Math.abs(HALF_TEST_BUFFER_SIZE), 1, true
+            },
+            {   "OptmON_CloseTest_EmptyFile_LessThanBufferSizeWrite",
+                true, true, 0, Math.abs(HALF_TEST_BUFFER_SIZE), 1, true
+            },
+            {   "OptmON_CloseTest_NonEmptyFile_LessThanBufferSizeWrite",
+                true, true, 2 * TEST_BUFFER_SIZE,
+                Math.abs(HALF_TEST_BUFFER_SIZE), 1, true
+            },
+            {   "OptmOFF_FlushCloseTest_EmptyFile_LessThanBufferSizeWrite",
+                false, false, 0, Math.abs(HALF_TEST_BUFFER_SIZE), 1, false
+            },
+            {   "OptmOFF_FlushCloseTest_NonEmptyFile_LessThanBufferSizeWrite",
+                false, false, 2 * TEST_BUFFER_SIZE,
+                Math.abs(HALF_TEST_BUFFER_SIZE), 1, false
+            },
+            {   "OptmOFF_CloseTest_EmptyFile_LessThanBufferSizeWrite",
+                false, true, 0, Math.abs(HALF_TEST_BUFFER_SIZE), 1, false
+            },
+            {   "OptmOFF_CloseTest_NonEmptyFile_LessThanBufferSizeWrite",
+                false, true, 2 * TEST_BUFFER_SIZE,
+                Math.abs(HALF_TEST_BUFFER_SIZE), 1, false
+            },
+            // Multiple small writes still less than buffer size
+            {   "OptmON_FlushCloseTest_EmptyFile_MultiSmallWritesStillLessThanBufferSize",
+                true, false, 0, Math.abs(QUARTER_TEST_BUFFER_SIZE), 3, true
+            },
+            {   "OptmON_FlushCloseTest_NonEmptyFile_MultiSmallWritesStillLessThanBufferSize",
+                true, false, 2 * TEST_BUFFER_SIZE,
+                Math.abs(QUARTER_TEST_BUFFER_SIZE), 3, true
+            },
+            {   "OptmON_CloseTest_EmptyFile_MultiSmallWritesStillLessThanBufferSize",
+                true, true, 0, Math.abs(QUARTER_TEST_BUFFER_SIZE), 3, true
+            },
+            {   "OptmON_CloseTest_NonEmptyFile_MultiSmallWritesStillLessThanBufferSize",
+                true, true, 2 * TEST_BUFFER_SIZE,
+                Math.abs(QUARTER_TEST_BUFFER_SIZE), 3, true
+            },
+            {   "OptmOFF_FlushCloseTest_EmptyFile_MultiSmallWritesStillLessThanBufferSize",
+                false, false, 0, Math.abs(QUARTER_TEST_BUFFER_SIZE), 3, false
+            },
+            {   "OptmOFF_FlushCloseTest_NonEmptyFile_MultiSmallWritesStillLessThanBufferSize",
+                false, false, 2 * TEST_BUFFER_SIZE,
+                Math.abs(QUARTER_TEST_BUFFER_SIZE), 3, false
+            },
+            {   "OptmOFF_CloseTest_EmptyFile_MultiSmallWritesStillLessThanBufferSize",
+                false, true, 0, Math.abs(QUARTER_TEST_BUFFER_SIZE), 3, false
+            },
+            {   "OptmOFF_CloseTest_NonEmptyFile_MultiSmallWritesStillLessThanBufferSize",
+                false, true, 2 * TEST_BUFFER_SIZE,
+                Math.abs(QUARTER_TEST_BUFFER_SIZE), 3, false
+            },
+            // Multiple full buffer writes
+            {   "OptmON_FlushCloseTest_EmptyFile_MultiBufferSizeWrite",
+                true, false, 0, TEST_BUFFER_SIZE, 3, false
+            },
+            {   "OptmON_FlushCloseTest_NonEmptyFile_MultiBufferSizeWrite",
+                true, false, 2 * TEST_BUFFER_SIZE, TEST_BUFFER_SIZE, 3, false
+            },
+            {   "OptmON_CloseTest_EmptyFile_MultiBufferSizeWrite",
+                true, true, 0, TEST_BUFFER_SIZE, 3, false
+            },
+            {   "OptmON_CloseTest_NonEmptyFile_MultiBufferSizeWrite",
+                true, true, 2 * TEST_BUFFER_SIZE, TEST_BUFFER_SIZE, 3, false
+            },
+            {   "OptmOFF_FlushCloseTest_EmptyFile_MultiBufferSizeWrite",
+                false, false, 0, TEST_BUFFER_SIZE, 3, false
+            },
+            {   "OptmOFF_FlushCloseTest_NonEmptyFile_MultiBufferSizeWrite",
+                false, false, 2 * TEST_BUFFER_SIZE, TEST_BUFFER_SIZE, 3, false
+            },
+            {   "OptmOFF_CloseTest_EmptyFile_MultiBufferSizeWrite",
+                false, true, 0, TEST_BUFFER_SIZE, 3, false
+            },
+            {   "OptmOFF_CloseTest_NonEmptyFile_MultiBufferSizeWrite",
+                false, true, 2 * TEST_BUFFER_SIZE, TEST_BUFFER_SIZE, 3, false
+            },
+            // Multiple full buffers triggered and data less than buffer size pending
+            {   "OptmON_FlushCloseTest_EmptyFile_BufferAndExtraWrite",
+                true, false, 0,
+                TEST_BUFFER_SIZE + Math.abs(QUARTER_TEST_BUFFER_SIZE),
+                3, false
+            },
+            {   "OptmON_FlushCloseTest_NonEmptyFile_BufferAndExtraWrite",
+                true, false, 2 * TEST_BUFFER_SIZE,
+                TEST_BUFFER_SIZE + Math.abs(QUARTER_TEST_BUFFER_SIZE),
+                3, false
+            },
+            {   "OptmON_CloseTest_EmptyFile__BufferAndExtraWrite",
+                true, true, 0,
+                TEST_BUFFER_SIZE + Math.abs(QUARTER_TEST_BUFFER_SIZE),
+                3, false
+            },
+            {   "OptmON_CloseTest_NonEmptyFile_BufferAndExtraWrite",
+                true, true, 2 * TEST_BUFFER_SIZE,
+                TEST_BUFFER_SIZE + Math.abs(QUARTER_TEST_BUFFER_SIZE),
+                3, false
+            },
+            {   "OptmOFF_FlushCloseTest_EmptyFile_BufferAndExtraWrite",
+                false, false, 0,
+                TEST_BUFFER_SIZE + Math.abs(QUARTER_TEST_BUFFER_SIZE),
+                3, false
+            },
+            {   "OptmOFF_FlushCloseTest_NonEmptyFile_BufferAndExtraWrite",
+                false, false, 2 * TEST_BUFFER_SIZE,
+                TEST_BUFFER_SIZE + Math.abs(QUARTER_TEST_BUFFER_SIZE),
+                3, false
+            },
+            {   "OptmOFF_CloseTest_EmptyFile_BufferAndExtraWrite",
+                false, true, 0,
+                TEST_BUFFER_SIZE + Math.abs(QUARTER_TEST_BUFFER_SIZE),
+                3, false
+            },
+            {   "OptmOFF_CloseTest_NonEmptyFile_BufferAndExtraWrite",
+                false, true, 2 * TEST_BUFFER_SIZE,
+                TEST_BUFFER_SIZE + Math.abs(QUARTER_TEST_BUFFER_SIZE),
+                3, false
+            },
+            // 0 byte tests
+            {   "OptmON_FlushCloseTest_EmptyFile_0ByteWrite",
+                true, false, 0, 0, 1, false
+            },
+            {   "OptmON_FlushCloseTest_NonEmptyFile_0ByteWrite",
+                true, false, 2 * TEST_BUFFER_SIZE, 0, 1, false
+            },
+            {   "OptmON_CloseTest_EmptyFile_0ByteWrite",
+                true, true, 0, 0, 1, false
+            },
+            {   "OptmON_CloseTest_NonEmptyFile_0ByteWrite",
+                true, true, 2 * TEST_BUFFER_SIZE, 0, 1, false
+            },
+            {   "OptmOFF_FlushCloseTest_EmptyFile_0ByteWrite",
+                false, false, 0, 0, 1, false
+            },
+            {   "OptmOFF_FlushCloseTest_NonEmptyFile_0ByteWrite",
+                false, false, 2 * TEST_BUFFER_SIZE, 0, 1, false
+            },
+            {   "OptmOFF_CloseTest_EmptyFile_0ByteWrite",
+                false, true, 0, 0, 1, false
+            },
+            {   "OptmOFF_CloseTest_NonEmptyFile_0ByteWrite",
+                false, true, 2 * TEST_BUFFER_SIZE, 0, 1, false
+            },
+        });
+  }
+  public ITestSmallWriteOptimization() throws Exception {
+    super();
+  }
+
+  @Test
+  public void testSmallWriteOptimization()
+      throws IOException {
+    boolean serviceDefaultOptmSettings = DEFAULT_AZURE_ENABLE_SMALL_WRITE_OPTIMIZATION;
+    // Tests with Optimization should only run if service has the feature on by
+    // default. Default settings will be turned on when server support is
+    // available on all store prod regions.
+    if (enableSmallWriteOptimization) {
+      Assume.assumeTrue(serviceDefaultOptmSettings);
+    }
+
+    final AzureBlobFileSystem currentfs = this.getFileSystem();
+    Configuration config = currentfs.getConf();
+    boolean isAppendBlobTestSettingEnabled = (config.get(FS_AZURE_TEST_APPENDBLOB_ENABLED) == "true");
+
+    // This optimization doesnt take effect when append blob is on.
+    Assume.assumeFalse(isAppendBlobTestSettingEnabled);
+
+    config.set(ConfigurationKeys.AZURE_WRITE_BUFFER_SIZE, Integer.toString(TEST_BUFFER_SIZE));
+    config.set(ConfigurationKeys.AZURE_ENABLE_SMALL_WRITE_OPTIMIZATION, Boolean.toString(enableSmallWriteOptimization));
+    final AzureBlobFileSystem fs = (AzureBlobFileSystem) FileSystem.get(
+        currentfs.getUri(), config);
+
+    formulateSmallWriteTestAppendPattern(fs, startingFileSize,
+        recurringClientWriteSize, numOfClientWrites,
+        directCloseTest, flushExpectedToBeMergedWithAppend);
+  }
+
+  /**
+   * if isDirectCloseTest == true, append + close is triggered
+   * if isDirectCloseTest == false, append + flush runs are repeated over
+   * iterations followed by close
+   * @param fs
+   * @param startingFileSize
+   * @param recurringWriteSize
+   * @param numOfWrites
+   * @param isDirectCloseTest
+   * @throws IOException
+   */
+  private void formulateSmallWriteTestAppendPattern(final AzureBlobFileSystem fs,
+      int startingFileSize,
+      int recurringWriteSize,
+      int numOfWrites,
+      boolean isDirectCloseTest,
+      boolean flushExpectedToBeMergedWithAppend) throws IOException {
+
+    int totalDataToBeAppended = 0;
+    int testIteration = 0;
+    int dataWrittenPerIteration = (numOfWrites * recurringWriteSize);
+
+    if (isDirectCloseTest) {
+      totalDataToBeAppended = dataWrittenPerIteration;
+      testIteration = 1;
+    } else {
+      testIteration = TEST_FLUSH_ITERATION;
+      totalDataToBeAppended = testIteration * dataWrittenPerIteration;
+    }
+
+    int totalFileSize = totalDataToBeAppended + startingFileSize;
+    // write buffer of file size created. This will be used as write
+    // source and for file content validation
+    final byte[] writeBuffer = new byte[totalFileSize];
+    new Random().nextBytes(writeBuffer);
+    int writeBufferCursor = 0;
+
+    Path testPath = new Path(getMethodName() + UUID.randomUUID().toString());
+    FSDataOutputStream opStream;
+
+    if (startingFileSize > 0) {
+      writeBufferCursor += createFileWithStartingTestSize(fs, writeBuffer, writeBufferCursor, testPath,
+          startingFileSize);
+      opStream = fs.append(testPath);
+    } else {
+      opStream = fs.create(testPath);
+    }
+
+    final int writeBufferSize = fs.getAbfsStore()
+        .getAbfsConfiguration()
+        .getWriteBufferSize();
+    long expectedTotalRequestsMade = fs.getInstrumentationMap()
+        .get(CONNECTIONS_MADE.getStatName());
+    long expectedRequestsMadeWithData = fs.getInstrumentationMap()
+        .get(SEND_REQUESTS.getStatName());
+    long expectedBytesSent = fs.getInstrumentationMap()
+        .get(BYTES_SENT.getStatName());
+
+    while (testIteration > 0) {
+      // trigger recurringWriteSize appends over numOfWrites
+      writeBufferCursor += executeWritePattern(opStream, writeBuffer,
+          writeBufferCursor, numOfWrites, recurringWriteSize);
+
+      int numOfBuffersWrittenToStore = (int) Math.floor(
+          dataWrittenPerIteration / writeBufferSize);
+      int dataSizeWrittenToStore = numOfBuffersWrittenToStore * writeBufferSize;
+      int pendingDataToStore = dataWrittenPerIteration - dataSizeWrittenToStore;
+
+      expectedTotalRequestsMade += numOfBuffersWrittenToStore;
+      expectedRequestsMadeWithData += numOfBuffersWrittenToStore;
+      expectedBytesSent += dataSizeWrittenToStore;
+
+      if (isDirectCloseTest) {
+        opStream.close();
+      } else {
+        opStream.hflush();
+      }
+
+      boolean wasDataPendingToBeWrittenToServer = (pendingDataToStore > 0);
+      // Small write optimization will only work if
+      // a. config for small write optimization is on
+      // b. no buffer writes have been triggered since last flush
+      // c. there is some pending data in buffer to write to store
+      final boolean smallWriteOptimizationEnabled = fs.getAbfsStore()
+          .getAbfsConfiguration()
+          .isSmallWriteOptimizationEnabled();
+      boolean flushWillBeMergedWithAppend = smallWriteOptimizationEnabled
+          && (numOfBuffersWrittenToStore == 0)
+          && (wasDataPendingToBeWrittenToServer);
+
+      Assertions.assertThat(flushWillBeMergedWithAppend)
+          .describedAs(flushExpectedToBeMergedWithAppend
+              ? "Flush was to be merged with Append"
+              : "Flush should not have been merged with Append")
+          .isEqualTo(flushExpectedToBeMergedWithAppend);
+
+      int totalAppendFlushCalls = (flushWillBeMergedWithAppend
+          ? 1 // 1 append (with flush and close param)
+          : (wasDataPendingToBeWrittenToServer)
+              ? 2 // 1 append + 1 flush (with close)
+              : 1); // 1 flush (with close)
+
+      expectedTotalRequestsMade += totalAppendFlushCalls;
+      expectedRequestsMadeWithData += totalAppendFlushCalls;
+      expectedBytesSent += wasDataPendingToBeWrittenToServer
+          ? pendingDataToStore
+          : 0;
+
+      assertOpStats(fs.getInstrumentationMap(), expectedTotalRequestsMade,
+          expectedRequestsMadeWithData, expectedBytesSent);
+
+      if (isDirectCloseTest) {
+        // stream already closed
+        validateStoreAppends(fs, testPath, totalFileSize, writeBuffer);
+        return;
+      }
+
+      testIteration--;
+    }
+
+    opStream.close();
+    expectedTotalRequestsMade += 1;
+    expectedRequestsMadeWithData += 1;
+    // no change in expectedBytesSent
+    assertOpStats(fs.getInstrumentationMap(), expectedTotalRequestsMade, expectedRequestsMadeWithData, expectedBytesSent);
+
+    validateStoreAppends(fs, testPath, totalFileSize, writeBuffer);
+  }
+
+  private int createFileWithStartingTestSize(AzureBlobFileSystem fs, byte[] writeBuffer,
+      int writeBufferCursor, Path testPath, int startingFileSize)
+      throws IOException {
+    FSDataOutputStream opStream = fs.create(testPath);
+    writeBufferCursor += executeWritePattern(opStream,
+        writeBuffer,
+        writeBufferCursor,
+        1,
+        startingFileSize);
+
+    opStream.close();
+    Assertions.assertThat(fs.getFileStatus(testPath).getLen())
+        .describedAs("File should be of size %d at the start of test.",
+            startingFileSize)
+        .isEqualTo(startingFileSize);
+
+    return writeBufferCursor;
+  }
+
+  private void validateStoreAppends(AzureBlobFileSystem fs,
+      Path testPath,
+      int totalFileSize,
+      byte[] bufferWritten)
+      throws IOException {
+    // Final validation
+    Assertions.assertThat(fs.getFileStatus(testPath).getLen())
+        .describedAs("File should be of size %d at the end of test.",
+            totalFileSize)
+        .isEqualTo(totalFileSize);
+
+    byte[] fileReadFromStore = new byte[totalFileSize];
+    fs.open(testPath).read(fileReadFromStore, 0, totalFileSize);
+
+    assertArrayEquals("Test file content incorrect", bufferWritten,
+        fileReadFromStore);
+  }
+
+  private void assertOpStats(Map<String, Long> metricMap,
+      long expectedTotalRequestsMade,
+      long expectedRequestsMadeWithData,
+      long expectedBytesSent) {
+    assertAbfsStatistics(CONNECTIONS_MADE, expectedTotalRequestsMade,
+        metricMap);
+    assertAbfsStatistics(SEND_REQUESTS, expectedRequestsMadeWithData,
+        metricMap);
+    assertAbfsStatistics(BYTES_SENT, expectedBytesSent, metricMap);
+  }
+
+  private int executeWritePattern(FSDataOutputStream opStream,
+      byte[] buffer,
+      int startOffset,
+      int writeLoopCount,
+      int writeSize)
+      throws IOException {
+    int dataSizeWritten = startOffset;
+
+    while (writeLoopCount > 0) {
+      opStream.write(buffer, startOffset, writeSize);
+      startOffset += writeSize;
+      writeLoopCount--;
+    }
+
+    dataSizeWritten = startOffset - dataSizeWritten;
+    return dataSizeWritten;
+  }
+}
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsOutputStream.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsOutputStream.java
index 7f91116..fff0051 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsOutputStream.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsOutputStream.java
@@ -44,10 +44,16 @@ public class ITestAbfsOutputStream extends AbstractAbfsIntegrationTest {
     final AzureBlobFileSystem fs = getFileSystem(conf);
     try (FSDataOutputStream out = fs.create(TEST_FILE_PATH)) {
     AbfsOutputStream stream = (AbfsOutputStream) out.getWrappedStream();
+
+      int maxConcurrentRequests
+          = getConfiguration().getWriteMaxConcurrentRequestCount();
+      if (stream.isAppendBlobStream()) {
+        maxConcurrentRequests = 1;
+      }
+
     Assertions.assertThat(stream.getMaxConcurrentRequestCount()).describedAs(
-        "maxConcurrentRequests should be " + getConfiguration()
-            .getWriteMaxConcurrentRequestCount())
-        .isEqualTo(getConfiguration().getWriteMaxConcurrentRequestCount());
+        "maxConcurrentRequests should be " + maxConcurrentRequests)
+        .isEqualTo(maxConcurrentRequests);
     Assertions.assertThat(stream.getMaxRequestsThatCanBeQueued()).describedAs(
         "maxRequestsToQueue should be " + getConfiguration()
             .getMaxWriteRequestsToQueue())
@@ -67,6 +73,11 @@ public class ITestAbfsOutputStream extends AbstractAbfsIntegrationTest {
     final AzureBlobFileSystem fs = getFileSystem(conf);
     FSDataOutputStream out = fs.create(TEST_FILE_PATH);
     AbfsOutputStream stream = (AbfsOutputStream) out.getWrappedStream();
+
+    if (stream.isAppendBlobStream()) {
+      maxConcurrentRequests = 1;
+    }
+
     Assertions.assertThat(stream.getMaxConcurrentRequestCount())
         .describedAs("maxConcurrentRequests should be " + maxConcurrentRequests)
         .isEqualTo(maxConcurrentRequests);
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsOutputStream.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsOutputStream.java
index aab0248..1e6b8ef 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsOutputStream.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsOutputStream.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.fs.azurebfs.services;
 
 import java.io.IOException;
 import java.util.Arrays;
-import java.util.HashSet;
 import java.util.Random;
 
 import org.junit.Test;
@@ -28,19 +27,22 @@ import org.junit.Test;
 import org.mockito.ArgumentCaptor;
 
 import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
+import org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters;
 import org.apache.hadoop.conf.Configuration;
 
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.ArgumentMatchers.refEq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.anyInt;
 import static org.mockito.Mockito.anyBoolean;
 import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.anyString;
 import static org.mockito.Mockito.anyLong;
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters.Mode.APPEND_MODE;
 
 public final class TestAbfsOutputStream {
 
@@ -83,22 +85,15 @@ public final class TestAbfsOutputStream {
     abfsConf = new AbfsConfiguration(conf, accountName1);
     AbfsPerfTracker tracker = new AbfsPerfTracker("test", accountName1, abfsConf);
     when(client.getAbfsPerfTracker()).thenReturn(tracker);
-    when(client.append(anyString(), anyLong(), any(byte[].class), anyInt(), anyInt(), any(), anyBoolean())).thenReturn(op);
+    when(client.append(anyString(), any(byte[].class), any(AppendRequestParameters.class), any())).thenReturn(op);
     when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), any())).thenReturn(op);
 
-    AbfsOutputStream out = new AbfsOutputStream(client, null, PATH, 0, populateAbfsOutputStreamContext(BUFFER_SIZE, true, false, false));
+    AbfsOutputStream out = new AbfsOutputStream(client, null, PATH, 0,
+        populateAbfsOutputStreamContext(BUFFER_SIZE, true, false, false));
     final byte[] b = new byte[WRITE_SIZE];
     new Random().nextBytes(b);
     out.write(b);
     out.hsync();
-    ArgumentCaptor<String> acString = ArgumentCaptor.forClass(String.class);
-    ArgumentCaptor<Long> acLong = ArgumentCaptor.forClass(Long.class);
-    ArgumentCaptor<Integer> acBufferOffset = ArgumentCaptor.forClass(Integer.class);
-    ArgumentCaptor<Integer> acBufferLength = ArgumentCaptor.forClass(Integer.class);
-    ArgumentCaptor<byte[]> acByteArray = ArgumentCaptor.forClass(byte[].class);
-    ArgumentCaptor<Boolean> acAppendBlobAppend = ArgumentCaptor.forClass(Boolean.class);
-    ArgumentCaptor<String> acSASToken = ArgumentCaptor.forClass(String.class);
-
 
     final byte[] b1 = new byte[2*WRITE_SIZE];
     new Random().nextBytes(b1);
@@ -108,13 +103,18 @@ public final class TestAbfsOutputStream {
 
     out.hsync();
 
-    verify(client, times(2)).append(acString.capture(), acLong.capture(), acByteArray.capture(), acBufferOffset.capture(), acBufferLength.capture(),
-                                    acSASToken.capture(), acAppendBlobAppend.capture());
-    assertThat(Arrays.asList(PATH, PATH)).describedAs("Path of the requests").isEqualTo(acString.getAllValues());
-    assertThat(Arrays.asList(Long.valueOf(0), Long.valueOf(WRITE_SIZE))).describedAs("Write Position").isEqualTo(acLong.getAllValues());
-    assertThat(Arrays.asList(0, 0)).describedAs("Buffer Offset").isEqualTo(acBufferOffset.getAllValues());
-    assertThat(Arrays.asList(WRITE_SIZE, 2*WRITE_SIZE)).describedAs("Buffer length").isEqualTo(acBufferLength.getAllValues());
-
+    AppendRequestParameters firstReqParameters = new AppendRequestParameters(
+        0, 0, WRITE_SIZE, APPEND_MODE, false);
+    AppendRequestParameters secondReqParameters = new AppendRequestParameters(
+        WRITE_SIZE, 0, 2 * WRITE_SIZE, APPEND_MODE, false);
+
+    verify(client, times(1)).append(
+        eq(PATH), any(byte[].class), refEq(firstReqParameters), any());
+    verify(client, times(1)).append(
+        eq(PATH), any(byte[].class), refEq(secondReqParameters), any());
+    // confirm there were only 2 invocations in all
+    verify(client, times(2)).append(
+        eq(PATH), any(byte[].class), any(), any());
   }
 
   /**
@@ -132,10 +132,11 @@ public final class TestAbfsOutputStream {
     AbfsPerfTracker tracker = new AbfsPerfTracker("test", accountName1, abfsConf);
 
     when(client.getAbfsPerfTracker()).thenReturn(tracker);
-    when(client.append(anyString(), anyLong(), any(byte[].class), anyInt(), anyInt(), any(), anyBoolean())).thenReturn(op);
+    when(client.append(anyString(), any(byte[].class), any(AppendRequestParameters.class), any())).thenReturn(op);
     when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), any())).thenReturn(op);
 
-    AbfsOutputStream out = new AbfsOutputStream(client, null, PATH, 0, populateAbfsOutputStreamContext(BUFFER_SIZE, true, false, false));
+    AbfsOutputStream out = new AbfsOutputStream(client, null, PATH, 0,
+        populateAbfsOutputStreamContext(BUFFER_SIZE, true, false, false));
     final byte[] b = new byte[WRITE_SIZE];
     new Random().nextBytes(b);
 
@@ -144,33 +145,29 @@ public final class TestAbfsOutputStream {
     }
     out.close();
 
-    ArgumentCaptor<String> acString = ArgumentCaptor.forClass(String.class);
-    ArgumentCaptor<Long> acLong = ArgumentCaptor.forClass(Long.class);
-    ArgumentCaptor<Integer> acBufferOffset = ArgumentCaptor.forClass(Integer.class);
-    ArgumentCaptor<Integer> acBufferLength = ArgumentCaptor.forClass(Integer.class);
-    ArgumentCaptor<byte[]> acByteArray = ArgumentCaptor.forClass(byte[].class);
-    ArgumentCaptor<Boolean> acAppendBlobAppend = ArgumentCaptor.forClass(Boolean.class);
-    ArgumentCaptor<String> acSASToken = ArgumentCaptor.forClass(String.class);
-
-    verify(client, times(2)).append(acString.capture(), acLong.capture(), acByteArray.capture(), acBufferOffset.capture(), acBufferLength.capture(),
-                                    acSASToken.capture(), acAppendBlobAppend.capture());
-    assertThat(Arrays.asList(PATH, PATH)).describedAs("Path").isEqualTo(acString.getAllValues());
-    assertThat(new HashSet<Long>(Arrays.asList(Long.valueOf(0), Long.valueOf(BUFFER_SIZE)))).describedAs("Position").isEqualTo(new HashSet<Long>(
-               acLong.getAllValues()));
-    assertThat(Arrays.asList(0, 0)).describedAs("Buffer Offset").isEqualTo(acBufferOffset.getAllValues());
-    assertThat(new HashSet<Integer>(Arrays.asList(BUFFER_SIZE, 5*WRITE_SIZE-BUFFER_SIZE))).describedAs("Buffer Length").isEqualTo(new HashSet<Integer>(
-               acBufferLength.getAllValues()));
-
-    ArgumentCaptor<String> acFlushString = ArgumentCaptor.forClass(String.class);
-    ArgumentCaptor<Long> acFlushLong = ArgumentCaptor.forClass(Long.class);
+    AppendRequestParameters firstReqParameters = new AppendRequestParameters(
+        0, 0, BUFFER_SIZE, APPEND_MODE, false);
+    AppendRequestParameters secondReqParameters = new AppendRequestParameters(
+        BUFFER_SIZE, 0, 5*WRITE_SIZE-BUFFER_SIZE, APPEND_MODE, false);
+
+    verify(client, times(1)).append(
+        eq(PATH), any(byte[].class), refEq(firstReqParameters), any());
+    verify(client, times(1)).append(
+        eq(PATH), any(byte[].class), refEq(secondReqParameters), any());
+    // confirm there were only 2 invocations in all
+    verify(client, times(2)).append(
+        eq(PATH), any(byte[].class), any(), any());
+
+    ArgumentCaptor<String> acFlushPath = ArgumentCaptor.forClass(String.class);
+    ArgumentCaptor<Long> acFlushPosition = ArgumentCaptor.forClass(Long.class);
     ArgumentCaptor<Boolean> acFlushRetainUnCommittedData = ArgumentCaptor.forClass(Boolean.class);
     ArgumentCaptor<Boolean> acFlushClose = ArgumentCaptor.forClass(Boolean.class);
     ArgumentCaptor<String> acFlushSASToken = ArgumentCaptor.forClass(String.class);
 
-    verify(client, times(1)).flush(acFlushString.capture(), acFlushLong.capture(), acFlushRetainUnCommittedData.capture(), acFlushClose.capture(),
-                                   acFlushSASToken.capture());
-    assertThat(Arrays.asList(PATH)).describedAs("path").isEqualTo(acFlushString.getAllValues());
-    assertThat(Arrays.asList(Long.valueOf(5*WRITE_SIZE))).describedAs("position").isEqualTo(acFlushLong.getAllValues());
+    verify(client, times(1)).flush(acFlushPath.capture(), acFlushPosition.capture(), acFlushRetainUnCommittedData.capture(), acFlushClose.capture(),
+        acFlushSASToken.capture());
+    assertThat(Arrays.asList(PATH)).describedAs("path").isEqualTo(acFlushPath.getAllValues());
+    assertThat(Arrays.asList(Long.valueOf(5*WRITE_SIZE))).describedAs("position").isEqualTo(acFlushPosition.getAllValues());
     assertThat(Arrays.asList(false)).describedAs("RetainUnCommittedData flag").isEqualTo(acFlushRetainUnCommittedData.getAllValues());
     assertThat(Arrays.asList(true)).describedAs("Close flag").isEqualTo(acFlushClose.getAllValues());
   }
@@ -191,12 +188,13 @@ public final class TestAbfsOutputStream {
     AbfsPerfTracker tracker = new AbfsPerfTracker("test", accountName1, abfsConf);
 
     when(client.getAbfsPerfTracker()).thenReturn(tracker);
-    when(client.append(anyString(), anyLong(), any(byte[].class), anyInt(), anyInt(), any(), anyBoolean())).thenReturn(op);
+    when(client.append(anyString(), any(byte[].class), any(AppendRequestParameters.class), any())).thenReturn(op);
     when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), any())).thenReturn(op);
     when(op.getSasToken()).thenReturn("testToken");
     when(op.getResult()).thenReturn(httpOp);
 
-    AbfsOutputStream out = new AbfsOutputStream(client, null, PATH, 0, populateAbfsOutputStreamContext(BUFFER_SIZE, true, false, false));
+    AbfsOutputStream out = new AbfsOutputStream(client, null, PATH, 0,
+        populateAbfsOutputStreamContext(BUFFER_SIZE, true, false, false));
     final byte[] b = new byte[BUFFER_SIZE];
     new Random().nextBytes(b);
 
@@ -205,35 +203,31 @@ public final class TestAbfsOutputStream {
     }
     out.close();
 
-    ArgumentCaptor<String> acString = ArgumentCaptor.forClass(String.class);
-    ArgumentCaptor<Long> acLong = ArgumentCaptor.forClass(Long.class);
-    ArgumentCaptor<Integer> acBufferOffset = ArgumentCaptor.forClass(Integer.class);
-    ArgumentCaptor<Integer> acBufferLength = ArgumentCaptor.forClass(Integer.class);
-    ArgumentCaptor<byte[]> acByteArray = ArgumentCaptor.forClass(byte[].class);
-    ArgumentCaptor<Boolean> acAppendBlobAppend = ArgumentCaptor.forClass(Boolean.class);
-    ArgumentCaptor<String> acSASToken = ArgumentCaptor.forClass(String.class);
-
-    verify(client, times(2)).append(acString.capture(), acLong.capture(), acByteArray.capture(), acBufferOffset.capture(), acBufferLength.capture(),
-                                    acSASToken.capture(), acAppendBlobAppend.capture());
-    assertThat(Arrays.asList(PATH, PATH)).describedAs("path").isEqualTo(acString.getAllValues());
-    assertThat(new HashSet<Long>(Arrays.asList(Long.valueOf(0), Long.valueOf(BUFFER_SIZE)))).describedAs("Position").isEqualTo(new HashSet<Long>(
-               acLong.getAllValues()));
-    assertThat(Arrays.asList(0, 0)).describedAs("Buffer Offset").isEqualTo(acBufferOffset.getAllValues());
-    assertThat(Arrays.asList(BUFFER_SIZE, BUFFER_SIZE)).describedAs("Buffer Length").isEqualTo(acBufferLength.getAllValues());
-
-    ArgumentCaptor<String> acFlushString = ArgumentCaptor.forClass(String.class);
-    ArgumentCaptor<Long> acFlushLong = ArgumentCaptor.forClass(Long.class);
+    AppendRequestParameters firstReqParameters = new AppendRequestParameters(
+        0, 0, BUFFER_SIZE, APPEND_MODE, false);
+    AppendRequestParameters secondReqParameters = new AppendRequestParameters(
+        BUFFER_SIZE, 0, BUFFER_SIZE, APPEND_MODE, false);
+
+    verify(client, times(1)).append(
+        eq(PATH), any(byte[].class), refEq(firstReqParameters), any());
+    verify(client, times(1)).append(
+        eq(PATH), any(byte[].class), refEq(secondReqParameters), any());
+    // confirm there were only 2 invocations in all
+    verify(client, times(2)).append(
+        eq(PATH), any(byte[].class), any(), any());
+
+    ArgumentCaptor<String> acFlushPath = ArgumentCaptor.forClass(String.class);
+    ArgumentCaptor<Long> acFlushPosition = ArgumentCaptor.forClass(Long.class);
     ArgumentCaptor<Boolean> acFlushRetainUnCommittedData = ArgumentCaptor.forClass(Boolean.class);
     ArgumentCaptor<Boolean> acFlushClose = ArgumentCaptor.forClass(Boolean.class);
     ArgumentCaptor<String> acFlushSASToken = ArgumentCaptor.forClass(String.class);
 
-    verify(client, times(1)).flush(acFlushString.capture(), acFlushLong.capture(), acFlushRetainUnCommittedData.capture(), acFlushClose.capture(),
-                                   acFlushSASToken.capture());
-    assertThat(Arrays.asList(PATH)).describedAs("path").isEqualTo(acFlushString.getAllValues());
-    assertThat(Arrays.asList(Long.valueOf(2*BUFFER_SIZE))).describedAs("position").isEqualTo(acFlushLong.getAllValues());
+    verify(client, times(1)).flush(acFlushPath.capture(), acFlushPosition.capture(), acFlushRetainUnCommittedData.capture(), acFlushClose.capture(),
+        acFlushSASToken.capture());
+    assertThat(Arrays.asList(PATH)).describedAs("path").isEqualTo(acFlushPath.getAllValues());
+    assertThat(Arrays.asList(Long.valueOf(2*BUFFER_SIZE))).describedAs("position").isEqualTo(acFlushPosition.getAllValues());
     assertThat(Arrays.asList(false)).describedAs("RetainUnCommittedData flag").isEqualTo(acFlushRetainUnCommittedData.getAllValues());
     assertThat(Arrays.asList(true)).describedAs("Close flag").isEqualTo(acFlushClose.getAllValues());
-
   }
 
   /**
@@ -252,12 +246,13 @@ public final class TestAbfsOutputStream {
     AbfsPerfTracker tracker = new AbfsPerfTracker("test", accountName1, abfsConf);
 
     when(client.getAbfsPerfTracker()).thenReturn(tracker);
-    when(client.append(anyString(), anyLong(), any(byte[].class), anyInt(), anyInt(), any(), anyBoolean())).thenReturn(op);
+    when(client.append(anyString(), any(byte[].class), any(AppendRequestParameters.class), any())).thenReturn(op);
     when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), any())).thenReturn(op);
     when(op.getSasToken()).thenReturn("testToken");
     when(op.getResult()).thenReturn(httpOp);
 
-    AbfsOutputStream out = new AbfsOutputStream(client, null, PATH, 0, populateAbfsOutputStreamContext(BUFFER_SIZE, true, false, false));
+    AbfsOutputStream out = new AbfsOutputStream(client, null, PATH, 0,
+        populateAbfsOutputStreamContext(BUFFER_SIZE, true, false, false));
     final byte[] b = new byte[BUFFER_SIZE];
     new Random().nextBytes(b);
 
@@ -266,22 +261,18 @@ public final class TestAbfsOutputStream {
     }
     Thread.sleep(1000);
 
-    ArgumentCaptor<String> acString = ArgumentCaptor.forClass(String.class);
-    ArgumentCaptor<Long> acLong = ArgumentCaptor.forClass(Long.class);
-    ArgumentCaptor<Integer> acBufferOffset = ArgumentCaptor.forClass(Integer.class);
-    ArgumentCaptor<Integer> acBufferLength = ArgumentCaptor.forClass(Integer.class);
-    ArgumentCaptor<byte[]> acByteArray = ArgumentCaptor.forClass(byte[].class);
-    ArgumentCaptor<Boolean> acAppendBlobAppend = ArgumentCaptor.forClass(Boolean.class);
-    ArgumentCaptor<String> acSASToken = ArgumentCaptor.forClass(String.class);
-
-    verify(client, times(2)).append(acString.capture(), acLong.capture(), acByteArray.capture(), acBufferOffset.capture(), acBufferLength.capture(),
-                                    acSASToken.capture(), acAppendBlobAppend.capture());
-    assertThat(Arrays.asList(PATH, PATH)).describedAs("File Path").isEqualTo(acString.getAllValues());
-    assertThat(new HashSet<Long>(Arrays.asList(Long.valueOf(0), Long.valueOf(BUFFER_SIZE)))).describedAs("Position in file").isEqualTo(
-               new HashSet<Long>(acLong.getAllValues()));
-    assertThat(Arrays.asList(0, 0)).describedAs("buffer offset").isEqualTo(acBufferOffset.getAllValues());
-    assertThat(Arrays.asList(BUFFER_SIZE, BUFFER_SIZE)).describedAs("buffer length").isEqualTo(acBufferLength.getAllValues());
-
+    AppendRequestParameters firstReqParameters = new AppendRequestParameters(
+        0, 0, BUFFER_SIZE, APPEND_MODE, false);
+    AppendRequestParameters secondReqParameters = new AppendRequestParameters(
+        BUFFER_SIZE, 0, BUFFER_SIZE, APPEND_MODE, false);
+
+    verify(client, times(1)).append(
+        eq(PATH), any(byte[].class), refEq(firstReqParameters), any());
+    verify(client, times(1)).append(
+        eq(PATH), any(byte[].class), refEq(secondReqParameters), any());
+    // confirm there were only 2 invocations in all
+    verify(client, times(2)).append(
+        eq(PATH), any(byte[].class), any(), any());
   }
 
   /**
@@ -299,10 +290,11 @@ public final class TestAbfsOutputStream {
     AbfsPerfTracker tracker = new AbfsPerfTracker("test", accountName1, abfsConf);
 
     when(client.getAbfsPerfTracker()).thenReturn(tracker);
-    when(client.append(anyString(), anyLong(), any(byte[].class), anyInt(), anyInt(), any(), anyBoolean())).thenReturn(op);
+    when(client.append(anyString(), any(byte[].class), any(AppendRequestParameters.class), any())).thenReturn(op);
     when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), any())).thenReturn(op);
 
-    AbfsOutputStream out = new AbfsOutputStream(client, null, PATH, 0, populateAbfsOutputStreamContext(BUFFER_SIZE, true, false, true));
+    AbfsOutputStream out = new AbfsOutputStream(client, null, PATH, 0,
+        populateAbfsOutputStreamContext(BUFFER_SIZE, true, false, true));
     final byte[] b = new byte[BUFFER_SIZE];
     new Random().nextBytes(b);
 
@@ -311,22 +303,18 @@ public final class TestAbfsOutputStream {
     }
     Thread.sleep(1000);
 
-    ArgumentCaptor<String> acString = ArgumentCaptor.forClass(String.class);
-    ArgumentCaptor<Long> acLong = ArgumentCaptor.forClass(Long.class);
-    ArgumentCaptor<Integer> acBufferOffset = ArgumentCaptor.forClass(Integer.class);
-    ArgumentCaptor<Integer> acBufferLength = ArgumentCaptor.forClass(Integer.class);
-    ArgumentCaptor<byte[]> acByteArray = ArgumentCaptor.forClass(byte[].class);
-    ArgumentCaptor<Boolean> acAppendBlobAppend = ArgumentCaptor.forClass(Boolean.class);
-    ArgumentCaptor<String> acSASToken = ArgumentCaptor.forClass(String.class);
-
-    verify(client, times(2)).append(acString.capture(), acLong.capture(), acByteArray.capture(), acBufferOffset.capture(), acBufferLength.capture(),
-                                    acSASToken.capture(), acAppendBlobAppend.capture());
-    assertThat(Arrays.asList(PATH, PATH)).describedAs("File Path").isEqualTo(acString.getAllValues());
-    assertThat(Arrays.asList(Long.valueOf(0), Long.valueOf(BUFFER_SIZE))).describedAs("File Position").isEqualTo(acLong.getAllValues());
-    assertThat(Arrays.asList(0, 0)).describedAs("Buffer Offset").isEqualTo(acBufferOffset.getAllValues());
-    assertThat(Arrays.asList(BUFFER_SIZE, BUFFER_SIZE)).describedAs("Buffer Length").isEqualTo(acBufferLength.getAllValues());
-    assertThat(Arrays.asList(true, true)).describedAs("is AppendBlob Append").isEqualTo(acAppendBlobAppend.getAllValues());
-
+    AppendRequestParameters firstReqParameters = new AppendRequestParameters(
+        0, 0, BUFFER_SIZE, APPEND_MODE, true);
+    AppendRequestParameters secondReqParameters = new AppendRequestParameters(
+        BUFFER_SIZE, 0, BUFFER_SIZE, APPEND_MODE, true);
+
+    verify(client, times(1)).append(
+        eq(PATH), any(byte[].class), refEq(firstReqParameters), any());
+    verify(client, times(1)).append(
+        eq(PATH), any(byte[].class), refEq(secondReqParameters), any());
+    // confirm there were only 2 invocations in all
+    verify(client, times(2)).append(
+        eq(PATH), any(byte[].class), any(), any());
   }
 
   /**
@@ -337,6 +325,7 @@ public final class TestAbfsOutputStream {
 
     AbfsClient client = mock(AbfsClient.class);
     AbfsRestOperation op = mock(AbfsRestOperation.class);
+    when(op.getSasToken()).thenReturn("");
     AbfsConfiguration abfsConf;
     final Configuration conf = new Configuration();
     conf.set(accountKey1, accountValue1);
@@ -344,10 +333,11 @@ public final class TestAbfsOutputStream {
     AbfsPerfTracker tracker = new AbfsPerfTracker("test", accountName1, abfsConf);
 
     when(client.getAbfsPerfTracker()).thenReturn(tracker);
-    when(client.append(anyString(), anyLong(), any(byte[].class), anyInt(), anyInt(), any(), anyBoolean())).thenReturn(op);
+    when(client.append(anyString(), any(byte[].class), any(AppendRequestParameters.class), any())).thenReturn(op);
     when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), any())).thenReturn(op);
 
-    AbfsOutputStream out = new AbfsOutputStream(client, null, PATH, 0, populateAbfsOutputStreamContext(BUFFER_SIZE, true, false, false));
+    AbfsOutputStream out = new AbfsOutputStream(client, null, PATH, 0,
+        populateAbfsOutputStreamContext(BUFFER_SIZE, true, false, false));
     final byte[] b = new byte[BUFFER_SIZE];
     new Random().nextBytes(b);
 
@@ -356,35 +346,31 @@ public final class TestAbfsOutputStream {
     }
     out.hflush();
 
-    ArgumentCaptor<String> acString = ArgumentCaptor.forClass(String.class);
-    ArgumentCaptor<Long> acLong = ArgumentCaptor.forClass(Long.class);
-    ArgumentCaptor<Integer> acBufferOffset = ArgumentCaptor.forClass(Integer.class);
-    ArgumentCaptor<Integer> acBufferLength = ArgumentCaptor.forClass(Integer.class);
-    ArgumentCaptor<byte[]> acByteArray = ArgumentCaptor.forClass(byte[].class);
-    ArgumentCaptor<Boolean> acAppendBlobAppend = ArgumentCaptor.forClass(Boolean.class);
-    ArgumentCaptor<String> acSASToken = ArgumentCaptor.forClass(String.class);
-
-    verify(client, times(2)).append(acString.capture(), acLong.capture(), acByteArray.capture(), acBufferOffset.capture(), acBufferLength.capture(),
-                                    acSASToken.capture(), acAppendBlobAppend.capture());
-    assertThat(Arrays.asList(PATH, PATH)).describedAs("File Path").isEqualTo(acString.getAllValues());
-    assertThat(new HashSet<Long>(Arrays.asList(Long.valueOf(0), Long.valueOf(BUFFER_SIZE)))).describedAs("File Position").isEqualTo(
-               new HashSet<Long>(acLong.getAllValues()));
-    assertThat(Arrays.asList(0, 0)).describedAs("Buffer Offset").isEqualTo(acBufferOffset.getAllValues());
-    assertThat(Arrays.asList(BUFFER_SIZE, BUFFER_SIZE)).describedAs("Buffer Length").isEqualTo(acBufferLength.getAllValues());
-
-    ArgumentCaptor<String> acFlushString = ArgumentCaptor.forClass(String.class);
-    ArgumentCaptor<Long> acFlushLong = ArgumentCaptor.forClass(Long.class);
+    AppendRequestParameters firstReqParameters = new AppendRequestParameters(
+        0, 0, BUFFER_SIZE, APPEND_MODE, false);
+    AppendRequestParameters secondReqParameters = new AppendRequestParameters(
+        BUFFER_SIZE, 0, BUFFER_SIZE, APPEND_MODE, false);
+
+    verify(client, times(1)).append(
+        eq(PATH), any(byte[].class), refEq(firstReqParameters), any());
+    verify(client, times(1)).append(
+        eq(PATH), any(byte[].class), refEq(secondReqParameters), any());
+    // confirm there were only 2 invocations in all
+    verify(client, times(2)).append(
+        eq(PATH), any(byte[].class), any(), any());
+
+    ArgumentCaptor<String> acFlushPath = ArgumentCaptor.forClass(String.class);
+    ArgumentCaptor<Long> acFlushPosition = ArgumentCaptor.forClass(Long.class);
     ArgumentCaptor<Boolean> acFlushRetainUnCommittedData = ArgumentCaptor.forClass(Boolean.class);
     ArgumentCaptor<Boolean> acFlushClose = ArgumentCaptor.forClass(Boolean.class);
     ArgumentCaptor<String> acFlushSASToken = ArgumentCaptor.forClass(String.class);
 
-    verify(client, times(1)).flush(acFlushString.capture(), acFlushLong.capture(), acFlushRetainUnCommittedData.capture(), acFlushClose.capture(),
-                                   acFlushSASToken.capture());
-    assertThat(Arrays.asList(PATH)).describedAs("path").isEqualTo(acFlushString.getAllValues());
-    assertThat(Arrays.asList(Long.valueOf(2*BUFFER_SIZE))).describedAs("position").isEqualTo(acFlushLong.getAllValues());
+    verify(client, times(1)).flush(acFlushPath.capture(), acFlushPosition.capture(), acFlushRetainUnCommittedData.capture(), acFlushClose.capture(),
+        acFlushSASToken.capture());
+    assertThat(Arrays.asList(PATH)).describedAs("path").isEqualTo(acFlushPath.getAllValues());
+    assertThat(Arrays.asList(Long.valueOf(2*BUFFER_SIZE))).describedAs("position").isEqualTo(acFlushPosition.getAllValues());
     assertThat(Arrays.asList(false)).describedAs("RetainUnCommittedData flag").isEqualTo(acFlushRetainUnCommittedData.getAllValues());
     assertThat(Arrays.asList(false)).describedAs("Close flag").isEqualTo(acFlushClose.getAllValues());
-
   }
 
   /**
@@ -401,10 +387,11 @@ public final class TestAbfsOutputStream {
     abfsConf = new AbfsConfiguration(conf, accountName1);
     AbfsPerfTracker tracker = new AbfsPerfTracker("test", accountName1, abfsConf);
     when(client.getAbfsPerfTracker()).thenReturn(tracker);
-    when(client.append(anyString(), anyLong(), any(byte[].class), anyInt(), anyInt(), any(), anyBoolean())).thenReturn(op);
+    when(client.append(anyString(), any(byte[].class), any(AppendRequestParameters.class), any())).thenReturn(op);
     when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), any())).thenReturn(op);
 
-    AbfsOutputStream out = new AbfsOutputStream(client, null, PATH, 0, populateAbfsOutputStreamContext(BUFFER_SIZE, true, false, false));
+    AbfsOutputStream out = new AbfsOutputStream(client, null, PATH, 0,
+        populateAbfsOutputStreamContext(BUFFER_SIZE, true, false, false));
     final byte[] b = new byte[BUFFER_SIZE];
     new Random().nextBytes(b);
 
@@ -415,21 +402,17 @@ public final class TestAbfsOutputStream {
     out.flush();
     Thread.sleep(1000);
 
-    ArgumentCaptor<String> acString = ArgumentCaptor.forClass(String.class);
-    ArgumentCaptor<Long> acLong = ArgumentCaptor.forClass(Long.class);
-    ArgumentCaptor<Integer> acBufferOffset = ArgumentCaptor.forClass(Integer.class);
-    ArgumentCaptor<Integer> acBufferLength = ArgumentCaptor.forClass(Integer.class);
-    ArgumentCaptor<byte[]> acByteArray = ArgumentCaptor.forClass(byte[].class);
-    ArgumentCaptor<Boolean> acAppendBlobAppend = ArgumentCaptor.forClass(Boolean.class);
-    ArgumentCaptor<String> acSASToken = ArgumentCaptor.forClass(String.class);
-
-    verify(client, times(2)).append(acString.capture(), acLong.capture(), acByteArray.capture(), acBufferOffset.capture(), acBufferLength.capture(),
-                                    acSASToken.capture(), acAppendBlobAppend.capture());
-    assertThat(Arrays.asList(PATH, PATH)).describedAs("path").isEqualTo(acString.getAllValues());
-    assertThat(new HashSet<Long>(Arrays.asList(Long.valueOf(0), Long.valueOf(BUFFER_SIZE)))).describedAs("Position").isEqualTo(
-               new HashSet<Long>(acLong.getAllValues()));
-    assertThat(Arrays.asList(0, 0)).describedAs("Buffer Offset").isEqualTo(acBufferOffset.getAllValues());
-    assertThat(Arrays.asList(BUFFER_SIZE, BUFFER_SIZE)).describedAs("Buffer Length").isEqualTo(acBufferLength.getAllValues());
-
+    AppendRequestParameters firstReqParameters = new AppendRequestParameters(
+        0, 0, BUFFER_SIZE, APPEND_MODE, false);
+    AppendRequestParameters secondReqParameters = new AppendRequestParameters(
+        BUFFER_SIZE, 0, BUFFER_SIZE, APPEND_MODE, false);
+
+    verify(client, times(1)).append(
+        eq(PATH), any(byte[].class), refEq(firstReqParameters), any());
+    verify(client, times(1)).append(
+        eq(PATH), any(byte[].class), refEq(secondReqParameters), any());
+    // confirm there were only 2 invocations in all
+    verify(client, times(2)).append(
+        eq(PATH), any(byte[].class), any(), any());
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[hadoop] 05/06: HADOOP-17347. ABFS: Read optimizations

Posted by st...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

stevel pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/hadoop.git

commit cb6729224e15b89bd7fa7877fe045d28b3582f7b
Author: bilaharith <52...@users.noreply.github.com>
AuthorDate: Sun Jan 3 00:07:10 2021 +0530

    HADOOP-17347. ABFS: Read optimizations
    
    - Contributed by Bilahari T H
    
    (cherry picked from commit 1448add08fcd4a23e59eab5f75ef46fca6b1c3d1)
---
 .../hadoop/fs/azurebfs/AbfsConfiguration.java      |  28 ++
 .../fs/azurebfs/AzureBlobFileSystemStore.java      |   2 +
 .../fs/azurebfs/constants/ConfigurationKeys.java   |   2 +
 .../constants/FileSystemConfigurations.java        |   6 +-
 .../fs/azurebfs/services/AbfsInputStream.java      | 194 +++++++++--
 .../azurebfs/services/AbfsInputStreamContext.java  |  24 ++
 .../fs/azurebfs/services/ITestAbfsInputStream.java | 256 +++++++++++++++
 .../services/ITestAbfsInputStreamReadFooter.java   | 358 +++++++++++++++++++++
 .../ITestAbfsInputStreamSmallFileReads.java        | 326 +++++++++++++++++++
 9 files changed, 1175 insertions(+), 21 deletions(-)

diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
index 3d09a80..b1c95d2 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
@@ -100,6 +100,16 @@ public class AbfsConfiguration{
       DefaultValue = DEFAULT_WRITE_BUFFER_SIZE)
   private int writeBufferSize;
 
+  @BooleanConfigurationValidatorAnnotation(
+      ConfigurationKey = AZURE_READ_SMALL_FILES_COMPLETELY,
+      DefaultValue = DEFAULT_READ_SMALL_FILES_COMPLETELY)
+  private boolean readSmallFilesCompletely;
+
+  @BooleanConfigurationValidatorAnnotation(
+      ConfigurationKey = AZURE_READ_OPTIMIZE_FOOTER_READ,
+      DefaultValue = DEFAULT_OPTIMIZE_FOOTER_READ)
+  private boolean optimizeFooterRead;
+
   @IntegerConfigurationValidatorAnnotation(ConfigurationKey = AZURE_READ_BUFFER_SIZE,
       MinValue = MIN_BUFFER_SIZE,
       MaxValue = MAX_BUFFER_SIZE,
@@ -527,6 +537,14 @@ public class AbfsConfiguration{
     return this.writeBufferSize;
   }
 
+  public boolean readSmallFilesCompletely() {
+    return this.readSmallFilesCompletely;
+  }
+
+  public boolean optimizeFooterRead() {
+    return this.optimizeFooterRead;
+  }
+
   public int getReadBufferSize() {
     return this.readBufferSize;
   }
@@ -925,4 +943,14 @@ public class AbfsConfiguration{
     return authority;
   }
 
+  @VisibleForTesting
+  public void setReadSmallFilesCompletely(boolean readSmallFilesCompletely) {
+    this.readSmallFilesCompletely = readSmallFilesCompletely;
+  }
+
+  @VisibleForTesting
+  public void setOptimizeFooterRead(boolean optimizeFooterRead) {
+    this.optimizeFooterRead = optimizeFooterRead;
+  }
+
 }
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
index a766c62..869a6f9 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
@@ -643,6 +643,8 @@ public class AzureBlobFileSystemStore implements Closeable {
             .withReadBufferSize(abfsConfiguration.getReadBufferSize())
             .withReadAheadQueueDepth(abfsConfiguration.getReadAheadQueueDepth())
             .withTolerateOobAppends(abfsConfiguration.getTolerateOobAppends())
+            .withReadSmallFilesCompletely(abfsConfiguration.readSmallFilesCompletely())
+            .withOptimizeFooterRead(abfsConfiguration.optimizeFooterRead())
             .withStreamStatistics(new AbfsInputStreamStatisticsImpl())
             .withShouldReadBufferSizeAlways(
                 abfsConfiguration.shouldReadBufferSizeAlways())
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
index cb9c0de..3e1ff80 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
@@ -56,6 +56,8 @@ public final class ConfigurationKeys {
   public static final String AZURE_WRITE_MAX_REQUESTS_TO_QUEUE = "fs.azure.write.max.requests.to.queue";
   public static final String AZURE_WRITE_BUFFER_SIZE = "fs.azure.write.request.size";
   public static final String AZURE_READ_BUFFER_SIZE = "fs.azure.read.request.size";
+  public static final String AZURE_READ_SMALL_FILES_COMPLETELY = "fs.azure.read.smallfilescompletely";
+  public static final String AZURE_READ_OPTIMIZE_FOOTER_READ = "fs.azure.read.optimizefooterread";
   public static final String AZURE_BLOCK_SIZE_PROPERTY_NAME = "fs.azure.block.size";
   public static final String AZURE_BLOCK_LOCATION_HOST_PROPERTY_NAME = "fs.azure.block.location.impersonatedhost";
   public static final String AZURE_CONCURRENT_CONNECTION_VALUE_OUT = "fs.azure.concurrentRequestCount.out";
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
index 27dafd0..8008206 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
@@ -50,13 +50,15 @@ public final class FileSystemConfigurations {
   public static final int DEFAULT_AZURE_OAUTH_TOKEN_FETCH_RETRY_MAX_BACKOFF_INTERVAL = SIXTY_SECONDS;
   public static final int DEFAULT_AZURE_OAUTH_TOKEN_FETCH_RETRY_DELTA_BACKOFF = 2;
 
-  private static final int ONE_KB = 1024;
-  private static final int ONE_MB = ONE_KB * ONE_KB;
+  public static final int ONE_KB = 1024;
+  public static final int ONE_MB = ONE_KB * ONE_KB;
 
   // Default upload and download buffer size
   public static final int DEFAULT_WRITE_BUFFER_SIZE = 8 * ONE_MB;  // 8 MB
   public static final int APPENDBLOB_MAX_WRITE_BUFFER_SIZE = 4 * ONE_MB;  // 4 MB
   public static final int DEFAULT_READ_BUFFER_SIZE = 4 * ONE_MB;  // 4 MB
+  public static final boolean DEFAULT_READ_SMALL_FILES_COMPLETELY = false;
+  public static final boolean DEFAULT_OPTIMIZE_FOOTER_READ = false;
   public static final boolean DEFAULT_ALWAYS_READ_BUFFER_SIZE = false;
   public static final int DEFAULT_READ_AHEAD_BLOCK_SIZE = 4 * ONE_MB;
   public static final int MIN_BUFFER_SIZE = 16 * ONE_KB;  // 16 KB
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java
index 3682bcb..1d109f4 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java
@@ -38,6 +38,10 @@ import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationExcep
 import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
 import org.apache.hadoop.fs.azurebfs.utils.CachedSASToken;
 
+import static java.lang.Math.max;
+import static java.lang.Math.min;
+
+import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_KB;
 import static org.apache.hadoop.util.StringUtils.toLowerCase;
 
 /**
@@ -46,6 +50,9 @@ import static org.apache.hadoop.util.StringUtils.toLowerCase;
 public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
         StreamCapabilities {
   private static final Logger LOG = LoggerFactory.getLogger(AbfsInputStream.class);
+  //  Footer size is set to qualify for both ORC and parquet files
+  public static final int FOOTER_SIZE = 16 * ONE_KB;
+  public static final int MAX_OPTIMIZED_READ_ATTEMPTS = 2;
 
   private int readAheadBlockSize;
   private final AbfsClient client;
@@ -59,6 +66,7 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
   private final boolean readAheadEnabled; // whether enable readAhead;
   private final boolean alwaysReadBufferSize;
 
+  private boolean firstRead = true;
   // SAS tokens can be re-used until they expire
   private CachedSASToken cachedSasToken;
   private byte[] buffer = null;            // will be initialized on first use
@@ -70,11 +78,21 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
   //                                                      of valid bytes in buffer)
   private boolean closed = false;
 
+  //  Optimisations modify the pointer fields.
+  //  For better resilience the following fields are used to save the
+  //  existing state before optimization flows.
+  private int limitBkp;
+  private int bCursorBkp;
+  private long fCursorBkp;
+  private long fCursorAfterLastReadBkp;
+
   /** Stream statistics. */
   private final AbfsInputStreamStatistics streamStatistics;
   private long bytesFromReadAhead; // bytes read from readAhead; for testing
   private long bytesFromRemoteRead; // bytes read remotely; for testing
 
+  private final AbfsInputStreamContext context;
+
   public AbfsInputStream(
           final AbfsClient client,
           final Statistics statistics,
@@ -96,6 +114,7 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
     this.cachedSasToken = new CachedSASToken(
         abfsInputStreamContext.getSasTokenRenewPeriodForStreamsInSeconds());
     this.streamStatistics = abfsInputStreamContext.getStreamStatistics();
+    this.context = abfsInputStreamContext;
     readAheadBlockSize = abfsInputStreamContext.getReadAheadBlockSize();
 
     // Propagate the config values to ReadBufferManager so that the first instance
@@ -137,7 +156,13 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
     }
     incrementReadOps();
     do {
-      lastReadBytes = readOneBlock(b, currentOff, currentLen);
+      if (shouldReadFully()) {
+        lastReadBytes = readFileCompletely(b, currentOff, currentLen);
+      } else if (shouldReadLastBlock()) {
+        lastReadBytes = readLastBlock(b, currentOff, currentLen);
+      } else {
+        lastReadBytes = readOneBlock(b, currentOff, currentLen);
+      }
       if (lastReadBytes > 0) {
         currentOff += lastReadBytes;
         currentLen -= lastReadBytes;
@@ -150,27 +175,24 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
     return totalReadBytes > 0 ? totalReadBytes : lastReadBytes;
   }
 
-  private int readOneBlock(final byte[] b, final int off, final int len) throws IOException {
-    if (closed) {
-      throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
-    }
+  private boolean shouldReadFully() {
+    return this.firstRead && this.context.readSmallFilesCompletely()
+        && this.contentLength <= this.bufferSize;
+  }
 
-    Preconditions.checkNotNull(b);
-    LOG.debug("read one block requested b.length = {} off {} len {}", b.length,
-        off, len);
+  private boolean shouldReadLastBlock() {
+    long footerStart = max(0, this.contentLength - FOOTER_SIZE);
+    return this.firstRead && this.context.optimizeFooterRead()
+        && this.fCursor >= footerStart;
+  }
 
+  private int readOneBlock(final byte[] b, final int off, final int len) throws IOException {
     if (len == 0) {
       return 0;
     }
-
-    if (this.available() == 0) {
+    if (!validate(b, off, len)) {
       return -1;
     }
-
-    if (off < 0 || len < 0 || len > b.length - off) {
-      throw new IndexOutOfBoundsException();
-    }
-
     //If buffer is empty, then fill the buffer.
     if (bCursor == limit) {
       //If EOF, then return -1
@@ -197,6 +219,9 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
           bytesRead = readInternal(fCursor, buffer, 0, b.length, true);
         }
       }
+      if (firstRead) {
+        firstRead = false;
+      }
 
       if (bytesRead == -1) {
         return -1;
@@ -206,11 +231,123 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
       fCursor += bytesRead;
       fCursorAfterLastRead = fCursor;
     }
+    return copyToUserBuffer(b, off, len);
+  }
+
+  private int readFileCompletely(final byte[] b, final int off, final int len)
+      throws IOException {
+    if (len == 0) {
+      return 0;
+    }
+    if (!validate(b, off, len)) {
+      return -1;
+    }
+    savePointerState();
+    // data need to be copied to user buffer from index bCursor, bCursor has
+    // to be the current fCusor
+    bCursor = (int) fCursor;
+    return optimisedRead(b, off, len, 0, contentLength);
+  }
+
+  private int readLastBlock(final byte[] b, final int off, final int len)
+      throws IOException {
+    if (len == 0) {
+      return 0;
+    }
+    if (!validate(b, off, len)) {
+      return -1;
+    }
+    savePointerState();
+    // data need to be copied to user buffer from index bCursor,
+    // AbfsInutStream buffer is going to contain data from last block start. In
+    // that case bCursor will be set to fCursor - lastBlockStart
+    long lastBlockStart = max(0, contentLength - bufferSize);
+    bCursor = (int) (fCursor - lastBlockStart);
+    // 0 if contentlength is < buffersize
+    long actualLenToRead = min(bufferSize, contentLength);
+    return optimisedRead(b, off, len, lastBlockStart, actualLenToRead);
+  }
+
+  private int optimisedRead(final byte[] b, final int off, final int len,
+      final long readFrom, final long actualLen) throws IOException {
+    fCursor = readFrom;
+    int totalBytesRead = 0;
+    int lastBytesRead = 0;
+    try {
+      buffer = new byte[bufferSize];
+      for (int i = 0;
+           i < MAX_OPTIMIZED_READ_ATTEMPTS && fCursor < contentLength; i++) {
+        lastBytesRead = readInternal(fCursor, buffer, limit,
+            (int) actualLen - limit, true);
+        if (lastBytesRead > 0) {
+          totalBytesRead += lastBytesRead;
+          limit += lastBytesRead;
+          fCursor += lastBytesRead;
+          fCursorAfterLastRead = fCursor;
+        }
+      }
+    } catch (IOException e) {
+      LOG.debug("Optimized read failed. Defaulting to readOneBlock {}", e);
+      restorePointerState();
+      return readOneBlock(b, off, len);
+    } finally {
+      firstRead = false;
+    }
+    if (totalBytesRead < 1) {
+      restorePointerState();
+      return -1;
+    }
+    //  If the read was partial and the user requested part of data has
+    //  not read then fallback to readoneblock. When limit is smaller than
+    //  bCursor that means the user requested data has not been read.
+    if (fCursor < contentLength && bCursor > limit) {
+      restorePointerState();
+      return readOneBlock(b, off, len);
+    }
+    return copyToUserBuffer(b, off, len);
+  }
+
+  private void savePointerState() {
+    //  Saving the current state for fall back ifn case optimization fails
+    this.limitBkp = this.limit;
+    this.fCursorBkp = this.fCursor;
+    this.fCursorAfterLastReadBkp = this.fCursorAfterLastRead;
+    this.bCursorBkp = this.bCursor;
+  }
+
+  private void restorePointerState() {
+    //  Saving the current state for fall back ifn case optimization fails
+    this.limit = this.limitBkp;
+    this.fCursor = this.fCursorBkp;
+    this.fCursorAfterLastRead = this.fCursorAfterLastReadBkp;
+    this.bCursor = this.bCursorBkp;
+  }
+
+  private boolean validate(final byte[] b, final int off, final int len)
+      throws IOException {
+    if (closed) {
+      throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
+    }
+
+    Preconditions.checkNotNull(b);
+    LOG.debug("read one block requested b.length = {} off {} len {}", b.length,
+        off, len);
 
+    if (this.available() == 0) {
+      return false;
+    }
+
+    if (off < 0 || len < 0 || len > b.length - off) {
+      throw new IndexOutOfBoundsException();
+    }
+    return true;
+  }
+
+  private int copyToUserBuffer(byte[] b, int off, int len){
     //If there is anything in the buffer, then return lesser of (requested bytes) and (bytes in buffer)
     //(bytes returned may be less than requested)
     int bytesRemaining = limit - bCursor;
-    int bytesToRead = Math.min(len, bytesRemaining);
+    int bytesToRead = min(len, bytesRemaining);
     System.arraycopy(buffer, bCursor, b, off, bytesToRead);
     bCursor += bytesToRead;
     if (statistics != null) {
@@ -224,7 +361,6 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
     return bytesToRead;
   }
 
-
   private int readInternal(final long position, final byte[] b, final int offset, final int length,
                            final boolean bypassReadAhead) throws IOException {
     if (readAheadEnabled && !bypassReadAhead) {
@@ -239,7 +375,7 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
       long nextOffset = position;
       // First read to queue needs to be of readBufferSize and later
       // of readAhead Block size
-      long nextSize = Math.min((long) bufferSize, contentLength - nextOffset);
+      long nextSize = min((long) bufferSize, contentLength - nextOffset);
       LOG.debug("read ahead enabled issuing readheads num = {}", numReadAheads);
       while (numReadAheads > 0 && nextOffset < contentLength) {
         LOG.debug("issuing read ahead requestedOffset = {} requested size {}",
@@ -248,7 +384,7 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
         nextOffset = nextOffset + nextSize;
         numReadAheads--;
         // From next round onwards should be of readahead block size.
-        nextSize = Math.min((long) readAheadBlockSize, contentLength - nextOffset);
+        nextSize = min((long) readAheadBlockSize, contentLength - nextOffset);
       }
 
       // try reading from buffers first
@@ -572,4 +708,24 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
     }
     return sb.toString();
   }
+
+  @VisibleForTesting
+  int getBCursor() {
+    return this.bCursor;
+  }
+
+  @VisibleForTesting
+  long getFCursor() {
+    return this.fCursor;
+  }
+
+  @VisibleForTesting
+  long getFCursorAfterLastRead() {
+    return this.fCursorAfterLastRead;
+  }
+
+  @VisibleForTesting
+  long getLimit() {
+    return this.limit;
+  }
 }
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java
index ade0583..ab3d3b0 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java
@@ -40,6 +40,10 @@ public class AbfsInputStreamContext extends AbfsStreamContext {
 
   private AbfsInputStreamStatistics streamStatistics;
 
+  private boolean readSmallFilesCompletely;
+
+  private boolean optimizeFooterRead;
+
   public AbfsInputStreamContext(final long sasTokenRenewPeriodForStreamsInSeconds) {
     super(sasTokenRenewPeriodForStreamsInSeconds);
   }
@@ -69,6 +73,18 @@ public class AbfsInputStreamContext extends AbfsStreamContext {
     return this;
   }
 
+  public AbfsInputStreamContext withReadSmallFilesCompletely(
+      final boolean readSmallFilesCompletely) {
+    this.readSmallFilesCompletely = readSmallFilesCompletely;
+    return this;
+  }
+
+  public AbfsInputStreamContext withOptimizeFooterRead(
+      final boolean optimizeFooterRead) {
+    this.optimizeFooterRead = optimizeFooterRead;
+    return this;
+  }
+
   public AbfsInputStreamContext withShouldReadBufferSizeAlways(
       final boolean alwaysReadBufferSize) {
     this.alwaysReadBufferSize = alwaysReadBufferSize;
@@ -110,6 +126,14 @@ public class AbfsInputStreamContext extends AbfsStreamContext {
     return streamStatistics;
   }
 
+  public boolean readSmallFilesCompletely() {
+    return this.readSmallFilesCompletely;
+  }
+
+  public boolean optimizeFooterRead() {
+    return this.optimizeFooterRead;
+  }
+
   public boolean shouldReadBufferSizeAlways() {
     return alwaysReadBufferSize;
   }
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsInputStream.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsInputStream.java
new file mode 100644
index 0000000..44b0a36
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsInputStream.java
@@ -0,0 +1,256 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.util.Map;
+import java.util.Random;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
+import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest;
+import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;
+import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore;
+import org.junit.Test;
+
+import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_MB;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.spy;
+
+public class ITestAbfsInputStream extends AbstractAbfsIntegrationTest {
+
+  protected static final int HUNDRED = 100;
+
+  public ITestAbfsInputStream() throws Exception {
+  }
+
+  @Test
+  public void testWithNoOptimization() throws Exception {
+    for (int i = 2; i <= 7; i++) {
+      int fileSize = i * ONE_MB;
+      final AzureBlobFileSystem fs = getFileSystem(false, false, fileSize);
+      String fileName = methodName.getMethodName() + i;
+      byte[] fileContent = getRandomBytesArray(fileSize);
+      Path testFilePath = createFileWithContent(fs, fileName, fileContent);
+      testWithNoOptimization(fs, testFilePath, HUNDRED, fileContent);
+    }
+  }
+
+  protected void testWithNoOptimization(final FileSystem fs,
+      final Path testFilePath, final int seekPos, final byte[] fileContent)
+      throws IOException {
+    FSDataInputStream iStream = fs.open(testFilePath);
+    try {
+      AbfsInputStream abfsInputStream = (AbfsInputStream) iStream
+          .getWrappedStream();
+
+      iStream = new FSDataInputStream(abfsInputStream);
+      seek(iStream, seekPos);
+      long totalBytesRead = 0;
+      int length = HUNDRED * HUNDRED;
+      do {
+        byte[] buffer = new byte[length];
+        int bytesRead = iStream.read(buffer, 0, length);
+        totalBytesRead += bytesRead;
+        if ((totalBytesRead + seekPos) >= fileContent.length) {
+          length = (fileContent.length - seekPos) % length;
+        }
+        assertEquals(length, bytesRead);
+        assertContentReadCorrectly(fileContent,
+            (int) (seekPos + totalBytesRead - length), length, buffer);
+
+        assertTrue(abfsInputStream.getFCursor() >= seekPos + totalBytesRead);
+        assertTrue(abfsInputStream.getFCursorAfterLastRead() >= seekPos + totalBytesRead);
+        assertTrue(abfsInputStream.getBCursor() >= totalBytesRead % abfsInputStream.getBufferSize());
+        assertTrue(abfsInputStream.getLimit() >= totalBytesRead % abfsInputStream.getBufferSize());
+      } while (totalBytesRead + seekPos < fileContent.length);
+    } finally {
+      iStream.close();
+    }
+  }
+
+  @Test
+  public void testExceptionInOptimization() throws Exception {
+    for (int i = 2; i <= 7; i++) {
+      int fileSize = i * ONE_MB;
+      final AzureBlobFileSystem fs = getFileSystem(true, true, fileSize);
+      String fileName = methodName.getMethodName() + i;
+      byte[] fileContent = getRandomBytesArray(fileSize);
+      Path testFilePath = createFileWithContent(fs, fileName, fileContent);
+      testExceptionInOptimization(fs, testFilePath, fileSize - HUNDRED,
+          fileSize / 4, fileContent);
+    }
+  }
+
+  private void testExceptionInOptimization(final FileSystem fs,
+      final Path testFilePath,
+      final int seekPos, final int length, final byte[] fileContent)
+      throws IOException {
+
+    FSDataInputStream iStream = fs.open(testFilePath);
+    try {
+      AbfsInputStream abfsInputStream = (AbfsInputStream) iStream
+          .getWrappedStream();
+      abfsInputStream = spy(abfsInputStream);
+      doThrow(new IOException())
+          .doCallRealMethod()
+          .when(abfsInputStream)
+          .readRemote(anyLong(), any(), anyInt(), anyInt());
+
+      iStream = new FSDataInputStream(abfsInputStream);
+      verifyBeforeSeek(abfsInputStream);
+      seek(iStream, seekPos);
+      byte[] buffer = new byte[length];
+      int bytesRead = iStream.read(buffer, 0, length);
+      long actualLength = length;
+      if (seekPos + length > fileContent.length) {
+        long delta = seekPos + length - fileContent.length;
+        actualLength = length - delta;
+      }
+      assertEquals(bytesRead, actualLength);
+      assertContentReadCorrectly(fileContent, seekPos, (int) actualLength, buffer);
+      assertEquals(fileContent.length, abfsInputStream.getFCursor());
+      assertEquals(fileContent.length, abfsInputStream.getFCursorAfterLastRead());
+      assertEquals(actualLength, abfsInputStream.getBCursor());
+      assertTrue(abfsInputStream.getLimit() >= actualLength);
+    } finally {
+      iStream.close();
+    }
+  }
+
+  protected AzureBlobFileSystem getFileSystem(boolean readSmallFilesCompletely)
+      throws IOException {
+    final AzureBlobFileSystem fs = getFileSystem();
+    getAbfsStore(fs).getAbfsConfiguration()
+        .setReadSmallFilesCompletely(readSmallFilesCompletely);
+    return fs;
+  }
+
+  private AzureBlobFileSystem getFileSystem(boolean optimizeFooterRead,
+      boolean readSmallFileCompletely, int fileSize) throws IOException {
+    final AzureBlobFileSystem fs = getFileSystem();
+    getAbfsStore(fs).getAbfsConfiguration()
+        .setOptimizeFooterRead(optimizeFooterRead);
+    if (fileSize <= getAbfsStore(fs).getAbfsConfiguration()
+        .getReadBufferSize()) {
+      getAbfsStore(fs).getAbfsConfiguration()
+          .setReadSmallFilesCompletely(readSmallFileCompletely);
+    }
+    return fs;
+  }
+
+  protected byte[] getRandomBytesArray(int length) {
+    final byte[] b = new byte[length];
+    new Random().nextBytes(b);
+    return b;
+  }
+
+  protected Path createFileWithContent(FileSystem fs, String fileName,
+      byte[] fileContent) throws IOException {
+    Path testFilePath = path(fileName);
+    try (FSDataOutputStream oStream = fs.create(testFilePath)) {
+      oStream.write(fileContent);
+      oStream.flush();
+    }
+    return testFilePath;
+  }
+
+  protected AzureBlobFileSystemStore getAbfsStore(FileSystem fs)
+      throws NoSuchFieldException, IllegalAccessException {
+    AzureBlobFileSystem abfs = (AzureBlobFileSystem) fs;
+    Field abfsStoreField = AzureBlobFileSystem.class
+        .getDeclaredField("abfsStore");
+    abfsStoreField.setAccessible(true);
+    return (AzureBlobFileSystemStore) abfsStoreField.get(abfs);
+  }
+
+  protected Map<String, Long> getInstrumentationMap(FileSystem fs)
+      throws NoSuchFieldException, IllegalAccessException {
+    AzureBlobFileSystem abfs = (AzureBlobFileSystem) fs;
+    Field abfsCountersField = AzureBlobFileSystem.class
+        .getDeclaredField("abfsCounters");
+    abfsCountersField.setAccessible(true);
+    AbfsCounters abfsCounters = (AbfsCounters) abfsCountersField.get(abfs);
+    return abfsCounters.toMap();
+  }
+
+  protected void assertContentReadCorrectly(byte[] actualFileContent, int from,
+      int len, byte[] contentRead) {
+    for (int i = 0; i < len; i++) {
+      assertEquals(contentRead[i], actualFileContent[i + from]);
+    }
+  }
+
+  protected void assertBuffersAreNotEqual(byte[] actualContent,
+      byte[] contentRead, AbfsConfiguration conf) {
+    assertBufferEquality(actualContent, contentRead, conf, false);
+  }
+
+  protected void assertBuffersAreEqual(byte[] actualContent, byte[] contentRead,
+      AbfsConfiguration conf) {
+    assertBufferEquality(actualContent, contentRead, conf, true);
+  }
+
+  private void assertBufferEquality(byte[] actualContent, byte[] contentRead,
+      AbfsConfiguration conf, boolean assertEqual) {
+    int bufferSize = conf.getReadBufferSize();
+    int actualContentSize = actualContent.length;
+    int n = (actualContentSize < bufferSize) ? actualContentSize : bufferSize;
+    int matches = 0;
+    for (int i = 0; i < n; i++) {
+      if (actualContent[i] == contentRead[i]) {
+        matches++;
+      }
+    }
+    if (assertEqual) {
+      assertEquals(n, matches);
+    } else {
+      assertNotEquals(n, matches);
+    }
+  }
+
+  protected void seek(FSDataInputStream iStream, long seekPos)
+      throws IOException {
+    AbfsInputStream abfsInputStream = (AbfsInputStream) iStream.getWrappedStream();
+    verifyBeforeSeek(abfsInputStream);
+    iStream.seek(seekPos);
+    verifyAfterSeek(abfsInputStream, seekPos);
+  }
+
+  private void verifyBeforeSeek(AbfsInputStream abfsInputStream){
+    assertEquals(0, abfsInputStream.getFCursor());
+    assertEquals(-1, abfsInputStream.getFCursorAfterLastRead());
+    assertEquals(0, abfsInputStream.getLimit());
+    assertEquals(0, abfsInputStream.getBCursor());
+  }
+
+  private void verifyAfterSeek(AbfsInputStream abfsInputStream, long seekPos){
+    assertEquals(seekPos, abfsInputStream.getFCursor());
+    assertEquals(-1, abfsInputStream.getFCursorAfterLastRead());
+    assertEquals(0, abfsInputStream.getLimit());
+    assertEquals(0, abfsInputStream.getBCursor());
+  }
+}
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsInputStreamReadFooter.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsInputStreamReadFooter.java
new file mode 100644
index 0000000..09a810c
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsInputStreamReadFooter.java
@@ -0,0 +1,358 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.junit.Test;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
+import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;
+
+import static java.lang.Math.max;
+import static java.lang.Math.min;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.spy;
+
+import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.CONNECTIONS_MADE;
+import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_KB;
+import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_MB;
+
+public class ITestAbfsInputStreamReadFooter extends ITestAbfsInputStream {
+
+  private static final int TEN = 10;
+  private static final int TWENTY = 20;
+
+  public ITestAbfsInputStreamReadFooter() throws Exception {
+  }
+
+  @Test
+  public void testOnlyOneServerCallIsMadeWhenTheConfIsTrue() throws Exception {
+    testNumBackendCalls(true);
+  }
+
+  @Test
+  public void testMultipleServerCallsAreMadeWhenTheConfIsFalse()
+      throws Exception {
+    testNumBackendCalls(false);
+  }
+
+  private void testNumBackendCalls(boolean optimizeFooterRead)
+      throws Exception {
+    for (int i = 1; i <= 4; i++) {
+      int fileSize = i * ONE_MB;
+      final AzureBlobFileSystem fs = getFileSystem(optimizeFooterRead,
+          fileSize);
+      String fileName = methodName.getMethodName() + i;
+      byte[] fileContent = getRandomBytesArray(fileSize);
+      Path testFilePath = createFileWithContent(fs, fileName, fileContent);
+      int length = AbfsInputStream.FOOTER_SIZE;
+      try (FSDataInputStream iStream = fs.open(testFilePath)) {
+        byte[] buffer = new byte[length];
+
+        Map<String, Long> metricMap = getInstrumentationMap(fs);
+        long requestsMadeBeforeTest = metricMap
+            .get(CONNECTIONS_MADE.getStatName());
+
+        iStream.seek(fileSize - 8);
+        iStream.read(buffer, 0, length);
+
+        iStream.seek(fileSize - (TEN * ONE_KB));
+        iStream.read(buffer, 0, length);
+
+        iStream.seek(fileSize - (TWENTY * ONE_KB));
+        iStream.read(buffer, 0, length);
+
+        metricMap = getInstrumentationMap(fs);
+        long requestsMadeAfterTest = metricMap
+            .get(CONNECTIONS_MADE.getStatName());
+
+        if (optimizeFooterRead) {
+          assertEquals(1, requestsMadeAfterTest - requestsMadeBeforeTest);
+        } else {
+          assertEquals(3, requestsMadeAfterTest - requestsMadeBeforeTest);
+        }
+      }
+    }
+  }
+
+  @Test
+  public void testSeekToBeginAndReadWithConfTrue() throws Exception {
+    testSeekAndReadWithConf(true, SeekTo.BEGIN);
+  }
+
+  @Test
+  public void testSeekToBeginAndReadWithConfFalse() throws Exception {
+    testSeekAndReadWithConf(false, SeekTo.BEGIN);
+  }
+
+  @Test
+  public void testSeekToBeforeFooterAndReadWithConfTrue() throws Exception {
+    testSeekAndReadWithConf(true, SeekTo.BEFORE_FOOTER_START);
+  }
+
+  @Test
+  public void testSeekToBeforeFooterAndReadWithConfFalse() throws Exception {
+    testSeekAndReadWithConf(false, SeekTo.BEFORE_FOOTER_START);
+  }
+
+  @Test
+  public void testSeekToFooterAndReadWithConfTrue() throws Exception {
+    testSeekAndReadWithConf(true, SeekTo.AT_FOOTER_START);
+  }
+
+  @Test
+  public void testSeekToFooterAndReadWithConfFalse() throws Exception {
+    testSeekAndReadWithConf(false, SeekTo.AT_FOOTER_START);
+  }
+
+  @Test
+  public void testSeekToAfterFooterAndReadWithConfTrue() throws Exception {
+    testSeekAndReadWithConf(true, SeekTo.AFTER_FOOTER_START);
+  }
+
+  @Test
+  public void testSeekToToAfterFooterAndReadWithConfFalse() throws Exception {
+    testSeekAndReadWithConf(false, SeekTo.AFTER_FOOTER_START);
+  }
+
+  @Test
+  public void testSeekToEndAndReadWithConfTrue() throws Exception {
+    testSeekAndReadWithConf(true, SeekTo.END);
+  }
+
+  @Test
+  public void testSeekToEndAndReadWithConfFalse() throws Exception {
+    testSeekAndReadWithConf(false, SeekTo.END);
+  }
+
+  private void testSeekAndReadWithConf(boolean optimizeFooterRead,
+      SeekTo seekTo) throws Exception {
+    for (int i = 2; i <= 6; i++) {
+      int fileSize = i * ONE_MB;
+      final AzureBlobFileSystem fs = getFileSystem(optimizeFooterRead,
+          fileSize);
+      String fileName = methodName.getMethodName() + i;
+      byte[] fileContent = getRandomBytesArray(fileSize);
+      Path testFilePath = createFileWithContent(fs, fileName, fileContent);
+      seekReadAndTest(fs, testFilePath, seekPos(seekTo, fileSize), HUNDRED,
+          fileContent);
+    }
+  }
+
+  private int seekPos(SeekTo seekTo, int fileSize) {
+    if (seekTo == SeekTo.BEGIN) {
+      return 0;
+    }
+    if (seekTo == SeekTo.BEFORE_FOOTER_START) {
+      return fileSize - AbfsInputStream.FOOTER_SIZE - 1;
+    }
+    if (seekTo == SeekTo.AT_FOOTER_START) {
+      return fileSize - AbfsInputStream.FOOTER_SIZE;
+    }
+    if (seekTo == SeekTo.END) {
+      return fileSize - 1;
+    }
+    //seekTo == SeekTo.AFTER_FOOTER_START
+    return fileSize - AbfsInputStream.FOOTER_SIZE + 1;
+  }
+
+  private void seekReadAndTest(final FileSystem fs, final Path testFilePath,
+      final int seekPos, final int length, final byte[] fileContent)
+      throws IOException, NoSuchFieldException, IllegalAccessException {
+    AbfsConfiguration conf = getAbfsStore(fs).getAbfsConfiguration();
+    long actualContentLength = fileContent.length;
+    try (FSDataInputStream iStream = fs.open(testFilePath)) {
+      AbfsInputStream abfsInputStream = (AbfsInputStream) iStream
+          .getWrappedStream();
+      long bufferSize = abfsInputStream.getBufferSize();
+      seek(iStream, seekPos);
+      byte[] buffer = new byte[length];
+      long bytesRead = iStream.read(buffer, 0, length);
+
+      long footerStart = max(0,
+          actualContentLength - AbfsInputStream.FOOTER_SIZE);
+      boolean optimizationOn =
+          conf.optimizeFooterRead() && seekPos >= footerStart;
+
+      long actualLength = length;
+      if (seekPos + length > actualContentLength) {
+        long delta = seekPos + length - actualContentLength;
+        actualLength = length - delta;
+      }
+      long expectedLimit;
+      long expectedBCurson;
+      long expectedFCursor;
+      if (optimizationOn) {
+        if (actualContentLength <= bufferSize) {
+          expectedLimit = actualContentLength;
+          expectedBCurson = seekPos + actualLength;
+        } else {
+          expectedLimit = bufferSize;
+          long lastBlockStart = max(0, actualContentLength - bufferSize);
+          expectedBCurson = seekPos - lastBlockStart + actualLength;
+        }
+        expectedFCursor = actualContentLength;
+      } else {
+        if (seekPos + bufferSize < actualContentLength) {
+          expectedLimit = bufferSize;
+          expectedFCursor = bufferSize;
+        } else {
+          expectedLimit = actualContentLength - seekPos;
+          expectedFCursor = min(seekPos + bufferSize, actualContentLength);
+        }
+        expectedBCurson = actualLength;
+      }
+
+      assertEquals(expectedFCursor, abfsInputStream.getFCursor());
+      assertEquals(expectedFCursor, abfsInputStream.getFCursorAfterLastRead());
+      assertEquals(expectedLimit, abfsInputStream.getLimit());
+      assertEquals(expectedBCurson, abfsInputStream.getBCursor());
+      assertEquals(actualLength, bytesRead);
+      //  Verify user-content read
+      assertContentReadCorrectly(fileContent, seekPos, (int) actualLength, buffer);
+      //  Verify data read to AbfsInputStream buffer
+      int from = seekPos;
+      if (optimizationOn) {
+        from = (int) max(0, actualContentLength - bufferSize);
+      }
+      assertContentReadCorrectly(fileContent, from, (int) abfsInputStream.getLimit(),
+          abfsInputStream.getBuffer());
+    }
+  }
+
+  @Test
+  public void testPartialReadWithNoData()
+      throws Exception {
+    for (int i = 2; i <= 6; i++) {
+      int fileSize = i * ONE_MB;
+      final AzureBlobFileSystem fs = getFileSystem(true, fileSize);
+      String fileName = methodName.getMethodName() + i;
+      byte[] fileContent = getRandomBytesArray(fileSize);
+      Path testFilePath = createFileWithContent(fs, fileName, fileContent);
+      testPartialReadWithNoData(fs, testFilePath,
+          fileSize - AbfsInputStream.FOOTER_SIZE, AbfsInputStream.FOOTER_SIZE,
+          fileContent);
+    }
+  }
+
+  private void testPartialReadWithNoData(final FileSystem fs,
+      final Path testFilePath, final int seekPos, final int length,
+      final byte[] fileContent)
+      throws IOException, NoSuchFieldException, IllegalAccessException {
+    FSDataInputStream iStream = fs.open(testFilePath);
+    try {
+      AbfsInputStream abfsInputStream = (AbfsInputStream) iStream
+          .getWrappedStream();
+      abfsInputStream = spy(abfsInputStream);
+      doReturn(10).doReturn(10).doCallRealMethod().when(abfsInputStream)
+          .readRemote(anyLong(), any(), anyInt(), anyInt());
+
+      iStream = new FSDataInputStream(abfsInputStream);
+      seek(iStream, seekPos);
+
+      byte[] buffer = new byte[length];
+      int bytesRead = iStream.read(buffer, 0, length);
+      assertEquals(length, bytesRead);
+      assertContentReadCorrectly(fileContent, seekPos, length, buffer);
+      assertEquals(fileContent.length, abfsInputStream.getFCursor());
+      assertEquals(length, abfsInputStream.getBCursor());
+      assertTrue(abfsInputStream.getLimit() >= length);
+    } finally {
+      iStream.close();
+    }
+  }
+
+  @Test
+  public void testPartialReadWithSomeDat()
+      throws Exception {
+    for (int i = 3; i <= 6; i++) {
+      int fileSize = i * ONE_MB;
+      final AzureBlobFileSystem fs = getFileSystem(true, fileSize);
+      String fileName = methodName.getMethodName() + i;
+      byte[] fileContent = getRandomBytesArray(fileSize);
+      Path testFilePath = createFileWithContent(fs, fileName, fileContent);
+      testPartialReadWithSomeDat(fs, testFilePath,
+          fileSize - AbfsInputStream.FOOTER_SIZE, AbfsInputStream.FOOTER_SIZE,
+          fileContent);
+    }
+  }
+
+  private void testPartialReadWithSomeDat(final FileSystem fs,
+      final Path testFilePath, final int seekPos, final int length,
+      final byte[] fileContent)
+      throws IOException, NoSuchFieldException, IllegalAccessException {
+    FSDataInputStream iStream = fs.open(testFilePath);
+    try {
+      AbfsInputStream abfsInputStream = (AbfsInputStream) iStream
+          .getWrappedStream();
+      abfsInputStream = spy(abfsInputStream);
+      //  first readRemote, will return first 10 bytes
+      //  second readRemote returns data till the last 2 bytes
+      int someDataLength = 2;
+      int secondReturnSize =
+          min(fileContent.length, abfsInputStream.getBufferSize()) - 10
+              - someDataLength;
+      doReturn(10).doReturn(secondReturnSize).doCallRealMethod()
+          .when(abfsInputStream)
+          .readRemote(anyLong(), any(), anyInt(), anyInt());
+
+      iStream = new FSDataInputStream(abfsInputStream);
+      seek(iStream, seekPos);
+
+      byte[] buffer = new byte[length];
+      int bytesRead = iStream.read(buffer, 0, length);
+      assertEquals(length, bytesRead);
+      assertEquals(fileContent.length, abfsInputStream.getFCursor());
+      //  someDataLength(2), because in the do-while loop in read, the 2nd loop
+      //  will go to readoneblock and that resets the bCursor to 0 as
+      //  bCursor == limit finally when the 2 bytes are read bCursor and limit
+      //  will be at someDataLength(2)
+      assertEquals(someDataLength, abfsInputStream.getBCursor());
+      assertEquals(someDataLength, abfsInputStream.getLimit());
+    } finally {
+      iStream.close();
+    }
+  }
+
+  private AzureBlobFileSystem getFileSystem(boolean optimizeFooterRead,
+      int fileSize) throws IOException {
+    final AzureBlobFileSystem fs = getFileSystem();
+    getAbfsStore(fs).getAbfsConfiguration()
+        .setOptimizeFooterRead(optimizeFooterRead);
+    if (fileSize <= getAbfsStore(fs).getAbfsConfiguration()
+        .getReadBufferSize()) {
+      getAbfsStore(fs).getAbfsConfiguration()
+          .setReadSmallFilesCompletely(false);
+    }
+    return fs;
+  }
+
+  private enum SeekTo {
+    BEGIN, AT_FOOTER_START, BEFORE_FOOTER_START, AFTER_FOOTER_START, END
+  }
+}
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsInputStreamSmallFileReads.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsInputStreamSmallFileReads.java
new file mode 100644
index 0000000..ff03c0e
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsInputStreamSmallFileReads.java
@@ -0,0 +1,326 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.junit.Test;
+
+import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.spy;
+
+import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.CONNECTIONS_MADE;
+import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_KB;
+import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_MB;
+
+public class ITestAbfsInputStreamSmallFileReads extends ITestAbfsInputStream {
+
+  public ITestAbfsInputStreamSmallFileReads() throws Exception {
+  }
+
+  @Test
+  public void testOnlyOneServerCallIsMadeWhenTheConfIsTrue() throws Exception {
+    testNumBackendCalls(true);
+  }
+
+  @Test
+  public void testMultipleServerCallsAreMadeWhenTheConfIsFalse()
+      throws Exception {
+    testNumBackendCalls(false);
+  }
+
+  private void testNumBackendCalls(boolean readSmallFilesCompletely)
+      throws Exception {
+    final AzureBlobFileSystem fs = getFileSystem(readSmallFilesCompletely);
+    for (int i = 1; i <= 4; i++) {
+      String fileName = methodName.getMethodName() + i;
+      int fileSize = i * ONE_MB;
+      byte[] fileContent = getRandomBytesArray(fileSize);
+      Path testFilePath = createFileWithContent(fs, fileName, fileContent);
+      int length = ONE_KB;
+      try (FSDataInputStream iStream = fs.open(testFilePath)) {
+        byte[] buffer = new byte[length];
+
+        Map<String, Long> metricMap = getInstrumentationMap(fs);
+        long requestsMadeBeforeTest = metricMap
+            .get(CONNECTIONS_MADE.getStatName());
+
+        iStream.seek(seekPos(SeekTo.END, fileSize, length));
+        iStream.read(buffer, 0, length);
+
+        iStream.seek(seekPos(SeekTo.MIDDLE, fileSize, length));
+        iStream.read(buffer, 0, length);
+
+        iStream.seek(seekPos(SeekTo.BEGIN, fileSize, length));
+        iStream.read(buffer, 0, length);
+
+        metricMap = getInstrumentationMap(fs);
+        long requestsMadeAfterTest = metricMap
+            .get(CONNECTIONS_MADE.getStatName());
+
+        if (readSmallFilesCompletely) {
+          assertEquals(1, requestsMadeAfterTest - requestsMadeBeforeTest);
+        } else {
+          assertEquals(3, requestsMadeAfterTest - requestsMadeBeforeTest);
+        }
+      }
+    }
+  }
+
+  @Test
+  public void testSeekToBeginingAndReadSmallFileWithConfTrue()
+      throws Exception {
+    testSeekAndReadWithConf(SeekTo.BEGIN, 2, 4, true);
+  }
+
+  @Test
+  public void testSeekToBeginingAndReadSmallFileWithConfFalse()
+      throws Exception {
+    testSeekAndReadWithConf(SeekTo.BEGIN, 2, 4, false);
+  }
+
+  @Test
+  public void testSeekToBeginingAndReadBigFileWithConfTrue() throws Exception {
+    testSeekAndReadWithConf(SeekTo.BEGIN, 5, 6, true);
+  }
+
+  @Test
+  public void testSeekToBeginingAndReadBigFileWithConfFalse() throws Exception {
+    testSeekAndReadWithConf(SeekTo.BEGIN, 5, 6, false);
+  }
+
+  @Test
+  public void testSeekToEndAndReadSmallFileWithConfTrue() throws Exception {
+    testSeekAndReadWithConf(SeekTo.END, 2, 4, true);
+  }
+
+  @Test
+  public void testSeekToEndAndReadSmallFileWithConfFalse() throws Exception {
+    testSeekAndReadWithConf(SeekTo.END, 2, 4, false);
+  }
+
+  @Test
+  public void testSeekToEndAndReadBigFileWithConfTrue() throws Exception {
+    testSeekAndReadWithConf(SeekTo.END, 5, 6, true);
+  }
+
+  @Test
+  public void testSeekToEndAndReaBigFiledWithConfFalse() throws Exception {
+    testSeekAndReadWithConf(SeekTo.END, 5, 6, false);
+  }
+
+  @Test
+  public void testSeekToMiddleAndReadSmallFileWithConfTrue() throws Exception {
+    testSeekAndReadWithConf(SeekTo.MIDDLE, 2, 4, true);
+  }
+
+  @Test
+  public void testSeekToMiddleAndReadSmallFileWithConfFalse() throws Exception {
+    testSeekAndReadWithConf(SeekTo.MIDDLE, 2, 4, false);
+  }
+
+  @Test
+  public void testSeekToMiddleAndReaBigFileWithConfTrue() throws Exception {
+    testSeekAndReadWithConf(SeekTo.MIDDLE, 5, 6, true);
+  }
+
+  @Test
+  public void testSeekToMiddleAndReadBigFileWithConfFalse() throws Exception {
+    testSeekAndReadWithConf(SeekTo.MIDDLE, 5, 6, false);
+  }
+
+  private void testSeekAndReadWithConf(SeekTo seekTo, int startFileSizeInMB,
+      int endFileSizeInMB, boolean readSmallFilesCompletely) throws Exception {
+    final AzureBlobFileSystem fs = getFileSystem(readSmallFilesCompletely);
+    for (int i = startFileSizeInMB; i <= endFileSizeInMB; i++) {
+      String fileName = methodName.getMethodName() + i;
+      int fileSize = i * ONE_MB;
+      byte[] fileContent = getRandomBytesArray(fileSize);
+      Path testFilePath = createFileWithContent(fs, fileName, fileContent);
+      int length = ONE_KB;
+      int seekPos = seekPos(seekTo, fileSize, length);
+      seekReadAndTest(fs, testFilePath, seekPos, length, fileContent);
+    }
+  }
+
+  private int seekPos(SeekTo seekTo, int fileSize, int length) {
+    if (seekTo == SeekTo.BEGIN) {
+      return 0;
+    }
+    if (seekTo == SeekTo.END) {
+      return fileSize - length;
+    }
+    return fileSize / 2;
+  }
+
+  private void seekReadAndTest(FileSystem fs, Path testFilePath, int seekPos,
+      int length, byte[] fileContent)
+      throws IOException, NoSuchFieldException, IllegalAccessException {
+    AbfsConfiguration conf = getAbfsStore(fs).getAbfsConfiguration();
+    try (FSDataInputStream iStream = fs.open(testFilePath)) {
+      seek(iStream, seekPos);
+      byte[] buffer = new byte[length];
+      int bytesRead = iStream.read(buffer, 0, length);
+      assertEquals(bytesRead, length);
+      assertContentReadCorrectly(fileContent, seekPos, length, buffer);
+      AbfsInputStream abfsInputStream = (AbfsInputStream) iStream
+          .getWrappedStream();
+
+      final int readBufferSize = conf.getReadBufferSize();
+      final int fileContentLength = fileContent.length;
+      final boolean smallFile = fileContentLength <= readBufferSize;
+      int expectedLimit, expectedFCursor;
+      int expectedBCursor;
+      if (conf.readSmallFilesCompletely() && smallFile) {
+        assertBuffersAreEqual(fileContent, abfsInputStream.getBuffer(), conf);
+        expectedFCursor = fileContentLength;
+        expectedLimit = fileContentLength;
+        expectedBCursor = seekPos + length;
+      } else {
+        if ((seekPos == 0)) {
+          assertBuffersAreEqual(fileContent, abfsInputStream.getBuffer(), conf);
+        } else {
+          assertBuffersAreNotEqual(fileContent, abfsInputStream.getBuffer(),
+              conf);
+        }
+        expectedBCursor = length;
+        expectedFCursor = (fileContentLength < (seekPos + readBufferSize))
+            ? fileContentLength
+            : (seekPos + readBufferSize);
+        expectedLimit = (fileContentLength < (seekPos + readBufferSize))
+            ? (fileContentLength - seekPos)
+            : readBufferSize;
+      }
+      assertEquals(expectedFCursor, abfsInputStream.getFCursor());
+      assertEquals(expectedFCursor, abfsInputStream.getFCursorAfterLastRead());
+      assertEquals(expectedBCursor, abfsInputStream.getBCursor());
+      assertEquals(expectedLimit, abfsInputStream.getLimit());
+    }
+  }
+
+  @Test
+  public void testPartialReadWithNoData() throws Exception {
+    for (int i = 2; i <= 4; i++) {
+      int fileSize = i * ONE_MB;
+      final AzureBlobFileSystem fs = getFileSystem(true);
+      String fileName = methodName.getMethodName() + i;
+      byte[] fileContent = getRandomBytesArray(fileSize);
+      Path testFilePath = createFileWithContent(fs, fileName, fileContent);
+      partialReadWithNoData(fs, testFilePath, fileSize / 2, fileSize / 4,
+          fileContent);
+    }
+  }
+
+  private void partialReadWithNoData(final FileSystem fs,
+      final Path testFilePath,
+      final int seekPos, final int length, final byte[] fileContent)
+      throws IOException {
+
+    FSDataInputStream iStream = fs.open(testFilePath);
+    try {
+      AbfsInputStream abfsInputStream = (AbfsInputStream) iStream
+          .getWrappedStream();
+      abfsInputStream = spy(abfsInputStream);
+      doReturn(10)
+          .doReturn(10)
+          .doCallRealMethod()
+          .when(abfsInputStream)
+          .readRemote(anyLong(), any(), anyInt(), anyInt());
+
+      iStream = new FSDataInputStream(abfsInputStream);
+      seek(iStream, seekPos);
+      byte[] buffer = new byte[length];
+      int bytesRead = iStream.read(buffer, 0, length);
+      assertEquals(bytesRead, length);
+      assertContentReadCorrectly(fileContent, seekPos, length, buffer);
+      assertEquals(fileContent.length, abfsInputStream.getFCursor());
+      assertEquals(fileContent.length,
+          abfsInputStream.getFCursorAfterLastRead());
+      assertEquals(length, abfsInputStream.getBCursor());
+      assertTrue(abfsInputStream.getLimit() >= length);
+    } finally {
+      iStream.close();
+    }
+  }
+
+  @Test
+  public void testPartialReadWithSomeData() throws Exception {
+    for (int i = 2; i <= 4; i++) {
+      int fileSize = i * ONE_MB;
+      final AzureBlobFileSystem fs = getFileSystem(true);
+      String fileName = methodName.getMethodName() + i;
+      byte[] fileContent = getRandomBytesArray(fileSize);
+      Path testFilePath = createFileWithContent(fs, fileName, fileContent);
+      partialReadWithSomeData(fs, testFilePath, fileSize / 2,
+          fileSize / 4, fileContent);
+    }
+  }
+
+  private void partialReadWithSomeData(final FileSystem fs,
+      final Path testFilePath,
+      final int seekPos, final int length, final byte[] fileContent)
+      throws IOException, NoSuchFieldException, IllegalAccessException {
+    FSDataInputStream iStream = fs.open(testFilePath);
+    try {
+      AbfsInputStream abfsInputStream = (AbfsInputStream) iStream
+          .getWrappedStream();
+      abfsInputStream = spy(abfsInputStream);
+      //  first readRemote, will return first 10 bytes
+      //  second readRemote, seekPos - someDataLength(10) will reach the
+      //  seekPos as 10 bytes are already read in the first call. Plus
+      //  someDataLength(10)
+      int someDataLength = 10;
+      int secondReturnSize = seekPos - 10 + someDataLength;
+      doReturn(10)
+          .doReturn(secondReturnSize)
+          .doCallRealMethod()
+          .when(abfsInputStream)
+          .readRemote(anyLong(), any(), anyInt(), anyInt());
+
+      iStream = new FSDataInputStream(abfsInputStream);
+      seek(iStream, seekPos);
+
+      byte[] buffer = new byte[length];
+      int bytesRead = iStream.read(buffer, 0, length);
+      assertEquals(length, bytesRead);
+      assertTrue(abfsInputStream.getFCursor() > seekPos + length);
+      assertTrue(abfsInputStream.getFCursorAfterLastRead() > seekPos + length);
+      //  Optimized read was no complete but it got some user requested data
+      //  from server. So obviously the buffer will contain data more than
+      //  seekPos + len
+      assertEquals(length - someDataLength, abfsInputStream.getBCursor());
+      assertTrue(abfsInputStream.getLimit() > length - someDataLength);
+    } finally {
+      iStream.close();
+    }
+  }
+
+  private enum SeekTo {BEGIN, MIDDLE, END}
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[hadoop] 03/06: HADOOP-17422: ABFS: Set default ListMaxResults to max server limit (#2535) Contributed by Sumangala Patki

Posted by st...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

stevel pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/hadoop.git

commit 5f312a0d854f8d2c84099bb44783f07d84602625
Author: Sumangala <f2...@alumni.bits-pilani.ac.in>
AuthorDate: Wed Dec 9 15:35:03 2020 +0530

    HADOOP-17422: ABFS: Set default ListMaxResults to max server limit (#2535)
    Contributed by Sumangala Patki
    
    TEST RESULTS:
    
    namespace.enabled=true
    auth.type=SharedKey
    -------------------
    $mvn -T 1C -Dparallel-tests=abfs -Dscale -DtestsThreadCount=8 clean verify
    Tests run: 90, Failures: 0, Errors: 0, Skipped: 0
    Tests run: 462, Failures: 0, Errors: 0, Skipped: 24
    Tests run: 208, Failures: 0, Errors: 0, Skipped: 24
    
    namespace.enabled=true
    auth.type=OAuth
    -------------------
    $mvn -T 1C -Dparallel-tests=abfs -Dscale -DtestsThreadCount=8 clean verify
    Tests run: 90, Failures: 0, Errors: 0, Skipped: 0
    Tests run: 462, Failures: 0, Errors: 0, Skipped: 70
    Tests run: 208, Failures: 0, Errors: 0, Skipped: 141
    
    (cherry picked from commit a35fc3871b01d8a3a375f3ae0e330b55a1d9009f)
---
 .../hadoop/fs/azurebfs/constants/FileSystemConfigurations.java    | 2 +-
 hadoop-tools/hadoop-azure/src/site/markdown/abfs.md               | 2 +-
 .../hadoop/fs/azurebfs/ITestAzureBlobFileSystemListStatus.java    | 8 +++++++-
 3 files changed, 9 insertions(+), 3 deletions(-)

diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
index 49fc58b..27dafd0 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
@@ -63,7 +63,7 @@ public final class FileSystemConfigurations {
   public static final int MAX_BUFFER_SIZE = 100 * ONE_MB;  // 100 MB
   public static final long MAX_AZURE_BLOCK_SIZE = 256 * 1024 * 1024L; // changing default abfs blocksize to 256MB
   public static final String AZURE_BLOCK_LOCATION_HOST_DEFAULT = "localhost";
-  public static final int DEFAULT_AZURE_LIST_MAX_RESULTS = 500;
+  public static final int DEFAULT_AZURE_LIST_MAX_RESULTS = 5000;
 
   public static final int MAX_CONCURRENT_READ_THREADS = 12;
   public static final int MAX_CONCURRENT_WRITE_THREADS = 8;
diff --git a/hadoop-tools/hadoop-azure/src/site/markdown/abfs.md b/hadoop-tools/hadoop-azure/src/site/markdown/abfs.md
index a418811..0777f9b 100644
--- a/hadoop-tools/hadoop-azure/src/site/markdown/abfs.md
+++ b/hadoop-tools/hadoop-azure/src/site/markdown/abfs.md
@@ -848,7 +848,7 @@ Please refer the following links for further information.
 listStatus API fetches the FileStatus information from server in a page by page
 manner. The config `fs.azure.list.max.results` used to set the maxResults URI
  param which sets the pagesize(maximum results per call). The value should
- be >  0. By default this will be 500. Server has a maximum value for this
+ be >  0. By default this will be 5000. Server has a maximum value for this
  parameter as 5000. So even if the config is above 5000 the response will only
 contain 5000 entries. Please refer the following link for further information.
 https://docs.microsoft.com/en-us/rest/api/storageservices/datalakestoragegen2/path/list
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemListStatus.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemListStatus.java
index 25a1567..31f92d2 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemListStatus.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemListStatus.java
@@ -29,12 +29,15 @@ import java.util.concurrent.Future;
 
 import org.junit.Test;
 
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.contract.ContractTestUtils;
 
+import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_LIST_MAX_RESULTS;
 import static org.apache.hadoop.fs.contract.ContractTestUtils.assertMkdirs;
 import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile;
 import static org.apache.hadoop.fs.contract.ContractTestUtils.assertPathExists;
@@ -55,7 +58,10 @@ public class ITestAzureBlobFileSystemListStatus extends
 
   @Test
   public void testListPath() throws Exception {
-    final AzureBlobFileSystem fs = getFileSystem();
+    Configuration config = new Configuration(this.getRawConfiguration());
+    config.set(AZURE_LIST_MAX_RESULTS, "5000");
+    final AzureBlobFileSystem fs = (AzureBlobFileSystem) FileSystem
+        .newInstance(getFileSystem().getUri(), config);
     final List<Future<Void>> tasks = new ArrayList<>();
 
     ExecutorService es = Executors.newFixedThreadPool(10);


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[hadoop] 01/06: HADOOP-17296. ABFS: Force reads to be always of buffer size.

Posted by st...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

stevel pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/hadoop.git

commit a44890eb63f5320a542d5160f140e11e82256932
Author: Sneha Vijayarajan <sn...@gmail.com>
AuthorDate: Fri Nov 27 19:52:34 2020 +0530

    HADOOP-17296. ABFS: Force reads to be always of buffer size.
    
    Contributed by Sneha Vijayarajan.
    
    (cherry picked from commit 142941b96e221fc1b4524476ce445714d7f6eec3)
---
 .../hadoop/fs/azurebfs/AbfsConfiguration.java      |  18 ++
 .../fs/azurebfs/AzureBlobFileSystemStore.java      |   3 +
 .../fs/azurebfs/constants/ConfigurationKeys.java   |   2 +
 .../constants/FileSystemConfigurations.java        |   3 +
 .../fs/azurebfs/services/AbfsInputStream.java      |  41 +++-
 .../azurebfs/services/AbfsInputStreamContext.java  |  38 ++++
 .../fs/azurebfs/services/ReadBufferManager.java    | 105 ++++++++-
 .../hadoop-azure/src/site/markdown/abfs.md         |  16 ++
 .../fs/azurebfs/AbstractAbfsIntegrationTest.java   |   8 +
 .../ITestAzureBlobFileSystemRandomRead.java        | 244 ++++++++++++++++-----
 .../fs/azurebfs/services/TestAbfsInputStream.java  | 223 ++++++++++++++++++-
 11 files changed, 634 insertions(+), 67 deletions(-)

diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
index c4a2b67..3d09a80 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
@@ -201,6 +201,16 @@ public class AbfsConfiguration{
       DefaultValue = DEFAULT_READ_AHEAD_QUEUE_DEPTH)
   private int readAheadQueueDepth;
 
+  @IntegerConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_READ_AHEAD_BLOCK_SIZE,
+      MinValue = MIN_BUFFER_SIZE,
+      MaxValue = MAX_BUFFER_SIZE,
+      DefaultValue = DEFAULT_READ_AHEAD_BLOCK_SIZE)
+  private int readAheadBlockSize;
+
+  @BooleanConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_ALWAYS_READ_BUFFER_SIZE,
+      DefaultValue = DEFAULT_ALWAYS_READ_BUFFER_SIZE)
+  private boolean alwaysReadBufferSize;
+
   @BooleanConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_ENABLE_FLUSH,
       DefaultValue = DEFAULT_ENABLE_FLUSH)
   private boolean enableFlush;
@@ -599,6 +609,14 @@ public class AbfsConfiguration{
     return this.readAheadQueueDepth;
   }
 
+  public int getReadAheadBlockSize() {
+    return this.readAheadBlockSize;
+  }
+
+  public boolean shouldReadBufferSizeAlways() {
+    return this.alwaysReadBufferSize;
+  }
+
   public boolean isFlushEnabled() {
     return this.enableFlush;
   }
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
index e8f355f..a766c62 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
@@ -644,6 +644,9 @@ public class AzureBlobFileSystemStore implements Closeable {
             .withReadAheadQueueDepth(abfsConfiguration.getReadAheadQueueDepth())
             .withTolerateOobAppends(abfsConfiguration.getTolerateOobAppends())
             .withStreamStatistics(new AbfsInputStreamStatisticsImpl())
+            .withShouldReadBufferSizeAlways(
+                abfsConfiguration.shouldReadBufferSizeAlways())
+            .withReadAheadBlockSize(abfsConfiguration.getReadAheadBlockSize())
             .build();
   }
 
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
index c15c470..cb9c0de 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
@@ -75,6 +75,8 @@ public final class ConfigurationKeys {
    *  Default is empty. **/
   public static final String FS_AZURE_APPEND_BLOB_KEY = "fs.azure.appendblob.directories";
   public static final String FS_AZURE_READ_AHEAD_QUEUE_DEPTH = "fs.azure.readaheadqueue.depth";
+  public static final String FS_AZURE_ALWAYS_READ_BUFFER_SIZE = "fs.azure.read.alwaysReadBufferSize";
+  public static final String FS_AZURE_READ_AHEAD_BLOCK_SIZE = "fs.azure.read.readahead.blocksize";
   /** Provides a config control to enable or disable ABFS Flush operations -
    *  HFlush and HSync. Default is true. **/
   public static final String FS_AZURE_ENABLE_FLUSH = "fs.azure.enable.flush";
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
index fa0ee6a..49fc58b 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
@@ -57,6 +57,8 @@ public final class FileSystemConfigurations {
   public static final int DEFAULT_WRITE_BUFFER_SIZE = 8 * ONE_MB;  // 8 MB
   public static final int APPENDBLOB_MAX_WRITE_BUFFER_SIZE = 4 * ONE_MB;  // 4 MB
   public static final int DEFAULT_READ_BUFFER_SIZE = 4 * ONE_MB;  // 4 MB
+  public static final boolean DEFAULT_ALWAYS_READ_BUFFER_SIZE = false;
+  public static final int DEFAULT_READ_AHEAD_BLOCK_SIZE = 4 * ONE_MB;
   public static final int MIN_BUFFER_SIZE = 16 * ONE_KB;  // 16 KB
   public static final int MAX_BUFFER_SIZE = 100 * ONE_MB;  // 100 MB
   public static final long MAX_AZURE_BLOCK_SIZE = 256 * 1024 * 1024L; // changing default abfs blocksize to 256MB
@@ -74,6 +76,7 @@ public final class FileSystemConfigurations {
   public static final String DEFAULT_FS_AZURE_APPEND_BLOB_DIRECTORIES = "";
 
   public static final int DEFAULT_READ_AHEAD_QUEUE_DEPTH = -1;
+
   public static final boolean DEFAULT_ENABLE_FLUSH = true;
   public static final boolean DEFAULT_DISABLE_OUTPUTSTREAM_FLUSH = true;
   public static final boolean DEFAULT_ENABLE_AUTOTHROTTLING = true;
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java
index c43f910..3682bcb 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java
@@ -47,6 +47,7 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
         StreamCapabilities {
   private static final Logger LOG = LoggerFactory.getLogger(AbfsInputStream.class);
 
+  private int readAheadBlockSize;
   private final AbfsClient client;
   private final Statistics statistics;
   private final String path;
@@ -56,6 +57,7 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
   private final String eTag;                  // eTag of the path when InputStream are created
   private final boolean tolerateOobAppends; // whether tolerate Oob Appends
   private final boolean readAheadEnabled; // whether enable readAhead;
+  private final boolean alwaysReadBufferSize;
 
   // SAS tokens can be re-used until they expire
   private CachedSASToken cachedSasToken;
@@ -89,9 +91,16 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
     this.tolerateOobAppends = abfsInputStreamContext.isTolerateOobAppends();
     this.eTag = eTag;
     this.readAheadEnabled = true;
+    this.alwaysReadBufferSize
+        = abfsInputStreamContext.shouldReadBufferSizeAlways();
     this.cachedSasToken = new CachedSASToken(
         abfsInputStreamContext.getSasTokenRenewPeriodForStreamsInSeconds());
     this.streamStatistics = abfsInputStreamContext.getStreamStatistics();
+    readAheadBlockSize = abfsInputStreamContext.getReadAheadBlockSize();
+
+    // Propagate the config values to ReadBufferManager so that the first instance
+    // to initialize can set the readAheadBlockSize
+    ReadBufferManager.setReadBufferManagerConfigs(readAheadBlockSize);
   }
 
   public String getPath() {
@@ -178,11 +187,15 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
         buffer = new byte[bufferSize];
       }
 
-      // Enable readAhead when reading sequentially
-      if (-1 == fCursorAfterLastRead || fCursorAfterLastRead == fCursor || b.length >= bufferSize) {
+      if (alwaysReadBufferSize) {
         bytesRead = readInternal(fCursor, buffer, 0, bufferSize, false);
       } else {
-        bytesRead = readInternal(fCursor, buffer, 0, b.length, true);
+        // Enable readAhead when reading sequentially
+        if (-1 == fCursorAfterLastRead || fCursorAfterLastRead == fCursor || b.length >= bufferSize) {
+          bytesRead = readInternal(fCursor, buffer, 0, bufferSize, false);
+        } else {
+          bytesRead = readInternal(fCursor, buffer, 0, b.length, true);
+        }
       }
 
       if (bytesRead == -1) {
@@ -223,16 +236,19 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
 
       // queue read-aheads
       int numReadAheads = this.readAheadQueueDepth;
-      long nextSize;
       long nextOffset = position;
+      // First read to queue needs to be of readBufferSize and later
+      // of readAhead Block size
+      long nextSize = Math.min((long) bufferSize, contentLength - nextOffset);
       LOG.debug("read ahead enabled issuing readheads num = {}", numReadAheads);
       while (numReadAheads > 0 && nextOffset < contentLength) {
-        nextSize = Math.min((long) bufferSize, contentLength - nextOffset);
         LOG.debug("issuing read ahead requestedOffset = {} requested size {}",
             nextOffset, nextSize);
         ReadBufferManager.getBufferManager().queueReadAhead(this, nextOffset, (int) nextSize);
         nextOffset = nextOffset + nextSize;
         numReadAheads--;
+        // From next round onwards should be of readahead block size.
+        nextSize = Math.min((long) readAheadBlockSize, contentLength - nextOffset);
       }
 
       // try reading from buffers first
@@ -527,6 +543,21 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
     return bytesFromRemoteRead;
   }
 
+  @VisibleForTesting
+  public int getBufferSize() {
+    return bufferSize;
+  }
+
+  @VisibleForTesting
+  public int getReadAheadQueueDepth() {
+    return readAheadQueueDepth;
+  }
+
+  @VisibleForTesting
+  public boolean shouldAlwaysReadBufferSize() {
+    return alwaysReadBufferSize;
+  }
+
   /**
    * Get the statistics of the stream.
    * @return a string value.
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java
index f8d3b2a..ade0583 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java
@@ -18,10 +18,15 @@
 
 package org.apache.hadoop.fs.azurebfs.services;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 /**
  * Class to hold extra input stream configs.
  */
 public class AbfsInputStreamContext extends AbfsStreamContext {
+  // Retaining logger of AbfsInputStream
+  private static final Logger LOG = LoggerFactory.getLogger(AbfsInputStream.class);
 
   private int readBufferSize;
 
@@ -29,6 +34,10 @@ public class AbfsInputStreamContext extends AbfsStreamContext {
 
   private boolean tolerateOobAppends;
 
+  private boolean alwaysReadBufferSize;
+
+  private int readAheadBlockSize;
+
   private AbfsInputStreamStatistics streamStatistics;
 
   public AbfsInputStreamContext(final long sasTokenRenewPeriodForStreamsInSeconds) {
@@ -60,7 +69,27 @@ public class AbfsInputStreamContext extends AbfsStreamContext {
     return this;
   }
 
+  public AbfsInputStreamContext withShouldReadBufferSizeAlways(
+      final boolean alwaysReadBufferSize) {
+    this.alwaysReadBufferSize = alwaysReadBufferSize;
+    return this;
+  }
+
+  public AbfsInputStreamContext withReadAheadBlockSize(
+      final int readAheadBlockSize) {
+    this.readAheadBlockSize = readAheadBlockSize;
+    return this;
+  }
+
   public AbfsInputStreamContext build() {
+    if (readBufferSize > readAheadBlockSize) {
+      LOG.debug(
+          "fs.azure.read.request.size[={}] is configured for higher size than "
+              + "fs.azure.read.readahead.blocksize[={}]. Auto-align "
+              + "readAhead block size to be same as readRequestSize.",
+          readBufferSize, readAheadBlockSize);
+      readAheadBlockSize = readBufferSize;
+    }
     // Validation of parameters to be done here.
     return this;
   }
@@ -80,4 +109,13 @@ public class AbfsInputStreamContext extends AbfsStreamContext {
   public AbfsInputStreamStatistics getStreamStatistics() {
     return streamStatistics;
   }
+
+  public boolean shouldReadBufferSizeAlways() {
+    return alwaysReadBufferSize;
+  }
+
+  public int getReadAheadBlockSize() {
+    return readAheadBlockSize;
+  }
+
 }
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java
index f5c6393..f330d79 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java
@@ -28,6 +28,7 @@ import java.util.LinkedList;
 import java.util.Queue;
 import java.util.Stack;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
 
@@ -36,12 +37,14 @@ import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTest
  */
 final class ReadBufferManager {
   private static final Logger LOGGER = LoggerFactory.getLogger(ReadBufferManager.class);
+  private static final int ONE_KB = 1024;
+  private static final int ONE_MB = ONE_KB * ONE_KB;
 
   private static final int NUM_BUFFERS = 16;
-  private static final int BLOCK_SIZE = 4 * 1024 * 1024;
   private static final int NUM_THREADS = 8;
   private static final int DEFAULT_THRESHOLD_AGE_MILLISECONDS = 3000; // have to see if 3 seconds is a good threshold
 
+  private static int blockSize = 4 * ONE_MB;
   private static int thresholdAgeMilliseconds = DEFAULT_THRESHOLD_AGE_MILLISECONDS;
   private Thread[] threads = new Thread[NUM_THREADS];
   private byte[][] buffers;    // array of byte[] buffers, to hold the data that is read
@@ -50,21 +53,37 @@ final class ReadBufferManager {
   private Queue<ReadBuffer> readAheadQueue = new LinkedList<>(); // queue of requests that are not picked up by any worker thread yet
   private LinkedList<ReadBuffer> inProgressList = new LinkedList<>(); // requests being processed by worker threads
   private LinkedList<ReadBuffer> completedReadList = new LinkedList<>(); // buffers available for reading
-  private static final ReadBufferManager BUFFER_MANAGER; // singleton, initialized in static initialization block
+  private static ReadBufferManager bufferManager; // singleton, initialized in static initialization block
+  private static final ReentrantLock LOCK = new ReentrantLock();
 
-  static {
-    BUFFER_MANAGER = new ReadBufferManager();
-    BUFFER_MANAGER.init();
+  static ReadBufferManager getBufferManager() {
+    if (bufferManager == null) {
+      LOCK.lock();
+      try {
+        if (bufferManager == null) {
+          bufferManager = new ReadBufferManager();
+          bufferManager.init();
+        }
+      } finally {
+        LOCK.unlock();
+      }
+    }
+    return bufferManager;
   }
 
-  static ReadBufferManager getBufferManager() {
-    return BUFFER_MANAGER;
+  static void setReadBufferManagerConfigs(int readAheadBlockSize) {
+    if (bufferManager == null) {
+      LOGGER.debug(
+          "ReadBufferManager not initialized yet. Overriding readAheadBlockSize as {}",
+          readAheadBlockSize);
+      blockSize = readAheadBlockSize;
+    }
   }
 
   private void init() {
     buffers = new byte[NUM_BUFFERS][];
     for (int i = 0; i < NUM_BUFFERS; i++) {
-      buffers[i] = new byte[BLOCK_SIZE];  // same buffers are reused. The byte array never goes back to GC
+      buffers[i] = new byte[blockSize];  // same buffers are reused. The byte array never goes back to GC
       freeList.add(i);
     }
     for (int i = 0; i < NUM_THREADS; i++) {
@@ -124,10 +143,10 @@ final class ReadBufferManager {
       buffer.setBufferindex(bufferIndex);
       readAheadQueue.add(buffer);
       notifyAll();
-    }
-    if (LOGGER.isTraceEnabled()) {
-      LOGGER.trace("Done q-ing readAhead for file {} offset {} buffer idx {}",
-          stream.getPath(), requestedOffset, buffer.getBufferindex());
+      if (LOGGER.isTraceEnabled()) {
+        LOGGER.trace("Done q-ing readAhead for file {} offset {} buffer idx {}",
+            stream.getPath(), requestedOffset, buffer.getBufferindex());
+      }
     }
   }
 
@@ -272,6 +291,7 @@ final class ReadBufferManager {
       return evict(nodeToEvict);
     }
 
+    LOGGER.trace("No buffer eligible for eviction");
     // nothing can be evicted
     return false;
   }
@@ -484,6 +504,67 @@ final class ReadBufferManager {
   }
 
   /**
+   * Test method that can clean up the current state of readAhead buffers and
+   * the lists. Will also trigger a fresh init.
+   */
+  @VisibleForTesting
+  void testResetReadBufferManager() {
+    synchronized (this) {
+      ArrayList<ReadBuffer> completedBuffers = new ArrayList<>();
+      for (ReadBuffer buf : completedReadList) {
+        if (buf != null) {
+          completedBuffers.add(buf);
+        }
+      }
+
+      for (ReadBuffer buf : completedBuffers) {
+        evict(buf);
+      }
+
+      readAheadQueue.clear();
+      inProgressList.clear();
+      completedReadList.clear();
+      freeList.clear();
+      for (int i = 0; i < NUM_BUFFERS; i++) {
+        buffers[i] = null;
+      }
+      buffers = null;
+      resetBufferManager();
+    }
+  }
+
+  /**
+   * Reset buffer manager to null.
+   */
+  @VisibleForTesting
+  static void resetBufferManager() {
+    bufferManager = null;
+  }
+
+  /**
+   * Reset readAhead buffer to needed readAhead block size and
+   * thresholdAgeMilliseconds.
+   * @param readAheadBlockSize
+   * @param thresholdAgeMilliseconds
+   */
+  @VisibleForTesting
+  void testResetReadBufferManager(int readAheadBlockSize, int thresholdAgeMilliseconds) {
+    setBlockSize(readAheadBlockSize);
+    setThresholdAgeMilliseconds(thresholdAgeMilliseconds);
+    testResetReadBufferManager();
+  }
+
+  @VisibleForTesting
+  static void setBlockSize(int readAheadBlockSize) {
+    blockSize = readAheadBlockSize;
+  }
+
+  @VisibleForTesting
+  int getReadAheadBlockSize() {
+    return blockSize;
+  }
+
+  /**
    * Test method that can mimic no free buffers scenario and also add a ReadBuffer
    * into completedReadList. This readBuffer will get picked up by TryEvict()
    * next time a new queue request comes in.
diff --git a/hadoop-tools/hadoop-azure/src/site/markdown/abfs.md b/hadoop-tools/hadoop-azure/src/site/markdown/abfs.md
index 79b897b..a418811 100644
--- a/hadoop-tools/hadoop-azure/src/site/markdown/abfs.md
+++ b/hadoop-tools/hadoop-azure/src/site/markdown/abfs.md
@@ -789,6 +789,17 @@ to 100 MB). The default value will be 8388608 (8 MB).
 bytes. The value should be between 16384 to 104857600 both inclusive (16 KB to
 100 MB). The default value will be 4194304 (4 MB).
 
+`fs.azure.read.alwaysReadBufferSize`: Read request size configured by
+`fs.azure.read.request.size` will be honoured only when the reads done are in
+sequential pattern. When the read pattern is detected to be random, read size
+will be same as the buffer length provided by the calling process.
+This config when set to true will force random reads to also read in same
+request sizes as sequential reads. This is a means to have same read patterns
+as of ADLS Gen1, as it does not differentiate read patterns and always reads by
+the configured read request size. The default value for this config will be
+false, where reads for the provided buffer length is done when random read
+pattern is detected.
+
 `fs.azure.readaheadqueue.depth`: Sets the readahead queue depth in
 AbfsInputStream. In case the set value is negative the read ahead queue depth
 will be set as Runtime.getRuntime().availableProcessors(). By default the value
@@ -796,6 +807,11 @@ will be -1. To disable readaheads, set this value to 0. If your workload is
  doing only random reads (non-sequential) or you are seeing throttling, you
   may try setting this value to 0.
 
+`fs.azure.read.readahead.blocksize`: To set the read buffer size for the read
+aheads. Specify the value in bytes. The value should be between 16384 to
+104857600 both inclusive (16 KB to 100 MB). The default value will be
+4194304 (4 MB).
+
 To run under limited memory situations configure the following. Especially
 when there are too many writes from the same process. 
 
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java
index 34b3615..7b3b5c1 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java
@@ -392,6 +392,14 @@ public abstract class AbstractAbfsIntegrationTest extends
     return path;
   }
 
+  public AzureBlobFileSystemStore getAbfsStore(final AzureBlobFileSystem fs) {
+    return fs.getAbfsStore();
+  }
+
+  public Path makeQualified(Path path) throws java.io.IOException {
+    return getFileSystem().makeQualified(path);
+  }
+
   /**
    * Create a path under the test path provided by
    * {@link #getTestPath()}.
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRandomRead.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRandomRead.java
index f582763..ef531ac 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRandomRead.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRandomRead.java
@@ -21,6 +21,7 @@ import java.io.EOFException;
 import java.io.IOException;
 import java.util.Random;
 import java.util.concurrent.Callable;
+import java.util.UUID;
 
 import org.junit.Assume;
 import org.junit.Ignore;
@@ -28,6 +29,7 @@ import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FSExceptionMessages;
@@ -37,30 +39,43 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.azure.NativeAzureFileSystem;
 import org.apache.hadoop.fs.contract.ContractTestUtils;
 
+import org.apache.hadoop.fs.azurebfs.services.AbfsInputStream;
+import org.apache.hadoop.fs.azurebfs.services.TestAbfsInputStream;
+
 import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.BYTES_RECEIVED;
+import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.GET_RESPONSES;
+import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.ETAG;
 
 /**
  * Test random read operation.
  */
 public class ITestAzureBlobFileSystemRandomRead extends
     AbstractAbfsScaleTest {
+  private static final int BYTE = 1;
+  private static final int THREE_BYTES = 3;
+  private static final int FIVE_BYTES = 5;
+  private static final int TWENTY_BYTES = 20;
+  private static final int THIRTY_BYTES = 30;
   private static final int KILOBYTE = 1024;
   private static final int MEGABYTE = KILOBYTE * KILOBYTE;
+  private static final int FOUR_MB = 4 * MEGABYTE;
+  private static final int NINE_MB = 9 * MEGABYTE;
   private static final long TEST_FILE_SIZE = 8 * MEGABYTE;
   private static final int MAX_ELAPSEDTIMEMS = 20;
   private static final int SEQUENTIAL_READ_BUFFER_SIZE = 16 * KILOBYTE;
-  private static final int CREATE_BUFFER_SIZE = 26 * KILOBYTE;
 
   private static final int SEEK_POSITION_ONE = 2* KILOBYTE;
   private static final int SEEK_POSITION_TWO = 5 * KILOBYTE;
   private static final int SEEK_POSITION_THREE = 10 * KILOBYTE;
   private static final int SEEK_POSITION_FOUR = 4100 * KILOBYTE;
 
-  private static final Path TEST_FILE_PATH = new Path(
-            "/TestRandomRead.txt");
+  private static final int ALWAYS_READ_BUFFER_SIZE_TEST_FILE_SIZE = 16 * MEGABYTE;
+    private static final int DISABLED_READAHEAD_DEPTH = 0;
+
+  private static final String TEST_FILE_PREFIX = "/TestRandomRead";
   private static final String WASB = "WASB";
   private static final String ABFS = "ABFS";
-  private static long testFileLength = 0;
 
   private static final Logger LOG =
       LoggerFactory.getLogger(ITestAzureBlobFileSystemRandomRead.class);
@@ -71,9 +86,10 @@ public class ITestAzureBlobFileSystemRandomRead extends
 
   @Test
   public void testBasicRead() throws Exception {
-    assumeHugeFileExists();
+    Path testPath = new Path(TEST_FILE_PREFIX + "_testBasicRead");
+    assumeHugeFileExists(testPath);
 
-    try (FSDataInputStream inputStream = this.getFileSystem().open(TEST_FILE_PATH)) {
+    try (FSDataInputStream inputStream = this.getFileSystem().open(testPath)) {
       byte[] buffer = new byte[3 * MEGABYTE];
 
       // forward seek and read a kilobyte into first kilobyte of bufferV2
@@ -99,12 +115,14 @@ public class ITestAzureBlobFileSystemRandomRead extends
   public void testRandomRead() throws Exception {
     Assume.assumeFalse("This test does not support namespace enabled account",
             this.getFileSystem().getIsNamespaceEnabled());
-    assumeHugeFileExists();
+    Path testPath = new Path(TEST_FILE_PREFIX + "_testRandomRead");
+    assumeHugeFileExists(testPath);
+
     try (
             FSDataInputStream inputStreamV1
-                    = this.getFileSystem().open(TEST_FILE_PATH);
+                    = this.getFileSystem().open(testPath);
             FSDataInputStream inputStreamV2
-                    = this.getWasbFileSystem().open(TEST_FILE_PATH);
+                    = this.getWasbFileSystem().open(testPath);
     ) {
       final int bufferSize = 4 * KILOBYTE;
       byte[] bufferV1 = new byte[bufferSize];
@@ -156,8 +174,10 @@ public class ITestAzureBlobFileSystemRandomRead extends
    */
   @Test
   public void testSeekToNewSource() throws Exception {
-    assumeHugeFileExists();
-    try (FSDataInputStream inputStream = this.getFileSystem().open(TEST_FILE_PATH)) {
+    Path testPath = new Path(TEST_FILE_PREFIX + "_testSeekToNewSource");
+    assumeHugeFileExists(testPath);
+
+    try (FSDataInputStream inputStream = this.getFileSystem().open(testPath)) {
       assertFalse(inputStream.seekToNewSource(0));
     }
   }
@@ -169,8 +189,10 @@ public class ITestAzureBlobFileSystemRandomRead extends
    */
   @Test
   public void testSkipBounds() throws Exception {
-    assumeHugeFileExists();
-    try (FSDataInputStream inputStream = this.getFileSystem().open(TEST_FILE_PATH)) {
+    Path testPath = new Path(TEST_FILE_PREFIX + "_testSkipBounds");
+    long testFileLength = assumeHugeFileExists(testPath);
+
+    try (FSDataInputStream inputStream = this.getFileSystem().open(testPath)) {
       ContractTestUtils.NanoTimer timer = new ContractTestUtils.NanoTimer();
 
       long skipped = inputStream.skip(-1);
@@ -208,8 +230,10 @@ public class ITestAzureBlobFileSystemRandomRead extends
    */
   @Test
   public void testValidateSeekBounds() throws Exception {
-    assumeHugeFileExists();
-    try (FSDataInputStream inputStream = this.getFileSystem().open(TEST_FILE_PATH)) {
+    Path testPath = new Path(TEST_FILE_PREFIX + "_testValidateSeekBounds");
+    long testFileLength = assumeHugeFileExists(testPath);
+
+    try (FSDataInputStream inputStream = this.getFileSystem().open(testPath)) {
       ContractTestUtils.NanoTimer timer = new ContractTestUtils.NanoTimer();
 
       inputStream.seek(0);
@@ -257,8 +281,10 @@ public class ITestAzureBlobFileSystemRandomRead extends
    */
   @Test
   public void testSeekAndAvailableAndPosition() throws Exception {
-    assumeHugeFileExists();
-    try (FSDataInputStream inputStream = this.getFileSystem().open(TEST_FILE_PATH)) {
+    Path testPath = new Path(TEST_FILE_PREFIX + "_testSeekAndAvailableAndPosition");
+    long testFileLength = assumeHugeFileExists(testPath);
+
+    try (FSDataInputStream inputStream = this.getFileSystem().open(testPath)) {
       byte[] expected1 = {(byte) 'a', (byte) 'b', (byte) 'c'};
       byte[] expected2 = {(byte) 'd', (byte) 'e', (byte) 'f'};
       byte[] expected3 = {(byte) 'b', (byte) 'c', (byte) 'd'};
@@ -321,8 +347,10 @@ public class ITestAzureBlobFileSystemRandomRead extends
    */
   @Test
   public void testSkipAndAvailableAndPosition() throws Exception {
-    assumeHugeFileExists();
-    try (FSDataInputStream inputStream = this.getFileSystem().open(TEST_FILE_PATH)) {
+    Path testPath = new Path(TEST_FILE_PREFIX + "_testSkipAndAvailableAndPosition");
+    long testFileLength = assumeHugeFileExists(testPath);
+
+    try (FSDataInputStream inputStream = this.getFileSystem().open(testPath)) {
       byte[] expected1 = {(byte) 'a', (byte) 'b', (byte) 'c'};
       byte[] expected2 = {(byte) 'd', (byte) 'e', (byte) 'f'};
       byte[] expected3 = {(byte) 'b', (byte) 'c', (byte) 'd'};
@@ -385,15 +413,16 @@ public class ITestAzureBlobFileSystemRandomRead extends
   @Test
   public void testSequentialReadAfterReverseSeekPerformance()
           throws Exception {
-    assumeHugeFileExists();
+    Path testPath = new Path(TEST_FILE_PREFIX + "_testSequentialReadAfterReverseSeekPerformance");
+    assumeHugeFileExists(testPath);
     final int maxAttempts = 10;
     final double maxAcceptableRatio = 1.01;
     double beforeSeekElapsedMs = 0, afterSeekElapsedMs = 0;
     double ratio = Double.MAX_VALUE;
     for (int i = 0; i < maxAttempts && ratio >= maxAcceptableRatio; i++) {
-      beforeSeekElapsedMs = sequentialRead(ABFS,
+      beforeSeekElapsedMs = sequentialRead(ABFS, testPath,
               this.getFileSystem(), false);
-      afterSeekElapsedMs = sequentialRead(ABFS,
+      afterSeekElapsedMs = sequentialRead(ABFS, testPath,
               this.getFileSystem(), true);
       ratio = afterSeekElapsedMs / beforeSeekElapsedMs;
       LOG.info((String.format(
@@ -417,8 +446,8 @@ public class ITestAzureBlobFileSystemRandomRead extends
   public void testRandomReadPerformance() throws Exception {
     Assume.assumeFalse("This test does not support namespace enabled account",
             this.getFileSystem().getIsNamespaceEnabled());
-    createTestFile();
-    assumeHugeFileExists();
+    Path testPath = new Path(TEST_FILE_PREFIX + "_testRandomReadPerformance");
+    assumeHugeFileExists(testPath);
 
     final AzureBlobFileSystem abFs = this.getFileSystem();
     final NativeAzureFileSystem wasbFs = this.getWasbFileSystem();
@@ -428,8 +457,8 @@ public class ITestAzureBlobFileSystemRandomRead extends
     double v1ElapsedMs = 0, v2ElapsedMs = 0;
     double ratio = Double.MAX_VALUE;
     for (int i = 0; i < maxAttempts && ratio >= maxAcceptableRatio; i++) {
-      v1ElapsedMs = randomRead(1, wasbFs);
-      v2ElapsedMs = randomRead(2, abFs);
+      v1ElapsedMs = randomRead(1, testPath, wasbFs);
+      v2ElapsedMs = randomRead(2, testPath, abFs);
 
       ratio = v2ElapsedMs / v1ElapsedMs;
 
@@ -448,15 +477,112 @@ public class ITestAzureBlobFileSystemRandomRead extends
             ratio < maxAcceptableRatio);
   }
 
+  /**
+   * With this test we should see a full buffer read being triggered in case
+   * alwaysReadBufferSize is on, else only the requested buffer size.
+   * Hence a seek done few bytes away from last read position will trigger
+   * a network read when alwaysReadBufferSize is off, whereas it will return
+   * from the internal buffer when it is on.
+   * Reading a full buffer size is the Gen1 behaviour.
+   * @throws Throwable
+   */
+  @Test
+  public void testAlwaysReadBufferSizeConfig() throws Throwable {
+    testAlwaysReadBufferSizeConfig(false);
+    testAlwaysReadBufferSizeConfig(true);
+  }
+
+  public void testAlwaysReadBufferSizeConfig(boolean alwaysReadBufferSizeConfigValue)
+      throws Throwable {
+    final AzureBlobFileSystem currentFs = getFileSystem();
+    Configuration config = new Configuration(this.getRawConfiguration());
+    config.set("fs.azure.readaheadqueue.depth", "0");
+    config.set("fs.azure.read.alwaysReadBufferSize",
+        Boolean.toString(alwaysReadBufferSizeConfigValue));
+
+    final Path testFile = new Path("/FileName_"
+        + UUID.randomUUID().toString());
+
+    final AzureBlobFileSystem fs = createTestFile(testFile, 16 * MEGABYTE,
+        1 * MEGABYTE, config);
+    String eTag = fs.getAbfsClient()
+        .getPathStatus(testFile.toUri().getPath(), false)
+        .getResult()
+        .getResponseHeader(ETAG);
+
+    TestAbfsInputStream testInputStream = new TestAbfsInputStream();
+
+    AbfsInputStream inputStream = testInputStream.getAbfsInputStream(
+        fs.getAbfsClient(),
+        testFile.getName(), ALWAYS_READ_BUFFER_SIZE_TEST_FILE_SIZE, eTag,
+        DISABLED_READAHEAD_DEPTH, FOUR_MB,
+        alwaysReadBufferSizeConfigValue, FOUR_MB);
+
+    long connectionsAtStart = fs.getInstrumentationMap()
+        .get(GET_RESPONSES.getStatName());
+
+    long dateSizeReadStatAtStart = fs.getInstrumentationMap()
+        .get(BYTES_RECEIVED.getStatName());
+
+    long newReqCount = 0;
+    long newDataSizeRead = 0;
+
+    byte[] buffer20b = new byte[TWENTY_BYTES];
+    byte[] buffer30b = new byte[THIRTY_BYTES];
+    byte[] byteBuffer5 = new byte[FIVE_BYTES];
+
+    // first read
+    // if alwaysReadBufferSize is off, this is a sequential read
+    inputStream.read(byteBuffer5, 0, FIVE_BYTES);
+    newReqCount++;
+    newDataSizeRead += FOUR_MB;
+
+    assertAbfsStatistics(GET_RESPONSES, connectionsAtStart + newReqCount,
+        fs.getInstrumentationMap());
+    assertAbfsStatistics(BYTES_RECEIVED,
+        dateSizeReadStatAtStart + newDataSizeRead, fs.getInstrumentationMap());
+
+    // second read beyond that the buffer holds
+    // if alwaysReadBufferSize is off, this is a random read. Reads only
+    // incoming buffer size
+    // else, reads a buffer size
+    inputStream.seek(NINE_MB);
+    inputStream.read(buffer20b, 0, BYTE);
+    newReqCount++;
+    if (alwaysReadBufferSizeConfigValue) {
+      newDataSizeRead += FOUR_MB;
+    } else {
+      newDataSizeRead += TWENTY_BYTES;
+    }
+
+    assertAbfsStatistics(GET_RESPONSES, connectionsAtStart + newReqCount, fs.getInstrumentationMap());
+    assertAbfsStatistics(BYTES_RECEIVED,
+        dateSizeReadStatAtStart + newDataSizeRead, fs.getInstrumentationMap());
+
+    // third read adjacent to second but not exactly sequential.
+    // if alwaysReadBufferSize is off, this is another random read
+    // else second read would have read this too.
+    inputStream.seek(NINE_MB + TWENTY_BYTES + THREE_BYTES);
+      inputStream.read(buffer30b, 0, THREE_BYTES);
+      if (!alwaysReadBufferSizeConfigValue) {
+        newReqCount++;
+        newDataSizeRead += THIRTY_BYTES;
+      }
+
+    assertAbfsStatistics(GET_RESPONSES, connectionsAtStart + newReqCount, fs.getInstrumentationMap());
+    assertAbfsStatistics(BYTES_RECEIVED, dateSizeReadStatAtStart + newDataSizeRead, fs.getInstrumentationMap());
+  }
 
   private long sequentialRead(String version,
+                              Path testPath,
                               FileSystem fs,
                               boolean afterReverseSeek) throws IOException {
     byte[] buffer = new byte[SEQUENTIAL_READ_BUFFER_SIZE];
     long totalBytesRead = 0;
     long bytesRead = 0;
 
-    try(FSDataInputStream inputStream = fs.open(TEST_FILE_PATH)) {
+    long testFileLength = fs.getFileStatus(testPath).getLen();
+    try(FSDataInputStream inputStream = fs.open(testPath)) {
       if (afterReverseSeek) {
         while (bytesRead > 0 && totalBytesRead < 4 * MEGABYTE) {
           bytesRead = inputStream.read(buffer);
@@ -487,14 +613,14 @@ public class ITestAzureBlobFileSystemRandomRead extends
     }
   }
 
-  private long randomRead(int version, FileSystem fs) throws Exception {
-    assumeHugeFileExists();
+  private long randomRead(int version, Path testPath, FileSystem fs) throws Exception {
+    assumeHugeFileExists(testPath);
     final long minBytesToRead = 2 * MEGABYTE;
     Random random = new Random();
     byte[] buffer = new byte[8 * KILOBYTE];
     long totalBytesRead = 0;
     long bytesRead = 0;
-    try(FSDataInputStream inputStream = fs.open(TEST_FILE_PATH)) {
+    try(FSDataInputStream inputStream = fs.open(testPath)) {
       ContractTestUtils.NanoTimer timer = new ContractTestUtils.NanoTimer();
       do {
         bytesRead = inputStream.read(buffer);
@@ -526,28 +652,48 @@ public class ITestAzureBlobFileSystemRandomRead extends
     return bytes / 1000.0 * 8 / milliseconds;
   }
 
-  private void createTestFile() throws Exception {
-    final AzureBlobFileSystem fs = this.getFileSystem();
-    if (fs.exists(TEST_FILE_PATH)) {
-      FileStatus status = fs.getFileStatus(TEST_FILE_PATH);
-      if (status.getLen() >= TEST_FILE_SIZE) {
-        return;
+  private long createTestFile(Path testPath) throws Exception {
+    createTestFile(testPath,
+        TEST_FILE_SIZE,
+        MEGABYTE,
+        null);
+
+    return TEST_FILE_SIZE;
+  }
+
+  private AzureBlobFileSystem createTestFile(Path testFilePath, long testFileSize,
+      int createBufferSize, Configuration config) throws Exception {
+    AzureBlobFileSystem fs;
+
+    if (config == null) {
+      config = this.getRawConfiguration();
+    }
+
+    final AzureBlobFileSystem currentFs = getFileSystem();
+    fs = (AzureBlobFileSystem) FileSystem.newInstance(currentFs.getUri(),
+        config);
+
+    if (fs.exists(testFilePath)) {
+      FileStatus status = fs.getFileStatus(testFilePath);
+      if (status.getLen() == testFileSize) {
+        return fs;
       }
     }
 
-    byte[] buffer = new byte[CREATE_BUFFER_SIZE];
+    byte[] buffer = new byte[createBufferSize];
     char character = 'a';
     for (int i = 0; i < buffer.length; i++) {
       buffer[i] = (byte) character;
       character = (character == 'z') ? 'a' : (char) ((int) character + 1);
     }
 
-    LOG.info(String.format("Creating test file %s of size: %d ", TEST_FILE_PATH, TEST_FILE_SIZE));
+    LOG.info(String.format("Creating test file %s of size: %d ", testFilePath, testFileSize));
     ContractTestUtils.NanoTimer timer = new ContractTestUtils.NanoTimer();
 
-    try (FSDataOutputStream outputStream = fs.create(TEST_FILE_PATH)) {
+    try (FSDataOutputStream outputStream = fs.create(testFilePath)) {
+      String bufferContents = new String(buffer);
       int bytesWritten = 0;
-      while (bytesWritten < TEST_FILE_SIZE) {
+      while (bytesWritten < testFileSize) {
         outputStream.write(buffer);
         bytesWritten += buffer.length;
       }
@@ -557,18 +703,18 @@ public class ITestAzureBlobFileSystemRandomRead extends
       outputStream.close();
       closeTimer.end("time to close() output stream");
     }
-    timer.end("time to write %d KB", TEST_FILE_SIZE / 1024);
-    testFileLength = fs.getFileStatus(TEST_FILE_PATH).getLen();
-
+    timer.end("time to write %d KB", testFileSize / 1024);
+    return fs;
   }
 
-  private void assumeHugeFileExists() throws Exception{
-    createTestFile();
+  private long assumeHugeFileExists(Path testPath) throws Exception{
+    long fileSize = createTestFile(testPath);
     FileSystem fs = this.getFileSystem();
-    ContractTestUtils.assertPathExists(this.getFileSystem(), "huge file not created", TEST_FILE_PATH);
-    FileStatus status = fs.getFileStatus(TEST_FILE_PATH);
-    ContractTestUtils.assertIsFile(TEST_FILE_PATH, status);
-    assertTrue("File " + TEST_FILE_PATH + " is empty", status.getLen() > 0);
+    ContractTestUtils.assertPathExists(this.getFileSystem(), "huge file not created", testPath);
+    FileStatus status = fs.getFileStatus(testPath);
+    ContractTestUtils.assertIsFile(testPath, status);
+    assertTrue("File " + testPath + " is not of expected size " + fileSize + ":actual=" + status.getLen(), status.getLen() == fileSize);
+    return fileSize;
   }
 
   private void verifyConsistentReads(FSDataInputStream inputStreamV1,
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java
index ae72c5a..cbf3d6a 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java
@@ -22,10 +22,17 @@ import java.io.IOException;
 
 import org.junit.Assert;
 import org.junit.Test;
+import java.util.Arrays;
 
 import org.assertj.core.api.Assertions;
 
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest;
+import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;
 import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
 import org.apache.hadoop.fs.azurebfs.contracts.exceptions.TimeoutException;
 import org.apache.hadoop.fs.azurebfs.contracts.services.ReadBufferStatus;
@@ -51,9 +58,17 @@ public class TestAbfsInputStream extends
   private static final int ONE_KB = 1 * 1024;
   private static final int TWO_KB = 2 * 1024;
   private static final int THREE_KB = 3 * 1024;
+  private static final int SIXTEEN_KB = 16 * ONE_KB;
+  private static final int FORTY_EIGHT_KB = 48 * ONE_KB;
+  private static final int ONE_MB = 1 * 1024 * 1024;
+  private static final int FOUR_MB = 4 * ONE_MB;
+  private static final int EIGHT_MB = 8 * ONE_MB;
+  private static final int TEST_READAHEAD_DEPTH_2 = 2;
+  private static final int TEST_READAHEAD_DEPTH_4 = 4;
   private static final int REDUCED_READ_BUFFER_AGE_THRESHOLD = 3000; // 3 sec
   private static final int INCREASED_READ_BUFFER_AGE_THRESHOLD =
       REDUCED_READ_BUFFER_AGE_THRESHOLD * 10; // 30 sec
+  private static final int ALWAYS_READ_BUFFER_SIZE_TEST_FILE_SIZE = 16 * ONE_MB;
 
   private AbfsRestOperation getMockRestOp() {
     AbfsRestOperation op = mock(AbfsRestOperation.class);
@@ -84,7 +99,7 @@ public class TestAbfsInputStream extends
         null,
         FORWARD_SLASH + fileName,
         THREE_KB,
-        inputStreamContext.withReadBufferSize(ONE_KB).withReadAheadQueueDepth(10),
+        inputStreamContext.withReadBufferSize(ONE_KB).withReadAheadQueueDepth(10).withReadAheadBlockSize(ONE_KB),
         "eTag");
 
     inputStream.setCachedSasToken(
@@ -93,6 +108,33 @@ public class TestAbfsInputStream extends
     return inputStream;
   }
 
+  public AbfsInputStream getAbfsInputStream(AbfsClient abfsClient,
+      String fileName,
+      int fileSize,
+      String eTag,
+      int readAheadQueueDepth,
+      int readBufferSize,
+      boolean alwaysReadBufferSize,
+      int readAheadBlockSize) {
+    AbfsInputStreamContext inputStreamContext = new AbfsInputStreamContext(-1);
+    // Create AbfsInputStream with the client instance
+    AbfsInputStream inputStream = new AbfsInputStream(
+        abfsClient,
+        null,
+        FORWARD_SLASH + fileName,
+        fileSize,
+        inputStreamContext.withReadBufferSize(readBufferSize)
+            .withReadAheadQueueDepth(readAheadQueueDepth)
+            .withShouldReadBufferSizeAlways(alwaysReadBufferSize)
+            .withReadAheadBlockSize(readAheadBlockSize),
+        eTag);
+
+    inputStream.setCachedSasToken(
+        TestCachedSASToken.getTestCachedSASTokenInstance());
+
+    return inputStream;
+  }
+
   private void queueReadAheads(AbfsInputStream inputStream) {
     // Mimic AbfsInputStream readAhead queue requests
     ReadBufferManager.getBufferManager()
@@ -496,4 +538,183 @@ public class TestAbfsInputStream extends
     checkEvictedStatus(inputStream, 0, true);
   }
 
+  /**
+   * Test readahead with different config settings for request request size and
+   * readAhead block size
+   * @throws Exception
+   */
+  @Test
+  public void testDiffReadRequestSizeAndRAHBlockSize() throws Exception {
+    // Set requestRequestSize = 4MB and readAheadBufferSize=8MB
+    resetReadBufferManager(FOUR_MB, INCREASED_READ_BUFFER_AGE_THRESHOLD);
+    testReadAheadConfigs(FOUR_MB, TEST_READAHEAD_DEPTH_4, false, EIGHT_MB);
+
+    // Test for requestRequestSize =16KB and readAheadBufferSize=16KB
+    resetReadBufferManager(SIXTEEN_KB, INCREASED_READ_BUFFER_AGE_THRESHOLD);
+    AbfsInputStream inputStream = testReadAheadConfigs(SIXTEEN_KB,
+        TEST_READAHEAD_DEPTH_2, true, SIXTEEN_KB);
+    testReadAheads(inputStream, SIXTEEN_KB, SIXTEEN_KB);
+
+    // Test for requestRequestSize =16KB and readAheadBufferSize=48KB
+    resetReadBufferManager(FORTY_EIGHT_KB, INCREASED_READ_BUFFER_AGE_THRESHOLD);
+    inputStream = testReadAheadConfigs(SIXTEEN_KB, TEST_READAHEAD_DEPTH_2, true,
+        FORTY_EIGHT_KB);
+    testReadAheads(inputStream, SIXTEEN_KB, FORTY_EIGHT_KB);
+
+    // Test for requestRequestSize =48KB and readAheadBufferSize=16KB
+    resetReadBufferManager(FORTY_EIGHT_KB, INCREASED_READ_BUFFER_AGE_THRESHOLD);
+    inputStream = testReadAheadConfigs(FORTY_EIGHT_KB, TEST_READAHEAD_DEPTH_2,
+        true,
+        SIXTEEN_KB);
+    testReadAheads(inputStream, FORTY_EIGHT_KB, SIXTEEN_KB);
+  }
+
+
+  private void testReadAheads(AbfsInputStream inputStream,
+      int readRequestSize,
+      int readAheadRequestSize)
+      throws Exception {
+    if (readRequestSize > readAheadRequestSize) {
+      readAheadRequestSize = readRequestSize;
+    }
+
+    byte[] firstReadBuffer = new byte[readRequestSize];
+    byte[] secondReadBuffer = new byte[readAheadRequestSize];
+
+    // get the expected bytes to compare
+    byte[] expectedFirstReadAheadBufferContents = new byte[readRequestSize];
+    byte[] expectedSecondReadAheadBufferContents = new byte[readAheadRequestSize];
+    getExpectedBufferData(0, readRequestSize, expectedFirstReadAheadBufferContents);
+    getExpectedBufferData(readRequestSize, readAheadRequestSize,
+        expectedSecondReadAheadBufferContents);
+
+    Assertions.assertThat(inputStream.read(firstReadBuffer, 0, readRequestSize))
+        .describedAs("Read should be of exact requested size")
+        .isEqualTo(readRequestSize);
+
+    assertTrue("Data mismatch found in RAH1",
+        Arrays.equals(firstReadBuffer,
+            expectedFirstReadAheadBufferContents));
+
+    Assertions.assertThat(inputStream.read(secondReadBuffer, 0, readAheadRequestSize))
+        .describedAs("Read should be of exact requested size")
+        .isEqualTo(readAheadRequestSize);
+
+    assertTrue("Data mismatch found in RAH2",
+        Arrays.equals(secondReadBuffer,
+            expectedSecondReadAheadBufferContents));
+  }
+
+  public AbfsInputStream testReadAheadConfigs(int readRequestSize,
+      int readAheadQueueDepth,
+      boolean alwaysReadBufferSizeEnabled,
+      int readAheadBlockSize) throws Exception {
+    Configuration
+        config = new Configuration(
+        this.getRawConfiguration());
+    config.set("fs.azure.read.request.size", Integer.toString(readRequestSize));
+    config.set("fs.azure.readaheadqueue.depth",
+        Integer.toString(readAheadQueueDepth));
+    config.set("fs.azure.read.alwaysReadBufferSize",
+        Boolean.toString(alwaysReadBufferSizeEnabled));
+    config.set("fs.azure.read.readahead.blocksize",
+        Integer.toString(readAheadBlockSize));
+    if (readRequestSize > readAheadBlockSize) {
+      readAheadBlockSize = readRequestSize;
+    }
+
+    Path testPath = new Path(
+        "/testReadAheadConfigs");
+    final AzureBlobFileSystem fs = createTestFile(testPath,
+        ALWAYS_READ_BUFFER_SIZE_TEST_FILE_SIZE, config);
+    byte[] byteBuffer = new byte[ONE_MB];
+    AbfsInputStream inputStream = this.getAbfsStore(fs)
+        .openFileForRead(testPath, null);
+
+    Assertions.assertThat(inputStream.getBufferSize())
+        .describedAs("Unexpected AbfsInputStream buffer size")
+        .isEqualTo(readRequestSize);
+
+    Assertions.assertThat(inputStream.getReadAheadQueueDepth())
+        .describedAs("Unexpected ReadAhead queue depth")
+        .isEqualTo(readAheadQueueDepth);
+
+    Assertions.assertThat(inputStream.shouldAlwaysReadBufferSize())
+        .describedAs("Unexpected AlwaysReadBufferSize settings")
+        .isEqualTo(alwaysReadBufferSizeEnabled);
+
+    Assertions.assertThat(ReadBufferManager.getBufferManager().getReadAheadBlockSize())
+        .describedAs("Unexpected readAhead block size")
+        .isEqualTo(readAheadBlockSize);
+
+    return inputStream;
+  }
+
+  private void getExpectedBufferData(int offset, int length, byte[] b) {
+    boolean startFillingIn = false;
+    int indexIntoBuffer = 0;
+    char character = 'a';
+
+    for (int i = 0; i < (offset + length); i++) {
+      if (i == offset) {
+        startFillingIn = true;
+      }
+
+      if ((startFillingIn) && (indexIntoBuffer < length)) {
+        b[indexIntoBuffer] = (byte) character;
+        indexIntoBuffer++;
+      }
+
+      character = (character == 'z') ? 'a' : (char) ((int) character + 1);
+    }
+  }
+
+  private AzureBlobFileSystem createTestFile(Path testFilePath, long testFileSize,
+      Configuration config) throws Exception {
+    AzureBlobFileSystem fs;
+
+    if (config == null) {
+      fs = this.getFileSystem();
+    } else {
+      final AzureBlobFileSystem currentFs = getFileSystem();
+      fs = (AzureBlobFileSystem) FileSystem.newInstance(currentFs.getUri(),
+          config);
+    }
+
+    if (fs.exists(testFilePath)) {
+      FileStatus status = fs.getFileStatus(testFilePath);
+      if (status.getLen() >= testFileSize) {
+        return fs;
+      }
+    }
+
+    byte[] buffer = new byte[EIGHT_MB];
+    char character = 'a';
+    for (int i = 0; i < buffer.length; i++) {
+      buffer[i] = (byte) character;
+      character = (character == 'z') ? 'a' : (char) ((int) character + 1);
+    }
+
+    try (FSDataOutputStream outputStream = fs.create(testFilePath)) {
+      int bytesWritten = 0;
+      while (bytesWritten < testFileSize) {
+        outputStream.write(buffer);
+        bytesWritten += buffer.length;
+      }
+    }
+
+    Assertions.assertThat(fs.getFileStatus(testFilePath).getLen())
+        .describedAs("File not created of expected size")
+        .isEqualTo(testFileSize);
+
+    return fs;
+  }
+
+  private void resetReadBufferManager(int bufferSize, int threshold) {
+    ReadBufferManager.getBufferManager()
+        .testResetReadBufferManager(bufferSize, threshold);
+    // Trigger GC as aggressive recreation of ReadBufferManager buffers
+    // by successive tests can lead to OOM based on the dev VM/machine capacity.
+    System.gc();
+  }
 }
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org