You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by tm...@apache.org on 2020/09/19 01:35:21 UTC

[hadoop] branch trunk updated: HADOOP-17215: Support for conditional overwrite.

This is an automated email from the ASF dual-hosted git repository.

tmarquardt pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new e31a636  HADOOP-17215: Support for conditional overwrite.
e31a636 is described below

commit e31a636e922a8fdbe0aa7cca53f6de7175e97254
Author: Sneha Vijayarajan <sn...@gmail.com>
AuthorDate: Wed Aug 26 00:31:35 2020 +0530

    HADOOP-17215: Support for conditional overwrite.
    
    Contributed by Sneha Vijayarajan
    
    DETAILS:
    
        This change adds config key "fs.azure.enable.conditional.create.overwrite" with
        a default of true.  When enabled, if create(path, overwrite: true) is invoked
        and the file exists, the ABFS driver will first obtain its etag and then attempt
        to overwrite the file on the condition that the etag matches. The purpose of this
        is to mitigate the non-idempotency of this method.  Specifically, in the event of
        a network error or similar, the client will retry and this can result in the file
        being created more than once which may result in data loss.  In essense this is
        like a poor man's file handle, and will be addressed more thoroughly in the future
        when support for lease is added to ABFS.
    
    TEST RESULTS:
    
        namespace.enabled=true
        auth.type=SharedKey
        -------------------
        $mvn -T 1C -Dparallel-tests=abfs -Dscale -DtestsThreadCount=8 clean verify
        Tests run: 87, Failures: 0, Errors: 0, Skipped: 0
        Tests run: 457, Failures: 0, Errors: 0, Skipped: 42
        Tests run: 207, Failures: 0, Errors: 0, Skipped: 24
    
        namespace.enabled=true
        auth.type=OAuth
        -------------------
        $mvn -T 1C -Dparallel-tests=abfs -Dscale -DtestsThreadCount=8 clean verify
        Tests run: 87, Failures: 0, Errors: 0, Skipped: 0
        Tests run: 457, Failures: 0, Errors: 0, Skipped: 74
        Tests run: 207, Failures: 0, Errors: 0, Skipped: 140
---
 .../hadoop/fs/azurebfs/AbfsConfiguration.java      |   8 +
 .../fs/azurebfs/AzureBlobFileSystemStore.java      | 101 +++++++-
 .../fs/azurebfs/constants/ConfigurationKeys.java   |   4 +
 .../constants/FileSystemConfigurations.java        |   1 +
 ...ConcurrentWriteOperationDetectedException.java} |  32 +--
 .../hadoop/fs/azurebfs/services/AbfsClient.java    |   6 +-
 .../fs/azurebfs/ITestAbfsNetworkStatistics.java    |  40 ++-
 .../azurebfs/ITestAzureBlobFileSystemCreate.java   | 272 +++++++++++++++++++++
 .../fs/azurebfs/ITestAzureBlobFileSystemMkDir.java |  60 +++++
 .../fs/azurebfs/services/TestAbfsClient.java       |  62 +++--
 10 files changed, 515 insertions(+), 71 deletions(-)

diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
index 66d4853..72a8a43 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
@@ -181,6 +181,10 @@ public class AbfsConfiguration{
       DefaultValue = DEFAULT_FS_AZURE_ATOMIC_RENAME_DIRECTORIES)
   private String azureAtomicDirs;
 
+  @BooleanConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_ENABLE_CONDITIONAL_CREATE_OVERWRITE,
+      DefaultValue = DEFAULT_FS_AZURE_ENABLE_CONDITIONAL_CREATE_OVERWRITE)
+  private boolean enableConditionalCreateOverwrite;
+
   @StringConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_APPEND_BLOB_KEY,
       DefaultValue = DEFAULT_FS_AZURE_APPEND_BLOB_DIRECTORIES)
   private String azureAppendBlobDirs;
@@ -573,6 +577,10 @@ public class AbfsConfiguration{
     return this.azureAtomicDirs;
   }
 
+  public boolean isConditionalCreateOverwriteEnabled() {
+    return this.enableConditionalCreateOverwrite;
+  }
+
   public String getAppendBlobDirs() {
     return this.azureAppendBlobDirs;
   }
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
index 23d2b5a..d2a1d53 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
@@ -66,6 +66,7 @@ import org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations;
 import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
 import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
 import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
+import org.apache.hadoop.fs.azurebfs.contracts.exceptions.ConcurrentWriteOperationDetectedException;
 import org.apache.hadoop.fs.azurebfs.contracts.exceptions.FileSystemOperationUnhandledException;
 import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidAbfsRestOperationException;
 import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidFileSystemPropertyException;
@@ -464,10 +465,32 @@ public class AzureBlobFileSystemStore implements Closeable {
         isAppendBlob = true;
       }
 
