You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by tm...@apache.org on 2020/07/25 13:32:50 UTC

[hadoop] 08/09: HADOOP-17058. ABFS: Support for AppendBlob in Hadoop ABFS Driver

This is an automated email from the ASF dual-hosted git repository.

tmarquardt pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/hadoop.git

commit f24e2ec487b43110bab5909d9fd15b08d59f08f4
Author: ishaniahuja <50...@users.noreply.github.com>
AuthorDate: Sun Jul 5 01:55:14 2020 +0530

    HADOOP-17058. ABFS: Support for AppendBlob in Hadoop ABFS Driver
    
    - Contributed by Ishani Ahuja
---
 .../hadoop/fs/azurebfs/AbfsConfiguration.java      |   8 +
 .../fs/azurebfs/AzureBlobFileSystemStore.java      |  50 ++-
 .../fs/azurebfs/constants/AbfsHttpConstants.java   |   1 +
 .../fs/azurebfs/constants/ConfigurationKeys.java   |   3 +
 .../constants/FileSystemConfigurations.java        |   2 +
 .../fs/azurebfs/constants/HttpQueryParams.java     |   1 +
 .../hadoop/fs/azurebfs/services/AbfsClient.java    |  46 ++-
 .../fs/azurebfs/services/AbfsHttpOperation.java    |  10 +
 .../fs/azurebfs/services/AbfsOutputStream.java     |  63 ++-
 .../azurebfs/services/AbfsOutputStreamContext.java |  12 +
 .../fs/azurebfs/AbstractAbfsIntegrationTest.java   |   4 +
 .../fs/azurebfs/ITestAbfsNetworkStatistics.java    |  52 ++-
 .../azurebfs/ITestAbfsOutputStreamStatistics.java  |   4 +
 .../fs/azurebfs/ITestAbfsReadWriteAndSeek.java     |   3 +
 .../fs/azurebfs/ITestAbfsStreamStatistics.java     |  11 +-
 .../azurebfs/ITestAzureBlobFileSystemCreate.java   |  20 +-
 .../fs/azurebfs/ITestAzureBlobFileSystemE2E.java   |   3 +
 .../fs/azurebfs/ITestAzureBlobFileSystemFlush.java |  30 +-
 .../TestAbfsConfigurationFieldsValidation.java     |   2 +-
 .../azurebfs/constants/TestConfigurationKeys.java  |   1 +
 .../fs/azurebfs/services/TestAbfsOutputStream.java | 430 +++++++++++++++++++++
 21 files changed, 714 insertions(+), 42 deletions(-)

diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
index 091b1c7..85bd37a 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
@@ -173,6 +173,10 @@ public class AbfsConfiguration{
       DefaultValue = DEFAULT_FS_AZURE_ATOMIC_RENAME_DIRECTORIES)
   private String azureAtomicDirs;
 
+  @StringConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_APPEND_BLOB_KEY,
+      DefaultValue = DEFAULT_FS_AZURE_APPEND_BLOB_DIRECTORIES)
+  private String azureAppendBlobDirs;
+
   @BooleanConfigurationValidatorAnnotation(ConfigurationKey = AZURE_CREATE_REMOTE_FILESYSTEM_DURING_INITIALIZATION,
       DefaultValue = DEFAULT_AZURE_CREATE_REMOTE_FILESYSTEM_DURING_INITIALIZATION)
   private boolean createRemoteFileSystemDuringInitialization;
@@ -561,6 +565,10 @@ public class AbfsConfiguration{
     return this.azureAtomicDirs;
   }
 
