You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by tm...@apache.org on 2020/10/14 22:39:26 UTC

[hadoop] branch branch-3.3 updated (41a3c9b -> d5b4d04)

This is an automated email from the ASF dual-hosted git repository.

tmarquardt pushed a change to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/hadoop.git.


    from 41a3c9b  HDFS-15628. HttpFS server throws NPE if a file is a symlink. Contributed by Ahmed Hussein.
     new fbf151e  HADOOP-17137. ABFS: Makes the test cases in ITestAbfsNetworkStatistics agnostic
     new f73c90f  HADOOP-17163. ABFS: Adding debug log for rename failures
     new e481d01  HADOOP-17149. ABFS: Fixing the testcase ITestGetNameSpaceEnabled
     new 4072323  Upgrade store REST API version to 2019-12-12
     new cc73503  HADOOP-16915. ABFS: Ignoring the test ITestAzureBlobFileSystemRandomRead.testRandomReadPerformance
     new f208da2  HADOOP-17166. ABFS: configure output stream thread pool (#2179)
     new d166420  HADOOP-17215: Support for conditional overwrite.
     new da5db6a  HADOOP-17279: ABFS: testNegativeScenariosForCreateOverwriteDisabled fails for non-HNS account.
     new d5b4d04  HADOOP-17301. ABFS: read-ahead error reporting breaks buffer management (#2369)

The 9 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../hadoop/fs/azurebfs/AbfsConfiguration.java      |  30 +++
 .../hadoop/fs/azurebfs/AzureBlobFileSystem.java    |   1 +
 .../fs/azurebfs/AzureBlobFileSystemStore.java      | 103 +++++++-
 .../fs/azurebfs/constants/ConfigurationKeys.java   |   6 +
 .../constants/FileSystemConfigurations.java        |   1 +
 ...ConcurrentWriteOperationDetectedException.java} |  17 +-
 .../hadoop/fs/azurebfs/services/AbfsClient.java    |   8 +-
 .../fs/azurebfs/services/AbfsOutputStream.java     |  18 +-
 .../azurebfs/services/AbfsOutputStreamContext.java |  24 ++
 .../fs/azurebfs/services/ReadBufferManager.java    |  36 ++-
 .../hadoop-azure/src/site/markdown/abfs.md         |  13 +
 .../fs/azurebfs/ITestAbfsNetworkStatistics.java    | 103 +++++---
 .../azurebfs/ITestAzureBlobFileSystemCreate.java   | 279 +++++++++++++++++++++
 .../fs/azurebfs/ITestAzureBlobFileSystemMkDir.java |  60 +++++
 .../ITestAzureBlobFileSystemRandomRead.java        |   2 +
 .../fs/azurebfs/ITestGetNameSpaceEnabled.java      |  23 --
 .../hadoop/fs/azurebfs/ITestSharedKeyAuth.java     |  61 +++++
 .../azurebfs/services/ITestAbfsOutputStream.java   |  78 ++++++
 .../fs/azurebfs/services/TestAbfsClient.java       |  62 +++--
 .../fs/azurebfs/services/TestAbfsInputStream.java  |  49 ++++
 .../fs/azurebfs/services/TestAbfsOutputStream.java |   7 +-
 21 files changed, 870 insertions(+), 111 deletions(-)
 copy hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/exceptions/{TimeoutException.java => ConcurrentWriteOperationDetectedException.java} (72%)
 create mode 100644 hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestSharedKeyAuth.java
 create mode 100644 hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsOutputStream.java


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[hadoop] 05/09: HADOOP-16915. ABFS: Ignoring the test ITestAzureBlobFileSystemRandomRead.testRandomReadPerformance

Posted by tm...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tmarquardt pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/hadoop.git

commit cc7350302f4bde082d8dd5a01bf7d71b195089a2
Author: bilaharith <52...@users.noreply.github.com>
AuthorDate: Tue Aug 25 00:30:55 2020 +0530

    HADOOP-16915. ABFS: Ignoring the test ITestAzureBlobFileSystemRandomRead.testRandomReadPerformance
    
    - Contributed by Bilahari T H
---
 .../apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRandomRead.java   | 2 ++
 1 file changed, 2 insertions(+)

diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRandomRead.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRandomRead.java
index e5f64b5..f582763 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRandomRead.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRandomRead.java
@@ -23,6 +23,7 @@ import java.util.Random;
 import java.util.concurrent.Callable;
 
 import org.junit.Assume;
+import org.junit.Ignore;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -412,6 +413,7 @@ public class ITestAzureBlobFileSystemRandomRead extends
   }
 
   @Test