-      final AbfsRestOperation op = client.createPath(relativePath, true, overwrite,
-              isNamespaceEnabled ? getOctalNotation(permission) : null,
-              isNamespaceEnabled ? getOctalNotation(umask) : null,
-              isAppendBlob);
+      // if "fs.azure.enable.conditional.create.overwrite" is enabled and
+      // is a create request with overwrite=true, create will follow different
+      // flow.
+      boolean triggerConditionalCreateOverwrite = false;
+      if (overwrite
+          && abfsConfiguration.isConditionalCreateOverwriteEnabled()) {
+        triggerConditionalCreateOverwrite = true;
+      }
+
+      AbfsRestOperation op;
+      if (triggerConditionalCreateOverwrite) {
+        op = conditionalCreateOverwriteFile(relativePath,
+            statistics,
+            isNamespaceEnabled ? getOctalNotation(permission) : null,
+            isNamespaceEnabled ? getOctalNotation(umask) : null,
+            isAppendBlob
+        );
+
+      } else {
+        op = client.createPath(relativePath, true,
+            overwrite,
+            isNamespaceEnabled ? getOctalNotation(permission) : null,
+            isNamespaceEnabled ? getOctalNotation(umask) : null,
+            isAppendBlob,
+            null);
+      }
       perfInfo.registerResult(op.getResult()).registerSuccess(true);
 
       return new AbfsOutputStream(
@@ -479,6 +502,74 @@ public class AzureBlobFileSystemStore implements Closeable {
     }
   }
 