+  public String getAppendBlobDirs() {
+    return this.azureAppendBlobDirs;
+  }
+
   public boolean getCreateRemoteFileSystemDuringInitialization() {
     // we do not support creating the filesystem when AuthType is SAS
     return this.createRemoteFileSystemDuringInitialization
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
index 9aba59b..59c2e26 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
@@ -62,6 +62,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants;
 import org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes;
+import org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations;
 import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
 import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
 import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
@@ -146,6 +147,11 @@ public class AzureBlobFileSystemStore implements Closeable {
   private final IdentityTransformerInterface identityTransformer;
   private final AbfsPerfTracker abfsPerfTracker;
 
+  /**
+   * The set of directories where we should store files as append blobs.
+   */
+  private Set<String> appendBlobDirSet;
+
   public AzureBlobFileSystemStore(URI uri, boolean isSecureScheme,
                                   Configuration configuration,
                                   AbfsCounters abfsCounters) throws IOException {
@@ -197,6 +203,23 @@ public class AzureBlobFileSystemStore implements Closeable {
       throw new IOException(e);
     }
     LOG.trace("IdentityTransformer init complete");
+
+    // Extract the directories that should contain append blobs
+    String appendBlobDirs = abfsConfiguration.getAppendBlobDirs();
+    if (appendBlobDirs.trim().isEmpty()) {
+      this.appendBlobDirSet = new HashSet<String>();
+    } else {
+      this.appendBlobDirSet = new HashSet<>(Arrays.asList(
+          abfsConfiguration.getAppendBlobDirs().split(AbfsHttpConstants.COMMA)));
+    }
+  }
+
+  /**
+   * Checks if the given key in Azure Storage should be stored as a page
+   * blob instead of block blob.
+   */
+  public boolean isAppendBlobKey(String key) {
+    return isKeyForDirectorySet(key, appendBlobDirSet);
   }
 
   /**
@@ -432,10 +455,15 @@ public class AzureBlobFileSystemStore implements Closeable {
               isNamespaceEnabled);
 
       String relativePath = getRelativePath(path);
+      boolean isAppendBlob = false;
+      if (isAppendBlobKey(path.toString())) {
+        isAppendBlob = true;
+      }
 
       final AbfsRestOperation op = client.createPath(relativePath, true, overwrite,
               isNamespaceEnabled ? getOctalNotation(permission) : null,
-              isNamespaceEnabled ? getOctalNotation(umask) : null);
+              isNamespaceEnabled ? getOctalNotation(umask) : null,
+              isAppendBlob);
       perfInfo.registerResult(op.getResult()).registerSuccess(true);
 
       return new AbfsOutputStream(
@@ -443,16 +471,21 @@ public class AzureBlobFileSystemStore implements Closeable {
           statistics,
           relativePath,
           0,
-          populateAbfsOutputStreamContext());
+          populateAbfsOutputStreamContext(isAppendBlob));
     }
   }
 
-  private AbfsOutputStreamContext populateAbfsOutputStreamContext() {
+  private AbfsOutputStreamContext populateAbfsOutputStreamContext(boolean isAppendBlob) {
+    int bufferSize = abfsConfiguration.getWriteBufferSize();
+    if (isAppendBlob && bufferSize > FileSystemConfigurations.APPENDBLOB_MAX_WRITE_BUFFER_SIZE) {
+      bufferSize = FileSystemConfigurations.APPENDBLOB_MAX_WRITE_BUFFER_SIZE;
+    }
     return new AbfsOutputStreamContext(abfsConfiguration.getSasTokenRenewPeriodForStreamsInSeconds())
-            .withWriteBufferSize(abfsConfiguration.getWriteBufferSize())
+            .withWriteBufferSize(bufferSize)
             .enableFlush(abfsConfiguration.isFlushEnabled())
             .disableOutputStreamFlush(abfsConfiguration.isOutputStreamFlushDisabled())
             .withStreamStatistics(new AbfsOutputStreamStatisticsImpl())
+            .withAppendBlob(isAppendBlob)
             .build();
   }
 
@@ -469,7 +502,7 @@ public class AzureBlobFileSystemStore implements Closeable {
 
       final AbfsRestOperation op = client.createPath(getRelativePath(path), false, true,
               isNamespaceEnabled ? getOctalNotation(permission) : null,
-              isNamespaceEnabled ? getOctalNotation(umask) : null);
+              isNamespaceEnabled ? getOctalNotation(umask) : null, false);
       perfInfo.registerResult(op.getResult()).registerSuccess(true);
     }
   }
@@ -545,12 +578,17 @@ public class AzureBlobFileSystemStore implements Closeable {
 
       perfInfo.registerSuccess(true);
 
+      boolean isAppendBlob = false;
+      if (isAppendBlobKey(path.toString())) {
+        isAppendBlob = true;
+      }
+
       return new AbfsOutputStream(
           client,
           statistics,
           relativePath,
           offset,
-          populateAbfsOutputStreamContext());
+          populateAbfsOutputStreamContext(isAppendBlob));
     }
   }
 
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java
index 8d45513..38b79c9 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java
@@ -40,6 +40,7 @@ public final class AbfsHttpConstants {
   public static final String CHECK_ACCESS = "checkAccess";
   public static final String GET_STATUS = "getStatus";
   public static final String DEFAULT_TIMEOUT = "90";
+  public static final String APPEND_BLOB_TYPE = "appendblob";
   public static final String TOKEN_VERSION = "2";
 
   public static final String JAVA_VENDOR = "java.vendor";
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
index e325470..5f1ad31 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
@@ -65,6 +65,9 @@ public final class ConfigurationKeys {
   public static final String FS_AZURE_ENABLE_AUTOTHROTTLING = "fs.azure.enable.autothrottling";
   public static final String FS_AZURE_ALWAYS_USE_HTTPS = "fs.azure.always.use.https";
   public static final String FS_AZURE_ATOMIC_RENAME_KEY = "fs.azure.atomic.rename.key";
+  /** Provides a config to provide comma separated path prefixes on which Appendblob based files are created
+   *  Default is empty. **/
+  public static final String FS_AZURE_APPEND_BLOB_KEY = "fs.azure.appendblob.directories";
   public static final String FS_AZURE_READ_AHEAD_QUEUE_DEPTH = "fs.azure.readaheadqueue.depth";
   /** Provides a config control to enable or disable ABFS Flush operations -
    *  HFlush and HSync. Default is true. **/
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
index 482d5d9..260c496 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
@@ -55,6 +55,7 @@ public final class FileSystemConfigurations {
 
   // Default upload and download buffer size
   public static final int DEFAULT_WRITE_BUFFER_SIZE = 8 * ONE_MB;  // 8 MB
+  public static final int APPENDBLOB_MAX_WRITE_BUFFER_SIZE = 4 * ONE_MB;  // 4 MB
   public static final int DEFAULT_READ_BUFFER_SIZE = 4 * ONE_MB;  // 4 MB
   public static final int MIN_BUFFER_SIZE = 16 * ONE_KB;  // 16 KB
   public static final int MAX_BUFFER_SIZE = 100 * ONE_MB;  // 100 MB
@@ -69,6 +70,7 @@ public final class FileSystemConfigurations {
   public static final boolean DEFAULT_AZURE_SKIP_USER_GROUP_METADATA_DURING_INITIALIZATION = false;
 
   public static final String DEFAULT_FS_AZURE_ATOMIC_RENAME_DIRECTORIES = "/hbase";
+  public static final String DEFAULT_FS_AZURE_APPEND_BLOB_DIRECTORIES = "";
 
   public static final int DEFAULT_READ_AHEAD_QUEUE_DEPTH = -1;
   public static final boolean DEFAULT_ENABLE_FLUSH = true;
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/HttpQueryParams.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/HttpQueryParams.java
index 9f735f7..5a550ac 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/HttpQueryParams.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/HttpQueryParams.java
@@ -38,6 +38,7 @@ public final class HttpQueryParams {
   public static final String QUERY_PARAM_RETAIN_UNCOMMITTED_DATA = "retainUncommittedData";
   public static final String QUERY_PARAM_CLOSE = "close";
   public static final String QUERY_PARAM_UPN = "upn";
+  public static final String QUERY_PARAM_BLOBTYPE = "blobtype";
 
   private HttpQueryParams() {}
 }
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
index f614bbd..f747bd0 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
@@ -272,7 +272,8 @@ public class AbfsClient implements Closeable {
   }
 
   public AbfsRestOperation createPath(final String path, final boolean isFile, final boolean overwrite,
-                                      final String permission, final String umask) throws AzureBlobFileSystemException {
+                                      final String permission, final String umask,
+                                      final boolean isAppendBlob) throws AzureBlobFileSystemException {
     final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
     if (!overwrite) {
       requestHeaders.add(new AbfsHttpHeader(IF_NONE_MATCH, AbfsHttpConstants.STAR));
@@ -288,6 +289,9 @@ public class AbfsClient implements Closeable {
 
     final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
     abfsUriQueryBuilder.addQuery(QUERY_PARAM_RESOURCE, isFile ? FILE : DIRECTORY);
+    if (isAppendBlob) {
+      abfsUriQueryBuilder.addQuery(QUERY_PARAM_BLOBTYPE, APPEND_BLOB_TYPE);
+    }
 
     String operation = isFile
         ? SASTokenProvider.CREATE_FILE_OPERATION
@@ -380,7 +384,7 @@ public class AbfsClient implements Closeable {
   }
 
   public AbfsRestOperation append(final String path, final long position, final byte[] buffer, final int offset,
-                                  final int length, final String cachedSasToken) throws AzureBlobFileSystemException {
+                                  final int length, final String cachedSasToken, final boolean isAppendBlob) throws AzureBlobFileSystemException {
     final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
     // JDK7 does not support PATCH, so to workaround the issue we will use
     // PUT and specify the real method in the X-Http-Method-Override header.
@@ -401,10 +405,46 @@ public class AbfsClient implements Closeable {
             HTTP_METHOD_PUT,
             url,
             requestHeaders, buffer, offset, length, sasTokenForReuse);
-    op.execute();
+    try {
+      op.execute();
+    } catch (AzureBlobFileSystemException e) {
+      if (isAppendBlob && appendSuccessCheckOp(op, path, (position + length))) {
+        final AbfsRestOperation successOp = new AbfsRestOperation(
+            AbfsRestOperationType.Append,
+                this,
+                HTTP_METHOD_PUT,
+                url,
+                requestHeaders, buffer, offset, length, sasTokenForReuse);
+        successOp.hardSetResult(HttpURLConnection.HTTP_OK);
+        return successOp;
+      }
+      throw e;
+    }
+
     return op;
   }
 
+  // For AppendBlob its possible that the append succeeded in the backend but the request failed.
+  // However a retry would fail with an InvalidQueryParameterValue
+  // (as the current offset would be unacceptable).
+  // Hence, we pass/succeed the appendblob append call
+  // in case we are doing a retry after checking the length of the file
+  public boolean appendSuccessCheckOp(AbfsRestOperation op, final String path,
+                                       final long length) throws AzureBlobFileSystemException {
+    if ((op.isARetriedRequest())
+        && (op.getResult().getStatusCode() == HttpURLConnection.HTTP_BAD_REQUEST)) {
+      final AbfsRestOperation destStatusOp = getPathStatus(path, false);
+      if (destStatusOp.getResult().getStatusCode() == HttpURLConnection.HTTP_OK) {
+        String fileLength = destStatusOp.getResult().getResponseHeader(
+            HttpHeaderConfigurations.CONTENT_LENGTH);
+        if (length <= Long.parseLong(fileLength)) {
+          return true;
+        }
+      }
+    }
+    return false;
+  }
+
   public AbfsRestOperation flush(final String path, final long position, boolean retainUncommittedData,
                                  boolean isClose, final String cachedSasToken)
       throws AzureBlobFileSystemException {
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java
index 5dc4a89..a63c982 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java
@@ -69,6 +69,7 @@ public class AbfsHttpOperation implements AbfsPerfLoggable {
   private String storageErrorMessage  = "";
   private String clientRequestId = "";
   private String requestId  = "";
+  private String expectedAppendPos = "";
   private ListResultSchema listResultSchema = null;
 
   // metrics
@@ -126,6 +127,10 @@ public class AbfsHttpOperation implements AbfsPerfLoggable {
     return clientRequestId;
   }
 
+  public String getExpectedAppendPos() {
+    return expectedAppendPos;
+  }
+
   public String getRequestId() {
     return requestId;
   }
@@ -154,6 +159,8 @@ public class AbfsHttpOperation implements AbfsPerfLoggable {
     sb.append(statusCode);
     sb.append(",");
     sb.append(storageErrorCode);
+    sb.append(",");
+    sb.append(expectedAppendPos);
     sb.append(",cid=");
     sb.append(clientRequestId);
     sb.append(",rid=");
@@ -449,6 +456,9 @@ public class AbfsHttpOperation implements AbfsPerfLoggable {
               case "message":
                 storageErrorMessage = fieldValue;
                 break;
+              case "ExpectedAppendPos":
+                expectedAppendPos = fieldValue;
+                break;
               default:
                 break;
             }
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java
index 89afca4..6c1e177 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java
@@ -60,6 +60,7 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
   private boolean closed;
   private boolean supportFlush;
   private boolean disableOutputStreamFlush;
+  private boolean isAppendBlob;
   private volatile IOException lastError;
 
   private long lastFlushOffset;
@@ -106,6 +107,7 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
     this.supportFlush = abfsOutputStreamContext.isEnableFlush();
     this.disableOutputStreamFlush = abfsOutputStreamContext
             .isDisableOutputStreamFlush();
+    this.isAppendBlob = abfsOutputStreamContext.isAppendBlob();
     this.lastError = null;
     this.lastFlushOffset = 0;
     this.bufferSize = abfsOutputStreamContext.getWriteBufferSize();
@@ -114,8 +116,11 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
     this.writeOperations = new ConcurrentLinkedDeque<>();
     this.outputStreamStatistics = abfsOutputStreamContext.getStreamStatistics();
 
-    this.maxConcurrentRequestCount = 4 * Runtime.getRuntime().availableProcessors();
-
+    if (this.isAppendBlob) {
+      this.maxConcurrentRequestCount = 1;
+    } else {
+      this.maxConcurrentRequestCount = 4 * Runtime.getRuntime().availableProcessors();
+    }
     this.threadExecutor
         = new ThreadPoolExecutor(maxConcurrentRequestCount,
         maxConcurrentRequestCount,
@@ -309,7 +314,50 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
     flushWrittenBytesToServiceAsync();
   }
 
+  private void writeAppendBlobCurrentBufferToService() throws IOException {
+    if (bufferIndex == 0) {
+      return;
+    }
+    outputStreamStatistics.writeCurrentBuffer();
+
+    final byte[] bytes = buffer;
+    final int bytesLength = bufferIndex;
+    outputStreamStatistics.bytesToUpload(bytesLength);
+    buffer = byteBufferPool.getBuffer(false, bufferSize).array();
+    bufferIndex = 0;
+    final long offset = position;
+    position += bytesLength;
+    AbfsPerfTracker tracker = client.getAbfsPerfTracker();
+    try (AbfsPerfInfo perfInfo = new AbfsPerfInfo(tracker,
+            "writeCurrentBufferToService", "append")) {
+      AbfsRestOperation op = client.append(path, offset, bytes, 0,
+          bytesLength, cachedSasToken.get(), this.isAppendBlob);
+      cachedSasToken.update(op.getSasToken());
+      outputStreamStatistics.uploadSuccessful(bytesLength);
+      perfInfo.registerResult(op.getResult());
+      byteBufferPool.putBuffer(ByteBuffer.wrap(bytes));
+      perfInfo.registerSuccess(true);
+      return;
+    } catch (Exception ex) {
+      if (ex instanceof AbfsRestOperationException) {
+        if (((AbfsRestOperationException) ex).getStatusCode() == HttpURLConnection.HTTP_NOT_FOUND) {
+          throw new FileNotFoundException(ex.getMessage());
+        }
+      }
+      if (ex instanceof AzureBlobFileSystemException) {
+        ex = (AzureBlobFileSystemException) ex;
+      }
+      lastError = new IOException(ex);
+      throw lastError;
+    }
+  }
+
   private synchronized void writeCurrentBufferToService() throws IOException {
+    if (this.isAppendBlob) {
+      writeAppendBlobCurrentBufferToService();
+      return;
+    }
+
     if (bufferIndex == 0) {
       return;
     }
@@ -336,7 +384,7 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
         try (AbfsPerfInfo perfInfo = new AbfsPerfInfo(tracker,
                 "writeCurrentBufferToService", "append")) {
           AbfsRestOperation op = client.append(path, offset, bytes, 0,
-                  bytesLength, cachedSasToken.get());
+                  bytesLength, cachedSasToken.get(), false);
           cachedSasToken.update(op.getSasToken());
           perfInfo.registerResult(op.getResult());
           byteBufferPool.putBuffer(ByteBuffer.wrap(bytes));
@@ -389,6 +437,11 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
 
   private synchronized void flushWrittenBytesToServiceInternal(final long offset,
       final boolean retainUncommitedData, final boolean isClose) throws IOException {
+    // flush is called for appendblob only on close
+    if (this.isAppendBlob && !isClose) {
+      return;
+    }
+
     AbfsPerfTracker tracker = client.getAbfsPerfTracker();
     try (AbfsPerfInfo perfInfo = new AbfsPerfInfo(tracker,
             "flushWrittenBytesToServiceInternal", "flush")) {
@@ -434,6 +487,10 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
     for (completed = false; completionService.poll() != null; completed = true) {
       // keep polling until there is no data
     }
+    // for AppendBLob, jobs are not submitted to completion service
+    if (isAppendBlob) {
+      completed = true;
+    }
 
     if (!completed) {
       try {
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamContext.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamContext.java
index dcd6c45..03e4aba 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamContext.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamContext.java
@@ -31,6 +31,8 @@ public class AbfsOutputStreamContext extends AbfsStreamContext {
 
   private AbfsOutputStreamStatistics streamStatistics;
 
+  private boolean isAppendBlob;
+
   public AbfsOutputStreamContext(final long sasTokenRenewPeriodForStreamsInSeconds) {
     super(sasTokenRenewPeriodForStreamsInSeconds);
   }
@@ -58,6 +60,12 @@ public class AbfsOutputStreamContext extends AbfsStreamContext {
     return this;
   }
 
+  public AbfsOutputStreamContext withAppendBlob(
+          final boolean isAppendBlob) {
+    this.isAppendBlob = isAppendBlob;
+    return this;
+  }
+
   public AbfsOutputStreamContext build() {
     // Validation of parameters to be done here.
     return this;
@@ -78,4 +86,8 @@ public class AbfsOutputStreamContext extends AbfsStreamContext {
   public AbfsOutputStreamStatistics getStreamStatistics() {
     return streamStatistics;
   }
+
+  public boolean isAppendBlob() {
+    return isAppendBlob;
+  }
 }
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java
index a80bee6..34b3615 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java
@@ -122,6 +122,10 @@ public abstract class AbstractAbfsIntegrationTest extends
     this.testUrl = defaultUri.toString();
     abfsConfig.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, defaultUri.toString());
     abfsConfig.setBoolean(AZURE_CREATE_REMOTE_FILESYSTEM_DURING_INITIALIZATION, true);
+    if (abfsConfig.get(FS_AZURE_TEST_APPENDBLOB_ENABLED) == "true") {
+      String appendblobDirs = this.testUrl + "," + abfsConfig.get(FS_AZURE_CONTRACT_TEST_URI);
+      rawConfig.set(FS_AZURE_APPEND_BLOB_KEY, appendblobDirs);
+    }
     // For testing purposes, an IP address and port may be provided to override
     // the host specified in the FileSystem URI.  Also note that the format of
     // the Azure Storage Service URI changes from
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsNetworkStatistics.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsNetworkStatistics.java
index 904fdf3..b2e1301 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsNetworkStatistics.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsNetworkStatistics.java
@@ -81,10 +81,18 @@ public class ITestAbfsNetworkStatistics extends AbstractAbfsIntegrationTest {
        *
        * bytes_sent : bytes wrote in AbfsOutputStream.
        */
-      connectionsMade = assertAbfsStatistics(AbfsStatistic.CONNECTIONS_MADE,
-          6, metricMap);
-      requestsSent = assertAbfsStatistics(AbfsStatistic.SEND_REQUESTS, 4,
-          metricMap);
+      if (fs.getAbfsStore().isAppendBlobKey(fs.makeQualified(sendRequestPath).toString())) {
+        // no network calls are made for hflush in case of appendblob
+        connectionsMade = assertAbfsStatistics(AbfsStatistic.CONNECTIONS_MADE,
+            5, metricMap);
+        requestsSent = assertAbfsStatistics(AbfsStatistic.SEND_REQUESTS, 3,
+            metricMap);
+      } else {
+        connectionsMade = assertAbfsStatistics(AbfsStatistic.CONNECTIONS_MADE,
+            6, metricMap);
+        requestsSent = assertAbfsStatistics(AbfsStatistic.SEND_REQUESTS, 4,
+            metricMap);
+      }
       bytesSent = assertAbfsStatistics(AbfsStatistic.BYTES_SENT,
           testNetworkStatsString.getBytes().length, metricMap);
 
@@ -125,10 +133,18 @@ public class ITestAbfsNetworkStatistics extends AbstractAbfsIntegrationTest {
        * wrote each time).
        *
        */
-      assertAbfsStatistics(AbfsStatistic.CONNECTIONS_MADE,
-          connectionsMade + 1 + LARGE_OPERATIONS * 2, metricMap);
-      assertAbfsStatistics(AbfsStatistic.SEND_REQUESTS,
-          requestsSent + 1 + LARGE_OPERATIONS * 2, metricMap);
+      if (fs.getAbfsStore().isAppendBlobKey(fs.makeQualified(sendRequestPath).toString())) {
+        // no network calls are made for hflush in case of appendblob
+        assertAbfsStatistics(AbfsStatistic.CONNECTIONS_MADE,
+            connectionsMade + 1 + LARGE_OPERATIONS, metricMap);
+        assertAbfsStatistics(AbfsStatistic.SEND_REQUESTS,
+            requestsSent + 1 + LARGE_OPERATIONS, metricMap);
+      } else {
+        assertAbfsStatistics(AbfsStatistic.CONNECTIONS_MADE,
+            connectionsMade + 1 + LARGE_OPERATIONS * 2, metricMap);
+        assertAbfsStatistics(AbfsStatistic.SEND_REQUESTS,
+            requestsSent + 1 + LARGE_OPERATIONS * 2, metricMap);
+      }
       assertAbfsStatistics(AbfsStatistic.BYTES_SENT,
           bytesSent + LARGE_OPERATIONS * (testNetworkStatsString.getBytes().length),
           metricMap);
@@ -183,8 +199,14 @@ public class ITestAbfsNetworkStatistics extends AbstractAbfsIntegrationTest {
        *
        * bytes_received - This should be equal to bytes sent earlier.
        */
-      getResponses = assertAbfsStatistics(AbfsStatistic.GET_RESPONSES, 8,
-          metricMap);
+      if (fs.getAbfsStore().isAppendBlobKey(fs.makeQualified(getResponsePath).toString())) {
+        //for appendBlob hflush is a no-op
+        getResponses = assertAbfsStatistics(AbfsStatistic.GET_RESPONSES, 7,
+            metricMap);
+      } else {
+        getResponses = assertAbfsStatistics(AbfsStatistic.GET_RESPONSES, 8,
+            metricMap);
+      }
       // Testing that bytes received is equal to bytes sent.
       long bytesSend = metricMap.get(AbfsStatistic.BYTES_SENT.getStatName());
       bytesReceived = assertAbfsStatistics(AbfsStatistic.BYTES_RECEIVED,
@@ -242,8 +264,14 @@ public class ITestAbfsNetworkStatistics extends AbstractAbfsIntegrationTest {
       assertAbfsStatistics(AbfsStatistic.BYTES_RECEIVED,
           bytesReceived + LARGE_OPERATIONS * (testResponseString.getBytes().length),
           metricMap);
-      assertAbfsStatistics(AbfsStatistic.GET_RESPONSES,
-          getResponses + 3 + 2 * LARGE_OPERATIONS, metricMap);
+      if (fs.getAbfsStore().isAppendBlobKey(fs.makeQualified(getResponsePath).toString())) {
+        // no network calls are made for hflush in case of appendblob
+        assertAbfsStatistics(AbfsStatistic.GET_RESPONSES,
+            getResponses + 3 + LARGE_OPERATIONS, metricMap);
+      } else {
+        assertAbfsStatistics(AbfsStatistic.GET_RESPONSES,
+            getResponses + 3 + 2 * LARGE_OPERATIONS, metricMap);
+      }
 
     } finally {
       IOUtils.cleanupWithLogger(LOG, out, in);
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsOutputStreamStatistics.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsOutputStreamStatistics.java
index 09cbfde..c8640dd 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsOutputStreamStatistics.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsOutputStreamStatistics.java
@@ -113,6 +113,10 @@ public class ITestAbfsOutputStreamStatistics
     final AzureBlobFileSystem fs = getFileSystem();
     Path queueShrinkFilePath = path(getMethodName());
     String testQueueShrink = "testQueue";
+    if (fs.getAbfsStore().isAppendBlobKey(fs.makeQualified(queueShrinkFilePath).toString())) {
+      // writeOperationsQueue is not used for appendBlob, hence queueShrink is 0
+      return;
+    }
 
     try (AbfsOutputStream outForOneOp = createAbfsOutputStreamWithFlushEnabled(
         fs, queueShrinkFilePath)) {
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsReadWriteAndSeek.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsReadWriteAndSeek.java
index a270a00..52abb09 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsReadWriteAndSeek.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsReadWriteAndSeek.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.Path;
 
+import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.APPENDBLOB_MAX_WRITE_BUFFER_SIZE;
 import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_READ_BUFFER_SIZE;
 import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.MAX_BUFFER_SIZE;
 import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.MIN_BUFFER_SIZE;
@@ -46,6 +47,7 @@ public class ITestAbfsReadWriteAndSeek extends AbstractAbfsScaleTest {
   public static Iterable<Object[]> sizes() {
     return Arrays.asList(new Object[][]{{MIN_BUFFER_SIZE},
         {DEFAULT_READ_BUFFER_SIZE},
+        {APPENDBLOB_MAX_WRITE_BUFFER_SIZE},
         {MAX_BUFFER_SIZE}});
   }
 
@@ -70,6 +72,7 @@ public class ITestAbfsReadWriteAndSeek extends AbstractAbfsScaleTest {
 
     final byte[] b = new byte[2 * bufferSize];
     new Random().nextBytes(b);
+
     try (FSDataOutputStream stream = fs.create(TEST_PATH)) {
       stream.write(b);
     }
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsStreamStatistics.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsStreamStatistics.java
index 51531f6..395a456 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsStreamStatistics.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsStreamStatistics.java
@@ -136,8 +136,15 @@ public class ITestAbfsStreamStatistics extends AbstractAbfsIntegrationTest {
                 testReadWriteOps.getBytes().length);
       }
 
-      //Test for 1000000 read operations
-      assertReadWriteOps("read", largeValue, statistics.getReadOps());
+      if (fs.getAbfsStore().isAppendBlobKey(fs.makeQualified(largeOperationsFile).toString())) {
+        // for appendblob data is already flushed, so there is more data to read.
+        assertTrue(String.format("The actual value of %d was not equal to the "
+              + "expected value", statistics.getReadOps()),
+          statistics.getReadOps() == (largeValue + 3) || statistics.getReadOps() == (largeValue + 4));
+      } else {
+        //Test for 1000000 read operations
+        assertReadWriteOps("read", largeValue, statistics.getReadOps());
+      }
 
     } finally {
       IOUtils.cleanupWithLogger(LOG, inForLargeOperations,
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemCreate.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemCreate.java
index 94368a4..4b8f071 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemCreate.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemCreate.java
@@ -145,15 +145,19 @@ public class ITestAzureBlobFileSystemCreate extends
       out.hsync();
       fail("Expected a failure");
     } catch (FileNotFoundException fnfe) {
-      // the exception raised in close() must be in the caught exception's
-      // suppressed list
-      Throwable[] suppressed = fnfe.getSuppressed();
-      assertEquals("suppressed count", 1, suppressed.length);
-      Throwable inner = suppressed[0];
-      if (!(inner instanceof IOException)) {
-        throw inner;
+      //appendblob outputStream does not generate suppressed exception on close as it is
+      //single threaded code
+      if (!fs.getAbfsStore().isAppendBlobKey(fs.makeQualified(testPath).toString())) {
+        // the exception raised in close() must be in the caught exception's
+        // suppressed list
+        Throwable[] suppressed = fnfe.getSuppressed();
+        assertEquals("suppressed count", 1, suppressed.length);
+        Throwable inner = suppressed[0];
+        if (!(inner instanceof IOException)) {
+          throw inner;
+        }
+        GenericTestUtils.assertExceptionContains(fnfe.getMessage(), inner);
       }
-      GenericTestUtils.assertExceptionContains(fnfe.getMessage(), inner);
     }
   }
 
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemE2E.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemE2E.java
index ebc9c07..05c3855 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemE2E.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemE2E.java
@@ -203,6 +203,9 @@ public class ITestAzureBlobFileSystemE2E extends AbstractAbfsIntegrationTest {
   public void testFlushWithFileNotFoundException() throws Exception {
     final AzureBlobFileSystem fs = getFileSystem();
     final Path testFilePath = new Path(methodName.getMethodName());
+    if (fs.getAbfsStore().isAppendBlobKey(fs.makeQualified(testFilePath).toString())) {
+      return;
+    }
 
     FSDataOutputStream stream = fs.create(testFilePath);
     assertTrue(fs.exists(testFilePath));
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemFlush.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemFlush.java
index 60f7f7d..92aa552 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemFlush.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemFlush.java
@@ -49,7 +49,8 @@ import org.apache.hadoop.fs.Path;
 public class ITestAzureBlobFileSystemFlush extends AbstractAbfsScaleTest {
   private static final int BASE_SIZE = 1024;
   private static final int ONE_THOUSAND = 1000;
-  private static final int TEST_BUFFER_SIZE = 5 * ONE_THOUSAND * BASE_SIZE;
+ //3000 KB to support appenblob too
+  private static final int TEST_BUFFER_SIZE = 3 * ONE_THOUSAND * BASE_SIZE;
   private static final int ONE_MB = 1024 * 1024;
   private static final int FLUSH_TIMES = 200;
   private static final int THREAD_SLEEP_TIME = 1000;
@@ -226,11 +227,15 @@ public class ITestAzureBlobFileSystemFlush extends AbstractAbfsScaleTest {
 
     final Path testFilePath = path(methodName.getMethodName());
     byte[] buffer = getRandomBytesArray();
-
     // The test case must write "fs.azure.write.request.size" bytes
     // to the stream in order for the data to be uploaded to storage.
-    assertEquals(fs.getAbfsStore().getAbfsConfiguration().getWriteBufferSize(),
-        buffer.length);
+    assertTrue(fs.getAbfsStore().getAbfsConfiguration().getWriteBufferSize()
+        <= buffer.length);
+
+    boolean isAppendBlob = true;
+    if (!fs.getAbfsStore().isAppendBlobKey(fs.makeQualified(testFilePath).toString())) {
+      isAppendBlob = false;
+    }
 
     try (FSDataOutputStream stream = fs.create(testFilePath)) {
       stream.write(buffer);
@@ -245,7 +250,8 @@ public class ITestAzureBlobFileSystemFlush extends AbstractAbfsScaleTest {
 
       // Verify that the data can be read if disableOutputStreamFlush is
       // false; and otherwise cannot be read.
-      validate(fs.open(testFilePath), buffer, !disableOutputStreamFlush);
+      /* For Appendlob flush is not needed to update data on server */
+      validate(fs.open(testFilePath), buffer, !disableOutputStreamFlush || isAppendBlob);
     }
   }
 
@@ -267,10 +273,15 @@ public class ITestAzureBlobFileSystemFlush extends AbstractAbfsScaleTest {
     final AzureBlobFileSystem fs = this.getFileSystem();
     byte[] buffer = getRandomBytesArray();
     final Path testFilePath = path(methodName.getMethodName());
+    boolean isAppendBlob = false;
+    if (fs.getAbfsStore().isAppendBlobKey(fs.makeQualified(testFilePath).toString())) {
+      isAppendBlob = true;
+    }
 
     try (FSDataOutputStream stream = getStreamAfterWrite(fs, testFilePath, buffer, false)) {
       stream.hflush();
-      validate(fs, testFilePath, buffer, false);
+      /* For Appendlob flush is not needed to update data on server */
+      validate(fs, testFilePath, buffer, isAppendBlob);
     }
   }
 
@@ -322,9 +333,14 @@ public class ITestAzureBlobFileSystemFlush extends AbstractAbfsScaleTest {
     final AzureBlobFileSystem fs = this.getFileSystem();
     byte[] buffer = getRandomBytesArray();
     final Path testFilePath = path(methodName.getMethodName());
+    boolean isAppendBlob = false;
+    if (fs.getAbfsStore().isAppendBlobKey(fs.makeQualified(testFilePath).toString())) {
+      isAppendBlob = true;
+    }
     try (FSDataOutputStream stream = getStreamAfterWrite(fs, testFilePath, buffer, false)) {
       stream.hsync();
-      validate(fs, testFilePath, buffer, false);
+      /* For Appendlob flush is not needed to update data on server */
+      validate(fs, testFilePath, buffer, isAppendBlob);
     }
   }
 
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestAbfsConfigurationFieldsValidation.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestAbfsConfigurationFieldsValidation.java
index 23dda0f..bda845b 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestAbfsConfigurationFieldsValidation.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestAbfsConfigurationFieldsValidation.java
@@ -189,4 +189,4 @@ public class TestAbfsConfigurationFieldsValidation {
     abfsConfig.setMaxBackoffIntervalMilliseconds(backoffTime);
     return abfsConfig;
   }
-}
\ No newline at end of file
+}
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/constants/TestConfigurationKeys.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/constants/TestConfigurationKeys.java
index 16a3f57..72ea766 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/constants/TestConfigurationKeys.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/constants/TestConfigurationKeys.java
@@ -27,6 +27,7 @@ public final class TestConfigurationKeys {
   public static final String FS_AZURE_ACCOUNT_KEY = "fs.azure.account.key";
   public static final String FS_AZURE_CONTRACT_TEST_URI = "fs.contract.test.fs.abfs";
   public static final String FS_AZURE_TEST_NAMESPACE_ENABLED_ACCOUNT = "fs.azure.test.namespace.enabled";
+  public static final String FS_AZURE_TEST_APPENDBLOB_ENABLED = "fs.azure.test.appendblob.enabled";
 
   public static final String FS_AZURE_BLOB_DATA_CONTRIBUTOR_CLIENT_ID = "fs.azure.account.oauth2.contributor.client.id";
   public static final String FS_AZURE_BLOB_DATA_CONTRIBUTOR_CLIENT_SECRET = "fs.azure.account.oauth2.contributor.client.secret";
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsOutputStream.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsOutputStream.java
new file mode 100644
index 0000000..4105aa1
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsOutputStream.java
@@ -0,0 +1,430 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Random;
+
+import org.junit.Test;
+
+import org.mockito.ArgumentCaptor;
+
+import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
+import org.apache.hadoop.conf.Configuration;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.anyInt;
+import static org.mockito.Mockito.anyBoolean;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.anyString;
+import static org.mockito.Mockito.anyLong;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public final class TestAbfsOutputStream {
+
+  private static final int BUFFER_SIZE = 4096;
+  private static final int WRITE_SIZE = 1000;
+  private static final String PATH = "~/testpath";
+  private final String globalKey = "fs.azure.configuration";
+  private final String accountName1 = "account1";
+  private final String accountKey1 = globalKey + "." + accountName1;
+  private final String accountValue1 = "one";
+
+  private AbfsOutputStreamContext populateAbfsOutputStreamContext(int writeBufferSize,
+            boolean isFlushEnabled,
+            boolean disableOutputStreamFlush,
+            boolean isAppendBlob) {
+    return new AbfsOutputStreamContext(2)
+            .withWriteBufferSize(writeBufferSize)
+            .enableFlush(isFlushEnabled)
+            .disableOutputStreamFlush(disableOutputStreamFlush)
+            .withStreamStatistics(new AbfsOutputStreamStatisticsImpl())
+            .withAppendBlob(isAppendBlob)
+            .build();
+  }
+
+  /**
+   * The test verifies OutputStream shortwrite case(2000bytes write followed by flush, hflush, hsync) is making correct HTTP calls to the server
+   */
+  @Test
+  public void verifyShortWriteRequest() throws Exception {
+
+    AbfsClient client = mock(AbfsClient.class);
+    AbfsRestOperation op = mock(AbfsRestOperation.class);
+    AbfsConfiguration abfsConf;
+    final Configuration conf = new Configuration();
+    conf.set(accountKey1, accountValue1);
+    abfsConf = new AbfsConfiguration(conf, accountName1);
+    AbfsPerfTracker tracker = new AbfsPerfTracker("test", accountName1, abfsConf);
+    when(client.getAbfsPerfTracker()).thenReturn(tracker);
+    when(client.append(anyString(), anyLong(), any(byte[].class), anyInt(), anyInt(), any(), anyBoolean())).thenReturn(op);
+    when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), any())).thenReturn(op);
+
+    AbfsOutputStream out = new AbfsOutputStream(client, null, PATH, 0, populateAbfsOutputStreamContext(BUFFER_SIZE, true, false, false));
+    final byte[] b = new byte[WRITE_SIZE];
+    new Random().nextBytes(b);
+    out.write(b);
+    out.hsync();
+    ArgumentCaptor<String> acString = ArgumentCaptor.forClass(String.class);
+    ArgumentCaptor<Long> acLong = ArgumentCaptor.forClass(Long.class);
+    ArgumentCaptor<Integer> acBufferOffset = ArgumentCaptor.forClass(Integer.class);
+    ArgumentCaptor<Integer> acBufferLength = ArgumentCaptor.forClass(Integer.class);
+    ArgumentCaptor<byte[]> acByteArray = ArgumentCaptor.forClass(byte[].class);
+    ArgumentCaptor<Boolean> acAppendBlobAppend = ArgumentCaptor.forClass(Boolean.class);
+    ArgumentCaptor<String> acSASToken = ArgumentCaptor.forClass(String.class);
+
+
+    final byte[] b1 = new byte[2*WRITE_SIZE];
+    new Random().nextBytes(b1);
+    out.write(b1);
+    out.flush();
+    out.hflush();
+
+    out.hsync();
+
+    verify(client, times(2)).append(acString.capture(), acLong.capture(), acByteArray.capture(), acBufferOffset.capture(), acBufferLength.capture(),
+                                    acSASToken.capture(), acAppendBlobAppend.capture());
+    assertThat(Arrays.asList(PATH, PATH)).describedAs("Path of the requests").isEqualTo(acString.getAllValues());
+    assertThat(Arrays.asList(Long.valueOf(0), Long.valueOf(WRITE_SIZE))).describedAs("Write Position").isEqualTo(acLong.getAllValues());
+    assertThat(Arrays.asList(0, 0)).describedAs("Buffer Offset").isEqualTo(acBufferOffset.getAllValues());
+    assertThat(Arrays.asList(WRITE_SIZE, 2*WRITE_SIZE)).describedAs("Buffer length").isEqualTo(acBufferLength.getAllValues());
+
+  }
+
+  /**
+   * The test verifies OutputStream Write of WRITE_SIZE(1000 bytes) followed by a close is making correct HTTP calls to the server
+   */
+  @Test
+  public void verifyWriteRequest() throws Exception {
+
+    AbfsClient client = mock(AbfsClient.class);
+    AbfsRestOperation op = mock(AbfsRestOperation.class);
+    AbfsConfiguration abfsConf;
+    final Configuration conf = new Configuration();
+    conf.set(accountKey1, accountValue1);
+    abfsConf = new AbfsConfiguration(conf, accountName1);
+    AbfsPerfTracker tracker = new AbfsPerfTracker("test", accountName1, abfsConf);
+
+    when(client.getAbfsPerfTracker()).thenReturn(tracker);
+    when(client.append(anyString(), anyLong(), any(byte[].class), anyInt(), anyInt(), any(), anyBoolean())).thenReturn(op);
+    when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), any())).thenReturn(op);
+
+    AbfsOutputStream out = new AbfsOutputStream(client, null, PATH, 0, populateAbfsOutputStreamContext(BUFFER_SIZE, true, false, false));
+    final byte[] b = new byte[WRITE_SIZE];
+    new Random().nextBytes(b);
+
+    for (int i = 0; i < 5; i++) {
+      out.write(b);
+    }
+    out.close();
+
+    ArgumentCaptor<String> acString = ArgumentCaptor.forClass(String.class);
+    ArgumentCaptor<Long> acLong = ArgumentCaptor.forClass(Long.class);
+    ArgumentCaptor<Integer> acBufferOffset = ArgumentCaptor.forClass(Integer.class);
+    ArgumentCaptor<Integer> acBufferLength = ArgumentCaptor.forClass(Integer.class);
+    ArgumentCaptor<byte[]> acByteArray = ArgumentCaptor.forClass(byte[].class);
+    ArgumentCaptor<Boolean> acAppendBlobAppend = ArgumentCaptor.forClass(Boolean.class);
+    ArgumentCaptor<String> acSASToken = ArgumentCaptor.forClass(String.class);
+
+    verify(client, times(2)).append(acString.capture(), acLong.capture(), acByteArray.capture(), acBufferOffset.capture(), acBufferLength.capture(),
+                                    acSASToken.capture(), acAppendBlobAppend.capture());
+    assertThat(Arrays.asList(PATH, PATH)).describedAs("Path").isEqualTo(acString.getAllValues());
+    assertThat(new HashSet<Long>(Arrays.asList(Long.valueOf(0), Long.valueOf(BUFFER_SIZE)))).describedAs("Position").isEqualTo(new HashSet<Long>(
+               acLong.getAllValues()));
+    assertThat(Arrays.asList(0, 0)).describedAs("Buffer Offset").isEqualTo(acBufferOffset.getAllValues());
+    assertThat(new HashSet<Integer>(Arrays.asList(BUFFER_SIZE, 5*WRITE_SIZE-BUFFER_SIZE))).describedAs("Buffer Length").isEqualTo(new HashSet<Integer>(
+               acBufferLength.getAllValues()));
+
+    ArgumentCaptor<String> acFlushString = ArgumentCaptor.forClass(String.class);
+    ArgumentCaptor<Long> acFlushLong = ArgumentCaptor.forClass(Long.class);
+    ArgumentCaptor<Boolean> acFlushRetainUnCommittedData = ArgumentCaptor.forClass(Boolean.class);
+    ArgumentCaptor<Boolean> acFlushClose = ArgumentCaptor.forClass(Boolean.class);
+    ArgumentCaptor<String> acFlushSASToken = ArgumentCaptor.forClass(String.class);
+
+    verify(client, times(1)).flush(acFlushString.capture(), acFlushLong.capture(), acFlushRetainUnCommittedData.capture(), acFlushClose.capture(),
+                                   acFlushSASToken.capture());
+    assertThat(Arrays.asList(PATH)).describedAs("path").isEqualTo(acFlushString.getAllValues());
+    assertThat(Arrays.asList(Long.valueOf(5*WRITE_SIZE))).describedAs("position").isEqualTo(acFlushLong.getAllValues());
+    assertThat(Arrays.asList(false)).describedAs("RetainUnCommittedData flag").isEqualTo(acFlushRetainUnCommittedData.getAllValues());
+    assertThat(Arrays.asList(true)).describedAs("Close flag").isEqualTo(acFlushClose.getAllValues());
+  }
+
+  /**
+   * The test verifies OutputStream Write of BUFFER_SIZE(4KB) followed by a close is making correct HTTP calls to the server
+   */
+  @Test
+  public void verifyWriteRequestOfBufferSizeAndClose() throws Exception {
+
+    AbfsClient client = mock(AbfsClient.class);
+    AbfsRestOperation op = mock(AbfsRestOperation.class);
+    AbfsHttpOperation httpOp = mock(AbfsHttpOperation.class);
+    AbfsConfiguration abfsConf;
+    final Configuration conf = new Configuration();
+    conf.set(accountKey1, accountValue1);
+    abfsConf = new AbfsConfiguration(conf, accountName1);
+    AbfsPerfTracker tracker = new AbfsPerfTracker("test", accountName1, abfsConf);
+
+    when(client.getAbfsPerfTracker()).thenReturn(tracker);
+    when(client.append(anyString(), anyLong(), any(byte[].class), anyInt(), anyInt(), any(), anyBoolean())).thenReturn(op);
+    when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), any())).thenReturn(op);
+    when(op.getSasToken()).thenReturn("testToken");
+    when(op.getResult()).thenReturn(httpOp);
+
+    AbfsOutputStream out = new AbfsOutputStream(client, null, PATH, 0, populateAbfsOutputStreamContext(BUFFER_SIZE, true, false, false));
+    final byte[] b = new byte[BUFFER_SIZE];
+    new Random().nextBytes(b);
+
+    for (int i = 0; i < 2; i++) {
+      out.write(b);
+    }
+    out.close();
+
+    ArgumentCaptor<String> acString = ArgumentCaptor.forClass(String.class);
+    ArgumentCaptor<Long> acLong = ArgumentCaptor.forClass(Long.class);
+    ArgumentCaptor<Integer> acBufferOffset = ArgumentCaptor.forClass(Integer.class);
+    ArgumentCaptor<Integer> acBufferLength = ArgumentCaptor.forClass(Integer.class);
+    ArgumentCaptor<byte[]> acByteArray = ArgumentCaptor.forClass(byte[].class);
+    ArgumentCaptor<Boolean> acAppendBlobAppend = ArgumentCaptor.forClass(Boolean.class);
+    ArgumentCaptor<String> acSASToken = ArgumentCaptor.forClass(String.class);
+
+    verify(client, times(2)).append(acString.capture(), acLong.capture(), acByteArray.capture(), acBufferOffset.capture(), acBufferLength.capture(),
+                                    acSASToken.capture(), acAppendBlobAppend.capture());
+    assertThat(Arrays.asList(PATH, PATH)).describedAs("path").isEqualTo(acString.getAllValues());
+    assertThat(new HashSet<Long>(Arrays.asList(Long.valueOf(0), Long.valueOf(BUFFER_SIZE)))).describedAs("Position").isEqualTo(new HashSet<Long>(
+               acLong.getAllValues()));
+    assertThat(Arrays.asList(0, 0)).describedAs("Buffer Offset").isEqualTo(acBufferOffset.getAllValues());
+    assertThat(Arrays.asList(BUFFER_SIZE, BUFFER_SIZE)).describedAs("Buffer Length").isEqualTo(acBufferLength.getAllValues());
+
+    ArgumentCaptor<String> acFlushString = ArgumentCaptor.forClass(String.class);
+    ArgumentCaptor<Long> acFlushLong = ArgumentCaptor.forClass(Long.class);
+    ArgumentCaptor<Boolean> acFlushRetainUnCommittedData = ArgumentCaptor.forClass(Boolean.class);
+    ArgumentCaptor<Boolean> acFlushClose = ArgumentCaptor.forClass(Boolean.class);
+    ArgumentCaptor<String> acFlushSASToken = ArgumentCaptor.forClass(String.class);
+
+    verify(client, times(1)).flush(acFlushString.capture(), acFlushLong.capture(), acFlushRetainUnCommittedData.capture(), acFlushClose.capture(),
+                                   acFlushSASToken.capture());
+    assertThat(Arrays.asList(PATH)).describedAs("path").isEqualTo(acFlushString.getAllValues());
+    assertThat(Arrays.asList(Long.valueOf(2*BUFFER_SIZE))).describedAs("position").isEqualTo(acFlushLong.getAllValues());
+    assertThat(Arrays.asList(false)).describedAs("RetainUnCommittedData flag").isEqualTo(acFlushRetainUnCommittedData.getAllValues());
+    assertThat(Arrays.asList(true)).describedAs("Close flag").isEqualTo(acFlushClose.getAllValues());
+
+  }
+
+  /**
+   * The test verifies OutputStream Write of BUFFER_SIZE(4KB) is making correct HTTP calls to the server
+   */
+  @Test
+  public void verifyWriteRequestOfBufferSize() throws Exception {
+
+    AbfsClient client = mock(AbfsClient.class);
+    AbfsRestOperation op = mock(AbfsRestOperation.class);
+    AbfsHttpOperation httpOp = mock(AbfsHttpOperation.class);
+    AbfsConfiguration abfsConf;
+    final Configuration conf = new Configuration();
+    conf.set(accountKey1, accountValue1);
+    abfsConf = new AbfsConfiguration(conf, accountName1);
+    AbfsPerfTracker tracker = new AbfsPerfTracker("test", accountName1, abfsConf);
+
+    when(client.getAbfsPerfTracker()).thenReturn(tracker);
+    when(client.append(anyString(), anyLong(), any(byte[].class), anyInt(), anyInt(), any(), anyBoolean())).thenReturn(op);
+    when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), any())).thenReturn(op);
+    when(op.getSasToken()).thenReturn("testToken");
+    when(op.getResult()).thenReturn(httpOp);
+
+    AbfsOutputStream out = new AbfsOutputStream(client, null, PATH, 0, populateAbfsOutputStreamContext(BUFFER_SIZE, true, false, false));
+    final byte[] b = new byte[BUFFER_SIZE];
+    new Random().nextBytes(b);
+
+    for (int i = 0; i < 2; i++) {
+      out.write(b);
+    }
+    Thread.sleep(1000);
+
+    ArgumentCaptor<String> acString = ArgumentCaptor.forClass(String.class);
+    ArgumentCaptor<Long> acLong = ArgumentCaptor.forClass(Long.class);
+    ArgumentCaptor<Integer> acBufferOffset = ArgumentCaptor.forClass(Integer.class);
+    ArgumentCaptor<Integer> acBufferLength = ArgumentCaptor.forClass(Integer.class);
+    ArgumentCaptor<byte[]> acByteArray = ArgumentCaptor.forClass(byte[].class);
+    ArgumentCaptor<Boolean> acAppendBlobAppend = ArgumentCaptor.forClass(Boolean.class);
+    ArgumentCaptor<String> acSASToken = ArgumentCaptor.forClass(String.class);
+
+    verify(client, times(2)).append(acString.capture(), acLong.capture(), acByteArray.capture(), acBufferOffset.capture(), acBufferLength.capture(),
+                                    acSASToken.capture(), acAppendBlobAppend.capture());
+    assertThat(Arrays.asList(PATH, PATH)).describedAs("File Path").isEqualTo(acString.getAllValues());
+    assertThat(new HashSet<Long>(Arrays.asList(Long.valueOf(0), Long.valueOf(BUFFER_SIZE)))).describedAs("Position in file").isEqualTo(
+               new HashSet<Long>(acLong.getAllValues()));
+    assertThat(Arrays.asList(0, 0)).describedAs("buffer offset").isEqualTo(acBufferOffset.getAllValues());
+    assertThat(Arrays.asList(BUFFER_SIZE, BUFFER_SIZE)).describedAs("buffer length").isEqualTo(acBufferLength.getAllValues());
+
+  }
+
+  /**
+   * The test verifies OutputStream Write of BUFFER_SIZE(4KB) on a AppendBlob based stream is making correct HTTP calls to the server
+   */
+  @Test
+  public void verifyWriteRequestOfBufferSizeWithAppendBlob() throws Exception {
+
+    AbfsClient client = mock(AbfsClient.class);
+    AbfsRestOperation op = mock(AbfsRestOperation.class);
+    AbfsConfiguration abfsConf;
+    final Configuration conf = new Configuration();
+    conf.set(accountKey1, accountValue1);
+    abfsConf = new AbfsConfiguration(conf, accountName1);
+    AbfsPerfTracker tracker = new AbfsPerfTracker("test", accountName1, abfsConf);
+
+    when(client.getAbfsPerfTracker()).thenReturn(tracker);
+    when(client.append(anyString(), anyLong(), any(byte[].class), anyInt(), anyInt(), any(), anyBoolean())).thenReturn(op);
+    when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), any())).thenReturn(op);
+
+    AbfsOutputStream out = new AbfsOutputStream(client, null, PATH, 0, populateAbfsOutputStreamContext(BUFFER_SIZE, true, false, true));
+    final byte[] b = new byte[BUFFER_SIZE];
+    new Random().nextBytes(b);
+
+    for (int i = 0; i < 2; i++) {
+      out.write(b);
+    }
+    Thread.sleep(1000);
+
+    ArgumentCaptor<String> acString = ArgumentCaptor.forClass(String.class);
+    ArgumentCaptor<Long> acLong = ArgumentCaptor.forClass(Long.class);
+    ArgumentCaptor<Integer> acBufferOffset = ArgumentCaptor.forClass(Integer.class);
+    ArgumentCaptor<Integer> acBufferLength = ArgumentCaptor.forClass(Integer.class);
+    ArgumentCaptor<byte[]> acByteArray = ArgumentCaptor.forClass(byte[].class);
+    ArgumentCaptor<Boolean> acAppendBlobAppend = ArgumentCaptor.forClass(Boolean.class);
+    ArgumentCaptor<String> acSASToken = ArgumentCaptor.forClass(String.class);
+
+    verify(client, times(2)).append(acString.capture(), acLong.capture(), acByteArray.capture(), acBufferOffset.capture(), acBufferLength.capture(),
+                                    acSASToken.capture(), acAppendBlobAppend.capture());
+    assertThat(Arrays.asList(PATH, PATH)).describedAs("File Path").isEqualTo(acString.getAllValues());
+    assertThat(Arrays.asList(Long.valueOf(0), Long.valueOf(BUFFER_SIZE))).describedAs("File Position").isEqualTo(acLong.getAllValues());
+    assertThat(Arrays.asList(0, 0)).describedAs("Buffer Offset").isEqualTo(acBufferOffset.getAllValues());
+    assertThat(Arrays.asList(BUFFER_SIZE, BUFFER_SIZE)).describedAs("Buffer Length").isEqualTo(acBufferLength.getAllValues());
+    assertThat(Arrays.asList(true, true)).describedAs("is AppendBlob Append").isEqualTo(acAppendBlobAppend.getAllValues());
+
+  }
+
+  /**
+   * The test verifies OutputStream Write of BUFFER_SIZE(4KB)  followed by a hflush call is making correct HTTP calls to the server
+   */
+  @Test
+  public void verifyWriteRequestOfBufferSizeAndHFlush() throws Exception {
+
+    AbfsClient client = mock(AbfsClient.class);
+    AbfsRestOperation op = mock(AbfsRestOperation.class);
+    AbfsConfiguration abfsConf;
+    final Configuration conf = new Configuration();
+    conf.set(accountKey1, accountValue1);
+    abfsConf = new AbfsConfiguration(conf, accountName1);
+    AbfsPerfTracker tracker = new AbfsPerfTracker("test", accountName1, abfsConf);
+
+    when(client.getAbfsPerfTracker()).thenReturn(tracker);
+    when(client.append(anyString(), anyLong(), any(byte[].class), anyInt(), anyInt(), any(), anyBoolean())).thenReturn(op);
+    when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), any())).thenReturn(op);
+
+    AbfsOutputStream out = new AbfsOutputStream(client, null, PATH, 0, populateAbfsOutputStreamContext(BUFFER_SIZE, true, false, false));
+    final byte[] b = new byte[BUFFER_SIZE];
+    new Random().nextBytes(b);
+
+    for (int i = 0; i < 2; i++) {
+      out.write(b);
+    }
+    out.hflush();
+
+    ArgumentCaptor<String> acString = ArgumentCaptor.forClass(String.class);
+    ArgumentCaptor<Long> acLong = ArgumentCaptor.forClass(Long.class);
+    ArgumentCaptor<Integer> acBufferOffset = ArgumentCaptor.forClass(Integer.class);
+    ArgumentCaptor<Integer> acBufferLength = ArgumentCaptor.forClass(Integer.class);
+    ArgumentCaptor<byte[]> acByteArray = ArgumentCaptor.forClass(byte[].class);
+    ArgumentCaptor<Boolean> acAppendBlobAppend = ArgumentCaptor.forClass(Boolean.class);
+    ArgumentCaptor<String> acSASToken = ArgumentCaptor.forClass(String.class);
+
+    verify(client, times(2)).append(acString.capture(), acLong.capture(), acByteArray.capture(), acBufferOffset.capture(), acBufferLength.capture(),
+                                    acSASToken.capture(), acAppendBlobAppend.capture());
+    assertThat(Arrays.asList(PATH, PATH)).describedAs("File Path").isEqualTo(acString.getAllValues());
+    assertThat(new HashSet<Long>(Arrays.asList(Long.valueOf(0), Long.valueOf(BUFFER_SIZE)))).describedAs("File Position").isEqualTo(
+               new HashSet<Long>(acLong.getAllValues()));
+    assertThat(Arrays.asList(0, 0)).describedAs("Buffer Offset").isEqualTo(acBufferOffset.getAllValues());
+    assertThat(Arrays.asList(BUFFER_SIZE, BUFFER_SIZE)).describedAs("Buffer Length").isEqualTo(acBufferLength.getAllValues());
+
+    ArgumentCaptor<String> acFlushString = ArgumentCaptor.forClass(String.class);
+    ArgumentCaptor<Long> acFlushLong = ArgumentCaptor.forClass(Long.class);
+    ArgumentCaptor<Boolean> acFlushRetainUnCommittedData = ArgumentCaptor.forClass(Boolean.class);
+    ArgumentCaptor<Boolean> acFlushClose = ArgumentCaptor.forClass(Boolean.class);
+    ArgumentCaptor<String> acFlushSASToken = ArgumentCaptor.forClass(String.class);
+
+    verify(client, times(1)).flush(acFlushString.capture(), acFlushLong.capture(), acFlushRetainUnCommittedData.capture(), acFlushClose.capture(),
+                                   acFlushSASToken.capture());
+    assertThat(Arrays.asList(PATH)).describedAs("path").isEqualTo(acFlushString.getAllValues());
+    assertThat(Arrays.asList(Long.valueOf(2*BUFFER_SIZE))).describedAs("position").isEqualTo(acFlushLong.getAllValues());
+    assertThat(Arrays.asList(false)).describedAs("RetainUnCommittedData flag").isEqualTo(acFlushRetainUnCommittedData.getAllValues());
+    assertThat(Arrays.asList(false)).describedAs("Close flag").isEqualTo(acFlushClose.getAllValues());
+
+  }
+
+  /**
+   * The test verifies OutputStream Write of BUFFER_SIZE(4KB)  followed by a flush call is making correct HTTP calls to the server
+   */
+  @Test
+  public void verifyWriteRequestOfBufferSizeAndFlush() throws Exception {
+
+    AbfsClient client = mock(AbfsClient.class);
+    AbfsRestOperation op = mock(AbfsRestOperation.class);
+    AbfsConfiguration abfsConf;
+    final Configuration conf = new Configuration();
+    conf.set(accountKey1, accountValue1);
+    abfsConf = new AbfsConfiguration(conf, accountName1);
+    AbfsPerfTracker tracker = new AbfsPerfTracker("test", accountName1, abfsConf);
+    when(client.getAbfsPerfTracker()).thenReturn(tracker);
+    when(client.append(anyString(), anyLong(), any(byte[].class), anyInt(), anyInt(), any(), anyBoolean())).thenReturn(op);
+    when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), any())).thenReturn(op);
+
+    AbfsOutputStream out = new AbfsOutputStream(client, null, PATH, 0, populateAbfsOutputStreamContext(BUFFER_SIZE, true, false, false));
+    final byte[] b = new byte[BUFFER_SIZE];
+    new Random().nextBytes(b);
+
+    for (int i = 0; i < 2; i++) {
+      out.write(b);
+    }
+    Thread.sleep(1000);
+    out.flush();
+    Thread.sleep(1000);
+
+    ArgumentCaptor<String> acString = ArgumentCaptor.forClass(String.class);
+    ArgumentCaptor<Long> acLong = ArgumentCaptor.forClass(Long.class);
+    ArgumentCaptor<Integer> acBufferOffset = ArgumentCaptor.forClass(Integer.class);
+    ArgumentCaptor<Integer> acBufferLength = ArgumentCaptor.forClass(Integer.class);
+    ArgumentCaptor<byte[]> acByteArray = ArgumentCaptor.forClass(byte[].class);
+    ArgumentCaptor<Boolean> acAppendBlobAppend = ArgumentCaptor.forClass(Boolean.class);
+    ArgumentCaptor<String> acSASToken = ArgumentCaptor.forClass(String.class);
+
+    verify(client, times(2)).append(acString.capture(), acLong.capture(), acByteArray.capture(), acBufferOffset.capture(), acBufferLength.capture(),
+                                    acSASToken.capture(), acAppendBlobAppend.capture());
+    assertThat(Arrays.asList(PATH, PATH)).describedAs("path").isEqualTo(acString.getAllValues());
+    assertThat(new HashSet<Long>(Arrays.asList(Long.valueOf(0), Long.valueOf(BUFFER_SIZE)))).describedAs("Position").isEqualTo(
+               new HashSet<Long>(acLong.getAllValues()));
+    assertThat(Arrays.asList(0, 0)).describedAs("Buffer Offset").isEqualTo(acBufferOffset.getAllValues());
+    assertThat(Arrays.asList(BUFFER_SIZE, BUFFER_SIZE)).describedAs("Buffer Length").isEqualTo(acBufferLength.getAllValues());
+
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org