+  @Ignore("HADOOP-16915")
   public void testRandomReadPerformance() throws Exception {
     Assume.assumeFalse("This test does not support namespace enabled account",
             this.getFileSystem().getIsNamespaceEnabled());


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[hadoop] 01/09: HADOOP-17137. ABFS: Makes the test cases in ITestAbfsNetworkStatistics agnostic

Posted by tm...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tmarquardt pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/hadoop.git

commit fbf151ef6f608e5acc0c478325434c88359724da
Author: bilaharith <52...@users.noreply.github.com>
AuthorDate: Sat Aug 1 00:57:57 2020 +0530

    HADOOP-17137. ABFS: Makes the test cases in ITestAbfsNetworkStatistics agnostic
    
    - Contributed by Bilahari T H
---
 .../fs/azurebfs/ITestAbfsNetworkStatistics.java    | 63 +++++++++++++---------
 1 file changed, 38 insertions(+), 25 deletions(-)

diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsNetworkStatistics.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsNetworkStatistics.java
index e3a97b3..f6ee7a9 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsNetworkStatistics.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsNetworkStatistics.java
@@ -33,6 +33,9 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream;
 import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation;
 
+import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.CONNECTIONS_MADE;
+import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.SEND_REQUESTS;
+
 public class ITestAbfsNetworkStatistics extends AbstractAbfsIntegrationTest {
 
   private static final Logger LOG =
@@ -57,6 +60,11 @@ public class ITestAbfsNetworkStatistics extends AbstractAbfsIntegrationTest {
     String testNetworkStatsString = "http_send";
     long connectionsMade, requestsSent, bytesSent;
 
+    metricMap = fs.getInstrumentationMap();
+    long connectionsMadeBeforeTest = metricMap
+        .get(CONNECTIONS_MADE.getStatName());
+    long requestsMadeBeforeTest = metricMap.get(SEND_REQUESTS.getStatName());
+
     /*
      * Creating AbfsOutputStream will result in 1 connection made and 1 send
      * request.
@@ -76,27 +84,26 @@ public class ITestAbfsNetworkStatistics extends AbstractAbfsIntegrationTest {
       /*
        * Testing the network stats with 1 write operation.
        *
-       * connections_made : 3(getFileSystem()) + 1(AbfsOutputStream) + 2(flush).
+       * connections_made : (connections made above) + 2(flush).
        *
-       * send_requests : 1(getFileSystem()) + 1(AbfsOutputStream) + 2(flush).
+       * send_requests : (requests sent above) + 2(flush).
        *
        * bytes_sent : bytes wrote in AbfsOutputStream.
        */
-      if (fs.getAbfsStore().isAppendBlobKey(fs.makeQualified(sendRequestPath).toString())) {
+      long extraCalls = 0;
+      if (!fs.getAbfsStore()
+          .isAppendBlobKey(fs.makeQualified(sendRequestPath).toString())) {
         // no network calls are made for hflush in case of appendblob
-        connectionsMade = assertAbfsStatistics(AbfsStatistic.CONNECTIONS_MADE,
-            5, metricMap);
-        requestsSent = assertAbfsStatistics(AbfsStatistic.SEND_REQUESTS, 3,
-            metricMap);
-      } else {
-        connectionsMade = assertAbfsStatistics(AbfsStatistic.CONNECTIONS_MADE,
-            6, metricMap);
-        requestsSent = assertAbfsStatistics(AbfsStatistic.SEND_REQUESTS, 4,
-            metricMap);
+        extraCalls++;
       }
+      long expectedConnectionsMade = connectionsMadeBeforeTest + extraCalls + 2;
+      long expectedRequestsSent = requestsMadeBeforeTest + extraCalls + 2;
+      connectionsMade = assertAbfsStatistics(CONNECTIONS_MADE,
+          expectedConnectionsMade, metricMap);
+      requestsSent = assertAbfsStatistics(SEND_REQUESTS, expectedRequestsSent,
+          metricMap);
       bytesSent = assertAbfsStatistics(AbfsStatistic.BYTES_SENT,
           testNetworkStatsString.getBytes().length, metricMap);
-
     }
 
     // To close the AbfsOutputStream 1 connection is made and 1 request is sent.
@@ -136,14 +143,14 @@ public class ITestAbfsNetworkStatistics extends AbstractAbfsIntegrationTest {
        */
       if (fs.getAbfsStore().isAppendBlobKey(fs.makeQualified(sendRequestPath).toString())) {
         // no network calls are made for hflush in case of appendblob
-        assertAbfsStatistics(AbfsStatistic.CONNECTIONS_MADE,
+        assertAbfsStatistics(CONNECTIONS_MADE,
             connectionsMade + 1 + LARGE_OPERATIONS, metricMap);
-        assertAbfsStatistics(AbfsStatistic.SEND_REQUESTS,
+        assertAbfsStatistics(SEND_REQUESTS,
             requestsSent + 1 + LARGE_OPERATIONS, metricMap);
       } else {
-        assertAbfsStatistics(AbfsStatistic.CONNECTIONS_MADE,
+        assertAbfsStatistics(CONNECTIONS_MADE,
             connectionsMade + 1 + LARGE_OPERATIONS * 2, metricMap);
-        assertAbfsStatistics(AbfsStatistic.SEND_REQUESTS,
+        assertAbfsStatistics(SEND_REQUESTS,
             requestsSent + 1 + LARGE_OPERATIONS * 2, metricMap);
       }
       assertAbfsStatistics(AbfsStatistic.BYTES_SENT,
@@ -183,6 +190,10 @@ public class ITestAbfsNetworkStatistics extends AbstractAbfsIntegrationTest {
       out.write(testResponseString.getBytes());
       out.hflush();
 
+      metricMap = fs.getInstrumentationMap();
+      long getResponsesBeforeTest = metricMap
+          .get(CONNECTIONS_MADE.getStatName());
+
       // open would require 1 get response.
       in = fs.open(getResponsePath);
       // read would require 1 get response and also get the bytes received.
@@ -196,18 +207,20 @@ public class ITestAbfsNetworkStatistics extends AbstractAbfsIntegrationTest {
       /*
        * Testing values of statistics after writing and reading a buffer.
        *
-       * get_responses - 6(above operations) + 1(open()) + 1 (read()).
+       * get_responses - (above operations) + 1(open()) + 1 (read()).;
        *
        * bytes_received - This should be equal to bytes sent earlier.
        */
-      if (fs.getAbfsStore().isAppendBlobKey(fs.makeQualified(getResponsePath).toString())) {
-        //for appendBlob hflush is a no-op
-        getResponses = assertAbfsStatistics(AbfsStatistic.GET_RESPONSES, 7,
-            metricMap);
-      } else {
-        getResponses = assertAbfsStatistics(AbfsStatistic.GET_RESPONSES, 8,
-            metricMap);
+      long extraCalls = 0;
+      if (!fs.getAbfsStore()
+          .isAppendBlobKey(fs.makeQualified(getResponsePath).toString())) {
+        // no network calls are made for hflush in case of appendblob
+        extraCalls++;
       }
+      long expectedGetResponses = getResponsesBeforeTest + extraCalls + 1;
+      getResponses = assertAbfsStatistics(AbfsStatistic.GET_RESPONSES,
+          expectedGetResponses, metricMap);
+
       // Testing that bytes received is equal to bytes sent.
       long bytesSend = metricMap.get(AbfsStatistic.BYTES_SENT.getStatName());
       bytesReceived = assertAbfsStatistics(AbfsStatistic.BYTES_RECEIVED,


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[hadoop] 08/09: HADOOP-17279: ABFS: testNegativeScenariosForCreateOverwriteDisabled fails for non-HNS account.

Posted by tm...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tmarquardt pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/hadoop.git

commit da5db6a5a66d13fd8ba71b127bd6f16a1e3dade8
Author: Sneha Vijayarajan <sn...@microsoft.com>
AuthorDate: Tue Sep 22 20:58:12 2020 +0000

    HADOOP-17279: ABFS: testNegativeScenariosForCreateOverwriteDisabled fails for non-HNS account.
    
    Contributed by Sneha Vijayarajan
    
    Testing:
    
    namespace.enabled=false
    auth.type=SharedKey
    $mvn -T 1C -Dparallel-tests=abfs -Dscale -DtestsThreadCount=8 clean verify
    
    Tests run: 87, Failures: 0, Errors: 0, Skipped: 0
    Tests run: 457, Failures: 0, Errors: 0, Skipped: 246
    Tests run: 207, Failures: 0, Errors: 0, Skipped: 24
    
    namespace.enabled=true
    auth.type=SharedKey
    $mvn -T 1C -Dparallel-tests=abfs -Dscale -DtestsThreadCount=8 clean verify
    
    Tests run: 87, Failures: 0, Errors: 0, Skipped: 0
    Tests run: 457, Failures: 0, Errors: 0, Skipped: 33
    Tests run: 207, Failures: 0, Errors: 0, Skipped: 24
    
    namespace.enabled=true
    auth.type=OAuth
    $mvn -T 1C -Dparallel-tests=abfs -Dscale -DtestsThreadCount=8 clean verify
    
    Tests run: 87, Failures: 0, Errors: 0, Skipped: 0
    Tests run: 457, Failures: 0, Errors: 0, Skipped: 74
    Tests run: 207, Failures: 0, Errors: 0, Skipped: 140
---
 .../fs/azurebfs/ITestAzureBlobFileSystemCreate.java       | 15 +++++++++++----
 1 file changed, 11 insertions(+), 4 deletions(-)

diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemCreate.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemCreate.java
index 981ed25..09304d1 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemCreate.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemCreate.java
@@ -346,6 +346,7 @@ public class ITestAzureBlobFileSystemCreate extends
 
     AzureBlobFileSystemStore abfsStore = fs.getAbfsStore();
     abfsStore = setAzureBlobSystemStoreField(abfsStore, "client", mockClient);
+    boolean isNamespaceEnabled = abfsStore.getIsNamespaceEnabled();
 
     AbfsRestOperation successOp = mock(
         AbfsRestOperation.class);
@@ -363,6 +364,7 @@ public class ITestAzureBlobFileSystemCreate extends
     AbfsRestOperationException preConditionResponseEx
         = getMockAbfsRestOperationException(HTTP_PRECON_FAILED);
 
+    // mock for overwrite=false
     doThrow(conflictResponseEx) // Scn1: GFS fails with Http404
         .doThrow(conflictResponseEx) // Scn2: GFS fails with Http500
         .doThrow(
@@ -372,8 +374,10 @@ public class ITestAzureBlobFileSystemCreate extends
         .doThrow(
             serverErrorResponseEx) // Scn5: create overwrite=false fails with Http500
         .when(mockClient)
-        .createPath(any(String.class), eq(true), eq(false), any(String.class),
-            any(String.class), any(boolean.class), eq(null));
+        .createPath(any(String.class), eq(true), eq(false),
+            isNamespaceEnabled ? any(String.class) : eq(null),
+            isNamespaceEnabled ? any(String.class) : eq(null),
+            any(boolean.class), eq(null));
 
     doThrow(fileNotFoundResponseEx) // Scn1: GFS fails with Http404
         .doThrow(serverErrorResponseEx) // Scn2: GFS fails with Http500
@@ -382,13 +386,16 @@ public class ITestAzureBlobFileSystemCreate extends
         .when(mockClient)
         .getPathStatus(any(String.class), eq(false));
 
+    // mock for overwrite=true
     doThrow(
         preConditionResponseEx) // Scn3: create overwrite=true fails with Http412
         .doThrow(
             serverErrorResponseEx) // Scn4: create overwrite=true fails with Http500
         .when(mockClient)
-        .createPath(any(String.class), eq(true), eq(true), any(String.class),
-            any(String.class), any(boolean.class), eq(null));
+        .createPath(any(String.class), eq(true), eq(true),
+            isNamespaceEnabled ? any(String.class) : eq(null),
+            isNamespaceEnabled ? any(String.class) : eq(null),
+            any(boolean.class), eq(null));
 
     // Scn1: GFS fails with Http404
     // Sequence of events expected:


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[hadoop] 02/09: HADOOP-17163. ABFS: Adding debug log for rename failures

Posted by tm...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tmarquardt pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/hadoop.git

commit f73c90f0b07991c2871ab33fedfa16f0d4c88c74
Author: bilaharith <52...@users.noreply.github.com>
AuthorDate: Wed Aug 5 22:08:13 2020 +0530

    HADOOP-17163. ABFS: Adding debug log for rename failures
    
    - Contributed by Bilahari T H
---
 .../src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java | 1 +
 1 file changed, 1 insertion(+)

diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java
index 84d6068..513150a 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java
@@ -323,6 +323,7 @@ public class AzureBlobFileSystem extends FileSystem {
       abfsStore.rename(qualifiedSrcPath, qualifiedDstPath);
       return true;
     } catch(AzureBlobFileSystemException ex) {
+      LOG.debug("Rename operation failed. ", ex);
       checkException(
               src,
               ex,


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[hadoop] 07/09: HADOOP-17215: Support for conditional overwrite.

Posted by tm...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tmarquardt pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/hadoop.git

commit d1664203026c0bb862bb405f3f48c602ef699a2f
Author: Sneha Vijayarajan <sn...@gmail.com>
AuthorDate: Wed Aug 26 00:31:35 2020 +0530

    HADOOP-17215: Support for conditional overwrite.
    
    Contributed by Sneha Vijayarajan
    
    DETAILS:
    
        This change adds config key "fs.azure.enable.conditional.create.overwrite" with
        a default of true.  When enabled, if create(path, overwrite: true) is invoked
        and the file exists, the ABFS driver will first obtain its etag and then attempt
        to overwrite the file on the condition that the etag matches. The purpose of this
        is to mitigate the non-idempotency of this method.  Specifically, in the event of
        a network error or similar, the client will retry and this can result in the file
        being created more than once which may result in data loss.  In essense this is
        like a poor man's file handle, and will be addressed more thoroughly in the future
        when support for lease is added to ABFS.
    
    TEST RESULTS:
    
        namespace.enabled=true
        auth.type=SharedKey
        -------------------
        $mvn -T 1C -Dparallel-tests=abfs -Dscale -DtestsThreadCount=8 clean verify
        Tests run: 87, Failures: 0, Errors: 0, Skipped: 0
        Tests run: 457, Failures: 0, Errors: 0, Skipped: 42
        Tests run: 207, Failures: 0, Errors: 0, Skipped: 24
    
        namespace.enabled=true
        auth.type=OAuth
        -------------------
        $mvn -T 1C -Dparallel-tests=abfs -Dscale -DtestsThreadCount=8 clean verify
        Tests run: 87, Failures: 0, Errors: 0, Skipped: 0
        Tests run: 457, Failures: 0, Errors: 0, Skipped: 74
        Tests run: 207, Failures: 0, Errors: 0, Skipped: 140
---
 .../hadoop/fs/azurebfs/AbfsConfiguration.java      |   8 +
 .../fs/azurebfs/AzureBlobFileSystemStore.java      | 101 +++++++-
 .../fs/azurebfs/constants/ConfigurationKeys.java   |   4 +
 .../constants/FileSystemConfigurations.java        |   1 +
 ...ConcurrentWriteOperationDetectedException.java} |  32 +--
 .../hadoop/fs/azurebfs/services/AbfsClient.java    |   6 +-
 .../fs/azurebfs/ITestAbfsNetworkStatistics.java    |  40 ++-
 .../azurebfs/ITestAzureBlobFileSystemCreate.java   | 272 +++++++++++++++++++++
 .../fs/azurebfs/ITestAzureBlobFileSystemMkDir.java |  60 +++++
 .../fs/azurebfs/services/TestAbfsClient.java       |  62 +++--
 10 files changed, 515 insertions(+), 71 deletions(-)

diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
index 66d4853..72a8a43 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
@@ -181,6 +181,10 @@ public class AbfsConfiguration{
       DefaultValue = DEFAULT_FS_AZURE_ATOMIC_RENAME_DIRECTORIES)
   private String azureAtomicDirs;
 
+  @BooleanConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_ENABLE_CONDITIONAL_CREATE_OVERWRITE,
+      DefaultValue = DEFAULT_FS_AZURE_ENABLE_CONDITIONAL_CREATE_OVERWRITE)
+  private boolean enableConditionalCreateOverwrite;
+
   @StringConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_APPEND_BLOB_KEY,
       DefaultValue = DEFAULT_FS_AZURE_APPEND_BLOB_DIRECTORIES)
   private String azureAppendBlobDirs;
@@ -573,6 +577,10 @@ public class AbfsConfiguration{
     return this.azureAtomicDirs;
   }
 
+  public boolean isConditionalCreateOverwriteEnabled() {
+    return this.enableConditionalCreateOverwrite;
+  }
+
   public String getAppendBlobDirs() {
     return this.azureAppendBlobDirs;
   }
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
index 23d2b5a..d2a1d53 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
@@ -66,6 +66,7 @@ import org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations;
 import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
 import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
 import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
+import org.apache.hadoop.fs.azurebfs.contracts.exceptions.ConcurrentWriteOperationDetectedException;
 import org.apache.hadoop.fs.azurebfs.contracts.exceptions.FileSystemOperationUnhandledException;
 import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidAbfsRestOperationException;
 import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidFileSystemPropertyException;
@@ -464,10 +465,32 @@ public class AzureBlobFileSystemStore implements Closeable {
         isAppendBlob = true;
       }
 
-      final AbfsRestOperation op = client.createPath(relativePath, true, overwrite,
-              isNamespaceEnabled ? getOctalNotation(permission) : null,
-              isNamespaceEnabled ? getOctalNotation(umask) : null,
-              isAppendBlob);
+      // if "fs.azure.enable.conditional.create.overwrite" is enabled and
+      // is a create request with overwrite=true, create will follow different
+      // flow.
+      boolean triggerConditionalCreateOverwrite = false;
+      if (overwrite
+          && abfsConfiguration.isConditionalCreateOverwriteEnabled()) {
+        triggerConditionalCreateOverwrite = true;
+      }
+
+      AbfsRestOperation op;
+      if (triggerConditionalCreateOverwrite) {
+        op = conditionalCreateOverwriteFile(relativePath,
+            statistics,
+            isNamespaceEnabled ? getOctalNotation(permission) : null,
+            isNamespaceEnabled ? getOctalNotation(umask) : null,
+            isAppendBlob
+        );
+
+      } else {
+        op = client.createPath(relativePath, true,
+            overwrite,
+            isNamespaceEnabled ? getOctalNotation(permission) : null,
+            isNamespaceEnabled ? getOctalNotation(umask) : null,
+            isAppendBlob,
+            null);
+      }
       perfInfo.registerResult(op.getResult()).registerSuccess(true);
 
       return new AbfsOutputStream(
@@ -479,6 +502,74 @@ public class AzureBlobFileSystemStore implements Closeable {
     }
   }
 
+  /**
+   * Conditional create overwrite flow ensures that create overwrites is done
+   * only if there is match for eTag of existing file.
+   * @param relativePath
+   * @param statistics
+   * @param permission
+   * @param umask
+   * @param isAppendBlob
+   * @return
+   * @throws AzureBlobFileSystemException
+   */
+  private AbfsRestOperation conditionalCreateOverwriteFile(final String relativePath,
+      final FileSystem.Statistics statistics,
+      final String permission,
+      final String umask,
+      final boolean isAppendBlob) throws AzureBlobFileSystemException {
+    AbfsRestOperation op;
+
+    try {
+      // Trigger a create with overwrite=false first so that eTag fetch can be
+      // avoided for cases when no pre-existing file is present (major portion
+      // of create file traffic falls into the case of no pre-existing file).
+      op = client.createPath(relativePath, true,
+          false, permission, umask, isAppendBlob, null);
+    } catch (AbfsRestOperationException e) {
+      if (e.getStatusCode() == HttpURLConnection.HTTP_CONFLICT) {
+        // File pre-exists, fetch eTag
+        try {
+          op = client.getPathStatus(relativePath, false);
+        } catch (AbfsRestOperationException ex) {
+          if (ex.getStatusCode() == HttpURLConnection.HTTP_NOT_FOUND) {
+            // Is a parallel access case, as file which was found to be
+            // present went missing by this request.
+            throw new ConcurrentWriteOperationDetectedException(
+                "Parallel access to the create path detected. Failing request "
+                    + "to honor single writer semantics");
+          } else {
+            throw ex;
+          }
+        }
+
+        String eTag = op.getResult()
+            .getResponseHeader(HttpHeaderConfigurations.ETAG);
+
+        try {
+          // overwrite only if eTag matches with the file properties fetched befpre
+          op = client.createPath(relativePath, true,
+              true, permission, umask, isAppendBlob, eTag);
+        } catch (AbfsRestOperationException ex) {
+          if (ex.getStatusCode() == HttpURLConnection.HTTP_PRECON_FAILED) {
+            // Is a parallel access case, as file with eTag was just queried
+            // and precondition failure can happen only when another file with
+            // different etag got created.
+            throw new ConcurrentWriteOperationDetectedException(
+                "Parallel access to the create path detected. Failing request "
+                    + "to honor single writer semantics");
+          } else {
+            throw ex;
+          }
+        }
+      } else {
+        throw e;
+      }
+    }
+
+    return op;
+  }
+
   private AbfsOutputStreamContext populateAbfsOutputStreamContext(boolean isAppendBlob) {
     int bufferSize = abfsConfiguration.getWriteBufferSize();
     if (isAppendBlob && bufferSize > FileSystemConfigurations.APPENDBLOB_MAX_WRITE_BUFFER_SIZE) {
@@ -508,7 +599,7 @@ public class AzureBlobFileSystemStore implements Closeable {
 
       final AbfsRestOperation op = client.createPath(getRelativePath(path), false, true,
               isNamespaceEnabled ? getOctalNotation(permission) : null,
-              isNamespaceEnabled ? getOctalNotation(umask) : null, false);
+              isNamespaceEnabled ? getOctalNotation(umask) : null, false, null);
       perfInfo.registerResult(op.getResult()).registerSuccess(true);
     }
   }
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
index 681390c..c15c470 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
@@ -67,6 +67,10 @@ public final class ConfigurationKeys {
   public static final String FS_AZURE_ENABLE_AUTOTHROTTLING = "fs.azure.enable.autothrottling";
   public static final String FS_AZURE_ALWAYS_USE_HTTPS = "fs.azure.always.use.https";
   public static final String FS_AZURE_ATOMIC_RENAME_KEY = "fs.azure.atomic.rename.key";
+  /** This config ensures that during create overwrite an existing file will be
+   *  overwritten only if there is a match on the eTag of existing file.
+   */
+  public static final String FS_AZURE_ENABLE_CONDITIONAL_CREATE_OVERWRITE = "fs.azure.enable.conditional.create.overwrite";
   /** Provides a config to provide comma separated path prefixes on which Appendblob based files are created
    *  Default is empty. **/
   public static final String FS_AZURE_APPEND_BLOB_KEY = "fs.azure.appendblob.directories";
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
index f70d102..fa0ee6a 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
@@ -70,6 +70,7 @@ public final class FileSystemConfigurations {
   public static final boolean DEFAULT_AZURE_SKIP_USER_GROUP_METADATA_DURING_INITIALIZATION = false;
 
   public static final String DEFAULT_FS_AZURE_ATOMIC_RENAME_DIRECTORIES = "/hbase";
+  public static final boolean DEFAULT_FS_AZURE_ENABLE_CONDITIONAL_CREATE_OVERWRITE = true;
   public static final String DEFAULT_FS_AZURE_APPEND_BLOB_DIRECTORIES = "";
 
   public static final int DEFAULT_READ_AHEAD_QUEUE_DEPTH = -1;
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemMkDir.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/exceptions/ConcurrentWriteOperationDetectedException.java
similarity index 54%
copy from hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemMkDir.java
copy to hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/exceptions/ConcurrentWriteOperationDetectedException.java
index 382d396..79813dd 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemMkDir.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/exceptions/ConcurrentWriteOperationDetectedException.java
@@ -16,33 +16,17 @@
  * limitations under the License.
  */
 
-package org.apache.hadoop.fs.azurebfs;
-
-import org.junit.Test;
-
-import org.apache.hadoop.fs.Path;
-
-import static org.apache.hadoop.fs.contract.ContractTestUtils.assertMkdirs;
+package org.apache.hadoop.fs.azurebfs.contracts.exceptions;
 
 /**
- * Test mkdir operation.
+ * Thrown when a concurrent write operation is detected.
  */
-public class ITestAzureBlobFileSystemMkDir extends AbstractAbfsIntegrationTest {
-
-  public ITestAzureBlobFileSystemMkDir() throws Exception {
-    super();
-  }
-
-  @Test
-  public void testCreateDirWithExistingDir() throws Exception {
-    final AzureBlobFileSystem fs = getFileSystem();
-    Path path = new Path("testFolder");
-    assertMkdirs(fs, path);
-    assertMkdirs(fs, path);
-  }
+@org.apache.hadoop.classification.InterfaceAudience.Public
+@org.apache.hadoop.classification.InterfaceStability.Evolving
+public class ConcurrentWriteOperationDetectedException
+    extends AzureBlobFileSystemException {
 
-  @Test
-  public void testCreateRoot() throws Exception {
-    assertMkdirs(getFileSystem(), new Path("/"));
+  public ConcurrentWriteOperationDetectedException(String message) {
+    super(message);
   }
 }
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
index 45c1948..0415857 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
@@ -265,7 +265,7 @@ public class AbfsClient implements Closeable {
 
   public AbfsRestOperation createPath(final String path, final boolean isFile, final boolean overwrite,
                                       final String permission, final String umask,
-                                      final boolean isAppendBlob) throws AzureBlobFileSystemException {
+                                      final boolean isAppendBlob, final String eTag) throws AzureBlobFileSystemException {
     final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
     if (!overwrite) {
       requestHeaders.add(new AbfsHttpHeader(IF_NONE_MATCH, AbfsHttpConstants.STAR));
@@ -279,6 +279,10 @@ public class AbfsClient implements Closeable {
       requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.X_MS_UMASK, umask));
     }
 
+    if (eTag != null && !eTag.isEmpty()) {
+      requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.IF_MATCH, eTag));
+    }
+
     final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
     abfsUriQueryBuilder.addQuery(QUERY_PARAM_RESOURCE, isFile ? FILE : DIRECTORY);
     if (isAppendBlob) {
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsNetworkStatistics.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsNetworkStatistics.java
index f6ee7a9..c2dbe93 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsNetworkStatistics.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsNetworkStatistics.java
@@ -110,9 +110,18 @@ public class ITestAbfsNetworkStatistics extends AbstractAbfsIntegrationTest {
     connectionsMade++;
     requestsSent++;
 
+
     try (AbfsOutputStream out = createAbfsOutputStreamWithFlushEnabled(fs,
         sendRequestPath)) {
 
+      // Is a file overwrite case
+      long createRequestCalls = 1;
+      long createTriggeredGFSForETag = 0;
+      if (this.getConfiguration().isConditionalCreateOverwriteEnabled()) {
+        createRequestCalls += 1;
+        createTriggeredGFSForETag = 1;
+      }
+
       for (int i = 0; i < LARGE_OPERATIONS; i++) {
         out.write(testNetworkStatsString.getBytes());
 
@@ -141,17 +150,20 @@ public class ITestAbfsNetworkStatistics extends AbstractAbfsIntegrationTest {
        * wrote each time).
        *
        */
+
+      connectionsMade += createRequestCalls + createTriggeredGFSForETag;
+      requestsSent += createRequestCalls;
       if (fs.getAbfsStore().isAppendBlobKey(fs.makeQualified(sendRequestPath).toString())) {
         // no network calls are made for hflush in case of appendblob
         assertAbfsStatistics(CONNECTIONS_MADE,
-            connectionsMade + 1 + LARGE_OPERATIONS, metricMap);
+            connectionsMade + LARGE_OPERATIONS, metricMap);
         assertAbfsStatistics(SEND_REQUESTS,
-            requestsSent + 1 + LARGE_OPERATIONS, metricMap);
+            requestsSent + LARGE_OPERATIONS, metricMap);
       } else {
         assertAbfsStatistics(CONNECTIONS_MADE,
-            connectionsMade + 1 + LARGE_OPERATIONS * 2, metricMap);
+            connectionsMade + LARGE_OPERATIONS * 2, metricMap);
         assertAbfsStatistics(SEND_REQUESTS,
-            requestsSent + 1 + LARGE_OPERATIONS * 2, metricMap);
+            requestsSent + LARGE_OPERATIONS * 2, metricMap);
       }
       assertAbfsStatistics(AbfsStatistic.BYTES_SENT,
           bytesSent + LARGE_OPERATIONS * (testNetworkStatsString.getBytes().length),
@@ -237,13 +249,21 @@ public class ITestAbfsNetworkStatistics extends AbstractAbfsIntegrationTest {
     try {
 
       /*
-       * Creating a file and writing buffer into it. Also recording the
-       * buffer for future read() call.
+       * Creating a file and writing buffer into it.
+       * This is a file recreate, so it will trigger
+       * 2 extra calls if create overwrite is off by default.
+       * Also recording the buffer for future read() call.
        * This creating outputStream and writing requires 2 *
        * (LARGE_OPERATIONS) get requests.
        */
       StringBuilder largeBuffer = new StringBuilder();
       out = fs.create(getResponsePath);
+
+      long createRequestCalls = 1;
+      if (this.getConfiguration().isConditionalCreateOverwriteEnabled()) {
+        createRequestCalls += 2;
+      }
+
       for (int i = 0; i < LARGE_OPERATIONS; i++) {
         out.write(testResponseString.getBytes());
         out.hflush();
@@ -268,7 +288,8 @@ public class ITestAbfsNetworkStatistics extends AbstractAbfsIntegrationTest {
        *
        * get_response : get_responses(Last assertion) + 1
        * (OutputStream) + 2 * LARGE_OPERATIONS(Writing and flushing
-       * LARGE_OPERATIONS times) + 1(open()) + 1(read()).
+       * LARGE_OPERATIONS times) + 1(open()) + 1(read()) +
+       * 1 (createOverwriteTriggeredGetForeTag).
        *
        * bytes_received : bytes_received(Last assertion) + LARGE_OPERATIONS *
        * bytes wrote each time (bytes_received is equal to bytes wrote in the
@@ -284,7 +305,8 @@ public class ITestAbfsNetworkStatistics extends AbstractAbfsIntegrationTest {
             getResponses + 3 + LARGE_OPERATIONS, metricMap);
       } else {
         assertAbfsStatistics(AbfsStatistic.GET_RESPONSES,
-            getResponses + 3 + 2 * LARGE_OPERATIONS, metricMap);
+            getResponses + 2 + createRequestCalls + 2 * LARGE_OPERATIONS,
+            metricMap);
       }
 
     } finally {
@@ -319,4 +341,4 @@ public class ITestAbfsNetworkStatistics extends AbstractAbfsIntegrationTest {
       IOUtils.cleanupWithLogger(LOG, out);
     }
   }
-}
+}
\ No newline at end of file
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemCreate.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemCreate.java
index 4b8f071..981ed25 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemCreate.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemCreate.java
@@ -21,18 +21,44 @@ package org.apache.hadoop.fs.azurebfs;
 import java.io.FileNotFoundException;
 import java.io.FilterOutputStream;
 import java.io.IOException;
+import java.lang.reflect.Field;
 import java.util.EnumSet;
+import java.util.UUID;
 
 import org.junit.Test;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CreateFlag;
+import org.apache.hadoop.fs.FileAlreadyExistsException;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.test.GenericTestUtils;
 
+import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
+import org.apache.hadoop.fs.azurebfs.contracts.exceptions.ConcurrentWriteOperationDetectedException;
+import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
+import org.apache.hadoop.fs.azurebfs.services.TestAbfsClient;
+import org.apache.hadoop.fs.azurebfs.services.AbfsHttpOperation;
+import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation;
+
+import static java.net.HttpURLConnection.HTTP_CONFLICT;
+import static java.net.HttpURLConnection.HTTP_INTERNAL_ERROR;
+import static java.net.HttpURLConnection.HTTP_NOT_FOUND;
+import static java.net.HttpURLConnection.HTTP_OK;
+import static java.net.HttpURLConnection.HTTP_PRECON_FAILED;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
 import static org.apache.hadoop.fs.contract.ContractTestUtils.assertIsFile;
 import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.CONNECTIONS_MADE;
 
 /**
  * Test create operation.
@@ -188,4 +214,250 @@ public class ITestAzureBlobFileSystemCreate extends
         });
   }
 
+  /**
+   * Tests if the number of connections made for:
+   * 1. create overwrite=false of a file that doesnt pre-exist
+   * 2. create overwrite=false of a file that pre-exists
+   * 3. create overwrite=true of a file that doesnt pre-exist
+   * 4. create overwrite=true of a file that pre-exists
+   * matches the expectation when run against both combinations of
+   * fs.azure.enable.conditional.create.overwrite=true and
+   * fs.azure.enable.conditional.create.overwrite=false
+   * @throws Throwable
+   */
+  @Test
+  public void testDefaultCreateOverwriteFileTest() throws Throwable {
+    testCreateFileOverwrite(true);
+    testCreateFileOverwrite(false);
+  }
+
+  public void testCreateFileOverwrite(boolean enableConditionalCreateOverwrite)
+      throws Throwable {
+    final AzureBlobFileSystem currentFs = getFileSystem();
+    Configuration config = new Configuration(this.getRawConfiguration());
+    config.set("fs.azure.enable.conditional.create.overwrite",
+        Boolean.toString(enableConditionalCreateOverwrite));
+
+    final AzureBlobFileSystem fs =
+        (AzureBlobFileSystem) FileSystem.newInstance(currentFs.getUri(),
+            config);
+
+    long totalConnectionMadeBeforeTest = fs.getInstrumentationMap()
+        .get(CONNECTIONS_MADE.getStatName());
+
+    int createRequestCount = 0;
+    final Path nonOverwriteFile = new Path("/NonOverwriteTest_FileName_"
+        + UUID.randomUUID().toString());
+
+    // Case 1: Not Overwrite - File does not pre-exist
+    // create should be successful
+    fs.create(nonOverwriteFile, false);
+
+    // One request to server to create path should be issued
+    createRequestCount++;
+
+    assertAbfsStatistics(
+        CONNECTIONS_MADE,
+        totalConnectionMadeBeforeTest + createRequestCount,
+        fs.getInstrumentationMap());
+
+    // Case 2: Not Overwrite - File pre-exists
+    intercept(FileAlreadyExistsException.class,
+        () -> fs.create(nonOverwriteFile, false));
+
+    // One request to server to create path should be issued
+    createRequestCount++;
+
+    assertAbfsStatistics(
+        CONNECTIONS_MADE,
+        totalConnectionMadeBeforeTest + createRequestCount,
+        fs.getInstrumentationMap());
+
+    final Path overwriteFilePath = new Path("/OverwriteTest_FileName_"
+        + UUID.randomUUID().toString());
+
+    // Case 3: Overwrite - File does not pre-exist
+    // create should be successful
+    fs.create(overwriteFilePath, true);
+
+    // One request to server to create path should be issued
+    createRequestCount++;
+
+    assertAbfsStatistics(
+        CONNECTIONS_MADE,
+        totalConnectionMadeBeforeTest + createRequestCount,
+        fs.getInstrumentationMap());
+
+    // Case 4: Overwrite - File pre-exists
+    fs.create(overwriteFilePath, true);
+
+    if (enableConditionalCreateOverwrite) {
+      // Three requests will be sent to server to create path,
+      // 1. create without overwrite
+      // 2. GetFileStatus to get eTag
+      // 3. create with overwrite
+      createRequestCount += 3;
+    } else {
+      createRequestCount++;
+    }
+
+    assertAbfsStatistics(
+        CONNECTIONS_MADE,
+        totalConnectionMadeBeforeTest + createRequestCount,
+        fs.getInstrumentationMap());
+  }
+
+  /**
+   * Test negative scenarios with Create overwrite=false as default
+   * With create overwrite=true ending in 3 calls:
+   * A. Create overwrite=false
+   * B. GFS
+   * C. Create overwrite=true
+   *
+   * Scn1: A fails with HTTP409, leading to B which fails with HTTP404,
+   *        detect parallel access
+   * Scn2: A fails with HTTP409, leading to B which fails with HTTP500,
+   *        fail create with HTTP500
+   * Scn3: A fails with HTTP409, leading to B and then C,
+   *        which fails with HTTP412, detect parallel access
+   * Scn4: A fails with HTTP409, leading to B and then C,
+   *        which fails with HTTP500, fail create with HTTP500
+   * Scn5: A fails with HTTP500, fail create with HTTP500
+   */
+  @Test
+  public void testNegativeScenariosForCreateOverwriteDisabled()
+      throws Throwable {
+
+    final AzureBlobFileSystem currentFs = getFileSystem();
+    Configuration config = new Configuration(this.getRawConfiguration());
+    config.set("fs.azure.enable.conditional.create.overwrite",
+        Boolean.toString(true));
+
+    final AzureBlobFileSystem fs =
+        (AzureBlobFileSystem) FileSystem.newInstance(currentFs.getUri(),
+            config);
+
+    // Get mock AbfsClient with current config
+    AbfsClient
+        mockClient
+        = TestAbfsClient.getMockAbfsClient(
+        fs.getAbfsStore().getClient(),
+        fs.getAbfsStore().getAbfsConfiguration());
+
+    AzureBlobFileSystemStore abfsStore = fs.getAbfsStore();
+    abfsStore = setAzureBlobSystemStoreField(abfsStore, "client", mockClient);
+
+    AbfsRestOperation successOp = mock(
+        AbfsRestOperation.class);
+    AbfsHttpOperation http200Op = mock(
+        AbfsHttpOperation.class);
+    when(http200Op.getStatusCode()).thenReturn(HTTP_OK);
+    when(successOp.getResult()).thenReturn(http200Op);
+
+    AbfsRestOperationException conflictResponseEx
+        = getMockAbfsRestOperationException(HTTP_CONFLICT);
+    AbfsRestOperationException serverErrorResponseEx
+        = getMockAbfsRestOperationException(HTTP_INTERNAL_ERROR);
+    AbfsRestOperationException fileNotFoundResponseEx
+        = getMockAbfsRestOperationException(HTTP_NOT_FOUND);
+    AbfsRestOperationException preConditionResponseEx
+        = getMockAbfsRestOperationException(HTTP_PRECON_FAILED);
+
+    doThrow(conflictResponseEx) // Scn1: GFS fails with Http404
+        .doThrow(conflictResponseEx) // Scn2: GFS fails with Http500
+        .doThrow(
+            conflictResponseEx) // Scn3: create overwrite=true fails with Http412
+        .doThrow(
+            conflictResponseEx) // Scn4: create overwrite=true fails with Http500
+        .doThrow(
+            serverErrorResponseEx) // Scn5: create overwrite=false fails with Http500
+        .when(mockClient)
+        .createPath(any(String.class), eq(true), eq(false), any(String.class),
+            any(String.class), any(boolean.class), eq(null));
+
+    doThrow(fileNotFoundResponseEx) // Scn1: GFS fails with Http404
+        .doThrow(serverErrorResponseEx) // Scn2: GFS fails with Http500
+        .doReturn(successOp) // Scn3: create overwrite=true fails with Http412
+        .doReturn(successOp) // Scn4: create overwrite=true fails with Http500
+        .when(mockClient)
+        .getPathStatus(any(String.class), eq(false));
+
+    doThrow(
+        preConditionResponseEx) // Scn3: create overwrite=true fails with Http412
+        .doThrow(
+            serverErrorResponseEx) // Scn4: create overwrite=true fails with Http500
+        .when(mockClient)
+        .createPath(any(String.class), eq(true), eq(true), any(String.class),
+            any(String.class), any(boolean.class), eq(null));
+
+    // Scn1: GFS fails with Http404
+    // Sequence of events expected:
+    // 1. create overwrite=false - fail with conflict
+    // 2. GFS - fail with File Not found
+    // Create will fail with ConcurrentWriteOperationDetectedException
+    validateCreateFileException(ConcurrentWriteOperationDetectedException.class,
+        abfsStore);
+
+    // Scn2: GFS fails with Http500
+    // Sequence of events expected:
+    // 1. create overwrite=false - fail with conflict
+    // 2. GFS - fail with Server error
+    // Create will fail with 500
+    validateCreateFileException(AbfsRestOperationException.class, abfsStore);
+
+    // Scn3: create overwrite=true fails with Http412
+    // Sequence of events expected:
+    // 1. create overwrite=false - fail with conflict
+    // 2. GFS - pass
+    // 3. create overwrite=true - fail with Pre-Condition
+    // Create will fail with ConcurrentWriteOperationDetectedException
+    validateCreateFileException(ConcurrentWriteOperationDetectedException.class,
+        abfsStore);
+
+    // Scn4: create overwrite=true fails with Http500
+    // Sequence of events expected:
+    // 1. create overwrite=false - fail with conflict
+    // 2. GFS - pass
+    // 3. create overwrite=true - fail with Server error
+    // Create will fail with 500
+    validateCreateFileException(AbfsRestOperationException.class, abfsStore);
+
+    // Scn5: create overwrite=false fails with Http500
+    // Sequence of events expected:
+    // 1. create overwrite=false - fail with server error
+    // Create will fail with 500
+    validateCreateFileException(AbfsRestOperationException.class, abfsStore);
+  }
+
+  private AzureBlobFileSystemStore setAzureBlobSystemStoreField(
+      final AzureBlobFileSystemStore abfsStore,
+      final String fieldName,
+      Object fieldObject) throws Exception {
+
+    Field abfsClientField = AzureBlobFileSystemStore.class.getDeclaredField(
+        fieldName);
+    abfsClientField.setAccessible(true);
+    Field modifiersField = Field.class.getDeclaredField("modifiers");
+    modifiersField.setAccessible(true);
+    modifiersField.setInt(abfsClientField,
+        abfsClientField.getModifiers() & ~java.lang.reflect.Modifier.FINAL);
+    abfsClientField.set(abfsStore, fieldObject);
+    return abfsStore;
+  }
+
+  private <E extends Throwable> void validateCreateFileException(final Class<E> exceptionClass, final AzureBlobFileSystemStore abfsStore)
+      throws Exception {
+    FsPermission permission = new FsPermission(FsAction.ALL, FsAction.ALL,
+        FsAction.ALL);
+    FsPermission umask = new FsPermission(FsAction.NONE, FsAction.NONE,
+        FsAction.NONE);
+    Path testPath = new Path("testFile");
+    intercept(
+        exceptionClass,
+        () -> abfsStore.createFile(testPath, null, true, permission, umask));
+  }
+
+  private AbfsRestOperationException getMockAbfsRestOperationException(int status) {
+    return new AbfsRestOperationException(status, "", "", new Exception());
+  }
 }
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemMkDir.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemMkDir.java
index 382d396..de476a6 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemMkDir.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemMkDir.java
@@ -18,12 +18,18 @@
 
 package org.apache.hadoop.fs.azurebfs;
 
+import java.util.UUID;
+
 import org.junit.Test;
 
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
 import static org.apache.hadoop.fs.contract.ContractTestUtils.assertMkdirs;
 
+import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.CONNECTIONS_MADE;
+
 /**
  * Test mkdir operation.
  */
@@ -45,4 +51,58 @@ public class ITestAzureBlobFileSystemMkDir extends AbstractAbfsIntegrationTest {
   public void testCreateRoot() throws Exception {
     assertMkdirs(getFileSystem(), new Path("/"));
   }
+
+  /**
+   * Test mkdir for possible values of fs.azure.disable.default.create.overwrite
+   * @throws Exception
+   */
+  @Test
+  public void testDefaultCreateOverwriteDirTest() throws Throwable {
+    // the config fs.azure.disable.default.create.overwrite should have no
+    // effect on mkdirs
+    testCreateDirOverwrite(true);
+    testCreateDirOverwrite(false);
+  }
+
+  public void testCreateDirOverwrite(boolean enableConditionalCreateOverwrite)
+      throws Throwable {
+    final AzureBlobFileSystem currentFs = getFileSystem();
+    Configuration config = new Configuration(this.getRawConfiguration());
+    config.set("fs.azure.enable.conditional.create.overwrite",
+        Boolean.toString(enableConditionalCreateOverwrite));
+
+    final AzureBlobFileSystem fs =
+        (AzureBlobFileSystem) FileSystem.newInstance(currentFs.getUri(),
+            config);
+
+    long totalConnectionMadeBeforeTest = fs.getInstrumentationMap()
+        .get(CONNECTIONS_MADE.getStatName());
+
+    int mkdirRequestCount = 0;
+    final Path dirPath = new Path("/DirPath_"
+        + UUID.randomUUID().toString());
+
+    // Case 1: Dir does not pre-exist
+    fs.mkdirs(dirPath);
+
+    // One request to server
+    mkdirRequestCount++;
+
+    assertAbfsStatistics(
+        CONNECTIONS_MADE,
+        totalConnectionMadeBeforeTest + mkdirRequestCount,
+        fs.getInstrumentationMap());
+
+    // Case 2: Dir pre-exists
+    // Mkdir on existing Dir path will not lead to failure
+    fs.mkdirs(dirPath);
+
+    // One request to server
+    mkdirRequestCount++;
+
+    assertAbfsStatistics(
+        CONNECTIONS_MADE,
+        totalConnectionMadeBeforeTest + mkdirRequestCount,
+        fs.getInstrumentationMap());
+  }
 }
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsClient.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsClient.java
index 0d904c8..7a7992d 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsClient.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsClient.java
@@ -283,8 +283,7 @@ public final class TestAbfsClient {
   }
 
   public static AbfsClient getMockAbfsClient(AbfsClient baseAbfsClientInstance,
-      AbfsConfiguration abfsConfig)
-      throws IOException, NoSuchFieldException, IllegalAccessException {
+      AbfsConfiguration abfsConfig) throws Exception {
     AuthType currentAuthType = abfsConfig.getAuthType(
         abfsConfig.getAccountName());
 
@@ -310,47 +309,46 @@ public final class TestAbfsClient {
     when(client.createDefaultHeaders()).thenCallRealMethod();
 
     // override baseurl
-    Field baseUrlField = AbfsClient.class.getDeclaredField("baseUrl");
-    baseUrlField.setAccessible(true);
-    Field modifiersField = Field.class.getDeclaredField("modifiers");
-    modifiersField.setAccessible(true);
-    modifiersField.setInt(baseUrlField, baseUrlField.getModifiers() & ~java.lang.reflect.Modifier.FINAL);
-    baseUrlField.set(client, baseAbfsClientInstance.getBaseUrl());
+    client = TestAbfsClient.setAbfsClientField(client, "abfsConfiguration",
+        abfsConfig);
+
+    // override baseurl
+    client = TestAbfsClient.setAbfsClientField(client, "baseUrl",
+        baseAbfsClientInstance.getBaseUrl());
 
     // override auth provider
     if (currentAuthType == AuthType.SharedKey) {
-      Field sharedKeyCredsField = AbfsClient.class.getDeclaredField(
-          "sharedKeyCredentials");
-      sharedKeyCredsField.setAccessible(true);
-      modifiersField.setInt(sharedKeyCredsField,
-          sharedKeyCredsField.getModifiers()
-              & ~java.lang.reflect.Modifier.FINAL);
-      sharedKeyCredsField.set(client, new SharedKeyCredentials(
-          abfsConfig.getAccountName().substring(0,
-              abfsConfig.getAccountName().indexOf(DOT)),
-          abfsConfig.getStorageAccountKey()));
+      client = TestAbfsClient.setAbfsClientField(client, "sharedKeyCredentials",
+          new SharedKeyCredentials(
+              abfsConfig.getAccountName().substring(0,
+                  abfsConfig.getAccountName().indexOf(DOT)),
+              abfsConfig.getStorageAccountKey()));
     } else {
-      Field tokenProviderField = AbfsClient.class.getDeclaredField(
-          "tokenProvider");
-      tokenProviderField.setAccessible(true);
-      modifiersField.setInt(tokenProviderField,
-          tokenProviderField.getModifiers()
-              & ~java.lang.reflect.Modifier.FINAL);
-      tokenProviderField.set(client, abfsConfig.getTokenProvider());
+      client = TestAbfsClient.setAbfsClientField(client, "tokenProvider",
+          abfsConfig.getTokenProvider());
     }
 
     // override user agent
     String userAgent = "APN/1.0 Azure Blob FS/3.4.0-SNAPSHOT (PrivateBuild "
         + "JavaJRE 1.8.0_252; Linux 5.3.0-59-generic/amd64; openssl-1.0; "
         + "UNKNOWN/UNKNOWN) MSFT";
-    Field userAgentField = AbfsClient.class.getDeclaredField(
-        "userAgent");
-    userAgentField.setAccessible(true);
-    modifiersField.setInt(userAgentField,
-        userAgentField.getModifiers()
-            & ~java.lang.reflect.Modifier.FINAL);
-    userAgentField.set(client, userAgent);
+    client = TestAbfsClient.setAbfsClientField(client, "userAgent", userAgent);
 
     return client;
   }
+
+  private static AbfsClient setAbfsClientField(
+      final AbfsClient client,
+      final String fieldName,
+      Object fieldObject) throws Exception {
+
+    Field field = AbfsClient.class.getDeclaredField(fieldName);
+    field.setAccessible(true);
+    Field modifiersField = Field.class.getDeclaredField("modifiers");
+    modifiersField.setAccessible(true);
+    modifiersField.setInt(field,
+        field.getModifiers() & ~java.lang.reflect.Modifier.FINAL);
+    field.set(client, fieldObject);
+    return client;
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[hadoop] 09/09: HADOOP-17301. ABFS: read-ahead error reporting breaks buffer management (#2369)

Posted by tm...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tmarquardt pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/hadoop.git

commit d5b4d04b0df9661e23904bb30f369552cc0ec95d
Author: Sneha Vijayarajan <sn...@gmail.com>
AuthorDate: Tue Oct 13 21:00:34 2020 +0530

    HADOOP-17301. ABFS: read-ahead error reporting breaks buffer management (#2369)
    
    
    Fixes read-ahead buffer management issues introduced by HADOOP-16852,
     "ABFS: Send error back to client for Read Ahead request failure".
    
    Contributed by Sneha Vijayarajan
---
 .../fs/azurebfs/services/ReadBufferManager.java    | 36 ++++++++++++++--
 .../fs/azurebfs/services/TestAbfsInputStream.java  | 49 ++++++++++++++++++++++
 2 files changed, 82 insertions(+), 3 deletions(-)

diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java
index 73c23b0..d7e031b 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java
@@ -22,6 +22,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.LinkedList;
 import java.util.Queue;
@@ -218,6 +219,8 @@ final class ReadBufferManager {
       return false;  // there are no evict-able buffers
     }
 
+    long currentTimeInMs = currentTimeMillis();
+
     // first, try buffers where all bytes have been consumed (approximated as first and last bytes consumed)
     for (ReadBuffer buf : completedReadList) {
       if (buf.isFirstByteConsumed() && buf.isLastByteConsumed()) {
@@ -242,14 +245,30 @@ final class ReadBufferManager {
     }
 
     // next, try any old nodes that have not been consumed
+    // Failed read buffers (with buffer index=-1) that are older than
+    // thresholdAge should be cleaned up, but at the same time should not
+    // report successful eviction.
+    // Queue logic expects that a buffer is freed up for read ahead when
+    // eviction is successful, whereas a failed ReadBuffer would have released
+    // its buffer when its status was set to READ_FAILED.
     long earliestBirthday = Long.MAX_VALUE;
+    ArrayList<ReadBuffer> oldFailedBuffers = new ArrayList<>();
     for (ReadBuffer buf : completedReadList) {
-      if (buf.getTimeStamp() < earliestBirthday) {
+      if ((buf.getBufferindex() != -1)
+          && (buf.getTimeStamp() < earliestBirthday)) {
         nodeToEvict = buf;
         earliestBirthday = buf.getTimeStamp();
+      } else if ((buf.getBufferindex() == -1)
+          && (currentTimeInMs - buf.getTimeStamp()) > thresholdAgeMilliseconds) {
+        oldFailedBuffers.add(buf);
       }
     }
-    if ((currentTimeMillis() - earliestBirthday > thresholdAgeMilliseconds) && (nodeToEvict != null)) {
+
+    for (ReadBuffer buf : oldFailedBuffers) {
+      evict(buf);
+    }
+
+    if ((currentTimeInMs - earliestBirthday > thresholdAgeMilliseconds) && (nodeToEvict != null)) {
       return evict(nodeToEvict);
     }
 
@@ -417,7 +436,6 @@ final class ReadBufferManager {
       if (result == ReadBufferStatus.AVAILABLE && bytesActuallyRead > 0) {
         buffer.setStatus(ReadBufferStatus.AVAILABLE);
         buffer.setLength(bytesActuallyRead);
-        completedReadList.add(buffer);
       } else {
         freeList.push(buffer.getBufferindex());
         // buffer will be deleted as per the eviction policy.
@@ -464,4 +482,16 @@ final class ReadBufferManager {
   void callTryEvict() {
     tryEvict();
   }
+
+  /**
+   * Test method that can mimic no free buffers scenario and also add a ReadBuffer
+   * into completedReadList. This readBuffer will get picked up by TryEvict()
+   * next time a new queue request comes in.
+   * @param buf that needs to be added to completedReadlist
+   */
+  @VisibleForTesting
+  void testMimicFullUseAndAddFailedBuffer(ReadBuffer buf) {
+    freeList.clear();
+    completedReadList.add(buf);
+  }
 }
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java
index c9dacd6..ae72c5a 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java
@@ -23,9 +23,12 @@ import java.io.IOException;
 import org.junit.Assert;
 import org.junit.Test;
 
+import org.assertj.core.api.Assertions;
+
 import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest;
 import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
 import org.apache.hadoop.fs.azurebfs.contracts.exceptions.TimeoutException;
+import org.apache.hadoop.fs.azurebfs.contracts.services.ReadBufferStatus;
 import org.apache.hadoop.fs.azurebfs.utils.TestCachedSASToken;
 
 import static org.mockito.ArgumentMatchers.any;
@@ -49,6 +52,8 @@ public class TestAbfsInputStream extends
   private static final int TWO_KB = 2 * 1024;
   private static final int THREE_KB = 3 * 1024;
   private static final int REDUCED_READ_BUFFER_AGE_THRESHOLD = 3000; // 3 sec
+  private static final int INCREASED_READ_BUFFER_AGE_THRESHOLD =
+      REDUCED_READ_BUFFER_AGE_THRESHOLD * 10; // 30 sec
 
   private AbfsRestOperation getMockRestOp() {
     AbfsRestOperation op = mock(AbfsRestOperation.class);
@@ -182,7 +187,38 @@ public class TestAbfsInputStream extends
     checkEvictedStatus(inputStream, 0, false);
   }
 
+  @Test
+  public void testFailedReadAheadEviction() throws Exception {
+    AbfsClient client = getMockAbfsClient();
+    AbfsRestOperation successOp = getMockRestOp();
+    ReadBufferManager.setThresholdAgeMilliseconds(INCREASED_READ_BUFFER_AGE_THRESHOLD);
+    // Stub :
+    // Read request leads to 3 readahead calls: Fail all 3 readahead-client.read()
+    // Actual read request fails with the failure in readahead thread
+    doThrow(new TimeoutException("Internal Server error"))
+        .when(client)
+        .read(any(String.class), any(Long.class), any(byte[].class),
+            any(Integer.class), any(Integer.class), any(String.class),
+            any(String.class));
+
+    AbfsInputStream inputStream = getAbfsInputStream(client, "testFailedReadAheadEviction.txt");
+
+    // Add a failed buffer to completed queue and set to no free buffers to read ahead.
+    ReadBuffer buff = new ReadBuffer();
+    buff.setStatus(ReadBufferStatus.READ_FAILED);
+    ReadBufferManager.getBufferManager().testMimicFullUseAndAddFailedBuffer(buff);
+
+    // if read failed buffer eviction is tagged as a valid eviction, it will lead to
+    // wrong assumption of queue logic that a buffer is freed up and can lead to :
+    // java.util.EmptyStackException
+    // at java.util.Stack.peek(Stack.java:102)
+    // at java.util.Stack.pop(Stack.java:84)
+    // at org.apache.hadoop.fs.azurebfs.services.ReadBufferManager.queueReadAhead
+    ReadBufferManager.getBufferManager().queueReadAhead(inputStream, 0, ONE_KB);
+  }
+
   /**
+   *
    * The test expects AbfsInputStream to initiate a remote read request for
    * the request offset and length when previous read ahead on the offset had failed.
    * Also checks that the ReadBuffers are evicted as per the ReadBufferManager
@@ -264,12 +300,25 @@ public class TestAbfsInputStream extends
             any(String.class));
 
     AbfsInputStream inputStream = getAbfsInputStream(client, "testSuccessfulReadAhead.txt");
+    int beforeReadCompletedListSize = ReadBufferManager.getBufferManager().getCompletedReadListSize();
 
     // First read request that triggers readAheads.
     inputStream.read(new byte[ONE_KB]);
 
     // Only the 3 readAhead threads should have triggered client.read
     verifyReadCallCount(client, 3);
+    int newAdditionsToCompletedRead =
+        ReadBufferManager.getBufferManager().getCompletedReadListSize()
+            - beforeReadCompletedListSize;
+    // read buffer might be dumped if the ReadBufferManager getblock preceded
+    // the action of buffer being picked for reading from readaheadqueue, so that
+    // inputstream can proceed with read and not be blocked on readahead thread
+    // availability. So the count of buffers in completedReadQueue for the stream
+    // can be same or lesser than the requests triggered to queue readahead.
+    Assertions.assertThat(newAdditionsToCompletedRead)
+        .describedAs(
+            "New additions to completed reads should be same or less than as number of readaheads")
+        .isLessThanOrEqualTo(3);
 
     // Another read request whose requested data is already read ahead.
     inputStream.read(ONE_KB, new byte[ONE_KB], 0, ONE_KB);


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[hadoop] 04/09: Upgrade store REST API version to 2019-12-12

Posted by tm...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tmarquardt pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/hadoop.git

commit 4072323de407e7693ac43f4f8370c51f417ed874
Author: Sneha Vijayarajan <sn...@gmail.com>
AuthorDate: Mon Aug 17 22:47:18 2020 +0530

    Upgrade store REST API version to 2019-12-12
    
    - Contributed by Sneha Vijayarajan
---
 .../main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java    | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
index b4447b9..45c1948 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
@@ -62,7 +62,7 @@ public class AbfsClient implements Closeable {
   public static final Logger LOG = LoggerFactory.getLogger(AbfsClient.class);
   private final URL baseUrl;
   private final SharedKeyCredentials sharedKeyCredentials;
-  private final String xMsVersion = "2018-11-09";
+  private final String xMsVersion = "2019-12-12";
   private final ExponentialRetryPolicy retryPolicy;
   private final String filesystem;
   private final AbfsConfiguration abfsConfiguration;


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[hadoop] 06/09: HADOOP-17166. ABFS: configure output stream thread pool (#2179)

Posted by tm...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tmarquardt pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/hadoop.git

commit f208da286cddebe594c240ed6e4c8c4850f1faeb
Author: bilaharith <52...@users.noreply.github.com>
AuthorDate: Wed Sep 9 21:11:36 2020 +0530

    HADOOP-17166. ABFS: configure output stream thread pool (#2179)
    
    
    Adds the options to control the size of the per-output-stream threadpool
    when writing data through the abfs connector
    
    * fs.azure.write.max.concurrent.requests
    * fs.azure.write.max.requests.to.queue
    
    Contributed by Bilahari T H
---
 .../hadoop/fs/azurebfs/AbfsConfiguration.java      | 22 ++++++
 .../fs/azurebfs/AzureBlobFileSystemStore.java      |  2 +
 .../fs/azurebfs/constants/ConfigurationKeys.java   |  2 +
 .../fs/azurebfs/services/AbfsOutputStream.java     | 18 ++++-
 .../azurebfs/services/AbfsOutputStreamContext.java | 24 +++++++
 .../hadoop-azure/src/site/markdown/abfs.md         | 13 ++++
 .../azurebfs/services/ITestAbfsOutputStream.java   | 78 ++++++++++++++++++++++
 .../fs/azurebfs/services/TestAbfsOutputStream.java |  7 +-
 8 files changed, 163 insertions(+), 3 deletions(-)

diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
index 85bd37a..66d4853 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
@@ -86,6 +86,14 @@ public class AbfsConfiguration{
       DefaultValue = DEFAULT_FS_AZURE_ACCOUNT_IS_HNS_ENABLED)
   private String isNamespaceEnabledAccount;
 
+  @IntegerConfigurationValidatorAnnotation(ConfigurationKey = AZURE_WRITE_MAX_CONCURRENT_REQUESTS,
+      DefaultValue = -1)
+  private int writeMaxConcurrentRequestCount;
+
+  @IntegerConfigurationValidatorAnnotation(ConfigurationKey = AZURE_WRITE_MAX_REQUESTS_TO_QUEUE,
+      DefaultValue = -1)
+  private int maxWriteRequestsToQueue;
+
   @IntegerConfigurationValidatorAnnotation(ConfigurationKey = AZURE_WRITE_BUFFER_SIZE,
       MinValue = MIN_BUFFER_SIZE,
       MaxValue = MAX_BUFFER_SIZE,
@@ -822,6 +830,20 @@ public class AbfsConfiguration{
         oauthTokenFetchRetryDeltaBackoff);
   }
 
+  public int getWriteMaxConcurrentRequestCount() {
+    if (this.writeMaxConcurrentRequestCount < 1) {
+      return 4 * Runtime.getRuntime().availableProcessors();
+    }
+    return this.writeMaxConcurrentRequestCount;
+  }
+
+  public int getMaxWriteRequestsToQueue() {
+    if (this.maxWriteRequestsToQueue < 1) {
+      return 2 * getWriteMaxConcurrentRequestCount();
+    }
+    return this.maxWriteRequestsToQueue;
+  }
+
   @VisibleForTesting
   void setReadBufferSize(int bufferSize) {
     this.readBufferSize = bufferSize;
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
index 9861e3a..23d2b5a 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
@@ -490,6 +490,8 @@ public class AzureBlobFileSystemStore implements Closeable {
             .disableOutputStreamFlush(abfsConfiguration.isOutputStreamFlushDisabled())
             .withStreamStatistics(new AbfsOutputStreamStatisticsImpl())
             .withAppendBlob(isAppendBlob)
+            .withWriteMaxConcurrentRequestCount(abfsConfiguration.getWriteMaxConcurrentRequestCount())
+            .withMaxWriteRequestsToQueue(abfsConfiguration.getMaxWriteRequestsToQueue())
             .build();
   }
 
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
index 5f1ad31..681390c 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
@@ -52,6 +52,8 @@ public final class ConfigurationKeys {
   public static final String AZURE_OAUTH_TOKEN_FETCH_RETRY_DELTA_BACKOFF = "fs.azure.oauth.token.fetch.retry.delta.backoff";
 
   // Read and write buffer sizes defined by the user
+  public static final String AZURE_WRITE_MAX_CONCURRENT_REQUESTS = "fs.azure.write.max.concurrent.requests";
+  public static final String AZURE_WRITE_MAX_REQUESTS_TO_QUEUE = "fs.azure.write.max.requests.to.queue";
   public static final String AZURE_WRITE_BUFFER_SIZE = "fs.azure.write.request.size";
   public static final String AZURE_READ_BUFFER_SIZE = "fs.azure.read.request.size";
   public static final String AZURE_BLOCK_SIZE_PROPERTY_NAME = "fs.azure.block.size";
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java
index 6c1e177..1991638 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java
@@ -70,6 +70,7 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
   private byte[] buffer;
   private int bufferIndex;
   private final int maxConcurrentRequestCount;
+  private final int maxRequestsThatCanBeQueued;
 
   private ConcurrentLinkedDeque<WriteOperation> writeOperations;
   private final ThreadPoolExecutor threadExecutor;
@@ -119,8 +120,11 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
     if (this.isAppendBlob) {
       this.maxConcurrentRequestCount = 1;
     } else {
-      this.maxConcurrentRequestCount = 4 * Runtime.getRuntime().availableProcessors();
+      this.maxConcurrentRequestCount = abfsOutputStreamContext
+          .getWriteMaxConcurrentRequestCount();
     }
+    this.maxRequestsThatCanBeQueued = abfsOutputStreamContext
+        .getMaxWriteRequestsToQueue();
     this.threadExecutor
         = new ThreadPoolExecutor(maxConcurrentRequestCount,
         maxConcurrentRequestCount,
@@ -371,7 +375,7 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
     final long offset = position;
     position += bytesLength;
 
-    if (threadExecutor.getQueue().size() >= maxConcurrentRequestCount * 2) {
+    if (threadExecutor.getQueue().size() >= maxRequestsThatCanBeQueued) {
       long start = System.currentTimeMillis();
       waitForTaskToComplete();
       outputStreamStatistics.timeSpentTaskWait(start, System.currentTimeMillis());
@@ -543,6 +547,16 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
     return writeOperations.size();
   }
 
+  @VisibleForTesting
+  int getMaxConcurrentRequestCount() {
+    return this.maxConcurrentRequestCount;
+  }
+
+  @VisibleForTesting
+  int getMaxRequestsThatCanBeQueued() {
+    return maxRequestsThatCanBeQueued;
+  }
+
   /**
    * Appending AbfsOutputStream statistics to base toString().
    *
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamContext.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamContext.java
index 03e4aba..2dce5dc 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamContext.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamContext.java
@@ -33,6 +33,10 @@ public class AbfsOutputStreamContext extends AbfsStreamContext {
 
   private boolean isAppendBlob;
 
+  private int writeMaxConcurrentRequestCount;
+
+  private int maxWriteRequestsToQueue;
+
   public AbfsOutputStreamContext(final long sasTokenRenewPeriodForStreamsInSeconds) {
     super(sasTokenRenewPeriodForStreamsInSeconds);
   }
@@ -71,6 +75,18 @@ public class AbfsOutputStreamContext extends AbfsStreamContext {
     return this;
   }
 
+  public AbfsOutputStreamContext withWriteMaxConcurrentRequestCount(
+      final int writeMaxConcurrentRequestCount) {
+    this.writeMaxConcurrentRequestCount = writeMaxConcurrentRequestCount;
+    return this;
+  }
+
+  public AbfsOutputStreamContext withMaxWriteRequestsToQueue(
+      final int maxWriteRequestsToQueue) {
+    this.maxWriteRequestsToQueue = maxWriteRequestsToQueue;
+    return this;
+  }
+
   public int getWriteBufferSize() {
     return writeBufferSize;
   }
@@ -90,4 +106,12 @@ public class AbfsOutputStreamContext extends AbfsStreamContext {
   public boolean isAppendBlob() {
     return isAppendBlob;
   }
+
+  public int getWriteMaxConcurrentRequestCount() {
+    return this.writeMaxConcurrentRequestCount;
+  }
+
+  public int getMaxWriteRequestsToQueue() {
+    return this.maxWriteRequestsToQueue;
+  }
 }
diff --git a/hadoop-tools/hadoop-azure/src/site/markdown/abfs.md b/hadoop-tools/hadoop-azure/src/site/markdown/abfs.md
index 4640bab..79b897b 100644
--- a/hadoop-tools/hadoop-azure/src/site/markdown/abfs.md
+++ b/hadoop-tools/hadoop-azure/src/site/markdown/abfs.md
@@ -796,6 +796,19 @@ will be -1. To disable readaheads, set this value to 0. If your workload is
  doing only random reads (non-sequential) or you are seeing throttling, you
   may try setting this value to 0.
 
+To run under limited memory situations configure the following. Especially
+when there are too many writes from the same process. 
+
+`fs.azure.write.max.concurrent.requests`: To set the maximum concurrent
+ write requests from an AbfsOutputStream instance  to server at any point of
+ time. Effectively this will be the threadpool size within the
+ AbfsOutputStream instance. Set the value in between 1 to 8 both inclusive.
+
+`fs.azure.write.max.requests.to.queue`: To set the maximum write requests
+ that can be queued. Memory consumption of AbfsOutputStream instance can be
+ tuned with this config considering each queued request holds a buffer. Set
+ the value 3 or 4 times the value set for s.azure.write.max.concurrent.requests.
+
 ### <a name="securityconfigoptions"></a> Security Options
 `fs.azure.always.use.https`: Enforces to use HTTPS instead of HTTP when the flag
 is made true. Irrespective of the flag, AbfsClient will use HTTPS if the secure
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsOutputStream.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsOutputStream.java
new file mode 100644
index 0000000..7f91116
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsOutputStream.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import org.assertj.core.api.Assertions;
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest;
+import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;
+import org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys;
+
+/**
+ * Test create operation.
+ */
+public class ITestAbfsOutputStream extends AbstractAbfsIntegrationTest {
+  private static final Path TEST_FILE_PATH = new Path("testfile");
+
+  public ITestAbfsOutputStream() throws Exception {
+    super();
+  }
+
+  @Test
+  public void testMaxRequestsAndQueueCapacityDefaults() throws Exception {
+    Configuration conf = getRawConfiguration();
+    final AzureBlobFileSystem fs = getFileSystem(conf);
+    try (FSDataOutputStream out = fs.create(TEST_FILE_PATH)) {
+    AbfsOutputStream stream = (AbfsOutputStream) out.getWrappedStream();
+    Assertions.assertThat(stream.getMaxConcurrentRequestCount()).describedAs(
+        "maxConcurrentRequests should be " + getConfiguration()
+            .getWriteMaxConcurrentRequestCount())
+        .isEqualTo(getConfiguration().getWriteMaxConcurrentRequestCount());
+    Assertions.assertThat(stream.getMaxRequestsThatCanBeQueued()).describedAs(
+        "maxRequestsToQueue should be " + getConfiguration()
+            .getMaxWriteRequestsToQueue())
+        .isEqualTo(getConfiguration().getMaxWriteRequestsToQueue());
+    }
+  }
+
+  @Test
+  public void testMaxRequestsAndQueueCapacity() throws Exception {
+    Configuration conf = getRawConfiguration();
+    int maxConcurrentRequests = 6;
+    int maxRequestsToQueue = 10;
+    conf.set(ConfigurationKeys.AZURE_WRITE_MAX_CONCURRENT_REQUESTS,
+        "" + maxConcurrentRequests);
+    conf.set(ConfigurationKeys.AZURE_WRITE_MAX_REQUESTS_TO_QUEUE,
+        "" + maxRequestsToQueue);
+    final AzureBlobFileSystem fs = getFileSystem(conf);
+    FSDataOutputStream out = fs.create(TEST_FILE_PATH);
+    AbfsOutputStream stream = (AbfsOutputStream) out.getWrappedStream();
+    Assertions.assertThat(stream.getMaxConcurrentRequestCount())
+        .describedAs("maxConcurrentRequests should be " + maxConcurrentRequests)
+        .isEqualTo(maxConcurrentRequests);
+    Assertions.assertThat(stream.getMaxRequestsThatCanBeQueued())
+        .describedAs("maxRequestsToQueue should be " + maxRequestsToQueue)
+        .isEqualTo(maxRequestsToQueue);
+  }
+
+}
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsOutputStream.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsOutputStream.java
index 4105aa1..aab0248 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsOutputStream.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsOutputStream.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.fs.azurebfs.services;
 
+import java.io.IOException;
 import java.util.Arrays;
 import java.util.HashSet;
 import java.util.Random;
@@ -54,13 +55,17 @@ public final class TestAbfsOutputStream {
   private AbfsOutputStreamContext populateAbfsOutputStreamContext(int writeBufferSize,
             boolean isFlushEnabled,
             boolean disableOutputStreamFlush,
-            boolean isAppendBlob) {
+            boolean isAppendBlob) throws IOException, IllegalAccessException {
+    AbfsConfiguration abfsConf = new AbfsConfiguration(new Configuration(),
+        accountName1);
     return new AbfsOutputStreamContext(2)
             .withWriteBufferSize(writeBufferSize)
             .enableFlush(isFlushEnabled)
             .disableOutputStreamFlush(disableOutputStreamFlush)
             .withStreamStatistics(new AbfsOutputStreamStatisticsImpl())
             .withAppendBlob(isAppendBlob)
+            .withWriteMaxConcurrentRequestCount(abfsConf.getWriteMaxConcurrentRequestCount())
+            .withMaxWriteRequestsToQueue(abfsConf.getMaxWriteRequestsToQueue())
             .build();
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[hadoop] 03/09: HADOOP-17149. ABFS: Fixing the testcase ITestGetNameSpaceEnabled

Posted by tm...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tmarquardt pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/hadoop.git

commit e481d0108aa8f152610ca16b813c1dfaa568f1cc
Author: bilaharith <52...@users.noreply.github.com>
AuthorDate: Wed Aug 5 22:31:04 2020 +0530

    HADOOP-17149. ABFS: Fixing the testcase ITestGetNameSpaceEnabled
    
    - Contributed by Bilahari T H
---
 .../fs/azurebfs/ITestGetNameSpaceEnabled.java      | 23 --------
 .../hadoop/fs/azurebfs/ITestSharedKeyAuth.java     | 61 ++++++++++++++++++++++
 2 files changed, 61 insertions(+), 23 deletions(-)

diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestGetNameSpaceEnabled.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestGetNameSpaceEnabled.java
index 4268ff2..29de126 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestGetNameSpaceEnabled.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestGetNameSpaceEnabled.java
@@ -32,8 +32,6 @@ import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
 import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
-import org.apache.hadoop.fs.azurebfs.services.AuthType;
 
 import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.doReturn;
@@ -46,7 +44,6 @@ import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.D
 import static org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.TEST_CONFIGURATION_FILE_NAME;
 import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_CREATE_REMOTE_FILESYSTEM_DURING_INITIALIZATION;
 import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ACCOUNT_IS_HNS_ENABLED;
-import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ACCOUNT_KEY_PROPERTY_NAME;
 import static org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.FS_AZURE_TEST_NAMESPACE_ENABLED_ACCOUNT;
 import static org.apache.hadoop.test.LambdaTestUtils.intercept;
 
@@ -146,26 +143,6 @@ public class ITestGetNameSpaceEnabled extends AbstractAbfsIntegrationTest {
   }
 
   @Test
-  public void testFailedRequestWhenCredentialsNotCorrect() throws Exception {
-    Assume.assumeTrue(this.getAuthType() == AuthType.SharedKey);
-    Configuration config = this.getRawConfiguration();
-    config.setBoolean(AZURE_CREATE_REMOTE_FILESYSTEM_DURING_INITIALIZATION, false);
-    String accountName = this.getAccountName();
-    String configkKey = FS_AZURE_ACCOUNT_KEY_PROPERTY_NAME + "." + accountName;
-    // Provide a wrong sharedKey
-    String secret = config.get(configkKey);
-    secret = (char) (secret.charAt(0) + 1) + secret.substring(1);
-    config.set(configkKey, secret);
-
-    AzureBlobFileSystem fs = this.getFileSystem(config);
-    intercept(AbfsRestOperationException.class,
-            "\"Server failed to authenticate the request. Make sure the value of Authorization header is formed correctly including the signature.\", 403",
-            ()-> {
-              fs.getIsNamespaceEnabled();
-            });
-  }
-
-  @Test
   public void testEnsureGetAclCallIsMadeOnceWhenConfigIsInvalid()
       throws Exception {
     unsetConfAndEnsureGetAclCallIsMadeOnce();
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestSharedKeyAuth.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestSharedKeyAuth.java
new file mode 100644
index 0000000..ab55ffa
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestSharedKeyAuth.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.azurebfs;
+
+import org.junit.Assume;
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
+import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
+import org.apache.hadoop.fs.azurebfs.services.AuthType;
+
+import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_CREATE_REMOTE_FILESYSTEM_DURING_INITIALIZATION;
+import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ACCOUNT_KEY_PROPERTY_NAME;
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+
+public class ITestSharedKeyAuth extends AbstractAbfsIntegrationTest {
+
+  public ITestSharedKeyAuth() throws Exception {
+    super();
+  }
+
+  @Test
+  public void testWithWrongSharedKey() throws Exception {
+    Assume.assumeTrue(this.getAuthType() == AuthType.SharedKey);
+    Configuration config = this.getRawConfiguration();
+    config.setBoolean(AZURE_CREATE_REMOTE_FILESYSTEM_DURING_INITIALIZATION,
+        false);
+    String accountName = this.getAccountName();
+    String configkKey = FS_AZURE_ACCOUNT_KEY_PROPERTY_NAME + "." + accountName;
+    // a wrong sharedKey
+    String secret = "XjUjsGherkDpljuyThd7RpljhR6uhsFjhlxRpmhgD12lnj7lhfRn8kgPt5"
+        + "+MJHS7UJNDER+jn6KP6Jnm2ONQlm==";
+    config.set(configkKey, secret);
+
+    AbfsClient abfsClient = this.getFileSystem(config).getAbfsClient();
+    intercept(AbfsRestOperationException.class,
+        "\"Server failed to authenticate the request. Make sure the value of "
+            + "Authorization header is formed correctly including the "
+            + "signature.\", 403",
+        () -> {
+          abfsClient.getAclStatus("/");
+        });
+  }
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org