+  /**
+   * Conditional create overwrite flow ensures that create overwrites is done
+   * only if there is match for eTag of existing file.
+   * @param relativePath
+   * @param statistics
+   * @param permission
+   * @param umask
+   * @param isAppendBlob
+   * @return
+   * @throws AzureBlobFileSystemException
+   */
+  private AbfsRestOperation conditionalCreateOverwriteFile(final String relativePath,
+      final FileSystem.Statistics statistics,
+      final String permission,
+      final String umask,
+      final boolean isAppendBlob) throws AzureBlobFileSystemException {
+    AbfsRestOperation op;
+
+    try {
+      // Trigger a create with overwrite=false first so that eTag fetch can be
+      // avoided for cases when no pre-existing file is present (major portion
+      // of create file traffic falls into the case of no pre-existing file).
+      op = client.createPath(relativePath, true,
+          false, permission, umask, isAppendBlob, null);
+    } catch (AbfsRestOperationException e) {
+      if (e.getStatusCode() == HttpURLConnection.HTTP_CONFLICT) {
+        // File pre-exists, fetch eTag
+        try {
+          op = client.getPathStatus(relativePath, false);
+        } catch (AbfsRestOperationException ex) {
+          if (ex.getStatusCode() == HttpURLConnection.HTTP_NOT_FOUND) {
+            // Is a parallel access case, as file which was found to be
+            // present went missing by this request.
+            throw new ConcurrentWriteOperationDetectedException(
+                "Parallel access to the create path detected. Failing request "
+                    + "to honor single writer semantics");
+          } else {
+            throw ex;
+          }
+        }
+
+        String eTag = op.getResult()
+            .getResponseHeader(HttpHeaderConfigurations.ETAG);
+
+        try {
+          // overwrite only if eTag matches with the file properties fetched befpre
+          op = client.createPath(relativePath, true,
+              true, permission, umask, isAppendBlob, eTag);
+        } catch (AbfsRestOperationException ex) {
+          if (ex.getStatusCode() == HttpURLConnection.HTTP_PRECON_FAILED) {
+            // Is a parallel access case, as file with eTag was just queried
+            // and precondition failure can happen only when another file with
+            // different etag got created.
+            throw new ConcurrentWriteOperationDetectedException(
+                "Parallel access to the create path detected. Failing request "
+                    + "to honor single writer semantics");
+          } else {
+            throw ex;
+          }
+        }
+      } else {
+        throw e;
+      }
+    }
+
+    return op;
+  }
+
   private AbfsOutputStreamContext populateAbfsOutputStreamContext(boolean isAppendBlob) {
     int bufferSize = abfsConfiguration.getWriteBufferSize();
     if (isAppendBlob && bufferSize > FileSystemConfigurations.APPENDBLOB_MAX_WRITE_BUFFER_SIZE) {
@@ -508,7 +599,7 @@ public class AzureBlobFileSystemStore implements Closeable {
 
       final AbfsRestOperation op = client.createPath(getRelativePath(path), false, true,
               isNamespaceEnabled ? getOctalNotation(permission) : null,
-              isNamespaceEnabled ? getOctalNotation(umask) : null, false);
+              isNamespaceEnabled ? getOctalNotation(umask) : null, false, null);
       perfInfo.registerResult(op.getResult()).registerSuccess(true);
     }
   }
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
index 681390c..c15c470 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
@@ -67,6 +67,10 @@ public final class ConfigurationKeys {
   public static final String FS_AZURE_ENABLE_AUTOTHROTTLING = "fs.azure.enable.autothrottling";
   public static final String FS_AZURE_ALWAYS_USE_HTTPS = "fs.azure.always.use.https";
   public static final String FS_AZURE_ATOMIC_RENAME_KEY = "fs.azure.atomic.rename.key";
+  /** This config ensures that during create overwrite an existing file will be
+   *  overwritten only if there is a match on the eTag of existing file.
+   */
+  public static final String FS_AZURE_ENABLE_CONDITIONAL_CREATE_OVERWRITE = "fs.azure.enable.conditional.create.overwrite";
   /** Provides a config to provide comma separated path prefixes on which Appendblob based files are created
    *  Default is empty. **/
   public static final String FS_AZURE_APPEND_BLOB_KEY = "fs.azure.appendblob.directories";
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
index 260c496..2b50a5d 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
@@ -70,6 +70,7 @@ public final class FileSystemConfigurations {
   public static final boolean DEFAULT_AZURE_SKIP_USER_GROUP_METADATA_DURING_INITIALIZATION = false;
 
   public static final String DEFAULT_FS_AZURE_ATOMIC_RENAME_DIRECTORIES = "/hbase";
+  public static final boolean DEFAULT_FS_AZURE_ENABLE_CONDITIONAL_CREATE_OVERWRITE = true;
   public static final String DEFAULT_FS_AZURE_APPEND_BLOB_DIRECTORIES = "";
 
   public static final int DEFAULT_READ_AHEAD_QUEUE_DEPTH = -1;
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemMkDir.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/exceptions/ConcurrentWriteOperationDetectedException.java
similarity index 54%
copy from hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemMkDir.java
copy to hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/exceptions/ConcurrentWriteOperationDetectedException.java
index 382d396..79813dd 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemMkDir.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/exceptions/ConcurrentWriteOperationDetectedException.java
@@ -16,33 +16,17 @@
  * limitations under the License.
  */
 
-package org.apache.hadoop.fs.azurebfs;
-
-import org.junit.Test;
-
-import org.apache.hadoop.fs.Path;
-
-import static org.apache.hadoop.fs.contract.ContractTestUtils.assertMkdirs;
+package org.apache.hadoop.fs.azurebfs.contracts.exceptions;
 
 /**
- * Test mkdir operation.
+ * Thrown when a concurrent write operation is detected.
  */
-public class ITestAzureBlobFileSystemMkDir extends AbstractAbfsIntegrationTest {
-
-  public ITestAzureBlobFileSystemMkDir() throws Exception {
-    super();
-  }
-
-  @Test
-  public void testCreateDirWithExistingDir() throws Exception {
-    final AzureBlobFileSystem fs = getFileSystem();
-    Path path = new Path("testFolder");
-    assertMkdirs(fs, path);
-    assertMkdirs(fs, path);
-  }
+@org.apache.hadoop.classification.InterfaceAudience.Public
+@org.apache.hadoop.classification.InterfaceStability.Evolving
+public class ConcurrentWriteOperationDetectedException
+    extends AzureBlobFileSystemException {
 
-  @Test
-  public void testCreateRoot() throws Exception {
-    assertMkdirs(getFileSystem(), new Path("/"));
+  public ConcurrentWriteOperationDetectedException(String message) {
+    super(message);
   }
 }
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
index 45c1948..0415857 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
@@ -265,7 +265,7 @@ public class AbfsClient implements Closeable {
 
   public AbfsRestOperation createPath(final String path, final boolean isFile, final boolean overwrite,
                                       final String permission, final String umask,
-                                      final boolean isAppendBlob) throws AzureBlobFileSystemException {
+                                      final boolean isAppendBlob, final String eTag) throws AzureBlobFileSystemException {
     final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
     if (!overwrite) {
       requestHeaders.add(new AbfsHttpHeader(IF_NONE_MATCH, AbfsHttpConstants.STAR));
@@ -279,6 +279,10 @@ public class AbfsClient implements Closeable {
       requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.X_MS_UMASK, umask));
     }
 
+    if (eTag != null && !eTag.isEmpty()) {
+      requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.IF_MATCH, eTag));
+    }
+
     final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
     abfsUriQueryBuilder.addQuery(QUERY_PARAM_RESOURCE, isFile ? FILE : DIRECTORY);
     if (isAppendBlob) {
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsNetworkStatistics.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsNetworkStatistics.java
index f6ee7a9..c2dbe93 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsNetworkStatistics.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsNetworkStatistics.java
@@ -110,9 +110,18 @@ public class ITestAbfsNetworkStatistics extends AbstractAbfsIntegrationTest {
     connectionsMade++;
     requestsSent++;
 
+
     try (AbfsOutputStream out = createAbfsOutputStreamWithFlushEnabled(fs,
         sendRequestPath)) {
 
+      // Is a file overwrite case
+      long createRequestCalls = 1;
+      long createTriggeredGFSForETag = 0;
+      if (this.getConfiguration().isConditionalCreateOverwriteEnabled()) {
+        createRequestCalls += 1;
+        createTriggeredGFSForETag = 1;
+      }
+
       for (int i = 0; i < LARGE_OPERATIONS; i++) {
         out.write(testNetworkStatsString.getBytes());
 
@@ -141,17 +150,20 @@ public class ITestAbfsNetworkStatistics extends AbstractAbfsIntegrationTest {
        * wrote each time).
        *
        */
+
+      connectionsMade += createRequestCalls + createTriggeredGFSForETag;
+      requestsSent += createRequestCalls;
       if (fs.getAbfsStore().isAppendBlobKey(fs.makeQualified(sendRequestPath).toString())) {
         // no network calls are made for hflush in case of appendblob
         assertAbfsStatistics(CONNECTIONS_MADE,
-            connectionsMade + 1 + LARGE_OPERATIONS, metricMap);
+            connectionsMade + LARGE_OPERATIONS, metricMap);
         assertAbfsStatistics(SEND_REQUESTS,
-            requestsSent + 1 + LARGE_OPERATIONS, metricMap);
+            requestsSent + LARGE_OPERATIONS, metricMap);
       } else {
         assertAbfsStatistics(CONNECTIONS_MADE,
-            connectionsMade + 1 + LARGE_OPERATIONS * 2, metricMap);
+            connectionsMade + LARGE_OPERATIONS * 2, metricMap);
         assertAbfsStatistics(SEND_REQUESTS,
-            requestsSent + 1 + LARGE_OPERATIONS * 2, metricMap);
+            requestsSent + LARGE_OPERATIONS * 2, metricMap);
       }
       assertAbfsStatistics(AbfsStatistic.BYTES_SENT,
           bytesSent + LARGE_OPERATIONS * (testNetworkStatsString.getBytes().length),
@@ -237,13 +249,21 @@ public class ITestAbfsNetworkStatistics extends AbstractAbfsIntegrationTest {
     try {
 
       /*
-       * Creating a file and writing buffer into it. Also recording the
-       * buffer for future read() call.
+       * Creating a file and writing buffer into it.
+       * This is a file recreate, so it will trigger
+       * 2 extra calls if create overwrite is off by default.
+       * Also recording the buffer for future read() call.
        * This creating outputStream and writing requires 2 *
        * (LARGE_OPERATIONS) get requests.
        */
       StringBuilder largeBuffer = new StringBuilder();
       out = fs.create(getResponsePath);
+
+      long createRequestCalls = 1;
+      if (this.getConfiguration().isConditionalCreateOverwriteEnabled()) {
+        createRequestCalls += 2;
+      }
+
       for (int i = 0; i < LARGE_OPERATIONS; i++) {
         out.write(testResponseString.getBytes());
         out.hflush();
@@ -268,7 +288,8 @@ public class ITestAbfsNetworkStatistics extends AbstractAbfsIntegrationTest {
        *
        * get_response : get_responses(Last assertion) + 1
        * (OutputStream) + 2 * LARGE_OPERATIONS(Writing and flushing
-       * LARGE_OPERATIONS times) + 1(open()) + 1(read()).
+       * LARGE_OPERATIONS times) + 1(open()) + 1(read()) +
+       * 1 (createOverwriteTriggeredGetForeTag).
        *
        * bytes_received : bytes_received(Last assertion) + LARGE_OPERATIONS *
        * bytes wrote each time (bytes_received is equal to bytes wrote in the
@@ -284,7 +305,8 @@ public class ITestAbfsNetworkStatistics extends AbstractAbfsIntegrationTest {
             getResponses + 3 + LARGE_OPERATIONS, metricMap);
       } else {
         assertAbfsStatistics(AbfsStatistic.GET_RESPONSES,
-            getResponses + 3 + 2 * LARGE_OPERATIONS, metricMap);
+            getResponses + 2 + createRequestCalls + 2 * LARGE_OPERATIONS,
+            metricMap);
       }
 
     } finally {
@@ -319,4 +341,4 @@ public class ITestAbfsNetworkStatistics extends AbstractAbfsIntegrationTest {
       IOUtils.cleanupWithLogger(LOG, out);
     }
   }
-}
+}
\ No newline at end of file
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemCreate.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemCreate.java
index 4b8f071..981ed25 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemCreate.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemCreate.java
@@ -21,18 +21,44 @@ package org.apache.hadoop.fs.azurebfs;
 import java.io.FileNotFoundException;
 import java.io.FilterOutputStream;
 import java.io.IOException;
+import java.lang.reflect.Field;
 import java.util.EnumSet;
+import java.util.UUID;
 
 import org.junit.Test;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CreateFlag;
+import org.apache.hadoop.fs.FileAlreadyExistsException;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.test.GenericTestUtils;
 
+import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
+import org.apache.hadoop.fs.azurebfs.contracts.exceptions.ConcurrentWriteOperationDetectedException;
+import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
+import org.apache.hadoop.fs.azurebfs.services.TestAbfsClient;
+import org.apache.hadoop.fs.azurebfs.services.AbfsHttpOperation;
+import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation;
+
+import static java.net.HttpURLConnection.HTTP_CONFLICT;
+import static java.net.HttpURLConnection.HTTP_INTERNAL_ERROR;
+import static java.net.HttpURLConnection.HTTP_NOT_FOUND;
+import static java.net.HttpURLConnection.HTTP_OK;
+import static java.net.HttpURLConnection.HTTP_PRECON_FAILED;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
 import static org.apache.hadoop.fs.contract.ContractTestUtils.assertIsFile;
 import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.CONNECTIONS_MADE;
 
 /**
  * Test create operation.
@@ -188,4 +214,250 @@ public class ITestAzureBlobFileSystemCreate extends
         });
   }
 
+  /**
+   * Tests if the number of connections made for:
+   * 1. create overwrite=false of a file that doesnt pre-exist
+   * 2. create overwrite=false of a file that pre-exists
+   * 3. create overwrite=true of a file that doesnt pre-exist
+   * 4. create overwrite=true of a file that pre-exists
+   * matches the expectation when run against both combinations of
+   * fs.azure.enable.conditional.create.overwrite=true and
+   * fs.azure.enable.conditional.create.overwrite=false
+   * @throws Throwable
+   */
+  @Test
+  public void testDefaultCreateOverwriteFileTest() throws Throwable {
+    testCreateFileOverwrite(true);
+    testCreateFileOverwrite(false);
+  }
+
+  public void testCreateFileOverwrite(boolean enableConditionalCreateOverwrite)
+      throws Throwable {
+    final AzureBlobFileSystem currentFs = getFileSystem();
+    Configuration config = new Configuration(this.getRawConfiguration());
+    config.set("fs.azure.enable.conditional.create.overwrite",
+        Boolean.toString(enableConditionalCreateOverwrite));
+
+    final AzureBlobFileSystem fs =
+        (AzureBlobFileSystem) FileSystem.newInstance(currentFs.getUri(),
+            config);
+
+    long totalConnectionMadeBeforeTest = fs.getInstrumentationMap()
+        .get(CONNECTIONS_MADE.getStatName());
+
+    int createRequestCount = 0;
+    final Path nonOverwriteFile = new Path("/NonOverwriteTest_FileName_"
+        + UUID.randomUUID().toString());
+
+    // Case 1: Not Overwrite - File does not pre-exist
+    // create should be successful
+    fs.create(nonOverwriteFile, false);
+
+    // One request to server to create path should be issued
+    createRequestCount++;
+
+    assertAbfsStatistics(
+        CONNECTIONS_MADE,
+        totalConnectionMadeBeforeTest + createRequestCount,
+        fs.getInstrumentationMap());
+
+    // Case 2: Not Overwrite - File pre-exists
+    intercept(FileAlreadyExistsException.class,
+        () -> fs.create(nonOverwriteFile, false));
+
+    // One request to server to create path should be issued
+    createRequestCount++;
+
+    assertAbfsStatistics(
+        CONNECTIONS_MADE,
+        totalConnectionMadeBeforeTest + createRequestCount,
+        fs.getInstrumentationMap());
+
+    final Path overwriteFilePath = new Path("/OverwriteTest_FileName_"
+        + UUID.randomUUID().toString());
+
+    // Case 3: Overwrite - File does not pre-exist
+    // create should be successful
+    fs.create(overwriteFilePath, true);
+
+    // One request to server to create path should be issued
+    createRequestCount++;
+
+    assertAbfsStatistics(
+        CONNECTIONS_MADE,
+        totalConnectionMadeBeforeTest + createRequestCount,
+        fs.getInstrumentationMap());
+
+    // Case 4: Overwrite - File pre-exists
+    fs.create(overwriteFilePath, true);
+
+    if (enableConditionalCreateOverwrite) {
+      // Three requests will be sent to server to create path,
+      // 1. create without overwrite
+      // 2. GetFileStatus to get eTag
+      // 3. create with overwrite
+      createRequestCount += 3;
+    } else {
+      createRequestCount++;
+    }
+
+    assertAbfsStatistics(
+        CONNECTIONS_MADE,
+        totalConnectionMadeBeforeTest + createRequestCount,
+        fs.getInstrumentationMap());
+  }
+
+  /**
+   * Test negative scenarios with Create overwrite=false as default
+   * With create overwrite=true ending in 3 calls:
+   * A. Create overwrite=false
+   * B. GFS
+   * C. Create overwrite=true
+   *
+   * Scn1: A fails with HTTP409, leading to B which fails with HTTP404,
+   *        detect parallel access
+   * Scn2: A fails with HTTP409, leading to B which fails with HTTP500,
+   *        fail create with HTTP500
+   * Scn3: A fails with HTTP409, leading to B and then C,
+   *        which fails with HTTP412, detect parallel access
+   * Scn4: A fails with HTTP409, leading to B and then C,
+   *        which fails with HTTP500, fail create with HTTP500
+   * Scn5: A fails with HTTP500, fail create with HTTP500
+   */
+  @Test
+  public void testNegativeScenariosForCreateOverwriteDisabled()
+      throws Throwable {
+
+    final AzureBlobFileSystem currentFs = getFileSystem();
+    Configuration config = new Configuration(this.getRawConfiguration());
+    config.set("fs.azure.enable.conditional.create.overwrite",
+        Boolean.toString(true));
+
+    final AzureBlobFileSystem fs =
+        (AzureBlobFileSystem) FileSystem.newInstance(currentFs.getUri(),
+            config);
+
+    // Get mock AbfsClient with current config
+    AbfsClient
+        mockClient
+        = TestAbfsClient.getMockAbfsClient(
+        fs.getAbfsStore().getClient(),
+        fs.getAbfsStore().getAbfsConfiguration());
+
+    AzureBlobFileSystemStore abfsStore = fs.getAbfsStore();
+    abfsStore = setAzureBlobSystemStoreField(abfsStore, "client", mockClient);
+
+    AbfsRestOperation successOp = mock(
+        AbfsRestOperation.class);
+    AbfsHttpOperation http200Op = mock(
+        AbfsHttpOperation.class);
+    when(http200Op.getStatusCode()).thenReturn(HTTP_OK);
+    when(successOp.getResult()).thenReturn(http200Op);
+
+    AbfsRestOperationException conflictResponseEx
+        = getMockAbfsRestOperationException(HTTP_CONFLICT);
+    AbfsRestOperationException serverErrorResponseEx
+        = getMockAbfsRestOperationException(HTTP_INTERNAL_ERROR);
+    AbfsRestOperationException fileNotFoundResponseEx
+        = getMockAbfsRestOperationException(HTTP_NOT_FOUND);
+    AbfsRestOperationException preConditionResponseEx
+        = getMockAbfsRestOperationException(HTTP_PRECON_FAILED);
+
+    doThrow(conflictResponseEx) // Scn1: GFS fails with Http404
+        .doThrow(conflictResponseEx) // Scn2: GFS fails with Http500
+        .doThrow(
+            conflictResponseEx) // Scn3: create overwrite=true fails with Http412
+        .doThrow(
+            conflictResponseEx) // Scn4: create overwrite=true fails with Http500
+        .doThrow(
+            serverErrorResponseEx) // Scn5: create overwrite=false fails with Http500
+        .when(mockClient)
+        .createPath(any(String.class), eq(true), eq(false), any(String.class),
+            any(String.class), any(boolean.class), eq(null));
+
+    doThrow(fileNotFoundResponseEx) // Scn1: GFS fails with Http404
+        .doThrow(serverErrorResponseEx) // Scn2: GFS fails with Http500
+        .doReturn(successOp) // Scn3: create overwrite=true fails with Http412
+        .doReturn(successOp) // Scn4: create overwrite=true fails with Http500
+        .when(mockClient)
+        .getPathStatus(any(String.class), eq(false));
+
+    doThrow(
+        preConditionResponseEx) // Scn3: create overwrite=true fails with Http412
+        .doThrow(
+            serverErrorResponseEx) // Scn4: create overwrite=true fails with Http500
+        .when(mockClient)
+        .createPath(any(String.class), eq(true), eq(true), any(String.class),
+            any(String.class), any(boolean.class), eq(null));
+
+    // Scn1: GFS fails with Http404
+    // Sequence of events expected:
+    // 1. create overwrite=false - fail with conflict
+    // 2. GFS - fail with File Not found
+    // Create will fail with ConcurrentWriteOperationDetectedException
+    validateCreateFileException(ConcurrentWriteOperationDetectedException.class,
+        abfsStore);
+
+    // Scn2: GFS fails with Http500
+    // Sequence of events expected:
+    // 1. create overwrite=false - fail with conflict
+    // 2. GFS - fail with Server error
+    // Create will fail with 500
+    validateCreateFileException(AbfsRestOperationException.class, abfsStore);
+
+    // Scn3: create overwrite=true fails with Http412
+    // Sequence of events expected:
+    // 1. create overwrite=false - fail with conflict
+    // 2. GFS - pass
+    // 3. create overwrite=true - fail with Pre-Condition
+    // Create will fail with ConcurrentWriteOperationDetectedException
+    validateCreateFileException(ConcurrentWriteOperationDetectedException.class,
+        abfsStore);
+
+    // Scn4: create overwrite=true fails with Http500
+    // Sequence of events expected:
+    // 1. create overwrite=false - fail with conflict
+    // 2. GFS - pass
+    // 3. create overwrite=true - fail with Server error
+    // Create will fail with 500
+    validateCreateFileException(AbfsRestOperationException.class, abfsStore);
+
+    // Scn5: create overwrite=false fails with Http500
+    // Sequence of events expected:
+    // 1. create overwrite=false - fail with server error
+    // Create will fail with 500
+    validateCreateFileException(AbfsRestOperationException.class, abfsStore);
+  }
+
+  private AzureBlobFileSystemStore setAzureBlobSystemStoreField(
+      final AzureBlobFileSystemStore abfsStore,
+      final String fieldName,
+      Object fieldObject) throws Exception {
+
+    Field abfsClientField = AzureBlobFileSystemStore.class.getDeclaredField(
+        fieldName);
+    abfsClientField.setAccessible(true);
+    Field modifiersField = Field.class.getDeclaredField("modifiers");
+    modifiersField.setAccessible(true);
+    modifiersField.setInt(abfsClientField,
+        abfsClientField.getModifiers() & ~java.lang.reflect.Modifier.FINAL);
+    abfsClientField.set(abfsStore, fieldObject);
+    return abfsStore;
+  }
+
+  private <E extends Throwable> void validateCreateFileException(final Class<E> exceptionClass, final AzureBlobFileSystemStore abfsStore)
+      throws Exception {
+    FsPermission permission = new FsPermission(FsAction.ALL, FsAction.ALL,
+        FsAction.ALL);
+    FsPermission umask = new FsPermission(FsAction.NONE, FsAction.NONE,
+        FsAction.NONE);
+    Path testPath = new Path("testFile");
+    intercept(
+        exceptionClass,
+        () -> abfsStore.createFile(testPath, null, true, permission, umask));
+  }
+
+  private AbfsRestOperationException getMockAbfsRestOperationException(int status) {
+    return new AbfsRestOperationException(status, "", "", new Exception());
+  }
 }
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemMkDir.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemMkDir.java
index 382d396..de476a6 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemMkDir.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemMkDir.java
@@ -18,12 +18,18 @@
 
 package org.apache.hadoop.fs.azurebfs;
 
+import java.util.UUID;
+
 import org.junit.Test;
 
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
 import static org.apache.hadoop.fs.contract.ContractTestUtils.assertMkdirs;
 
+import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.CONNECTIONS_MADE;
+
 /**
  * Test mkdir operation.
  */
@@ -45,4 +51,58 @@ public class ITestAzureBlobFileSystemMkDir extends AbstractAbfsIntegrationTest {
   public void testCreateRoot() throws Exception {
     assertMkdirs(getFileSystem(), new Path("/"));
   }
+
+  /**
+   * Test mkdir for possible values of fs.azure.disable.default.create.overwrite
+   * @throws Exception
+   */
+  @Test
+  public void testDefaultCreateOverwriteDirTest() throws Throwable {
+    // the config fs.azure.disable.default.create.overwrite should have no
+    // effect on mkdirs
+    testCreateDirOverwrite(true);
+    testCreateDirOverwrite(false);
+  }
+
+  public void testCreateDirOverwrite(boolean enableConditionalCreateOverwrite)
+      throws Throwable {
+    final AzureBlobFileSystem currentFs = getFileSystem();
+    Configuration config = new Configuration(this.getRawConfiguration());
+    config.set("fs.azure.enable.conditional.create.overwrite",
+        Boolean.toString(enableConditionalCreateOverwrite));
+
+    final AzureBlobFileSystem fs =
+        (AzureBlobFileSystem) FileSystem.newInstance(currentFs.getUri(),
+            config);
+
+    long totalConnectionMadeBeforeTest = fs.getInstrumentationMap()
+        .get(CONNECTIONS_MADE.getStatName());
+
+    int mkdirRequestCount = 0;
+    final Path dirPath = new Path("/DirPath_"
+        + UUID.randomUUID().toString());
+
+    // Case 1: Dir does not pre-exist
+    fs.mkdirs(dirPath);
+
+    // One request to server
+    mkdirRequestCount++;
+
+    assertAbfsStatistics(
+        CONNECTIONS_MADE,
+        totalConnectionMadeBeforeTest + mkdirRequestCount,
+        fs.getInstrumentationMap());
+
+    // Case 2: Dir pre-exists
+    // Mkdir on existing Dir path will not lead to failure
+    fs.mkdirs(dirPath);
+
+    // One request to server
+    mkdirRequestCount++;
+
+    assertAbfsStatistics(
+        CONNECTIONS_MADE,
+        totalConnectionMadeBeforeTest + mkdirRequestCount,
+        fs.getInstrumentationMap());
+  }
 }
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsClient.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsClient.java
index 0d904c8..7a7992d 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsClient.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsClient.java
@@ -283,8 +283,7 @@ public final class TestAbfsClient {
   }
 
   public static AbfsClient getMockAbfsClient(AbfsClient baseAbfsClientInstance,
-      AbfsConfiguration abfsConfig)
-      throws IOException, NoSuchFieldException, IllegalAccessException {
+      AbfsConfiguration abfsConfig) throws Exception {
     AuthType currentAuthType = abfsConfig.getAuthType(
         abfsConfig.getAccountName());
 
@@ -310,47 +309,46 @@ public final class TestAbfsClient {
     when(client.createDefaultHeaders()).thenCallRealMethod();
 
     // override baseurl
-    Field baseUrlField = AbfsClient.class.getDeclaredField("baseUrl");
-    baseUrlField.setAccessible(true);
-    Field modifiersField = Field.class.getDeclaredField("modifiers");
-    modifiersField.setAccessible(true);
-    modifiersField.setInt(baseUrlField, baseUrlField.getModifiers() & ~java.lang.reflect.Modifier.FINAL);
-    baseUrlField.set(client, baseAbfsClientInstance.getBaseUrl());
+    client = TestAbfsClient.setAbfsClientField(client, "abfsConfiguration",
+        abfsConfig);
+
+    // override baseurl
+    client = TestAbfsClient.setAbfsClientField(client, "baseUrl",
+        baseAbfsClientInstance.getBaseUrl());
 
     // override auth provider
     if (currentAuthType == AuthType.SharedKey) {
-      Field sharedKeyCredsField = AbfsClient.class.getDeclaredField(
-          "sharedKeyCredentials");
-      sharedKeyCredsField.setAccessible(true);
-      modifiersField.setInt(sharedKeyCredsField,
-          sharedKeyCredsField.getModifiers()
-              & ~java.lang.reflect.Modifier.FINAL);
-      sharedKeyCredsField.set(client, new SharedKeyCredentials(
-          abfsConfig.getAccountName().substring(0,
-              abfsConfig.getAccountName().indexOf(DOT)),
-          abfsConfig.getStorageAccountKey()));
+      client = TestAbfsClient.setAbfsClientField(client, "sharedKeyCredentials",
+          new SharedKeyCredentials(
+              abfsConfig.getAccountName().substring(0,
+                  abfsConfig.getAccountName().indexOf(DOT)),
+              abfsConfig.getStorageAccountKey()));
     } else {
-      Field tokenProviderField = AbfsClient.class.getDeclaredField(
-          "tokenProvider");
-      tokenProviderField.setAccessible(true);
-      modifiersField.setInt(tokenProviderField,
-          tokenProviderField.getModifiers()
-              & ~java.lang.reflect.Modifier.FINAL);
-      tokenProviderField.set(client, abfsConfig.getTokenProvider());
+      client = TestAbfsClient.setAbfsClientField(client, "tokenProvider",
+          abfsConfig.getTokenProvider());
     }
 
     // override user agent
     String userAgent = "APN/1.0 Azure Blob FS/3.4.0-SNAPSHOT (PrivateBuild "
         + "JavaJRE 1.8.0_252; Linux 5.3.0-59-generic/amd64; openssl-1.0; "
         + "UNKNOWN/UNKNOWN) MSFT";
-    Field userAgentField = AbfsClient.class.getDeclaredField(
-        "userAgent");
-    userAgentField.setAccessible(true);
-    modifiersField.setInt(userAgentField,
-        userAgentField.getModifiers()
-            & ~java.lang.reflect.Modifier.FINAL);
-    userAgentField.set(client, userAgent);
+    client = TestAbfsClient.setAbfsClientField(client, "userAgent", userAgent);
 
     return client;
   }
+
+  private static AbfsClient setAbfsClientField(
+      final AbfsClient client,
+      final String fieldName,
+      Object fieldObject) throws Exception {
+
+    Field field = AbfsClient.class.getDeclaredField(fieldName);
+    field.setAccessible(true);
+    Field modifiersField = Field.class.getDeclaredField("modifiers");
+    modifiersField.setAccessible(true);
+    modifiersField.setInt(field,
+        field.getModifiers() & ~java.lang.reflect.Modifier.FINAL);
+    field.set(client, fieldObject);
+    return client;
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org