You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by tm...@apache.org on 2020/06/19 19:49:55 UTC

[hadoop] branch branch-3.3 updated (7613191 -> 63d236c)

This is an automated email from the ASF dual-hosted git repository.

tmarquardt pushed a change to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/hadoop.git.


    from 7613191  HDFS-15372. Files in snapshots no longer see attribute provider permissions. Contributed by Stephen O'Donnell.
     new 76ee7e5  HADOOP-17002. ABFS: Adding config to determine if the account is HNS enabled or not
     new a2f4434  HADOOP-17018. Intermittent failing of ITestAbfsStreamStatistics in ABFS (#1990)
     new af98f32  HADOOP-16916: ABFS: Delegation SAS generator for integration with Ranger
     new 11307f3  HADOOP-17004. ABFS: Improve the ABFS driver documentation
     new d639c11  HADOOP-17004. Fixing a formatting issue
     new 63d236c  HADOOP-17076: ABFS: Delegation SAS Generator Updates Contributed by Thomas Marquardt.

The 6 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../hadoop-azure/dev-support/findbugs-exclude.xml  |  21 +-
 .../hadoop/fs/azurebfs/AbfsConfiguration.java      |  23 ++
 .../hadoop/fs/azurebfs/AzureBlobFileSystem.java    |  10 +-
 .../fs/azurebfs/AzureBlobFileSystemStore.java      | 166 +++++----
 .../fs/azurebfs/constants/ConfigurationKeys.java   |  10 +
 .../constants/FileSystemConfigurations.java        |   4 +
 ...eption.java => TrileanConversionException.java} |  19 +-
 .../apache/hadoop/fs/azurebfs/enums/Trilean.java   |  80 +++++
 .../hadoop/fs/azurebfs/enums}/package-info.java    |   2 +-
 .../fs/azurebfs/extensions/SASTokenProvider.java   |  25 +-
 .../fs/azurebfs/oauth2/AzureADAuthenticator.java   |   8 +-
 .../hadoop/fs/azurebfs/services/AbfsClient.java    | 115 ++++--
 .../fs/azurebfs/services/AbfsInputStream.java      |   8 +-
 .../azurebfs/services/AbfsInputStreamContext.java  |   3 +-
 .../fs/azurebfs/services/AbfsOutputStream.java     |  12 +-
 .../azurebfs/services/AbfsOutputStreamContext.java |   3 +-
 .../fs/azurebfs/services/AbfsRestOperation.java    |  32 +-
 .../fs/azurebfs/services/AbfsStreamContext.java    |  13 +
 .../hadoop/fs/azurebfs/utils/CachedSASToken.java   | 207 +++++++++++
 .../hadoop-azure/src/site/markdown/abfs.md         | 157 ++++++++-
 .../src/site/markdown/testing_azure.md             |  75 +++-
 .../fs/azurebfs/AbstractAbfsIntegrationTest.java   |  21 +-
 .../fs/azurebfs/ITestAbfsStreamStatistics.java     |  19 +-
 .../ITestAzureBlobFileSystemAuthorization.java     |   9 +
 .../ITestAzureBlobFileSystemCheckAccess.java       |   2 +
 .../ITestAzureBlobFileSystemDelegationSAS.java     | 384 +++++++++++++++++++++
 .../fs/azurebfs/ITestGetNameSpaceEnabled.java      | 141 +++++++-
 .../apache/hadoop/fs/azurebfs/TrileanTests.java    |  92 +++++
 .../azurebfs/constants/TestConfigurationKeys.java  |   8 +
 .../extensions/MockDelegationSASTokenProvider.java | 142 ++++++++
 .../azurebfs/extensions/MockSASTokenProvider.java  |   6 +-
 .../fs/azurebfs/utils/DelegationSASGenerator.java  | 192 +++++++++++
 .../hadoop/fs/azurebfs/utils/SASGenerator.java     | 112 +++---
 ...{SASGenerator.java => ServiceSASGenerator.java} |  75 ++--
 .../fs/azurebfs/utils/TestCachedSASToken.java      | 162 +++++++++
 35 files changed, 2086 insertions(+), 272 deletions(-)
 copy hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/exceptions/{InvalidUriException.java => TrileanConversionException.java} (70%)
 create mode 100644 hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/enums/Trilean.java
 copy {hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token => hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/enums}/package-info.java (95%)
 create mode 100644 hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/CachedSASToken.java
 create mode 100644 hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemDelegationSAS.java
 create mode 100644 hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TrileanTests.java
 create mode 100644 hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/extensions/MockDelegationSASTokenProvider.java
 create mode 100644 hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/utils/DelegationSASGenerator.java
 copy hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/utils/{SASGenerator.java => ServiceSASGenerator.java} (54%)
 create mode 100644 hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/utils/TestCachedSASToken.java


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[hadoop] 05/06: HADOOP-17004. Fixing a formatting issue

Posted by tm...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tmarquardt pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/hadoop.git

commit d639c119867cf382815da0e427776c59aba0f5c8
Author: bilaharith <52...@users.noreply.github.com>
AuthorDate: Thu May 21 00:21:48 2020 +0530

    HADOOP-17004. Fixing a formatting issue
    
    Contributed by Bilahari T H.
---
 hadoop-tools/hadoop-azure/src/site/markdown/abfs.md | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/hadoop-tools/hadoop-azure/src/site/markdown/abfs.md b/hadoop-tools/hadoop-azure/src/site/markdown/abfs.md
index 93141f1..6aa030b 100644
--- a/hadoop-tools/hadoop-azure/src/site/markdown/abfs.md
+++ b/hadoop-tools/hadoop-azure/src/site/markdown/abfs.md
@@ -751,7 +751,7 @@ The following configs are related to read and write operations.
 `fs.azure.io.retry.max.retries`: Sets the number of retries for IO operations.
 Currently this is used only for the server call retry logic. Used within
 AbfsClient class as part of the ExponentialRetryPolicy. The value should be
->= 0.
+greater than or equal to 0.
 
 `fs.azure.write.request.size`: To set the write buffer size. Specify the value
 in bytes. The value should be between 16384 to 104857600 both inclusive (16 KB


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[hadoop] 02/06: HADOOP-17018. Intermittent failing of ITestAbfsStreamStatistics in ABFS (#1990)

Posted by tm...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tmarquardt pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/hadoop.git

commit a2f44344c346601607b9ac1de4598b754f9f2d72
Author: Mehakmeet Singh <me...@gmail.com>
AuthorDate: Thu May 7 16:45:28 2020 +0530

    HADOOP-17018. Intermittent failing of ITestAbfsStreamStatistics in ABFS (#1990)
    
    
    Contributed by: Mehakmeet Singh
    
    In some cases, ABFS-prefetch thread runs in the background which returns some bytes from the buffer and gives an extra readOp. Thus, making readOps values arbitrary and giving intermittent failures in some cases. Hence, readOps values of 2 or 3 are seen in different setups.
---
 .../hadoop/fs/azurebfs/ITestAbfsStreamStatistics.java | 19 ++++++++++++++-----
 1 file changed, 14 insertions(+), 5 deletions(-)

diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsStreamStatistics.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsStreamStatistics.java
index b749f49..51531f6 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsStreamStatistics.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsStreamStatistics.java
@@ -84,12 +84,21 @@ public class ITestAbfsStreamStatistics extends AbstractAbfsIntegrationTest {
 
       LOG.info("Result of Read operation : {}", result);
       /*
-      Testing if 2 read_ops value is coming after reading full content from a
-      file (3 if anything to read from Buffer too).
-      Reason: read() call gives read_ops=1,
-      reading from AbfsClient(http GET) gives read_ops=2.
+       * Testing if 2 read_ops value is coming after reading full content
+       * from a file (3 if anything to read from Buffer too). Reason: read()
+       * call gives read_ops=1, reading from AbfsClient(http GET) gives
+       * read_ops=2.
+       *
+       * In some cases ABFS-prefetch thread runs in the background which
+       * returns some bytes from buffer and gives an extra readOp.
+       * Thus, making readOps values arbitrary and giving intermittent
+       * failures in some cases. Hence, readOps values of 2 or 3 is seen in
+       * different setups.
+       *
        */
-      assertReadWriteOps("read", 2, statistics.getReadOps());
+      assertTrue(String.format("The actual value of %d was not equal to the "
+              + "expected value of 2 or 3", statistics.getReadOps()),
+          statistics.getReadOps() == 2 || statistics.getReadOps() == 3);
 
     } finally {
       IOUtils.cleanupWithLogger(LOG, inForOneOperation,


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[hadoop] 03/06: HADOOP-16916: ABFS: Delegation SAS generator for integration with Ranger

Posted by tm...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tmarquardt pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/hadoop.git

commit af98f32f7dbb9d71915690b66f12c33758011450
Author: Thomas Marquardt <tm...@microsoft.com>
AuthorDate: Tue May 12 17:32:52 2020 +0000

    HADOOP-16916: ABFS: Delegation SAS generator for integration with Ranger
    
    Contributed by Thomas Marquardt.
    
    DETAILS:
    
    Previously we had a SASGenerator class which generated Service SAS, but we need to add DelegationSASGenerator.
    I separated SASGenerator into a base class and two subclasses ServiceSASGenerator and DelegationSASGenreator.  The
    code in ServiceSASGenerator is copied from SASGenerator but the DelegationSASGenrator code is new.  The
    DelegationSASGenerator code demonstrates how to use Delegation SAS with minimal permissions, as would be used
    by an authorization service such as Apache Ranger.  Adding this to the tests helps us lock in this behavior.
    
    Added a MockDelegationSASTokenProvider for testing User Delegation SAS.
    
    Fixed the ITestAzureBlobFileSystemCheckAccess tests to assume oauth client ID so that they are ignored when that
    is not configured.
    
    To improve performance, AbfsInputStream/AbfsOutputStream re-use SAS tokens until the expiry is within 120 seconds.
    After this a new SAS will be requested.  The default period of 120 seconds can be changed using the configuration
    setting "fs.azure.sas.token.renew.period.for.streams".
    
    The SASTokenProvider operation names were updated to correspond better with the ADLS Gen2 REST API, since these
    operations must be provided tokens with appropriate SAS parameters to succeed.
    
    Support for the version 2.0 AAD authentication endpoint was added to AzureADAuthenticator.
    
    The getFileStatus method was mistakenly calling the ADLS Gen2 Get Properties API which requires read permission
    while the getFileStatus call only requires execute permission.  ADLS Gen2 Get Status API is supposed to be used
    for this purpose, so the underlying AbfsClient.getPathStatus API was updated with a includeProperties
    parameter which is set to false for getFileStatus and true for getXAttr.
    
    Added SASTokenProvider support for delete recursive.
    
    Fixed bugs in AzureBlobFileSystem where public methods were not validating the Path by calling makeQualified.  This is
    necessary to avoid passing null paths and to convert relative paths into absolute paths.
    
    Canonicalized the path used for root path internally so that root path can be used with SAS tokens, which requires
    that the path in the URL and the path in the SAS token match.  Internally the code was using
    "//" instead of "/" for the root path, sometimes.  Also related to this, the AzureBlobFileSystemStore.getRelativePath
    API was updated so that we no longer remove and then add back a preceding forward / to paths.
    
    To run ITestAzureBlobFileSystemDelegationSAS tests follow the instructions in testing_azure.md under the heading
    "To run Delegation SAS test cases".  You also need to set "fs.azure.enable.check.access" to true.
    
    TEST RESULTS:
    
    namespace.enabled=true
    auth.type=SharedKey
    -------------------
    $mvn -T 1C -Dparallel-tests=abfs -Dscale -DtestsThreadCount=8 clean verify
    Tests run: 63, Failures: 0, Errors: 0, Skipped: 0
    Tests run: 432, Failures: 0, Errors: 0, Skipped: 41
    Tests run: 206, Failures: 0, Errors: 0, Skipped: 24
    
    namespace.enabled=false
    auth.type=SharedKey
    -------------------
    $mvn -T 1C -Dparallel-tests=abfs -Dscale -DtestsThreadCount=8 clean verify
    Tests run: 63, Failures: 0, Errors: 0, Skipped: 0
    Tests run: 432, Failures: 0, Errors: 0, Skipped: 244
    Tests run: 206, Failures: 0, Errors: 0, Skipped: 24
    
    namespace.enabled=true
    auth.type=SharedKey
    sas.token.provider.type=MockDelegationSASTokenProvider
    enable.check.access=true
    -------------------
    $mvn -T 1C -Dparallel-tests=abfs -Dscale -DtestsThreadCount=8 clean verify
    Tests run: 63, Failures: 0, Errors: 0, Skipped: 0
    Tests run: 432, Failures: 0, Errors: 0, Skipped: 33
    Tests run: 206, Failures: 0, Errors: 0, Skipped: 24
    
    namespace.enabled=true
    auth.type=OAuth
    -------------------
    $mvn -T 1C -Dparallel-tests=abfs -Dscale -DtestsThreadCount=8 clean verify
    Tests run: 63, Failures: 0, Errors: 0, Skipped: 0
    Tests run: 432, Failures: 0, Errors: 1, Skipped: 74
    Tests run: 206, Failures: 0, Errors: 0, Skipped: 140
---
 .../hadoop-azure/dev-support/findbugs-exclude.xml  |  21 +-
 .../hadoop/fs/azurebfs/AbfsConfiguration.java      |   9 +
 .../hadoop/fs/azurebfs/AzureBlobFileSystem.java    |  10 +-
 .../fs/azurebfs/AzureBlobFileSystemStore.java      | 111 ++++---
 .../fs/azurebfs/constants/ConfigurationKeys.java   |   3 +
 .../constants/FileSystemConfigurations.java        |   1 +
 .../fs/azurebfs/extensions/SASTokenProvider.java   |  25 +-
 .../fs/azurebfs/oauth2/AzureADAuthenticator.java   |   8 +-
 .../hadoop/fs/azurebfs/services/AbfsClient.java    | 115 +++++--
 .../fs/azurebfs/services/AbfsInputStream.java      |   8 +-
 .../azurebfs/services/AbfsInputStreamContext.java  |   3 +-
 .../fs/azurebfs/services/AbfsOutputStream.java     |  12 +-
 .../azurebfs/services/AbfsOutputStreamContext.java |   3 +-
 .../fs/azurebfs/services/AbfsRestOperation.java    |  32 +-
 .../fs/azurebfs/services/AbfsStreamContext.java    |  13 +
 .../hadoop/fs/azurebfs/utils/CachedSASToken.java   | 207 ++++++++++++
 .../hadoop-azure/src/site/markdown/abfs.md         |  19 ++
 .../src/site/markdown/testing_azure.md             |  75 ++++-
 .../fs/azurebfs/AbstractAbfsIntegrationTest.java   |  21 +-
 .../ITestAzureBlobFileSystemAuthorization.java     |   9 +
 .../ITestAzureBlobFileSystemCheckAccess.java       |   2 +
 .../ITestAzureBlobFileSystemDelegationSAS.java     | 368 +++++++++++++++++++++
 .../azurebfs/constants/TestConfigurationKeys.java  |   8 +
 .../extensions/MockDelegationSASTokenProvider.java | 142 ++++++++
 .../azurebfs/extensions/MockSASTokenProvider.java  |   6 +-
 .../fs/azurebfs/utils/DelegationSASGenerator.java  | 194 +++++++++++
 .../hadoop/fs/azurebfs/utils/SASGenerator.java     | 110 +++---
 ...{SASGenerator.java => ServiceSASGenerator.java} |  75 ++---
 .../fs/azurebfs/utils/TestCachedSASToken.java      | 162 +++++++++
 29 files changed, 1536 insertions(+), 236 deletions(-)

diff --git a/hadoop-tools/hadoop-azure/dev-support/findbugs-exclude.xml b/hadoop-tools/hadoop-azure/dev-support/findbugs-exclude.xml
index 38de35e..7087d78 100644
--- a/hadoop-tools/hadoop-azure/dev-support/findbugs-exclude.xml
+++ b/hadoop-tools/hadoop-azure/dev-support/findbugs-exclude.xml
@@ -15,6 +15,23 @@
    limitations under the License.
 -->
 <FindBugsFilter>
+     <!-- This reference equality check is an intentional light weight
+          check to avoid re-validating the token when re-used. -->
+     <Match>
+       <Class name="org.apache.hadoop.fs.azurebfs.utils.CachedSASToken" />
+       <Method name="update" />
+       <Bug pattern="ES_COMPARING_PARAMETER_STRING_WITH_EQ" />
+     </Match>
+
+     <!-- This is intentional. The unsynchronized field access is safe
+          and only synchronized access is used when using the sasToken
+          for authorization. -->
+     <Match>
+       <Class name="org.apache.hadoop.fs.azurebfs.utils.CachedSASToken" />
+       <Field name="sasToken" />
+       <Bug pattern="IS2_INCONSISTENT_SYNC" />
+     </Match>
+
      <!-- It is okay to skip up to end of file. No need to check return value. -->
      <Match>
        <Class name="org.apache.hadoop.fs.azure.AzureNativeFileSystemStore" />
@@ -24,7 +41,7 @@
      </Match>
 
      <!-- Returning fully loaded array to iterate through is a convenience
-	  and helps performance. -->
+          and helps performance. -->
      <Match>
        <Class name="org.apache.hadoop.fs.azure.NativeAzureFileSystem$FolderRenamePending" />
        <Method name="getFiles" />
@@ -40,7 +57,7 @@
      </Match>
      
      <!-- Using a key set iterator is fine because this is not a performance-critical
-	  method. -->
+          method. -->
      <Match>
        <Class name="org.apache.hadoop.fs.azure.PageBlobOutputStream" />
        <Method name="logAllStackTraces" />
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
index d60bc37..354176f 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
@@ -210,6 +210,11 @@ public class AbfsConfiguration{
           DefaultValue = DEFAULT_ABFS_LATENCY_TRACK)
   private boolean trackLatency;
 
+  @LongConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_SAS_TOKEN_RENEW_PERIOD_FOR_STREAMS,
+      MinValue = 0,
+      DefaultValue = DEFAULT_SAS_TOKEN_RENEW_PERIOD_FOR_STREAMS_IN_SECONDS)
+  private long sasTokenRenewPeriodForStreamsInSeconds;
+
   private Map<String, String> storageAccountKeys;
 
   public AbfsConfiguration(final Configuration rawConfig, String accountName)
@@ -451,6 +456,10 @@ public class AbfsConfiguration{
     return this.isCheckAccessEnabled;
   }
 
+  public long getSasTokenRenewPeriodForStreamsInSeconds() {
+    return this.sasTokenRenewPeriodForStreamsInSeconds;
+  }
+
   public String getAzureBlockLocationHost() {
     return this.azureBlockLocationHost;
   }
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java
index 700d23a..6694c13 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java
@@ -679,15 +679,17 @@ public class AzureBlobFileSystem extends FileSystem {
       throw new IllegalArgumentException("A valid name and value must be specified.");
     }
 
+    Path qualifiedPath = makeQualified(path);
+
     try {
-      Hashtable<String, String> properties = abfsStore.getPathStatus(path);
+      Hashtable<String, String> properties = abfsStore.getPathStatus(qualifiedPath);
       String xAttrName = ensureValidAttributeName(name);
       boolean xAttrExists = properties.containsKey(xAttrName);
       XAttrSetFlag.validate(name, xAttrExists, flag);
 
       String xAttrValue = abfsStore.decodeAttribute(value);
       properties.put(xAttrName, xAttrValue);
-      abfsStore.setPathProperties(path, properties);
+      abfsStore.setPathProperties(qualifiedPath, properties);
     } catch (AzureBlobFileSystemException ex) {
       checkException(path, ex);
     }
@@ -712,9 +714,11 @@ public class AzureBlobFileSystem extends FileSystem {
       throw new IllegalArgumentException("A valid name must be specified.");
     }
 
+    Path qualifiedPath = makeQualified(path);
+
     byte[] value = null;
     try {
-      Hashtable<String, String> properties = abfsStore.getPathStatus(path);
+      Hashtable<String, String> properties = abfsStore.getPathStatus(qualifiedPath);
       String xAttrName = ensureValidAttributeName(name);
       if (properties.containsKey(xAttrName)) {
         String xAttrValue = properties.get(xAttrName);
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
index d37ceb3..8e0e6c1 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
@@ -247,8 +247,7 @@ public class AzureBlobFileSystemStore implements Closeable {
     LOG.debug("Get root ACL status");
     try (AbfsPerfInfo perfInfo = startTracking("getIsNamespaceEnabled",
         "getAclStatus")) {
-      AbfsRestOperation op = client.getAclStatus(
-          AbfsHttpConstants.FORWARD_SLASH + AbfsHttpConstants.ROOT_PATH);
+      AbfsRestOperation op = client.getAclStatus(AbfsHttpConstants.ROOT_PATH);
       perfInfo.registerResult(op.getResult());
       isNamespaceEnabled = Trilean.getTrilean(true);
       perfInfo.registerSuccess(true);
@@ -353,7 +352,7 @@ public class AzureBlobFileSystemStore implements Closeable {
               path);
 
       final Hashtable<String, String> parsedXmsProperties;
-      final AbfsRestOperation op = client.getPathStatus(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path));
+      final AbfsRestOperation op = client.getPathStatus(getRelativePath(path), true);
       perfInfo.registerResult(op.getResult());
 
       final String xMsProperties = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_PROPERTIES);
@@ -379,7 +378,7 @@ public class AzureBlobFileSystemStore implements Closeable {
       } catch (CharacterCodingException ex) {
         throw new InvalidAbfsRestOperationException(ex);
       }
-      final AbfsRestOperation op = client.setPathProperties(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path), commaSeparatedProperties);
+      final AbfsRestOperation op = client.setPathProperties(getRelativePath(path), commaSeparatedProperties);
       perfInfo.registerResult(op.getResult()).registerSuccess(true);
     }
   }
@@ -418,7 +417,9 @@ public class AzureBlobFileSystemStore implements Closeable {
               umask.toString(),
               isNamespaceEnabled);
 
-      final AbfsRestOperation op = client.createPath(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path), true, overwrite,
+      String relativePath = getRelativePath(path);
+
+      final AbfsRestOperation op = client.createPath(relativePath, true, overwrite,
               isNamespaceEnabled ? getOctalNotation(permission) : null,
               isNamespaceEnabled ? getOctalNotation(umask) : null);
       perfInfo.registerResult(op.getResult()).registerSuccess(true);
@@ -426,14 +427,14 @@ public class AzureBlobFileSystemStore implements Closeable {
       return new AbfsOutputStream(
           client,
           statistics,
-          AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path),
+          relativePath,
           0,
           populateAbfsOutputStreamContext());
     }
   }
 
   private AbfsOutputStreamContext populateAbfsOutputStreamContext() {
-    return new AbfsOutputStreamContext()
+    return new AbfsOutputStreamContext(abfsConfiguration.getSasTokenRenewPeriodForStreamsInSeconds())
             .withWriteBufferSize(abfsConfiguration.getWriteBufferSize())
             .enableFlush(abfsConfiguration.isFlushEnabled())
             .disableOutputStreamFlush(abfsConfiguration.isOutputStreamFlushDisabled())
@@ -452,7 +453,7 @@ public class AzureBlobFileSystemStore implements Closeable {
               umask,
               isNamespaceEnabled);
 
-      final AbfsRestOperation op = client.createPath(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path), false, true,
+      final AbfsRestOperation op = client.createPath(getRelativePath(path), false, true,
               isNamespaceEnabled ? getOctalNotation(permission) : null,
               isNamespaceEnabled ? getOctalNotation(umask) : null);
       perfInfo.registerResult(op.getResult()).registerSuccess(true);
@@ -466,7 +467,9 @@ public class AzureBlobFileSystemStore implements Closeable {
               client.getFileSystem(),
               path);
 
-      final AbfsRestOperation op = client.getPathStatus(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path));
+      String relativePath = getRelativePath(path);
+
+      final AbfsRestOperation op = client.getPathStatus(relativePath, false);
       perfInfo.registerResult(op.getResult());
 
       final String resourceType = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_RESOURCE_TYPE);
@@ -485,14 +488,14 @@ public class AzureBlobFileSystemStore implements Closeable {
 
       // Add statistics for InputStream
       return new AbfsInputStream(client, statistics,
-              AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path), contentLength,
+              relativePath, contentLength,
               populateAbfsInputStreamContext(),
               eTag);
     }
   }
 
   private AbfsInputStreamContext populateAbfsInputStreamContext() {
-    return new AbfsInputStreamContext()
+    return new AbfsInputStreamContext(abfsConfiguration.getSasTokenRenewPeriodForStreamsInSeconds())
             .withReadBufferSize(abfsConfiguration.getReadBufferSize())
             .withReadAheadQueueDepth(abfsConfiguration.getReadAheadQueueDepth())
             .withTolerateOobAppends(abfsConfiguration.getTolerateOobAppends())
@@ -507,7 +510,9 @@ public class AzureBlobFileSystemStore implements Closeable {
               path,
               overwrite);
 
-      final AbfsRestOperation op = client.getPathStatus(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path));
+      String relativePath = getRelativePath(path);
+
+      final AbfsRestOperation op = client.getPathStatus(relativePath, false);
       perfInfo.registerResult(op.getResult());
 
       final String resourceType = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_RESOURCE_TYPE);
@@ -528,7 +533,7 @@ public class AzureBlobFileSystemStore implements Closeable {
       return new AbfsOutputStream(
           client,
           statistics,
-          AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path),
+          relativePath,
           offset,
           populateAbfsOutputStreamContext());
     }
@@ -552,10 +557,13 @@ public class AzureBlobFileSystemStore implements Closeable {
 
     String continuation = null;
 
+    String sourceRelativePath = getRelativePath(source);
+    String destinationRelativePath = getRelativePath(destination);
+
     do {
       try (AbfsPerfInfo perfInfo = startTracking("rename", "renamePath")) {
-        AbfsRestOperation op = client.renamePath(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(source),
-                AbfsHttpConstants.FORWARD_SLASH + getRelativePath(destination), continuation);
+        AbfsRestOperation op = client.renamePath(sourceRelativePath,
+                destinationRelativePath, continuation);
         perfInfo.registerResult(op.getResult());
         continuation = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_CONTINUATION);
         perfInfo.registerSuccess(true);
@@ -582,10 +590,12 @@ public class AzureBlobFileSystemStore implements Closeable {
 
     String continuation = null;
 
+    String relativePath = getRelativePath(path);
+
     do {
       try (AbfsPerfInfo perfInfo = startTracking("delete", "deletePath")) {
         AbfsRestOperation op = client.deletePath(
-                AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path), recursive, continuation);
+                relativePath, recursive, continuation);
         perfInfo.registerResult(op.getResult());
         continuation = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_CONTINUATION);
         perfInfo.registerSuccess(true);
@@ -611,14 +621,14 @@ public class AzureBlobFileSystemStore implements Closeable {
       if (path.isRoot()) {
         if (isNamespaceEnabled) {
           perfInfo.registerCallee("getAclStatus");
-          op = client.getAclStatus(AbfsHttpConstants.FORWARD_SLASH + AbfsHttpConstants.ROOT_PATH);
+          op = client.getAclStatus(getRelativePath(path));
         } else {
           perfInfo.registerCallee("getFilesystemProperties");
           op = client.getFilesystemProperties();
         }
       } else {
         perfInfo.registerCallee("getPathStatus");
-        op = client.getPathStatus(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path));
+        op = client.getPathStatus(getRelativePath(path), false);
       }
 
       perfInfo.registerResult(op.getResult());
@@ -698,14 +708,14 @@ public class AzureBlobFileSystemStore implements Closeable {
             path,
             startFrom);
 
-    final String relativePath = path.isRoot() ? AbfsHttpConstants.EMPTY_STRING : getRelativePath(path);
+    final String relativePath = getRelativePath(path);
     String continuation = null;
 
     // generate continuation token if a valid startFrom is provided.
     if (startFrom != null && !startFrom.isEmpty()) {
       continuation = getIsNamespaceEnabled()
               ? generateContinuationTokenForXns(startFrom)
-              : generateContinuationTokenForNonXns(path.isRoot() ? ROOT_PATH : relativePath, startFrom);
+              : generateContinuationTokenForNonXns(relativePath, startFrom);
     }
 
     ArrayList<FileStatus> fileStatuses = new ArrayList<>();
@@ -793,12 +803,13 @@ public class AzureBlobFileSystemStore implements Closeable {
   }
 
   // generate continuation token for non-xns account
-  private String generateContinuationTokenForNonXns(final String path, final String firstEntryName) {
+  private String generateContinuationTokenForNonXns(String path, final String firstEntryName) {
     Preconditions.checkArgument(!Strings.isNullOrEmpty(firstEntryName)
             && !firstEntryName.startsWith(AbfsHttpConstants.ROOT_PATH),
             "startFrom must be a dir/file name and it can not be a full path");
 
     // Notice: non-xns continuation token requires full path (first "/" is not included) for startFrom
+    path = AbfsClient.getDirectoryQueryParameter(path);
     final String startFrom = (path.isEmpty() || path.equals(ROOT_PATH))
             ? firstEntryName
             : path + ROOT_PATH + firstEntryName;
@@ -846,8 +857,7 @@ public class AzureBlobFileSystemStore implements Closeable {
       final String transformedOwner = identityTransformer.transformUserOrGroupForSetRequest(owner);
       final String transformedGroup = identityTransformer.transformUserOrGroupForSetRequest(group);
 
-      final AbfsRestOperation op = client.setOwner(
-              AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path, true),
+      final AbfsRestOperation op = client.setOwner(getRelativePath(path),
               transformedOwner,
               transformedGroup);
 
@@ -870,8 +880,7 @@ public class AzureBlobFileSystemStore implements Closeable {
               path.toString(),
               permission.toString());
 
-      final AbfsRestOperation op = client.setPermission(
-              AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path, true),
+      final AbfsRestOperation op = client.setPermission(getRelativePath(path),
               String.format(AbfsHttpConstants.PERMISSION_FORMAT, permission.toOctal()));
 
       perfInfo.registerResult(op.getResult()).registerSuccess(true);
@@ -897,7 +906,9 @@ public class AzureBlobFileSystemStore implements Closeable {
       final Map<String, String> modifyAclEntries = AbfsAclHelper.deserializeAclSpec(AclEntry.aclSpecToString(aclSpec));
       boolean useUpn = AbfsAclHelper.isUpnFormatAclEntries(modifyAclEntries);
 
-      final AbfsRestOperation op = client.getAclStatus(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path, true), useUpn);
+      String relativePath = getRelativePath(path);
+
+      final AbfsRestOperation op = client.getAclStatus(relativePath, useUpn);
       perfInfoGet.registerResult(op.getResult());
       final String eTag = op.getResult().getResponseHeader(HttpHeaderConfigurations.ETAG);
 
@@ -909,7 +920,7 @@ public class AzureBlobFileSystemStore implements Closeable {
 
       try (AbfsPerfInfo perfInfoSet = startTracking("modifyAclEntries", "setAcl")) {
         final AbfsRestOperation setAclOp
-                = client.setAcl(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path, true),
+                = client.setAcl(relativePath,
                 AbfsAclHelper.serializeAclSpec(aclEntries), eTag);
         perfInfoSet.registerResult(setAclOp.getResult())
                 .registerSuccess(true)
@@ -936,7 +947,9 @@ public class AzureBlobFileSystemStore implements Closeable {
       final Map<String, String> removeAclEntries = AbfsAclHelper.deserializeAclSpec(AclEntry.aclSpecToString(aclSpec));
       boolean isUpnFormat = AbfsAclHelper.isUpnFormatAclEntries(removeAclEntries);
 
-      final AbfsRestOperation op = client.getAclStatus(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path, true), isUpnFormat);
+      String relativePath = getRelativePath(path);
+
+      final AbfsRestOperation op = client.getAclStatus(relativePath, isUpnFormat);
       perfInfoGet.registerResult(op.getResult());
       final String eTag = op.getResult().getResponseHeader(HttpHeaderConfigurations.ETAG);
 
@@ -948,7 +961,7 @@ public class AzureBlobFileSystemStore implements Closeable {
 
       try (AbfsPerfInfo perfInfoSet = startTracking("removeAclEntries", "setAcl")) {
         final AbfsRestOperation setAclOp =
-                client.setAcl(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path, true),
+                client.setAcl(relativePath,
                 AbfsAclHelper.serializeAclSpec(aclEntries), eTag);
         perfInfoSet.registerResult(setAclOp.getResult())
                 .registerSuccess(true)
@@ -970,7 +983,9 @@ public class AzureBlobFileSystemStore implements Closeable {
               client.getFileSystem(),
               path.toString());
 
-      final AbfsRestOperation op = client.getAclStatus(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path, true));
+      String relativePath = getRelativePath(path);
+
+      final AbfsRestOperation op = client.getAclStatus(relativePath);
       perfInfoGet.registerResult(op.getResult());
       final String eTag = op.getResult().getResponseHeader(HttpHeaderConfigurations.ETAG);
       final Map<String, String> aclEntries = AbfsAclHelper.deserializeAclSpec(op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_ACL));
@@ -988,7 +1003,7 @@ public class AzureBlobFileSystemStore implements Closeable {
 
       try (AbfsPerfInfo perfInfoSet = startTracking("removeDefaultAcl", "setAcl")) {
         final AbfsRestOperation setAclOp =
-                client.setAcl(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path, true),
+                client.setAcl(relativePath,
                 AbfsAclHelper.serializeAclSpec(aclEntries), eTag);
         perfInfoSet.registerResult(setAclOp.getResult())
                 .registerSuccess(true)
@@ -1010,7 +1025,9 @@ public class AzureBlobFileSystemStore implements Closeable {
               client.getFileSystem(),
               path.toString());
 
-      final AbfsRestOperation op = client.getAclStatus(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path, true));
+      String relativePath = getRelativePath(path);
+
+      final AbfsRestOperation op = client.getAclStatus(relativePath);
       perfInfoGet.registerResult(op.getResult());
       final String eTag = op.getResult().getResponseHeader(HttpHeaderConfigurations.ETAG);
 
@@ -1025,7 +1042,7 @@ public class AzureBlobFileSystemStore implements Closeable {
 
       try (AbfsPerfInfo perfInfoSet = startTracking("removeAcl", "setAcl")) {
         final AbfsRestOperation setAclOp =
-                client.setAcl(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path, true),
+                client.setAcl(relativePath,
                 AbfsAclHelper.serializeAclSpec(newAclEntries), eTag);
         perfInfoSet.registerResult(setAclOp.getResult())
                 .registerSuccess(true)
@@ -1052,7 +1069,9 @@ public class AzureBlobFileSystemStore implements Closeable {
       final Map<String, String> aclEntries = AbfsAclHelper.deserializeAclSpec(AclEntry.aclSpecToString(aclSpec));
       final boolean isUpnFormat = AbfsAclHelper.isUpnFormatAclEntries(aclEntries);
 
-      final AbfsRestOperation op = client.getAclStatus(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path, true), isUpnFormat);
+      String relativePath = getRelativePath(path);
+
+      final AbfsRestOperation op = client.getAclStatus(relativePath, isUpnFormat);
       perfInfoGet.registerResult(op.getResult());
       final String eTag = op.getResult().getResponseHeader(HttpHeaderConfigurations.ETAG);
 
@@ -1064,7 +1083,7 @@ public class AzureBlobFileSystemStore implements Closeable {
 
       try (AbfsPerfInfo perfInfoSet = startTracking("setAcl", "setAcl")) {
         final AbfsRestOperation setAclOp =
-                client.setAcl(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path, true),
+                client.setAcl(relativePath,
                 AbfsAclHelper.serializeAclSpec(aclEntries), eTag);
         perfInfoSet.registerResult(setAclOp.getResult())
                 .registerSuccess(true)
@@ -1086,7 +1105,7 @@ public class AzureBlobFileSystemStore implements Closeable {
               client.getFileSystem(),
               path.toString());
 
-      AbfsRestOperation op = client.getAclStatus(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path, true));
+      AbfsRestOperation op = client.getAclStatus(getRelativePath(path));
       AbfsHttpOperation result = op.getResult();
       perfInfo.registerResult(result);
 
@@ -1130,10 +1149,8 @@ public class AzureBlobFileSystemStore implements Closeable {
       return;
     }
     try (AbfsPerfInfo perfInfo = startTracking("access", "checkAccess")) {
-      String relativePath =
-          AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path, true);
       final AbfsRestOperation op = this.client
-          .checkAccess(relativePath, mode.SYMBOL);
+          .checkAccess(getRelativePath(path), mode.SYMBOL);
       perfInfo.registerResult(op.getResult()).registerSuccess(true);
     }
   }
@@ -1201,22 +1218,8 @@ public class AzureBlobFileSystemStore implements Closeable {
   }
 
   private String getRelativePath(final Path path) {
-    return getRelativePath(path, false);
-  }
-
-  private String getRelativePath(final Path path, final boolean allowRootPath) {
     Preconditions.checkNotNull(path, "path");
-    final String relativePath = path.toUri().getPath();
-
-    if (relativePath.length() == 0 || (relativePath.length() == 1 && relativePath.charAt(0) == Path.SEPARATOR_CHAR)) {
-      return allowRootPath ? AbfsHttpConstants.ROOT_PATH : AbfsHttpConstants.EMPTY_STRING;
-    }
-
-    if (relativePath.charAt(0) == Path.SEPARATOR_CHAR) {
-      return relativePath.substring(1);
-    }
-
-    return relativePath;
+    return path.toUri().getPath();
   }
 
   private long parseContentLength(final String contentLength) {
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
index 597f93f..5794d32 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
@@ -142,5 +142,8 @@ public final class ConfigurationKeys {
   /** Key for SAS token provider **/
   public static final String FS_AZURE_SAS_TOKEN_PROVIDER_TYPE = "fs.azure.sas.token.provider.type";
 
+  /** For performance, AbfsInputStream/AbfsOutputStream re-use SAS tokens until the expiry is within this number of seconds. **/
+  public static final String FS_AZURE_SAS_TOKEN_RENEW_PERIOD_FOR_STREAMS = "fs.azure.sas.token.renew.period.for.streams";
+
   private ConfigurationKeys() {}
 }
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
index ef2e708..01d5202 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
@@ -76,6 +76,7 @@ public final class FileSystemConfigurations {
   public static final boolean DEFAULT_USE_UPN = false;
   public static final boolean DEFAULT_ENABLE_CHECK_ACCESS = false;
   public static final boolean DEFAULT_ABFS_LATENCY_TRACK = false;
+  public static final long DEFAULT_SAS_TOKEN_RENEW_PERIOD_FOR_STREAMS_IN_SECONDS = 120;
 
   public static final String DEFAULT_FS_AZURE_USER_AGENT_PREFIX = EMPTY_STRING;
   public static final String DEFAULT_VALUE_UNKNOWN = "UNKNOWN";
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/extensions/SASTokenProvider.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/extensions/SASTokenProvider.java
index 9cfe2bc..2cd44f1 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/extensions/SASTokenProvider.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/extensions/SASTokenProvider.java
@@ -32,22 +32,23 @@ import org.apache.hadoop.security.AccessControlException;
 @InterfaceStability.Unstable
 public interface SASTokenProvider {
 
-  String CONCAT_SOURCE_OPERATION = "concat-source";
-  String CONCAT_TARGET_OPERATION = "concat-target";
-  String CREATEFILE_OPERATION = "create";
+  String CHECK_ACCESS_OPERATION = "check-access";
+  String CREATE_FILE_OPERATION = "create-file";
   String DELETE_OPERATION = "delete";
-  String EXECUTE_OPERATION = "execute";
-  String GETACL_OPERATION = "getaclstatus";
-  String GETFILESTATUS_OPERATION = "getfilestatus";
-  String LISTSTATUS_OPERATION = "liststatus";
-  String MKDIR_OPERATION = "mkdir";
+  String DELETE_RECURSIVE_OPERATION = "delete-recursive";
+  String GET_ACL_OPERATION = "get-acl";
+  String GET_STATUS_OPERATION = "get-status";
+  String GET_PROPERTIES_OPERATION = "get-properties";
+  String LIST_OPERATION = "list";
+  String CREATE_DIRECTORY_OPERATION = "create-directory";
   String READ_OPERATION = "read";
   String RENAME_SOURCE_OPERATION = "rename-source";
   String RENAME_DESTINATION_OPERATION = "rename-destination";
-  String SETACL_OPERATION = "setacl";
-  String SETOWNER_OPERATION = "setowner";
-  String SETPERMISSION_OPERATION = "setpermission";
-  String APPEND_OPERATION = "write";
+  String SET_ACL_OPERATION = "set-acl";
+  String SET_OWNER_OPERATION = "set-owner";
+  String SET_PERMISSION_OPERATION = "set-permission";
+  String SET_PROPERTIES_OPERATION = "set-properties";
+  String WRITE_OPERATION = "write";
 
   /**
    * Initialize authorizer for Azure Blob File System.
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/AzureADAuthenticator.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/AzureADAuthenticator.java
index f836bab..435335f 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/AzureADAuthenticator.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/AzureADAuthenticator.java
@@ -52,6 +52,7 @@ public final class AzureADAuthenticator {
 
   private static final Logger LOG = LoggerFactory.getLogger(AzureADAuthenticator.class);
   private static final String RESOURCE_NAME = "https://storage.azure.com/";
+  private static final String SCOPE = "https://storage.azure.com/.default";
   private static final int CONNECT_TIMEOUT = 30 * 1000;
   private static final int READ_TIMEOUT = 30 * 1000;
 
@@ -85,9 +86,14 @@ public final class AzureADAuthenticator {
     Preconditions.checkNotNull(authEndpoint, "authEndpoint");
     Preconditions.checkNotNull(clientId, "clientId");
     Preconditions.checkNotNull(clientSecret, "clientSecret");
+    boolean isVersion2AuthenticationEndpoint = authEndpoint.contains("/oauth2/v2.0/");
 
     QueryParams qp = new QueryParams();
-    qp.add("resource", RESOURCE_NAME);
+    if (isVersion2AuthenticationEndpoint) {
+      qp.add("scope", SCOPE);
+    } else {
+      qp.add("resource", RESOURCE_NAME);
+    }
     qp.add("grant_type", "client_credentials");
     qp.add("client_id", clientId);
     qp.add("client_secret", clientSecret);
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
index 6bacde4..70d1399 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
@@ -29,6 +29,7 @@ import java.util.List;
 import java.util.Locale;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Strings;
 import org.apache.hadoop.security.ssl.DelegatingSSLSocketFactory;
 import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants;
 import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
@@ -207,13 +208,12 @@ public class AbfsClient implements Closeable {
 
     final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
     abfsUriQueryBuilder.addQuery(QUERY_PARAM_RESOURCE, FILESYSTEM);
-    abfsUriQueryBuilder.addQuery(QUERY_PARAM_DIRECTORY, relativePath == null ? AbfsHttpConstants.EMPTY_STRING
-        : relativePath);
+    abfsUriQueryBuilder.addQuery(QUERY_PARAM_DIRECTORY, getDirectoryQueryParameter(relativePath));
     abfsUriQueryBuilder.addQuery(QUERY_PARAM_RECURSIVE, String.valueOf(recursive));
     abfsUriQueryBuilder.addQuery(QUERY_PARAM_CONTINUATION, continuation);
     abfsUriQueryBuilder.addQuery(QUERY_PARAM_MAXRESULTS, String.valueOf(listMaxResults));
     abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_UPN, String.valueOf(abfsConfiguration.isUpnUsed()));
-    appendSASTokenToQuery(relativePath, SASTokenProvider.LISTSTATUS_OPERATION, abfsUriQueryBuilder);
+    appendSASTokenToQuery(relativePath, SASTokenProvider.LIST_OPERATION, abfsUriQueryBuilder);
 
     final URL url = createRequestUrl(abfsUriQueryBuilder.toString());
     final AbfsRestOperation op = new AbfsRestOperation(
@@ -279,8 +279,8 @@ public class AbfsClient implements Closeable {
     abfsUriQueryBuilder.addQuery(QUERY_PARAM_RESOURCE, isFile ? FILE : DIRECTORY);
 
     String operation = isFile
-        ? SASTokenProvider.CREATEFILE_OPERATION
-        : SASTokenProvider.MKDIR_OPERATION;
+        ? SASTokenProvider.CREATE_FILE_OPERATION
+        : SASTokenProvider.CREATE_DIRECTORY_OPERATION;
     appendSASTokenToQuery(path, operation, abfsUriQueryBuilder);
 
     final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
@@ -325,7 +325,7 @@ public class AbfsClient implements Closeable {
   }
 
   public AbfsRestOperation append(final String path, final long position, final byte[] buffer, final int offset,
-                                  final int length) throws AzureBlobFileSystemException {
+                                  final int length, final String cachedSasToken) throws AzureBlobFileSystemException {
     final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
     // JDK7 does not support PATCH, so to workaround the issue we will use
     // PUT and specify the real method in the X-Http-Method-Override header.
@@ -335,7 +335,9 @@ public class AbfsClient implements Closeable {
     final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
     abfsUriQueryBuilder.addQuery(QUERY_PARAM_ACTION, APPEND_ACTION);
     abfsUriQueryBuilder.addQuery(QUERY_PARAM_POSITION, Long.toString(position));
-    appendSASTokenToQuery(path, SASTokenProvider.APPEND_OPERATION, abfsUriQueryBuilder);
+    // AbfsInputStream/AbfsOutputStream reuse SAS tokens for better performance
+    String sasTokenForReuse = appendSASTokenToQuery(path, SASTokenProvider.WRITE_OPERATION,
+        abfsUriQueryBuilder, cachedSasToken);
 
     final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
     final AbfsRestOperation op = new AbfsRestOperation(
@@ -343,12 +345,13 @@ public class AbfsClient implements Closeable {
             this,
             HTTP_METHOD_PUT,
             url,
-            requestHeaders, buffer, offset, length);
+            requestHeaders, buffer, offset, length, sasTokenForReuse);
     op.execute();
     return op;
   }
 
-  public AbfsRestOperation flush(final String path, final long position, boolean retainUncommittedData, boolean isClose)
+  public AbfsRestOperation flush(final String path, final long position, boolean retainUncommittedData,
+                                 boolean isClose, final String cachedSasToken)
       throws AzureBlobFileSystemException {
     final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
     // JDK7 does not support PATCH, so to workaround the issue we will use
@@ -361,7 +364,9 @@ public class AbfsClient implements Closeable {
     abfsUriQueryBuilder.addQuery(QUERY_PARAM_POSITION, Long.toString(position));
     abfsUriQueryBuilder.addQuery(QUERY_PARAM_RETAIN_UNCOMMITTED_DATA, String.valueOf(retainUncommittedData));
     abfsUriQueryBuilder.addQuery(QUERY_PARAM_CLOSE, String.valueOf(isClose));
-    appendSASTokenToQuery(path, SASTokenProvider.APPEND_OPERATION, abfsUriQueryBuilder);
+    // AbfsInputStream/AbfsOutputStream reuse SAS tokens for better performance
+    String sasTokenForReuse = appendSASTokenToQuery(path, SASTokenProvider.WRITE_OPERATION,
+        abfsUriQueryBuilder, cachedSasToken);
 
     final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
     final AbfsRestOperation op = new AbfsRestOperation(
@@ -369,7 +374,7 @@ public class AbfsClient implements Closeable {
             this,
             HTTP_METHOD_PUT,
             url,
-            requestHeaders);
+            requestHeaders, sasTokenForReuse);
     op.execute();
     return op;
   }
@@ -386,6 +391,7 @@ public class AbfsClient implements Closeable {
 
     final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
     abfsUriQueryBuilder.addQuery(QUERY_PARAM_ACTION, SET_PROPERTIES_ACTION);
+    appendSASTokenToQuery(path, SASTokenProvider.SET_PROPERTIES_OPERATION, abfsUriQueryBuilder);
 
     final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
     final AbfsRestOperation op = new AbfsRestOperation(
@@ -398,12 +404,20 @@ public class AbfsClient implements Closeable {
     return op;
   }
 
-  public AbfsRestOperation getPathStatus(final String path) throws AzureBlobFileSystemException {
+  public AbfsRestOperation getPathStatus(final String path, final boolean includeProperties) throws AzureBlobFileSystemException {
     final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
 
     final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
+    String operation = SASTokenProvider.GET_PROPERTIES_OPERATION;
+    if (!includeProperties) {
+      // The default action (operation) is implicitly to get properties and this action requires read permission
+      // because it reads user defined properties.  If the action is getStatus or getAclStatus, then
+      // only traversal (execute) permission is required.
+      abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_ACTION, AbfsHttpConstants.GET_STATUS);
+      operation = SASTokenProvider.GET_STATUS_OPERATION;
+    }
     abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_UPN, String.valueOf(abfsConfiguration.isUpnUsed()));
-    appendSASTokenToQuery(path, SASTokenProvider.GETFILESTATUS_OPERATION, abfsUriQueryBuilder);
+    appendSASTokenToQuery(path, operation, abfsUriQueryBuilder);
 
     final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
     final AbfsRestOperation op = new AbfsRestOperation(
@@ -417,14 +431,16 @@ public class AbfsClient implements Closeable {
   }
 
   public AbfsRestOperation read(final String path, final long position, final byte[] buffer, final int bufferOffset,
-                                final int bufferLength, final String eTag) throws AzureBlobFileSystemException {
+                                final int bufferLength, final String eTag, String cachedSasToken) throws AzureBlobFileSystemException {
     final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
     requestHeaders.add(new AbfsHttpHeader(RANGE,
             String.format("bytes=%d-%d", position, position + bufferLength - 1)));
     requestHeaders.add(new AbfsHttpHeader(IF_MATCH, eTag));
 
     final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
-    appendSASTokenToQuery(path, SASTokenProvider.READ_OPERATION, abfsUriQueryBuilder);
+    // AbfsInputStream/AbfsOutputStream reuse SAS tokens for better performance
+    String sasTokenForReuse = appendSASTokenToQuery(path, SASTokenProvider.READ_OPERATION,
+        abfsUriQueryBuilder, cachedSasToken);
 
     final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
 
@@ -436,7 +452,7 @@ public class AbfsClient implements Closeable {
             requestHeaders,
             buffer,
             bufferOffset,
-            bufferLength);
+            bufferLength, sasTokenForReuse);
     op.execute();
 
     return op;
@@ -449,7 +465,8 @@ public class AbfsClient implements Closeable {
     final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
     abfsUriQueryBuilder.addQuery(QUERY_PARAM_RECURSIVE, String.valueOf(recursive));
     abfsUriQueryBuilder.addQuery(QUERY_PARAM_CONTINUATION, continuation);
-    appendSASTokenToQuery(path, SASTokenProvider.DELETE_OPERATION, abfsUriQueryBuilder);
+    String operation = recursive ? SASTokenProvider.DELETE_RECURSIVE_OPERATION : SASTokenProvider.DELETE_OPERATION;
+    appendSASTokenToQuery(path, operation, abfsUriQueryBuilder);
 
     final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
     final AbfsRestOperation op = new AbfsRestOperation(
@@ -479,7 +496,7 @@ public class AbfsClient implements Closeable {
 
     final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
     abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_ACTION, AbfsHttpConstants.SET_ACCESS_CONTROL);
-    appendSASTokenToQuery(path, SASTokenProvider.SETOWNER_OPERATION, abfsUriQueryBuilder);
+    appendSASTokenToQuery(path, SASTokenProvider.SET_OWNER_OPERATION, abfsUriQueryBuilder);
 
     final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
     final AbfsRestOperation op = new AbfsRestOperation(
@@ -504,7 +521,7 @@ public class AbfsClient implements Closeable {
 
     final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
     abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_ACTION, AbfsHttpConstants.SET_ACCESS_CONTROL);
-    appendSASTokenToQuery(path, SASTokenProvider.SETPERMISSION_OPERATION, abfsUriQueryBuilder);
+    appendSASTokenToQuery(path, SASTokenProvider.SET_PERMISSION_OPERATION, abfsUriQueryBuilder);
 
     final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
     final AbfsRestOperation op = new AbfsRestOperation(
@@ -537,7 +554,7 @@ public class AbfsClient implements Closeable {
 
     final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
     abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_ACTION, AbfsHttpConstants.SET_ACCESS_CONTROL);
-    appendSASTokenToQuery(path, SASTokenProvider.SETACL_OPERATION, abfsUriQueryBuilder);
+    appendSASTokenToQuery(path, SASTokenProvider.SET_ACL_OPERATION, abfsUriQueryBuilder);
 
     final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
     final AbfsRestOperation op = new AbfsRestOperation(
@@ -560,7 +577,7 @@ public class AbfsClient implements Closeable {
     final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
     abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_ACTION, AbfsHttpConstants.GET_ACCESS_CONTROL);
     abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_UPN, String.valueOf(useUPN));
-    appendSASTokenToQuery(path, SASTokenProvider.GETACL_OPERATION, abfsUriQueryBuilder);
+    appendSASTokenToQuery(path, SASTokenProvider.GET_ACL_OPERATION, abfsUriQueryBuilder);
 
     final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
     final AbfsRestOperation op = new AbfsRestOperation(
@@ -587,6 +604,7 @@ public class AbfsClient implements Closeable {
     AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
     abfsUriQueryBuilder.addQuery(QUERY_PARAM_ACTION, CHECK_ACCESS);
     abfsUriQueryBuilder.addQuery(QUERY_FS_ACTION, rwx);
+    appendSASTokenToQuery(path, SASTokenProvider.CHECK_ACCESS_OPERATION, abfsUriQueryBuilder);
     URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
     AbfsRestOperation op = new AbfsRestOperation(
         AbfsRestOperationType.CheckAccess, this,
@@ -596,22 +614,64 @@ public class AbfsClient implements Closeable {
   }
 
   /**
+   * Get the directory query parameter used by the List Paths REST API and used
+   * as the path in the continuation token.  If the input path is null or the
+   * root path "/", empty string is returned. If the input path begins with '/',
+   * the return value is the substring beginning at offset 1.  Otherwise, the
+   * input path is returned.
+   * @param path the path to be listed.
+   * @return the value of the directory query parameter
+   */
+  public static String getDirectoryQueryParameter(final String path) {
+    String directory = path;
+    if (Strings.isNullOrEmpty(directory)) {
+      directory = AbfsHttpConstants.EMPTY_STRING;
+    } else if (directory.charAt(0) == '/') {
+      directory = directory.substring(1);
+    }
+    return directory;
+  }
+
+  /**
    * If configured for SAS AuthType, appends SAS token to queryBuilder
    * @param path
    * @param operation
    * @param queryBuilder
+   * @return sasToken - returned for optional re-use.
    * @throws SASTokenProviderException
    */
-  private void appendSASTokenToQuery(String path, String operation, AbfsUriQueryBuilder queryBuilder) throws SASTokenProviderException {
+  private String appendSASTokenToQuery(String path, String operation, AbfsUriQueryBuilder queryBuilder) throws SASTokenProviderException {
+    return appendSASTokenToQuery(path, operation, queryBuilder, null);
+  }
+
+  /**
+   * If configured for SAS AuthType, appends SAS token to queryBuilder
+   * @param path
+   * @param operation
+   * @param queryBuilder
+   * @param cachedSasToken - previously acquired SAS token to be reused.
+   * @return sasToken - returned for optional re-use.
+   * @throws SASTokenProviderException
+   */
+  private String appendSASTokenToQuery(String path,
+                                       String operation,
+                                       AbfsUriQueryBuilder queryBuilder,
+                                       String cachedSasToken)
+      throws SASTokenProviderException {
+    String sasToken = null;
     if (this.authType == AuthType.SAS) {
       try {
         LOG.trace("Fetch SAS token for {} on {}", operation, path);
-        String sasToken = sasTokenProvider.getSASToken(this.accountName,
-            this.filesystem, path, operation);
-        if ((sasToken == null) || sasToken.isEmpty()) {
-          throw new UnsupportedOperationException("SASToken received is empty or null");
+        if (cachedSasToken == null) {
+          sasToken = sasTokenProvider.getSASToken(this.accountName,
+              this.filesystem, path, operation);
+          if ((sasToken == null) || sasToken.isEmpty()) {
+            throw new UnsupportedOperationException("SASToken received is empty or null");
+          }
+        } else {
+          sasToken = cachedSasToken;
+          LOG.trace("Using cached SAS token.");
         }
-
         queryBuilder.setSASToken(sasToken);
         LOG.trace("SAS token fetch complete for {} on {}", operation, path);
       } catch (Exception ex) {
@@ -621,6 +681,7 @@ public class AbfsClient implements Closeable {
             ex.toString()));
       }
     }
+    return sasToken;
   }
 
   private URL createRequestUrl(final String query) throws AzureBlobFileSystemException {
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java
index 05c093a..422fa3b 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.fs.FileSystem.Statistics;
 import org.apache.hadoop.fs.StreamCapabilities;
 import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
 import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
+import org.apache.hadoop.fs.azurebfs.utils.CachedSASToken;
 
 import static org.apache.hadoop.util.StringUtils.toLowerCase;
 
@@ -51,6 +52,8 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
   private final boolean tolerateOobAppends; // whether tolerate Oob Appends
   private final boolean readAheadEnabled; // whether enable readAhead;
 
+  // SAS tokens can be re-used until they expire
+  private CachedSASToken cachedSasToken;
   private byte[] buffer = null;            // will be initialized on first use
 
   private long fCursor = 0;  // cursor of buffer within file - offset of next byte to read from remote server
@@ -76,6 +79,8 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
     this.tolerateOobAppends = abfsInputStreamContext.isTolerateOobAppends();
     this.eTag = eTag;
     this.readAheadEnabled = true;
+    this.cachedSasToken = new CachedSASToken(
+        abfsInputStreamContext.getSasTokenRenewPeriodForStreamsInSeconds());
   }
 
   public String getPath() {
@@ -234,7 +239,8 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
     final AbfsRestOperation op;
     AbfsPerfTracker tracker = client.getAbfsPerfTracker();
     try (AbfsPerfInfo perfInfo = new AbfsPerfInfo(tracker, "readRemote", "read")) {
-      op = client.read(path, position, b, offset, length, tolerateOobAppends ? "*" : eTag);
+      op = client.read(path, position, b, offset, length, tolerateOobAppends ? "*" : eTag, cachedSasToken.get());
+      cachedSasToken.update(op.getSasToken());
       perfInfo.registerResult(op.getResult()).registerSuccess(true);
       incrementReadOps();
     } catch (AzureBlobFileSystemException ex) {
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java
index cba7191..a847b56 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java
@@ -29,7 +29,8 @@ public class AbfsInputStreamContext extends AbfsStreamContext {
 
   private boolean tolerateOobAppends;
 
-  public AbfsInputStreamContext() {
+  public AbfsInputStreamContext(final long sasTokenRenewPeriodForStreamsInSeconds) {
+    super(sasTokenRenewPeriodForStreamsInSeconds);
   }
 
   public AbfsInputStreamContext withReadBufferSize(final int readBufferSize) {
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java
index 4297b18..89afca4 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java
@@ -40,6 +40,7 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
 import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
+import org.apache.hadoop.fs.azurebfs.utils.CachedSASToken;
 import org.apache.hadoop.io.ElasticByteBufferPool;
 import org.apache.hadoop.fs.FileSystem.Statistics;
 import org.apache.hadoop.fs.FSExceptionMessages;
@@ -73,6 +74,9 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
   private final ThreadPoolExecutor threadExecutor;
   private final ExecutorCompletionService<Void> completionService;
 
+  // SAS tokens can be re-used until they expire
+  private CachedSASToken cachedSasToken;
+
   /**
    * Queue storing buffers with the size of the Azure block ready for
    * reuse. The pool allows reusing the blocks instead of allocating new
@@ -119,6 +123,8 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
         TimeUnit.SECONDS,
         new LinkedBlockingQueue<>());
     this.completionService = new ExecutorCompletionService<>(this.threadExecutor);
+    this.cachedSasToken = new CachedSASToken(
+        abfsOutputStreamContext.getSasTokenRenewPeriodForStreamsInSeconds());
   }
 
   /**
@@ -330,7 +336,8 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
         try (AbfsPerfInfo perfInfo = new AbfsPerfInfo(tracker,
                 "writeCurrentBufferToService", "append")) {
           AbfsRestOperation op = client.append(path, offset, bytes, 0,
-                  bytesLength);
+                  bytesLength, cachedSasToken.get());
+          cachedSasToken.update(op.getSasToken());
           perfInfo.registerResult(op.getResult());
           byteBufferPool.putBuffer(ByteBuffer.wrap(bytes));
           perfInfo.registerSuccess(true);
@@ -385,7 +392,8 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
     AbfsPerfTracker tracker = client.getAbfsPerfTracker();
     try (AbfsPerfInfo perfInfo = new AbfsPerfInfo(tracker,
             "flushWrittenBytesToServiceInternal", "flush")) {
-      AbfsRestOperation op = client.flush(path, offset, retainUncommitedData, isClose);
+      AbfsRestOperation op = client.flush(path, offset, retainUncommitedData, isClose, cachedSasToken.get());
+      cachedSasToken.update(op.getSasToken());
       perfInfo.registerResult(op.getResult()).registerSuccess(true);
     } catch (AzureBlobFileSystemException ex) {
       if (ex instanceof AbfsRestOperationException) {
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamContext.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamContext.java
index e0aefbf..dcd6c45 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamContext.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamContext.java
@@ -31,7 +31,8 @@ public class AbfsOutputStreamContext extends AbfsStreamContext {
 
   private AbfsOutputStreamStatistics streamStatistics;
 
-  public AbfsOutputStreamContext() {
+  public AbfsOutputStreamContext(final long sasTokenRenewPeriodForStreamsInSeconds) {
+    super(sasTokenRenewPeriodForStreamsInSeconds);
   }
 
   public AbfsOutputStreamContext withWriteBufferSize(
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java
index 445c366..2f9ab88 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java
@@ -53,6 +53,9 @@ public class AbfsRestOperation {
   // request body and all the download methods have a response body.
   private final boolean hasRequestBody;
 
+  // Used only by AbfsInputStream/AbfsOutputStream to reuse SAS tokens.
+  private final String sasToken;
+
   private static final Logger LOG = LoggerFactory.getLogger(AbfsClient.class);
 
   // For uploads, this is the request entity body.  For downloads,
@@ -67,6 +70,10 @@ public class AbfsRestOperation {
     return result;
   }
 
+  String getSasToken() {
+    return sasToken;
+  }
+
   /**
    * Initializes a new REST operation.
    *
@@ -80,6 +87,24 @@ public class AbfsRestOperation {
                     final String method,
                     final URL url,
                     final List<AbfsHttpHeader> requestHeaders) {
+    this(operationType, client, method, url, requestHeaders, null);
+  }
+
+  /**
+   * Initializes a new REST operation.
+   *
+   * @param client The Blob FS client.
+   * @param method The HTTP method (PUT, PATCH, POST, GET, HEAD, or DELETE).
+   * @param url The full URL including query string parameters.
+   * @param requestHeaders The HTTP request headers.
+   * @param sasToken A sasToken for optional re-use by AbfsInputStream/AbfsOutputStream.
+   */
+  AbfsRestOperation(final AbfsRestOperationType operationType,
+                    final AbfsClient client,
+                    final String method,
+                    final URL url,
+                    final List<AbfsHttpHeader> requestHeaders,
+                    final String sasToken) {
     this.operationType = operationType;
     this.client = client;
     this.method = method;
@@ -87,6 +112,7 @@ public class AbfsRestOperation {
     this.requestHeaders = requestHeaders;
     this.hasRequestBody = (AbfsHttpConstants.HTTP_METHOD_PUT.equals(method)
             || AbfsHttpConstants.HTTP_METHOD_PATCH.equals(method));
+    this.sasToken = sasToken;
   }
 
   /**
@@ -101,6 +127,7 @@ public class AbfsRestOperation {
    *               this will hold the response entity body.
    * @param bufferOffset An offset into the buffer where the data beings.
    * @param bufferLength The length of the data in the buffer.
+   * @param sasToken A sasToken for optional re-use by AbfsInputStream/AbfsOutputStream.
    */
   AbfsRestOperation(AbfsRestOperationType operationType,
                     AbfsClient client,
@@ -109,8 +136,9 @@ public class AbfsRestOperation {
                     List<AbfsHttpHeader> requestHeaders,
                     byte[] buffer,
                     int bufferOffset,
-                    int bufferLength) {
-    this(operationType, client, method, url, requestHeaders);
+                    int bufferLength,
+                    String sasToken) {
+    this(operationType, client, method, url, requestHeaders, sasToken);
     this.buffer = buffer;
     this.bufferOffset = bufferOffset;
     this.bufferLength = bufferLength;
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsStreamContext.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsStreamContext.java
index ee77f59..9cd858c 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsStreamContext.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsStreamContext.java
@@ -23,4 +23,17 @@ package org.apache.hadoop.fs.azurebfs.services;
  * to store common configs among input and output streams.
  */
 public abstract class AbfsStreamContext {
+  private long sasTokenRenewPeriodForStreamsInSeconds;
+
+  // hide default constructor
+  private AbfsStreamContext() {
+  }
+
+  public AbfsStreamContext(final long sasTokenRenewPeriodForStreamsInSeconds) {
+    this.sasTokenRenewPeriodForStreamsInSeconds = sasTokenRenewPeriodForStreamsInSeconds;
+  }
+
+  public long getSasTokenRenewPeriodForStreamsInSeconds() {
+    return sasTokenRenewPeriodForStreamsInSeconds;
+  }
 }
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/CachedSASToken.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/CachedSASToken.java
new file mode 100644
index 0000000..620b4c0
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/CachedSASToken.java
@@ -0,0 +1,207 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.utils;
+
+import java.io.UnsupportedEncodingException;
+import java.net.URLDecoder;
+import java.time.OffsetDateTime;
+import java.time.ZoneOffset;
+import java.time.format.DateTimeFormatter;
+import java.time.format.DateTimeParseException;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_SAS_TOKEN_RENEW_PERIOD_FOR_STREAMS_IN_SECONDS;
+import static java.time.temporal.ChronoUnit.SECONDS;
+
+/**
+ * CachedSASToken provides simple utility for managing renewal
+ * of SAS tokens used by Input/OutputStream.  This enables SAS re-use
+ * and reduces calls to the SASTokenProvider.
+ */
+public final class CachedSASToken {
+  public static final Logger LOG = LoggerFactory.getLogger(CachedSASToken.class);
+  private final long minExpirationInSeconds;
+  private String sasToken;
+  private OffsetDateTime sasExpiry;
+
+  /**
+   * Create instance with default minimum expiration.  SAS tokens are
+   * automatically renewed when their expiration is within this period.
+   */
+  public CachedSASToken() {
+    this(DEFAULT_SAS_TOKEN_RENEW_PERIOD_FOR_STREAMS_IN_SECONDS);
+  }
+
+  /**
+   * Create instance with specified minimum expiration.  SAS tokens are
+   * automatically renewed when their expiration is within this period.
+   * @param minExpirationInSeconds
+   */
+  public CachedSASToken(long minExpirationInSeconds) {
+    this.minExpirationInSeconds = minExpirationInSeconds;
+  }
+
+  /**
+   * Checks if the SAS token is expired or near expiration.
+   * @param expiry
+   * @param minExpiryInSeconds
+   * @return true if the SAS is near sasExpiry; otherwise false
+   */
+  private static boolean isNearExpiry(OffsetDateTime expiry, long minExpiryInSeconds) {
+    if (expiry == OffsetDateTime.MIN) {
+      return true;
+    }
+    OffsetDateTime utcNow = OffsetDateTime.now(ZoneOffset.UTC);
+    return utcNow.until(expiry, SECONDS) <= minExpiryInSeconds;
+  }
+
+  /**
+   * Parse the sasExpiry from the SAS token.  The sasExpiry is the minimum
+   * of the ske and se parameters.  The se parameter is required and the
+   * ske parameter is optional.
+   * @param token an Azure Storage SAS token
+   * @return the sasExpiry or OffsetDateTime.MIN if invalid.
+   */
+  private static OffsetDateTime getExpiry(String token) {
+    // return MIN for all invalid input, including a null token
+    if (token == null) {
+      return OffsetDateTime.MIN;
+    }
+
+    String signedExpiry = "se=";
+    int signedExpiryLen = 3;
+
+    int start = token.indexOf(signedExpiry);
+
+    // return MIN if the required se parameter is absent
+    if (start == -1) {
+      return OffsetDateTime.MIN;
+    }
+
+    start += signedExpiryLen;
+
+    // extract the value of se parameter
+    int end = token.indexOf("&", start);
+    String seValue = (end == -1) ? token.substring(start) : token.substring(start, end);
+
+    try {
+      seValue = URLDecoder.decode(seValue, "utf-8");
+    } catch (UnsupportedEncodingException ex) {
+      LOG.error("Error decoding se query parameter ({}) from SAS.", seValue, ex);
+      return OffsetDateTime.MIN;
+    }
+
+    // parse the ISO 8601 date value; return MIN if invalid
+    OffsetDateTime seDate = OffsetDateTime.MIN;
+    try {
+      seDate = OffsetDateTime.parse(seValue, DateTimeFormatter.ISO_DATE_TIME);
+    } catch (DateTimeParseException ex) {
+      LOG.error("Error parsing se query parameter ({}) from SAS.", seValue, ex);
+    }
+
+    String signedKeyExpiry = "ske=";
+    int signedKeyExpiryLen = 4;
+
+    // if ske is present, the sasExpiry is the minimum of ske and se
+    start = token.indexOf(signedKeyExpiry);
+
+    // return seDate if ske is absent
+    if (start == -1) {
+      return seDate;
+    }
+
+    start += signedKeyExpiryLen;
+
+    // extract the value of ske parameter
+    end = token.indexOf("&", start);
+    String skeValue = (end == -1) ? token.substring(start) : token.substring(start, end);
+
+    try {
+      skeValue = URLDecoder.decode(skeValue, "utf-8");
+    } catch (UnsupportedEncodingException ex) {
+      LOG.error("Error decoding ske query parameter ({}) from SAS.", skeValue, ex);
+      return OffsetDateTime.MIN;
+    }
+
+    // parse the ISO 8601 date value; return MIN if invalid
+    OffsetDateTime skeDate = OffsetDateTime.MIN;
+    try {
+      skeDate = OffsetDateTime.parse(skeValue, DateTimeFormatter.ISO_DATE_TIME);
+    } catch (DateTimeParseException ex) {
+      LOG.error("Error parsing ske query parameter ({}) from SAS.", skeValue, ex);
+      return OffsetDateTime.MIN;
+    }
+
+    return skeDate.isBefore(seDate) ? skeDate : seDate;
+  }
+
+  /**
+   * Updates the cached SAS token and expiry.  If the token is invalid, the cached value
+   * is cleared by setting it to null and the expiry to MIN.
+   * @param token an Azure Storage SAS token
+   */
+  public void update(String token) {
+    // quickly return if token and cached sasToken are the same reference
+    // Note: use of operator == is intentional
+    if (token == sasToken) {
+      return;
+    }
+    OffsetDateTime newExpiry = getExpiry(token);
+    boolean isInvalid = isNearExpiry(newExpiry, minExpirationInSeconds);
+    synchronized (this) {
+      if (isInvalid) {
+        sasToken = null;
+        sasExpiry = OffsetDateTime.MIN;
+      } else {
+        sasToken = token;
+        sasExpiry = newExpiry;
+      }
+    }
+  }
+
+  /**
+   * Gets the token if still valid.
+   * @return the token or null if it is expired or near sasExpiry.
+   */
+  public String get() {
+    // quickly return null if not set
+    if (sasToken == null) {
+      return null;
+    }
+    String token;
+    OffsetDateTime exp;
+    synchronized (this) {
+      token = sasToken;
+      exp = sasExpiry;
+    }
+    boolean isInvalid = isNearExpiry(exp, minExpirationInSeconds);
+    return isInvalid ? null : token;
+  }
+
+  @VisibleForTesting
+  void setForTesting(String token, OffsetDateTime expiry) {
+    synchronized (this) {
+      sasToken = token;
+      sasExpiry = expiry;
+    }
+  }
+}
\ No newline at end of file
diff --git a/hadoop-tools/hadoop-azure/src/site/markdown/abfs.md b/hadoop-tools/hadoop-azure/src/site/markdown/abfs.md
index 2b4b14d..89f52e7 100644
--- a/hadoop-tools/hadoop-azure/src/site/markdown/abfs.md
+++ b/hadoop-tools/hadoop-azure/src/site/markdown/abfs.md
@@ -313,6 +313,7 @@ driven by them.
 1. Using OAuth 2.0 tokens of one form or another.
 1. Deployed in-Azure with the Azure VMs providing OAuth 2.0 tokens to the application,
  "Managed Instance".
+1. Using Shared Access Signature (SAS) tokens provided by a custom implementation of the SASTokenProvider interface.
 
 What can be changed is what secrets/credentials are used to authenticate the caller.
 
@@ -541,6 +542,24 @@ and optionally `org.apache.hadoop.fs.azurebfs.extensions.BoundDTExtension`.
 
 The declared class also holds responsibility to implement retry logic while fetching access tokens.
 
+### Shared Access Signature (SAS) Token Provider
+
+A Shared Access Signature (SAS) token provider supplies the ABFS connector with SAS
+tokens by implementing the SASTokenProvider interface.
+
+```xml
+<property>
+  <name>fs.azure.account.auth.type</name>
+  <value>SAS</value>
+</property>
+<property>
+  <name>fs.azure.sas.token.provider.type</name>
+  <value>{fully-qualified-class-name-for-implementation-of-SASTokenProvider-interface}</value>
+</property>
+```
+
+The declared class must implement `org.apache.hadoop.fs.azurebfs.extensions.SASTokenProvider`.
+
 ## <a name="technical"></a> Technical notes
 
 ### <a name="proxy"></a> Proxy setup
diff --git a/hadoop-tools/hadoop-azure/src/site/markdown/testing_azure.md b/hadoop-tools/hadoop-azure/src/site/markdown/testing_azure.md
index a26da83..87cbb97 100644
--- a/hadoop-tools/hadoop-azure/src/site/markdown/testing_azure.md
+++ b/hadoop-tools/hadoop-azure/src/site/markdown/testing_azure.md
@@ -646,7 +646,7 @@ hierarchical namespace enabled, and set the following configuration settings:
 <property>
   <name>fs.azure.account.auth.type.{YOUR_ABFS_ACCOUNT_NAME}</name>
   <value>{AUTH TYPE}</value>
-  <description>The authorization type can be SharedKey, OAuth, or Custom. The
+  <description>The authorization type can be SharedKey, OAuth, Custom or SAS. The
   default is SharedKey.</description>
 </property>
 
@@ -794,6 +794,79 @@ hierarchical namespace enabled, and set the following configuration settings:
    -->
 
 ```
+To run Delegation SAS test cases you must use a storage account with the
+hierarchical namespace enabled and set the following configuration settings:
+
+```xml
+<!--=========================== AUTHENTICATION  OPTIONS ===================-->
+<!--=============================   FOR SAS   ===========================-->
+<!-- To run ABFS Delegation SAS tests, you must register an app, create the
+     necessary role assignments, and set the configuration discussed below:
+
+    1) Register an app:
+      a) Login to https://portal.azure.com, select your AAD directory and search for app registrations.
+      b) Click "New registration".
+      c) Provide a display name, such as "abfs-app".
+      d) Set the account type to "Accounts in this organizational directory only ({YOUR_Tenant} only - Single tenant)".
+      e) For Redirect URI select Web and enter "http://localhost".
+      f) Click Register.
+
+    2)  Create necessary role assignments:
+      a) Login to https://portal.azure.com and find the Storage account with hierarchical namespace enabled
+         that you plan to run the tests against.
+      b) Select "Access Control (IAM)".
+      c) Select Role Assignments
+      d) Click Add and select "Add role assignments"
+      e) For Role and enter "Storage Blob Data Owner".
+      f) Under Select enter the name of the app you registered in step 1 and select it.
+      g) Click Save.
+      h) Repeat above steps to create a second role assignment for the app but this time for
+         the "Storage Blob Delegator" role.
+
+    3) Generate a new client secret for the application:
+      a) Login to https://portal.azure.com and find the app registered in step 1.
+      b) Select "Certificates and secrets".
+      c) Click "New client secret".
+      d) Enter a description (eg. Secret1)
+      e) Set expiration period.  Expires in 1 year is good.
+      f) Click Add
+      g) Copy the secret and expiration to a safe location.
+
+    4) Set the following configuration values:
+-->
+
+  <property>
+    <name>fs.azure.sas.token.provider.type</name>
+    <value>org.apache.hadoop.fs.azurebfs.extensions.MockDelegationSASTokenProvider</value>
+    <description>The fully qualified class name of the SAS token provider implementation.</description>
+  </property>
+
+  <property>
+    <name>fs.azure.test.app.service.principal.tenant.id</name>
+    <value>{TID}</value>
+    <description>Tenant ID for the application's service principal.</description>
+  </property>
+
+  <property>
+    <name>fs.azure.test.app.service.principal.object.id</name>
+    <value>{OID}</value>
+    <description>Object ID for the application's service principal.</description>
+  </property>
+
+  <property>
+    <name>fs.azure.test.app.id</name>
+    <value>{app id}</value>
+    <description>The application's ID, also known as the client id.</description>
+  </property>
+
+   <property>
+     <name>fs.azure.test.app.secret</name>
+     <value>{client secret}</value>
+     <description>The application's secret, also known as the client secret.</description>
+   </property>
+
+
+```
 
 If running tests against an endpoint that uses the URL format
 http[s]://[ip]:[port]/[account]/[filesystem] instead of
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java
index 87a3dcd..f41cbd6 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java
@@ -26,6 +26,7 @@ import java.util.UUID;
 import java.util.concurrent.Callable;
 
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -79,6 +80,7 @@ public abstract class AbstractAbfsIntegrationTest extends
   private String testUrl;
   private AuthType authType;
   private boolean useConfiguredFileSystem = false;
+  private boolean usingFilesystemForSASTests = false;
 
   protected AbstractAbfsIntegrationTest() throws Exception {
     fileSystemName = TEST_CONTAINER_PREFIX + UUID.randomUUID().toString();
@@ -175,8 +177,13 @@ public abstract class AbstractAbfsIntegrationTest extends
         return;
       }
 
-      // Delete all uniquely created filesystem from the account
-      if (!useConfiguredFileSystem) {
+      if (usingFilesystemForSASTests) {
+        abfsConfig.set(FS_AZURE_ACCOUNT_AUTH_TYPE_PROPERTY_NAME, AuthType.SharedKey.name());
+        AzureBlobFileSystem tempFs = (AzureBlobFileSystem) FileSystem.newInstance(rawConfig);
+        tempFs.getAbfsStore().deleteFilesystem();
+      }
+      else if (!useConfiguredFileSystem) {
+        // Delete all uniquely created filesystem from the account
         final AzureBlobFileSystemStore abfsStore = abfs.getAbfsStore();
         abfsStore.deleteFilesystem();
 
@@ -225,6 +232,16 @@ public abstract class AbstractAbfsIntegrationTest extends
     useConfiguredFileSystem = true;
   }
 
+  protected void createFilesystemForSASTests() throws Exception {
+    // The SAS tests do not have permission to create a filesystem
+    // so first create temporary instance of the filesystem using SharedKey
+    // then re-use the filesystem it creates with SAS auth instead of SharedKey.
+    AzureBlobFileSystem tempFs = (AzureBlobFileSystem) FileSystem.newInstance(rawConfig);
+    Assert.assertTrue(tempFs.exists(new Path("/")));
+    abfsConfig.set(FS_AZURE_ACCOUNT_AUTH_TYPE_PROPERTY_NAME, AuthType.SAS.name());
+    usingFilesystemForSASTests = true;
+  }
+
   public AzureBlobFileSystem getFileSystem() throws IOException {
     return abfs;
   }
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemAuthorization.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemAuthorization.java
index 94e0ce3..1278e65 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemAuthorization.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemAuthorization.java
@@ -209,46 +209,55 @@ public class ITestAzureBlobFileSystemAuthorization extends AbstractAbfsIntegrati
 
   @Test
   public void testSetOwnerUnauthorized() throws Exception {
+    Assume.assumeTrue(this.getFileSystem().getIsNamespaceEnabled());
     runTest(FileSystemOperations.SetOwner, true);
   }
 
   @Test
   public void testSetPermissionUnauthorized() throws Exception {
+    Assume.assumeTrue(this.getFileSystem().getIsNamespaceEnabled());
     runTest(FileSystemOperations.SetPermissions, true);
   }
 
   @Test
   public void testModifyAclEntriesUnauthorized() throws Exception {
+    Assume.assumeTrue(this.getFileSystem().getIsNamespaceEnabled());
     runTest(FileSystemOperations.ModifyAclEntries, true);
   }
 
   @Test
   public void testRemoveAclEntriesUnauthorized() throws Exception {
+    Assume.assumeTrue(this.getFileSystem().getIsNamespaceEnabled());
     runTest(FileSystemOperations.RemoveAclEntries, true);
   }
 
   @Test
   public void testRemoveDefaultAclUnauthorized() throws Exception {
+    Assume.assumeTrue(this.getFileSystem().getIsNamespaceEnabled());
     runTest(FileSystemOperations.RemoveDefaultAcl, true);
   }
 
   @Test
   public void testRemoveAclUnauthorized() throws Exception {
+    Assume.assumeTrue(this.getFileSystem().getIsNamespaceEnabled());
     runTest(FileSystemOperations.RemoveAcl, true);
   }
 
   @Test
   public void testSetAclUnauthorized() throws Exception {
+    Assume.assumeTrue(this.getFileSystem().getIsNamespaceEnabled());
     runTest(FileSystemOperations.SetAcl, true);
   }
 
   @Test
   public void testGetAclStatusAuthorized() throws Exception {
+    Assume.assumeTrue(this.getFileSystem().getIsNamespaceEnabled());
     runTest(FileSystemOperations.GetAcl, false);
   }
 
   @Test
   public void testGetAclStatusUnauthorized() throws Exception {
+    Assume.assumeTrue(this.getFileSystem().getIsNamespaceEnabled());
     runTest(FileSystemOperations.GetAcl, true);
   }
 
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemCheckAccess.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemCheckAccess.java
index bc5fc59..4189d66 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemCheckAccess.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemCheckAccess.java
@@ -273,6 +273,8 @@ public class ITestAzureBlobFileSystemCheckAccess
         isHNSEnabled);
     Assume.assumeTrue(FS_AZURE_ENABLE_CHECK_ACCESS + " is false",
         isCheckAccessEnabled);
+
+    Assume.assumeNotNull(getRawConfiguration().get(FS_AZURE_BLOB_FS_CLIENT_ID));
   }
 
   private void assertAccessible(Path testFilePath, FsAction fsAction)
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemDelegationSAS.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemDelegationSAS.java
new file mode 100644
index 0000000..07b5804
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemDelegationSAS.java
@@ -0,0 +1,368 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.UUID;
+
+import org.junit.Assume;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants;
+import org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys;
+import org.apache.hadoop.fs.azurebfs.extensions.MockDelegationSASTokenProvider;
+import org.apache.hadoop.fs.azurebfs.services.AuthType;
+import org.apache.hadoop.fs.permission.AclEntry;
+import org.apache.hadoop.fs.permission.AclEntryScope;
+import org.apache.hadoop.fs.permission.AclStatus;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.security.AccessControlException;
+
+import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_SAS_TOKEN_PROVIDER_TYPE;
+import static org.apache.hadoop.fs.azurebfs.utils.AclTestHelpers.aclEntry;
+import static org.apache.hadoop.fs.permission.AclEntryScope.ACCESS;
+import static org.apache.hadoop.fs.permission.AclEntryScope.DEFAULT;
+import static org.apache.hadoop.fs.permission.AclEntryType.GROUP;
+import static org.apache.hadoop.fs.permission.AclEntryType.USER;
+
+/**
+ * Test Perform Authorization Check operation
+ */
+public class ITestAzureBlobFileSystemDelegationSAS extends AbstractAbfsIntegrationTest {
+  private static final String TEST_GROUP = UUID.randomUUID().toString();
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(ITestAzureBlobFileSystemDelegationSAS.class);
+
+  public ITestAzureBlobFileSystemDelegationSAS() throws Exception {
+    // These tests rely on specific settings in azure-auth-keys.xml:
+    String sasProvider = getRawConfiguration().get(FS_AZURE_SAS_TOKEN_PROVIDER_TYPE);
+    Assume.assumeTrue(MockDelegationSASTokenProvider.class.getCanonicalName().equals(sasProvider));
+    Assume.assumeNotNull(getRawConfiguration().get(TestConfigurationKeys.FS_AZURE_TEST_APP_ID));
+    Assume.assumeNotNull(getRawConfiguration().get(TestConfigurationKeys.FS_AZURE_TEST_APP_SECRET));
+    Assume.assumeNotNull(getRawConfiguration().get(TestConfigurationKeys.FS_AZURE_TEST_APP_SERVICE_PRINCIPAL_TENANT_ID));
+    Assume.assumeNotNull(getRawConfiguration().get(TestConfigurationKeys.FS_AZURE_TEST_APP_SERVICE_PRINCIPAL_OBJECT_ID));
+    // The test uses shared key to create a random filesystem and then creates another
+    // instance of this filesystem using SAS authorization.
+    Assume.assumeTrue(this.getAuthType() == AuthType.SharedKey);
+  }
+
+  @Override
+  public void setup() throws Exception {
+    boolean isHNSEnabled = this.getConfiguration().getBoolean(
+        TestConfigurationKeys.FS_AZURE_TEST_NAMESPACE_ENABLED_ACCOUNT, false);
+    Assume.assumeTrue(isHNSEnabled);
+    createFilesystemForSASTests();
+    super.setup();
+  }
+
+  @Test
+  // Test filesystem operations access, create, mkdirs, setOwner, getFileStatus
+  public void testCheckAccess() throws Exception {
+    final AzureBlobFileSystem fs = getFileSystem();
+
+    Path rootPath = new Path("/");
+    fs.setPermission(rootPath, new FsPermission(FsAction.ALL, FsAction.READ_EXECUTE, FsAction.EXECUTE));
+    FileStatus rootStatus = fs.getFileStatus(rootPath);
+    assertEquals("The directory permissions are not expected.", "rwxr-x--x", rootStatus.getPermission().toString());
+
+    Path dirPath = new Path(UUID.randomUUID().toString());
+    fs.mkdirs(dirPath);
+    fs.setOwner(dirPath, MockDelegationSASTokenProvider.TEST_OWNER, null);
+
+    Path filePath = new Path(dirPath, "file1");
+    fs.create(filePath).close();
+    fs.setPermission(filePath, new FsPermission(FsAction.READ, FsAction.READ, FsAction.NONE));
+
+    FileStatus dirStatus = fs.getFileStatus(dirPath);
+    FileStatus fileStatus = fs.getFileStatus(filePath);
+
+    assertEquals("The owner is not expected.", MockDelegationSASTokenProvider.TEST_OWNER, dirStatus.getOwner());
+    assertEquals("The owner is not expected.", MockDelegationSASTokenProvider.TEST_OWNER, fileStatus.getOwner());
+    assertEquals("The directory permissions are not expected.", "rwxr-xr-x", dirStatus.getPermission().toString());
+    assertEquals("The file permissions are not expected.", "r--r-----", fileStatus.getPermission().toString());
+
+    assertTrue(isAccessible(fs, dirPath, FsAction.READ_WRITE));
+    assertFalse(isAccessible(fs, filePath, FsAction.READ_WRITE));
+
+    fs.setPermission(filePath, new FsPermission(FsAction.READ_WRITE, FsAction.READ, FsAction.NONE));
+    fileStatus = fs.getFileStatus(filePath);
+    assertEquals("The file permissions are not expected.", "rw-r-----", fileStatus.getPermission().toString());
+    assertTrue(isAccessible(fs, filePath, FsAction.READ_WRITE));
+
+    fs.setPermission(dirPath, new FsPermission(FsAction.EXECUTE, FsAction.NONE, FsAction.NONE));
+    dirStatus = fs.getFileStatus(dirPath);
+    assertEquals("The file permissions are not expected.", "--x------", dirStatus.getPermission().toString());
+    assertFalse(isAccessible(fs, dirPath, FsAction.READ_WRITE));
+    assertTrue(isAccessible(fs, dirPath, FsAction.EXECUTE));
+
+    fs.setPermission(dirPath, new FsPermission(FsAction.NONE, FsAction.NONE, FsAction.NONE));
+    dirStatus = fs.getFileStatus(dirPath);
+    assertEquals("The file permissions are not expected.", "---------", dirStatus.getPermission().toString());
+    assertFalse(isAccessible(fs, filePath, FsAction.READ_WRITE));
+  }
+
+  private boolean isAccessible(FileSystem fs, Path path, FsAction fsAction)
+      throws IOException {
+    try {
+      fs.access(path, fsAction);
+    } catch (AccessControlException ace) {
+      return false;
+    }
+    return true;
+  }
+
+  @Test
+  // Test filesystem operations create, create with overwrite, append and open.
+  // Test output stream operation write, flush and close
+  // Test input stream operation, read
+  public void testReadAndWrite() throws Exception {
+    final AzureBlobFileSystem fs = getFileSystem();
+    Path reqPath = new Path(UUID.randomUUID().toString());
+
+    final String msg1 = "purple";
+    final String msg2 = "yellow";
+    int expectedFileLength = msg1.length() * 2;
+
+    byte[] readBuffer = new byte[1024];
+
+    // create file with content "purplepurple"
+    try (FSDataOutputStream stream = fs.create(reqPath)) {
+      stream.writeBytes(msg1);
+      stream.hflush();
+      stream.writeBytes(msg1);
+    }
+
+    // open file and verify content is "purplepurple"
+    try (FSDataInputStream stream = fs.open(reqPath)) {
+      int bytesRead = stream.read(readBuffer, 0, readBuffer.length);
+      assertEquals(expectedFileLength, bytesRead);
+      String fileContent = new String(readBuffer, 0, bytesRead, StandardCharsets.UTF_8);
+      assertEquals(msg1 + msg1, fileContent);
+    }
+
+    // overwrite file with content "yellowyellow"
+    try (FSDataOutputStream stream = fs.create(reqPath)) {
+      stream.writeBytes(msg2);
+      stream.hflush();
+      stream.writeBytes(msg2);
+    }
+
+    // open file and verify content is "yellowyellow"
+    try (FSDataInputStream stream = fs.open(reqPath)) {
+      int bytesRead = stream.read(readBuffer, 0, readBuffer.length);
+      assertEquals(expectedFileLength, bytesRead);
+      String fileContent = new String(readBuffer, 0, bytesRead, StandardCharsets.UTF_8);
+      assertEquals(msg2 + msg2, fileContent);
+    }
+
+    // append to file so final content is "yellowyellowpurplepurple"
+    try (FSDataOutputStream stream = fs.append(reqPath)) {
+      stream.writeBytes(msg1);
+      stream.hflush();
+      stream.writeBytes(msg1);
+    }
+
+    // open file and verify content is "yellowyellowpurplepurple"
+    try (FSDataInputStream stream = fs.open(reqPath)) {
+      int bytesRead = stream.read(readBuffer, 0, readBuffer.length);
+      assertEquals(2 * expectedFileLength, bytesRead);
+      String fileContent = new String(readBuffer, 0, bytesRead, StandardCharsets.UTF_8);
+      assertEquals(msg2 + msg2 + msg1 + msg1, fileContent);
+    }
+  }
+
+  @Test
+  // Test rename file and rename folder
+  public void testRename() throws Exception {
+    final AzureBlobFileSystem fs = getFileSystem();
+    Path sourceDir = new Path(UUID.randomUUID().toString());
+    Path sourcePath = new Path(sourceDir, UUID.randomUUID().toString());
+    Path destinationPath = new Path(sourceDir, UUID.randomUUID().toString());
+    Path destinationDir = new Path(UUID.randomUUID().toString());
+
+    // create file with content "hello"
+    try (FSDataOutputStream stream = fs.create(sourcePath)) {
+      stream.writeBytes("hello");
+    }
+
+    assertFalse(fs.exists(destinationPath));
+    fs.rename(sourcePath, destinationPath);
+    assertFalse(fs.exists(sourcePath));
+    assertTrue(fs.exists(destinationPath));
+
+    assertFalse(fs.exists(destinationDir));
+    fs.rename(sourceDir, destinationDir);
+    assertFalse(fs.exists(sourceDir));
+    assertTrue(fs.exists(destinationDir));
+  }
+
+  @Test
+  // Test delete file and delete folder
+  public void testDelete() throws Exception {
+    final AzureBlobFileSystem fs = getFileSystem();
+    Path dirPath = new Path(UUID.randomUUID().toString());
+    Path filePath = new Path(dirPath, UUID.randomUUID().toString());
+
+    // create file with content "hello"
+    try (FSDataOutputStream stream = fs.create(filePath)) {
+      stream.writeBytes("hello");
+    }
+
+    assertTrue(fs.exists(filePath));
+    fs.delete(filePath, false);
+    assertFalse(fs.exists(filePath));
+
+    assertTrue(fs.exists(dirPath));
+    fs.delete(dirPath, false);
+    assertFalse(fs.exists(dirPath));
+  }
+
+  @Test
+  // Test delete folder recursive
+  public void testDeleteRecursive() throws Exception {
+    final AzureBlobFileSystem fs = getFileSystem();
+    Path dirPath = new Path(UUID.randomUUID().toString());
+    Path filePath = new Path(dirPath, UUID.randomUUID().toString());
+
+    // create file with content "hello"
+    try (FSDataOutputStream stream = fs.create(filePath)) {
+      stream.writeBytes("hello");
+    }
+
+    assertTrue(fs.exists(dirPath));
+    assertTrue(fs.exists(filePath));
+    fs.delete(dirPath, true);
+    assertFalse(fs.exists(filePath));
+    assertFalse(fs.exists(dirPath));
+  }
+
+  @Test
+  // Test list on file, directory and root path
+  public void testList() throws Exception {
+    final AzureBlobFileSystem fs = getFileSystem();
+    Path dirPath = new Path(UUID.randomUUID().toString());
+    Path filePath = new Path(dirPath, UUID.randomUUID().toString());
+
+    fs.mkdirs(dirPath);
+
+    // create file with content "hello"
+    try (FSDataOutputStream stream = fs.create(filePath)) {
+      stream.writeBytes("hello");
+    }
+
+    fs.listStatus(filePath);
+    fs.listStatus(dirPath);
+    fs.listStatus(new Path("/"));
+  }
+
+  @Test
+  // Test filesystem operations setAcl, getAclStatus, removeAcl
+  // setPermissions and getFileStatus
+  public void testAcl() throws Exception {
+    final AzureBlobFileSystem fs = getFileSystem();
+    Path reqPath = new Path(UUID.randomUUID().toString());
+
+    fs.create(reqPath).close();
+
+    fs.setAcl(reqPath, Arrays
+        .asList(aclEntry(ACCESS, GROUP, TEST_GROUP, FsAction.ALL)));
+
+    AclStatus acl = fs.getAclStatus(reqPath);
+    assertEquals(MockDelegationSASTokenProvider.TEST_OWNER, acl.getOwner());
+    assertEquals("[group::r--, group:" + TEST_GROUP + ":rwx]", acl.getEntries().toString());
+
+    fs.removeAcl(reqPath);
+    acl = fs.getAclStatus(reqPath);
+    assertEquals("[]", acl.getEntries().toString());
+
+    fs.setPermission(reqPath,
+        new FsPermission(FsAction.ALL, FsAction.NONE, FsAction.NONE));
+
+    FileStatus status = fs.getFileStatus(reqPath);
+    assertEquals("rwx------", status.getPermission().toString());
+
+    acl = fs.getAclStatus(reqPath);
+    assertEquals("rwx------", acl.getPermission().toString());
+  }
+
+  @Test
+  // Test getFileStatus and getAclStatus operations on root path
+  public void testRootPath() throws Exception {
+    final AzureBlobFileSystem fs = getFileSystem();
+    Path rootPath = new Path(AbfsHttpConstants.ROOT_PATH);
+
+    FileStatus status = fs.getFileStatus(rootPath);
+    assertEquals("rwxr-x---", status.getPermission().toString());
+    assertTrue(status.isDirectory());
+
+    AclStatus acl = fs.getAclStatus(rootPath);
+    assertEquals("rwxr-x---", acl.getPermission().toString());
+
+    List<AclEntry> aclSpec = new ArrayList<>();
+    int count = 0;
+    for (AclEntry entry: acl.getEntries()) {
+      aclSpec.add(entry);
+       if (entry.getScope() == AclEntryScope.DEFAULT) {
+         count++;
+       }
+    }
+    assertEquals(0, count);
+
+    aclSpec.add(aclEntry(DEFAULT, USER, "cd548981-afec-4ab9-9d39-f6f2add2fd9b", FsAction.EXECUTE));
+
+    fs.modifyAclEntries(rootPath, aclSpec);
+
+    acl = fs.getAclStatus(rootPath);
+
+    count = 0;
+    for (AclEntry entry: acl.getEntries()) {
+      aclSpec.add(entry);
+      if (entry.getScope() == AclEntryScope.DEFAULT) {
+        count++;
+      }
+    }
+    assertEquals(5, count);
+
+    fs.removeDefaultAcl(rootPath);
+
+    acl = fs.getAclStatus(rootPath);
+
+    count = 0;
+    for (AclEntry entry: acl.getEntries()) {
+      aclSpec.add(entry);
+      if (entry.getScope() == AclEntryScope.DEFAULT) {
+        count++;
+      }
+    }
+    assertEquals(0, count);
+  }
+}
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/constants/TestConfigurationKeys.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/constants/TestConfigurationKeys.java
index c8dcef3..16a3f57 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/constants/TestConfigurationKeys.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/constants/TestConfigurationKeys.java
@@ -45,6 +45,14 @@ public final class TestConfigurationKeys {
   public static final String MOCK_SASTOKENPROVIDER_FAIL_INIT = "mock.sastokenprovider.fail.init";
   public static final String MOCK_SASTOKENPROVIDER_RETURN_EMPTY_SAS_TOKEN = "mock.sastokenprovider.return.empty.sasToken";
 
+  public static final String FS_AZURE_TEST_APP_SERVICE_PRINCIPAL_TENANT_ID = "fs.azure.test.app.service.principal.tenant.id";
+
+  public static final String FS_AZURE_TEST_APP_SERVICE_PRINCIPAL_OBJECT_ID = "fs.azure.test.app.service.principal.object.id";
+
+  public static final String FS_AZURE_TEST_APP_ID = "fs.azure.test.app.id";
+
+  public static final String FS_AZURE_TEST_APP_SECRET = "fs.azure.test.app.secret";
+
   public static final String TEST_CONFIGURATION_FILE_NAME = "azure-test.xml";
   public static final String TEST_CONTAINER_PREFIX = "abfs-testcontainer-";
   public static final int TEST_TIMEOUT = 15 * 60 * 1000;
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/extensions/MockDelegationSASTokenProvider.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/extensions/MockDelegationSASTokenProvider.java
new file mode 100644
index 0000000..fa50bef
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/extensions/MockDelegationSASTokenProvider.java
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.extensions;
+
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.nio.charset.StandardCharsets;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants;
+import org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys;
+import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
+import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidUriException;
+import org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider;
+import org.apache.hadoop.fs.azurebfs.services.AbfsHttpHeader;
+import org.apache.hadoop.fs.azurebfs.services.AbfsHttpOperation;
+import org.apache.hadoop.fs.azurebfs.utils.Base64;
+import org.apache.hadoop.fs.azurebfs.utils.DelegationSASGenerator;
+import org.apache.hadoop.fs.azurebfs.utils.SASGenerator;
+import org.apache.hadoop.security.AccessControlException;
+
+/**
+ * A mock SAS token provider implementation
+ */
+public class MockDelegationSASTokenProvider implements SASTokenProvider {
+
+  private DelegationSASGenerator generator;
+
+  public static final String TEST_OWNER = "325f1619-4205-432f-9fce-3fd594325ce5";
+  public static final String CORRELATION_ID = "66ff4ffc-ff17-417e-a2a9-45db8c5b0b5c";
+
+  @Override
+  public void initialize(Configuration configuration, String accountName) throws IOException {
+    String appID = configuration.get(TestConfigurationKeys.FS_AZURE_TEST_APP_ID);
+    String appSecret = configuration.get(TestConfigurationKeys.FS_AZURE_TEST_APP_SECRET);
+    String sktid = configuration.get(TestConfigurationKeys.FS_AZURE_TEST_APP_SERVICE_PRINCIPAL_TENANT_ID);
+    String skoid = configuration.get(TestConfigurationKeys.FS_AZURE_TEST_APP_SERVICE_PRINCIPAL_OBJECT_ID);
+    String skt = SASGenerator.ISO_8601_FORMATTER.format(Instant.now().minusSeconds(SASGenerator.FIVE_MINUTES));
+    String ske = SASGenerator.ISO_8601_FORMATTER.format(Instant.now().plusSeconds(SASGenerator.ONE_DAY));
+    String skv = SASGenerator.AuthenticationVersion.Dec19.toString();
+
+    byte[] key = getUserDelegationKey(accountName, appID, appSecret, sktid, skt, ske, skv);
+
+    generator = new DelegationSASGenerator(key, skoid, sktid, skt, ske, skv);
+  }
+
+  // Invokes the AAD v2.0 authentication endpoint with a client credentials grant to get an
+  // access token.  See https://docs.microsoft.com/en-us/azure/active-directory/develop/v2-oauth2-client-creds-grant-flow.
+  private String getAuthorizationHeader(String accountName, String appID, String appSecret, String sktid) throws IOException {
+    String authEndPoint = String.format("https://login.microsoftonline.com/%s/oauth2/v2.0/token", sktid);
+    ClientCredsTokenProvider provider = new ClientCredsTokenProvider(authEndPoint, appID, appSecret);
+    return "Bearer " + provider.getToken().getAccessToken();
+  }
+
+  private byte[] getUserDelegationKey(String accountName, String appID, String appSecret,
+      String sktid, String skt, String ske, String skv) throws IOException {
+
+    String method = "POST";
+    String account = accountName.substring(0, accountName.indexOf(AbfsHttpConstants.DOT));
+
+    final StringBuilder sb = new StringBuilder(128);
+    sb.append("https://");
+    sb.append(account);
+    sb.append(".blob.core.windows.net/?restype=service&comp=userdelegationkey");
+
+    URL url;
+    try {
+      url = new URL(sb.toString());
+    } catch (MalformedURLException ex) {
+      throw new InvalidUriException(sb.toString());
+    }
+
+    List<AbfsHttpHeader> requestHeaders = new ArrayList<AbfsHttpHeader>();
+    requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.X_MS_VERSION, skv));
+    requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.CONTENT_TYPE, "application/x-www-form-urlencoded"));
+    requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.AUTHORIZATION, getAuthorizationHeader(account, appID, appSecret, sktid)));
+
+    final StringBuilder requestBody = new StringBuilder(512);
+    requestBody.append("<?xml version=\"1.0\" encoding=\"utf-8\"?><KeyInfo><Start>");
+    requestBody.append(skt);
+    requestBody.append("</Start><Expiry>");
+    requestBody.append(ske);
+    requestBody.append("</Expiry></KeyInfo>");
+
+    AbfsHttpOperation op = new AbfsHttpOperation(url, method, requestHeaders);
+
+    byte[] requestBuffer = requestBody.toString().getBytes(StandardCharsets.UTF_8.toString());
+    op.sendRequest(requestBuffer, 0, requestBuffer.length);
+
+    byte[] responseBuffer = new byte[4 * 1024];
+    op.processResponse(responseBuffer, 0, responseBuffer.length);
+
+    String responseBody = new String(responseBuffer, 0, (int) op.getBytesReceived(), StandardCharsets.UTF_8);
+    int beginIndex = responseBody.indexOf("<Value>") + "<Value>".length();
+    int endIndex = responseBody.indexOf("</Value>");
+    String value = responseBody.substring(beginIndex, endIndex);
+    return Base64.decode(value);
+  }
+
+  /**
+   * Invokes the authorizer to obtain a SAS token.
+   *
+   * @param accountName the name of the storage account.
+   * @param fileSystem the name of the fileSystem.
+   * @param path the file or directory path.
+   * @param operation the operation to be performed on the path.
+   * @return a SAS token to perform the request operation.
+   * @throws IOException if there is a network error.
+   * @throws AccessControlException if access is denied.
+   */
+  @Override
+  public String getSASToken(String accountName, String fileSystem, String path,
+                     String operation) throws IOException, AccessControlException {
+    // The user for these tests is always TEST_OWNER.  The check access operation
+    // requires suoid to check permissions for the user and will throw if the
+    // user does not have access and otherwise succeed.
+    String saoid = (operation == SASTokenProvider.CHECK_ACCESS_OPERATION) ? null : TEST_OWNER;
+    String suoid = (operation == SASTokenProvider.CHECK_ACCESS_OPERATION) ? TEST_OWNER : null;
+    return generator.getDelegationSAS(accountName, fileSystem, path, operation,
+        saoid, suoid, CORRELATION_ID);
+  }
+}
\ No newline at end of file
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/extensions/MockSASTokenProvider.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/extensions/MockSASTokenProvider.java
index de841b0..50ac209 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/extensions/MockSASTokenProvider.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/extensions/MockSASTokenProvider.java
@@ -25,7 +25,7 @@ import org.apache.hadoop.security.AccessControlException;
 
 import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
 import org.apache.hadoop.fs.azurebfs.utils.Base64;
-import org.apache.hadoop.fs.azurebfs.utils.SASGenerator;
+import org.apache.hadoop.fs.azurebfs.utils.ServiceSASGenerator;
 
 /**
  * A mock SAS token provider implementation
@@ -33,7 +33,7 @@ import org.apache.hadoop.fs.azurebfs.utils.SASGenerator;
 public class MockSASTokenProvider implements SASTokenProvider {
 
   private byte[] accountKey;
-  private SASGenerator generator;
+  private ServiceSASGenerator generator;
   private boolean skipAuthorizationForTestSetup = false;
 
   // For testing we use a container SAS for all operations.
@@ -49,7 +49,7 @@ public class MockSASTokenProvider implements SASTokenProvider {
     } catch (Exception ex) {
       throw new IOException(ex);
     }
-    generator = new SASGenerator(accountKey);
+    generator = new ServiceSASGenerator(accountKey);
   }
 
   /**
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/utils/DelegationSASGenerator.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/utils/DelegationSASGenerator.java
new file mode 100644
index 0000000..f84ed6a
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/utils/DelegationSASGenerator.java
@@ -0,0 +1,194 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.utils;
+
+import java.time.Instant;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.fs.azurebfs.extensions.SASTokenProvider;
+import org.apache.hadoop.fs.azurebfs.services.AbfsUriQueryBuilder;
+
+
+/**
+ * Test Delegation SAS generator.
+ */
+public class DelegationSASGenerator extends SASGenerator {
+  private String skoid;
+  private String sktid;
+  private String skt;
+  private String ske;
+  private final String sks = "b";
+  private String skv;
+
+  public DelegationSASGenerator(byte[] userDelegationKey, String skoid, String sktid, String skt, String ske, String skv) {
+    super(userDelegationKey);
+    this.skoid = skoid;
+    this.sktid = sktid;
+    this.skt = skt;
+    this.ske = ske;
+    this.skv = skv;
+  }
+
+  public String getDelegationSAS(String accountName, String containerName, String path, String operation,
+                                 String saoid, String suoid, String scid) {
+
+    final String sv = AuthenticationVersion.Dec19.toString();
+    final String st = ISO_8601_FORMATTER.format(Instant.now().minusSeconds(FIVE_MINUTES));
+    final String se = ISO_8601_FORMATTER.format(Instant.now().plusSeconds(ONE_DAY));
+    String sr = "b";
+    String sdd = null;
+    String sp = null;
+
+    switch (operation) {
+      case SASTokenProvider.CHECK_ACCESS_OPERATION:
+        sp = "e";
+        break;
+      case SASTokenProvider.WRITE_OPERATION:
+      case SASTokenProvider.CREATE_FILE_OPERATION:
+      case SASTokenProvider.CREATE_DIRECTORY_OPERATION:
+        sp = "w";
+        break;
+      case SASTokenProvider.DELETE_OPERATION:
+        sp = "d";
+        break;
+      case SASTokenProvider.DELETE_RECURSIVE_OPERATION:
+        sp = "d";
+        sr = "d";
+        sdd = Integer.toString(StringUtils.countMatches(path, "/"));
+        break;
+      case SASTokenProvider.GET_ACL_OPERATION:
+      case SASTokenProvider.GET_STATUS_OPERATION:
+        sp = "e";
+        break;
+      case SASTokenProvider.LIST_OPERATION:
+        sp = "l";
+        break;
+      case SASTokenProvider.READ_OPERATION:
+        sp = "r";
+        break;
+      case SASTokenProvider.RENAME_DESTINATION_OPERATION:
+      case SASTokenProvider.RENAME_SOURCE_OPERATION:
+        sp = "m";
+        break;
+      case SASTokenProvider.SET_ACL_OPERATION:
+        sp = "p";
+        break;
+      case SASTokenProvider.SET_OWNER_OPERATION:
+        sp = "o";
+        break;
+      case SASTokenProvider.SET_PERMISSION_OPERATION:
+        sp = "p";
+        break;
+      default:
+        throw new IllegalArgumentException(operation);
+    }
+
+    String signature = computeSignatureForSAS(sp, st, se, sv, sr, accountName, containerName,
+        path, saoid, suoid, scid);
+
+    AbfsUriQueryBuilder qb = new AbfsUriQueryBuilder();
+    qb.addQuery("skoid", skoid);
+    qb.addQuery("sktid", sktid);
+    qb.addQuery("skt", skt);
+    qb.addQuery("ske", ske);
+    qb.addQuery("sks", sks);
+    qb.addQuery("skv", skv);
+    if (saoid != null) {
+      qb.addQuery("saoid", saoid);
+    }
+    if (suoid != null) {
+      qb.addQuery("suoid", suoid);
+    }
+    if (scid != null) {
+      qb.addQuery("scid", scid);
+    }
+    qb.addQuery("sp", sp);
+    qb.addQuery("st", st);
+    qb.addQuery("se", se);
+    qb.addQuery("sv", sv);
+    qb.addQuery("sr", sr);
+    if (sdd != null) {
+      qb.addQuery("sdd", sdd);
+    }
+    qb.addQuery("sig", signature);
+    return qb.toString().substring(1);
+  }
+
+  private String computeSignatureForSAS(String sp, String st, String se, String sv,
+      String sr, String accountName, String containerName,
+      String path, String saoid, String suoid, String scid) {
+
+    StringBuilder sb = new StringBuilder();
+    sb.append(sp);
+    sb.append("\n");
+    sb.append(st);
+    sb.append("\n");
+    sb.append(se);
+    sb.append("\n");
+    // canonicalized resource
+    sb.append("/blob/");
+    sb.append(accountName);
+    sb.append("/");
+    sb.append(containerName);
+    if (path != null && sr != "c") {
+      sb.append(path);
+    }
+    sb.append("\n");
+    sb.append(skoid);
+    sb.append("\n");
+    sb.append(sktid);
+    sb.append("\n");
+    sb.append(skt);
+    sb.append("\n");
+    sb.append(ske);
+    sb.append("\n");
+    sb.append(sks);
+    sb.append("\n");
+    sb.append(skv);
+    sb.append("\n");
+    if (saoid != null) {
+      sb.append(saoid);
+    }
+    sb.append("\n");
+    if (suoid != null) {
+      sb.append(suoid);
+    }
+    sb.append("\n");
+    if (scid != null) {
+      sb.append(scid);
+    }
+    sb.append("\n");
+
+    sb.append("\n"); // sip
+    sb.append("\n"); // spr
+    sb.append(sv);
+    sb.append("\n");
+    sb.append(sr);
+    sb.append("\n");
+    sb.append("\n"); // - For optional : rscc - ResponseCacheControl
+    sb.append("\n"); // - For optional : rscd - ResponseContentDisposition
+    sb.append("\n"); // - For optional : rsce - ResponseContentEncoding
+    sb.append("\n"); // - For optional : rscl - ResponseContentLanguage
+    sb.append("\n"); // - For optional : rsct - ResponseContentType
+
+    String stringToSign = sb.toString();
+    LOG.debug("Delegation SAS stringToSign: " + stringToSign.replace("\n", "."));
+    return computeHmac256(stringToSign);
+  }
+}
\ No newline at end of file
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/utils/SASGenerator.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/utils/SASGenerator.java
index 19bf9e2..34d504a 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/utils/SASGenerator.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/utils/SASGenerator.java
@@ -19,104 +19,74 @@
 package org.apache.hadoop.fs.azurebfs.utils;
 
 import java.io.UnsupportedEncodingException;
+import java.nio.charset.StandardCharsets;
 import java.time.format.DateTimeFormatter;
-import java.time.Instant;
 import java.time.ZoneId;
 import java.util.Locale;
 import javax.crypto.Mac;
 import javax.crypto.spec.SecretKeySpec;
 
-import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants;
-import org.apache.hadoop.fs.azurebfs.services.AbfsUriQueryBuilder;
-
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 /**
- * Test container SAS generator.
+ * Test SAS generator.
  */
-public class SASGenerator {
+public abstract class SASGenerator {
+
+  public enum AuthenticationVersion {
+    Nov18("2018-11-09"),
+    Dec19("2019-12-12");
+
+    private final String ver;
+
+    AuthenticationVersion(String version) {
+      this.ver = version;
+    }
 
-  private static final String HMAC_SHA256 = "HmacSHA256";
-  private static final int TOKEN_START_PERIOD_IN_SECONDS = 5 * 60;
-  private static final int TOKEN_EXPIRY_PERIOD_IN_SECONDS = 24 * 60 * 60;
-  public static final DateTimeFormatter ISO_8601_UTC_DATE_FORMATTER =
+    @Override
+    public String toString() {
+      return ver;
+    }
+  }
+
+  protected static final Logger LOG = LoggerFactory.getLogger(SASGenerator.class);
+  public static final int FIVE_MINUTES = 5 * 60;
+  public static final int ONE_DAY = 24 * 60 * 60;
+  public static final DateTimeFormatter ISO_8601_FORMATTER =
       DateTimeFormatter
           .ofPattern("yyyy-MM-dd'T'HH:mm:ss'Z'", Locale.ROOT)
           .withZone(ZoneId.of("UTC"));
+
   private Mac hmacSha256;
   private byte[] key;
 
-  public SASGenerator(byte[] key) {
-    this.key = key;
-    initializeMac();
-  }
-
-  public String getContainerSASWithFullControl(String accountName, String containerName) {
-    String sp = "rcwdl";
-    String sv = "2018-11-09";
-    String sr = "c";
-    String st = ISO_8601_UTC_DATE_FORMATTER.format(Instant.now().minusSeconds(TOKEN_START_PERIOD_IN_SECONDS));
-    String se =
-        ISO_8601_UTC_DATE_FORMATTER.format(Instant.now().plusSeconds(TOKEN_EXPIRY_PERIOD_IN_SECONDS));
-
-    String signature = computeSignatureForSAS(sp, st, se, sv, "c",
-        accountName, containerName);
-
-    AbfsUriQueryBuilder qb = new AbfsUriQueryBuilder();
-    qb.addQuery("sp", sp);
-    qb.addQuery("st", st);
-    qb.addQuery("se", se);
-    qb.addQuery("sv", sv);
-    qb.addQuery("sr", sr);
-    qb.addQuery("sig", signature);
-    return qb.toString().substring(1);
+  // hide default constructor
+  private SASGenerator() {
   }
 
-  private String computeSignatureForSAS(String sp, String st,
-      String se, String sv, String sr, String accountName, String containerName) {
-
-    StringBuilder sb = new StringBuilder();
-    sb.append(sp);
-    sb.append("\n");
-    sb.append(st);
-    sb.append("\n");
-    sb.append(se);
-    sb.append("\n");
-    // canonicalized resource
-    sb.append("/blob/");
-    sb.append(accountName);
-    sb.append("/");
-    sb.append(containerName);
-    sb.append("\n");
-    sb.append("\n"); // si
-    sb.append("\n"); // sip
-    sb.append("\n"); // spr
-    sb.append(sv);
-    sb.append("\n");
-    sb.append(sr);
-    sb.append("\n");
-    sb.append("\n"); // - For optional : rscc - ResponseCacheControl
-    sb.append("\n"); // - For optional : rscd - ResponseContentDisposition
-    sb.append("\n"); // - For optional : rsce - ResponseContentEncoding
-    sb.append("\n"); // - For optional : rscl - ResponseContentLanguage
-    sb.append("\n"); // - For optional : rsct - ResponseContentType
-
-    String stringToSign = sb.toString();
-    return computeHmac256(stringToSign);
+  /**
+   * Called by subclasses to initialize the cryptographic SHA-256 HMAC provider.
+   * @param key - a 256-bit secret key
+   */
+  protected SASGenerator(byte[] key) {
+    this.key = key;
+    initializeMac();
   }
 
   private void initializeMac() {
     // Initializes the HMAC-SHA256 Mac and SecretKey.
     try {
-      hmacSha256 = Mac.getInstance(HMAC_SHA256);
-      hmacSha256.init(new SecretKeySpec(key, HMAC_SHA256));
+      hmacSha256 = Mac.getInstance("HmacSHA256");
+      hmacSha256.init(new SecretKeySpec(key, "HmacSHA256"));
     } catch (final Exception e) {
       throw new IllegalArgumentException(e);
     }
   }
 
-  private String computeHmac256(final String stringToSign) {
+  protected String computeHmac256(final String stringToSign) {
     byte[] utf8Bytes;
     try {
-      utf8Bytes = stringToSign.getBytes(AbfsHttpConstants.UTF_8);
+      utf8Bytes = stringToSign.getBytes(StandardCharsets.UTF_8.toString());
     } catch (final UnsupportedEncodingException e) {
       throw new IllegalArgumentException(e);
     }
@@ -126,4 +96,4 @@ public class SASGenerator {
     }
     return Base64.encode(hmac);
   }
-}
+}
\ No newline at end of file
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/utils/SASGenerator.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/utils/ServiceSASGenerator.java
similarity index 54%
copy from hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/utils/SASGenerator.java
copy to hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/utils/ServiceSASGenerator.java
index 19bf9e2..a76681b 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/utils/SASGenerator.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/utils/ServiceSASGenerator.java
@@ -18,47 +18,33 @@
 
 package org.apache.hadoop.fs.azurebfs.utils;
 
-import java.io.UnsupportedEncodingException;
-import java.time.format.DateTimeFormatter;
 import java.time.Instant;
-import java.time.ZoneId;
-import java.util.Locale;
-import javax.crypto.Mac;
-import javax.crypto.spec.SecretKeySpec;
 
-import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants;
 import org.apache.hadoop.fs.azurebfs.services.AbfsUriQueryBuilder;
 
 /**
- * Test container SAS generator.
+ * Test Service SAS generator.
  */
-public class SASGenerator {
+public class ServiceSASGenerator extends SASGenerator {
 
-  private static final String HMAC_SHA256 = "HmacSHA256";
-  private static final int TOKEN_START_PERIOD_IN_SECONDS = 5 * 60;
-  private static final int TOKEN_EXPIRY_PERIOD_IN_SECONDS = 24 * 60 * 60;
-  public static final DateTimeFormatter ISO_8601_UTC_DATE_FORMATTER =
-      DateTimeFormatter
-          .ofPattern("yyyy-MM-dd'T'HH:mm:ss'Z'", Locale.ROOT)
-          .withZone(ZoneId.of("UTC"));
-  private Mac hmacSha256;
-  private byte[] key;
-
-  public SASGenerator(byte[] key) {
-    this.key = key;
-    initializeMac();
+  /**
+   * Creates a SAS Generator for Service SAS
+   * (https://docs.microsoft.com/en-us/rest/api/storageservices/create-service-sas).
+   * @param accountKey - the storage account key
+   */
+  public ServiceSASGenerator(byte[] accountKey) {
+    super(accountKey);
   }
 
   public String getContainerSASWithFullControl(String accountName, String containerName) {
     String sp = "rcwdl";
-    String sv = "2018-11-09";
+    String sv = AuthenticationVersion.Nov18.toString();
     String sr = "c";
-    String st = ISO_8601_UTC_DATE_FORMATTER.format(Instant.now().minusSeconds(TOKEN_START_PERIOD_IN_SECONDS));
-    String se =
-        ISO_8601_UTC_DATE_FORMATTER.format(Instant.now().plusSeconds(TOKEN_EXPIRY_PERIOD_IN_SECONDS));
+    String st = ISO_8601_FORMATTER.format(Instant.now().minusSeconds(FIVE_MINUTES));
+    String se = ISO_8601_FORMATTER.format(Instant.now().plusSeconds(ONE_DAY));
 
     String signature = computeSignatureForSAS(sp, st, se, sv, "c",
-        accountName, containerName);
+        accountName, containerName, null);
 
     AbfsUriQueryBuilder qb = new AbfsUriQueryBuilder();
     qb.addQuery("sp", sp);
@@ -70,8 +56,8 @@ public class SASGenerator {
     return qb.toString().substring(1);
   }
 
-  private String computeSignatureForSAS(String sp, String st,
-      String se, String sv, String sr, String accountName, String containerName) {
+  private String computeSignatureForSAS(String sp, String st, String se, String sv,
+      String sr, String accountName, String containerName, String path) {
 
     StringBuilder sb = new StringBuilder();
     sb.append(sp);
@@ -85,6 +71,10 @@ public class SASGenerator {
     sb.append(accountName);
     sb.append("/");
     sb.append(containerName);
+    if (path != null && sr != "c") {
+      //sb.append("/");
+      sb.append(path);
+    }
     sb.append("\n");
     sb.append("\n"); // si
     sb.append("\n"); // sip
@@ -100,30 +90,7 @@ public class SASGenerator {
     sb.append("\n"); // - For optional : rsct - ResponseContentType
 
     String stringToSign = sb.toString();
+    LOG.debug("Service SAS stringToSign: " + stringToSign.replace("\n", "."));
     return computeHmac256(stringToSign);
   }
-
-  private void initializeMac() {
-    // Initializes the HMAC-SHA256 Mac and SecretKey.
-    try {
-      hmacSha256 = Mac.getInstance(HMAC_SHA256);
-      hmacSha256.init(new SecretKeySpec(key, HMAC_SHA256));
-    } catch (final Exception e) {
-      throw new IllegalArgumentException(e);
-    }
-  }
-
-  private String computeHmac256(final String stringToSign) {
-    byte[] utf8Bytes;
-    try {
-      utf8Bytes = stringToSign.getBytes(AbfsHttpConstants.UTF_8);
-    } catch (final UnsupportedEncodingException e) {
-      throw new IllegalArgumentException(e);
-    }
-    byte[] hmac;
-    synchronized (this) {
-      hmac = hmacSha256.doFinal(utf8Bytes);
-    }
-    return Base64.encode(hmac);
-  }
-}
+}
\ No newline at end of file
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/utils/TestCachedSASToken.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/utils/TestCachedSASToken.java
new file mode 100644
index 0000000..1016d4b
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/utils/TestCachedSASToken.java
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.utils;
+
+import java.io.IOException;
+import java.time.OffsetDateTime;
+import java.time.ZoneOffset;
+import java.time.format.DateTimeFormatter;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_SAS_TOKEN_RENEW_PERIOD_FOR_STREAMS_IN_SECONDS;
+import static java.time.temporal.ChronoUnit.SECONDS;
+
+/**
+ * Test CachedSASToken.
+ */
+public final class TestCachedSASToken {
+
+  @Test
+  public void testUpdateAndGet() throws IOException {
+    CachedSASToken cachedSasToken = new CachedSASToken();
+
+    String se1 = OffsetDateTime.now(ZoneOffset.UTC).plus(
+        DEFAULT_SAS_TOKEN_RENEW_PERIOD_FOR_STREAMS_IN_SECONDS * 2,
+        SECONDS).format(DateTimeFormatter.ISO_DATE_TIME);
+    String token1 = "se=" + se1;
+
+    // set first time and ensure reference equality
+    cachedSasToken.update(token1);
+    String cachedToken = cachedSasToken.get();
+    Assert.assertTrue(token1 == cachedToken);
+
+    // update with same token and ensure reference equality
+    cachedSasToken.update(token1);
+    cachedToken = cachedSasToken.get();
+    Assert.assertTrue(token1 == cachedToken);
+
+    // renew and ensure reference equality
+    String se2 = OffsetDateTime.now(ZoneOffset.UTC).plus(
+        DEFAULT_SAS_TOKEN_RENEW_PERIOD_FOR_STREAMS_IN_SECONDS * 2,
+        SECONDS).format(DateTimeFormatter.ISO_DATE_TIME);
+    String token2 = "se=" + se2;
+    cachedSasToken.update(token2);
+    cachedToken = cachedSasToken.get();
+    Assert.assertTrue(token2 == cachedToken);
+
+    // renew and ensure reference equality with ske
+    String se3 = OffsetDateTime.now(ZoneOffset.UTC).plus(
+        DEFAULT_SAS_TOKEN_RENEW_PERIOD_FOR_STREAMS_IN_SECONDS * 4,
+        SECONDS).format(DateTimeFormatter.ISO_DATE_TIME);
+
+    String ske3 = OffsetDateTime.now(ZoneOffset.UTC).plus(
+        DEFAULT_SAS_TOKEN_RENEW_PERIOD_FOR_STREAMS_IN_SECONDS * 2,
+        SECONDS).format(DateTimeFormatter.ISO_DATE_TIME);
+    String token3 = "se=" + se3 + "&ske=" + ske3;
+    cachedSasToken.update(token3);
+    cachedToken = cachedSasToken.get();
+    Assert.assertTrue(token3 == cachedToken);
+  }
+
+  @Test
+  public void testGetExpiration() throws IOException {
+    CachedSASToken cachedSasToken = new CachedSASToken();
+
+    String se = OffsetDateTime.now(ZoneOffset.UTC).plus(
+        DEFAULT_SAS_TOKEN_RENEW_PERIOD_FOR_STREAMS_IN_SECONDS - 1,
+        SECONDS).format(DateTimeFormatter.ISO_DATE_TIME);
+    OffsetDateTime seDate = OffsetDateTime.parse(se, DateTimeFormatter.ISO_DATE_TIME);
+    String token = "se=" + se;
+
+    // By-pass the normal validation provided by update method
+    // by callng set with expired SAS, then ensure the get
+    // method returns null (auto expiration as next REST operation will use
+    // SASTokenProvider to get a new SAS).
+    cachedSasToken.setForTesting(token, seDate);
+    String cachedToken = cachedSasToken.get();
+    Assert.assertNull(cachedToken);
+  }
+
+  @Test
+  public void testUpdateAndGetWithExpiredToken() throws IOException {
+    CachedSASToken cachedSasToken = new CachedSASToken();
+
+    String se1 = OffsetDateTime.now(ZoneOffset.UTC).plus(
+        DEFAULT_SAS_TOKEN_RENEW_PERIOD_FOR_STREAMS_IN_SECONDS - 1,
+        SECONDS).format(DateTimeFormatter.ISO_DATE_TIME);
+    String token1 = "se=" + se1;
+
+    // set expired token and ensure not cached
+    cachedSasToken.update(token1);
+    String cachedToken = cachedSasToken.get();
+    Assert.assertNull(cachedToken);
+
+    String se2 = OffsetDateTime.now(ZoneOffset.UTC).plus(
+        DEFAULT_SAS_TOKEN_RENEW_PERIOD_FOR_STREAMS_IN_SECONDS * 2,
+        SECONDS).format(DateTimeFormatter.ISO_DATE_TIME);
+
+    String ske2 = OffsetDateTime.now(ZoneOffset.UTC).plus(
+        DEFAULT_SAS_TOKEN_RENEW_PERIOD_FOR_STREAMS_IN_SECONDS - 1,
+        SECONDS).format(DateTimeFormatter.ISO_DATE_TIME);
+    String token2 = "se=" + se2 + "&ske=" + ske2;
+
+    // set with expired ske and ensure not cached
+    cachedSasToken.update(token2);
+    cachedToken = cachedSasToken.get();
+    Assert.assertNull(cachedToken);
+
+  }
+
+  @Test
+  public void testUpdateAndGetWithInvalidToken() throws IOException {
+    CachedSASToken cachedSasToken = new CachedSASToken();
+
+    // set and ensure reference that it is not cached
+    String token1 = "se=";
+    cachedSasToken.update(token1);
+    String cachedToken = cachedSasToken.get();
+    Assert.assertNull(cachedToken);
+
+    // set and ensure reference that it is not cached
+    String token2 = "se=xyz";
+    cachedSasToken.update(token2);
+    cachedToken = cachedSasToken.get();
+    Assert.assertNull(cachedToken);
+
+    // set and ensure reference that it is not cached
+    String token3 = "se=2100-01-01T00:00:00Z&ske=";
+    cachedSasToken.update(token3);
+    cachedToken = cachedSasToken.get();
+    Assert.assertNull(cachedToken);
+
+    // set and ensure reference that it is not cached
+    String token4 = "se=2100-01-01T00:00:00Z&ske=xyz&";
+    cachedSasToken.update(token4);
+    cachedToken = cachedSasToken.get();
+    Assert.assertNull(cachedToken);
+
+    // set and ensure reference that it is not cached
+    String token5 = "se=abc&ske=xyz&";
+    cachedSasToken.update(token5);
+    cachedToken = cachedSasToken.get();
+    Assert.assertNull(cachedToken);
+  }
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[hadoop] 04/06: HADOOP-17004. ABFS: Improve the ABFS driver documentation

Posted by tm...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tmarquardt pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/hadoop.git

commit 11307f3be9be494ec880e036c78705c41ca8ceae
Author: bilaharith <52...@users.noreply.github.com>
AuthorDate: Tue May 19 09:15:54 2020 +0530

    HADOOP-17004. ABFS: Improve the ABFS driver documentation
    
    Contributed by Bilahari T H.
---
 .../hadoop-azure/src/site/markdown/abfs.md         | 133 ++++++++++++++++++++-
 1 file changed, 130 insertions(+), 3 deletions(-)

diff --git a/hadoop-tools/hadoop-azure/src/site/markdown/abfs.md b/hadoop-tools/hadoop-azure/src/site/markdown/abfs.md
index 89f52e7..93141f1 100644
--- a/hadoop-tools/hadoop-azure/src/site/markdown/abfs.md
+++ b/hadoop-tools/hadoop-azure/src/site/markdown/abfs.md
@@ -257,7 +257,8 @@ will have the URL `abfs://container1@abfswales1.dfs.core.windows.net/`
 
 
 You can create a new container through the ABFS connector, by setting the option
- `fs.azure.createRemoteFileSystemDuringInitialization` to `true`.
+ `fs.azure.createRemoteFileSystemDuringInitialization` to `true`. Though the
+  same is not supported when AuthType is SAS.
 
 If the container does not exist, an attempt to list it with `hadoop fs -ls`
 will fail
@@ -317,8 +318,13 @@ driven by them.
 
 What can be changed is what secrets/credentials are used to authenticate the caller.
 
-The authentication mechanism is set in `fs.azure.account.auth.type` (or the account specific variant),
-and, for the various OAuth options `fs.azure.account.oauth.provider.type`
+The authentication mechanism is set in `fs.azure.account.auth.type` (or the
+account specific variant). The possible values are SharedKey, OAuth, Custom
+and SAS. For the various OAuth options use the config `fs.azure.account
+.oauth.provider.type`. Following are the implementations supported
+ClientCredsTokenProvider, UserPasswordTokenProvider, MsiTokenProvider and
+RefreshTokenBasedTokenProvider. An IllegalArgumentException is thrown if
+the specified provider type is not one of the supported.
 
 All secrets can be stored in JCEKS files. These are encrypted and password
 protected —use them or a compatible Hadoop Key Management Store wherever
@@ -350,6 +356,15 @@ the password, "key", retrieved from the XML/JCECKs configuration files.
 *Note*: The source of the account key can be changed through a custom key provider;
 one exists to execute a shell script to retrieve it.
 
+A custom key provider class can be provided with the config
+`fs.azure.account.keyprovider`. If a key provider class is specified the same
+will be used to get account key. Otherwise the Simple key provider will be used
+which will use the key specified for the config `fs.azure.account.key`.
+
+To retrieve using shell script, specify the path to the script for the config
+`fs.azure.shellkeyprovider.script`. ShellDecryptionKeyProvider class use the
+script specified to retrieve the key.
+
 ### <a name="oauth-client-credentials"></a> OAuth 2.0 Client Credentials
 
 OAuth 2.0 credentials of (client id, client secret, endpoint) are provided in the configuration/JCEKS file.
@@ -466,6 +481,13 @@ With an existing Oauth 2.0 token, make a request of the Active Directory endpoin
   </description>
 </property>
 <property>
+  <name>fs.azure.account.oauth2.refresh.endpoint</name>
+  <value></value>
+  <description>
+  Refresh token endpoint
+  </description>
+</property>
+<property>
   <name>fs.azure.account.oauth2.client.id</name>
   <value></value>
   <description>
@@ -507,6 +529,13 @@ The Azure Portal/CLI is used to create the service identity.
   </description>
 </property>
 <property>
+  <name>fs.azure.account.oauth2.msi.endpoint</name>
+  <value></value>
+  <description>
+   MSI endpoint
+  </description>
+</property>
+<property>
   <name>fs.azure.account.oauth2.client.id</name>
   <value></value>
   <description>
@@ -542,6 +571,26 @@ and optionally `org.apache.hadoop.fs.azurebfs.extensions.BoundDTExtension`.
 
 The declared class also holds responsibility to implement retry logic while fetching access tokens.
 
+### <a name="delegationtokensupportconfigoptions"></a> Delegation Token Provider
+
+A delegation token provider supplies the ABFS connector with delegation tokens,
+helps renew and cancel the tokens by implementing the
+CustomDelegationTokenManager interface.
+
+```xml
+<property>
+  <name>fs.azure.enable.delegation.token</name>
+  <value>true</value>
+  <description>Make this true to use delegation token provider</description>
+</property>
+<property>
+  <name>fs.azure.delegation.token.provider.type</name>
+  <value>{fully-qualified-class-name-for-implementation-of-CustomDelegationTokenManager-interface}</value>
+</property>
+```
+In case delegation token is enabled, and the config `fs.azure.delegation.token
+.provider.type` is not provided then an IlleagalArgumentException is thrown.
+
 ### Shared Access Signature (SAS) Token Provider
 
 A Shared Access Signature (SAS) token provider supplies the ABFS connector with SAS
@@ -691,6 +740,84 @@ Config `fs.azure.account.hns.enabled` provides an option to specify whether
 Config `fs.azure.enable.check.access` needs to be set true to enable
  the AzureBlobFileSystem.access().
 
+### <a name="featureconfigoptions"></a> Primary User Group Options
+The group name which is part of FileStatus and AclStatus will be set the same as
+the username if the following config is set to true
+`fs.azure.skipUserGroupMetadataDuringInitialization`.
+
+### <a name="ioconfigoptions"></a> IO Options
+The following configs are related to read and write operations.
+
+`fs.azure.io.retry.max.retries`: Sets the number of retries for IO operations.
+Currently this is used only for the server call retry logic. Used within
+AbfsClient class as part of the ExponentialRetryPolicy. The value should be
+>= 0.
+
+`fs.azure.write.request.size`: To set the write buffer size. Specify the value
+in bytes. The value should be between 16384 to 104857600 both inclusive (16 KB
+to 100 MB). The default value will be 8388608 (8 MB).
+
+`fs.azure.read.request.size`: To set the read buffer size.Specify the value in
+bytes. The value should be between 16384 to 104857600 both inclusive (16 KB to
+100 MB). The default value will be 4194304 (4 MB).
+
+`fs.azure.readaheadqueue.depth`: Sets the readahead queue depth in
+AbfsInputStream. In case the set value is negative the read ahead queue depth
+will be set as Runtime.getRuntime().availableProcessors(). By default the value
+will be -1.
+
+### <a name="securityconfigoptions"></a> Security Options
+`fs.azure.always.use.https`: Enforces to use HTTPS instead of HTTP when the flag
+is made true. Irrespective of the flag, AbfsClient will use HTTPS if the secure
+scheme (ABFSS) is used or OAuth is used for authentication. By default this will
+be set to true.
+
+`fs.azure.ssl.channel.mode`: Initializing DelegatingSSLSocketFactory with the
+specified SSL channel mode. Value should be of the enum
+DelegatingSSLSocketFactory.SSLChannelMode. The default value will be
+DelegatingSSLSocketFactory.SSLChannelMode.Default.
+
+### <a name="serverconfigoptions"></a> Server Options
+When the config `fs.azure.io.read.tolerate.concurrent.append` is made true, the
+If-Match header sent to the server for read calls will be set as * otherwise the
+same will be set with ETag. This is basically a mechanism in place to handle the
+reads with optimistic concurrency.
+Please refer the following links for further information.
+1. https://docs.microsoft.com/en-us/rest/api/storageservices/datalakestoragegen2/path/read
+2. https://azure.microsoft.com/de-de/blog/managing-concurrency-in-microsoft-azure-storage-2/
+
+listStatus API fetches the FileStatus information from server in a page by page
+manner. The config `fs.azure.list.max.results` used to set the maxResults URI
+ param which sets the pagesize(maximum results per call). The value should
+ be >  0. By default this will be 500. Server has a maximum value for this
+ parameter as 5000. So even if the config is above 5000 the response will only
+contain 5000 entries. Please refer the following link for further information.
+https://docs.microsoft.com/en-us/rest/api/storageservices/datalakestoragegen2/path/list
+
+### <a name="throttlingconfigoptions"></a> Throttling Options
+ABFS driver has the capability to throttle read and write operations to achieve
+maximum throughput by minimizing errors. The errors occur when the account
+ingress or egress limits are exceeded and, the server-side throttles requests.
+Server-side throttling causes the retry policy to be used, but the retry policy
+sleeps for long periods of time causing the total ingress or egress throughput
+to be as much as 35% lower than optimal. The retry policy is also after the
+fact, in that it applies after a request fails. On the other hand, the
+client-side throttling implemented here happens before requests are made and
+sleeps just enough to minimize errors, allowing optimal ingress and/or egress
+throughput. By default the throttling mechanism is enabled in the driver. The
+same can be disabled by setting the config `fs.azure.enable.autothrottling`
+to false.
+
+### <a name="renameconfigoptions"></a> Rename Options
+`fs.azure.atomic.rename.key`: Directories for atomic rename support can be
+specified comma separated in this config. The driver prints the following
+warning log if the source of the rename belongs to one of the configured
+directories. "The atomic rename feature is not supported by the ABFS scheme
+; however, rename, create and delete operations are atomic if Namespace is
+enabled for your Azure Storage account."
+The directories can be specified as comma separated values. By default the value
+is "/hbase"
+
 ### <a name="perfoptions"></a> Perf Options
 
 #### <a name="abfstracklatencyoptions"></a> 1. HTTP Request Tracking Options


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[hadoop] 01/06: HADOOP-17002. ABFS: Adding config to determine if the account is HNS enabled or not

Posted by tm...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tmarquardt pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/hadoop.git

commit 76ee7e5494579b6f8adf1d86b17e97a63a8576ad
Author: bilaharith <52...@users.noreply.github.com>
AuthorDate: Fri Apr 24 06:16:18 2020 +0530

    HADOOP-17002. ABFS: Adding config to determine if the account is HNS enabled or not
    
    Contributed by Bilahari T H.
---
 .../hadoop/fs/azurebfs/AbfsConfiguration.java      |  14 ++
 .../fs/azurebfs/AzureBlobFileSystemStore.java      |  59 ++++++---
 .../fs/azurebfs/constants/ConfigurationKeys.java   |   7 +
 .../constants/FileSystemConfigurations.java        |   3 +
 .../exceptions/TrileanConversionException.java     |  36 ++++++
 .../apache/hadoop/fs/azurebfs/enums/Trilean.java   |  80 ++++++++++++
 .../hadoop/fs/azurebfs/enums/package-info.java     |  22 ++++
 .../hadoop-azure/src/site/markdown/abfs.md         |   5 +
 .../fs/azurebfs/ITestGetNameSpaceEnabled.java      | 141 ++++++++++++++++++++-
 .../apache/hadoop/fs/azurebfs/TrileanTests.java    |  92 ++++++++++++++
 10 files changed, 438 insertions(+), 21 deletions(-)

diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
index 78d6260..d60bc37 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
@@ -47,6 +47,7 @@ import org.apache.hadoop.fs.azurebfs.diagnostics.BooleanConfigurationBasicValida
 import org.apache.hadoop.fs.azurebfs.diagnostics.IntegerConfigurationBasicValidator;
 import org.apache.hadoop.fs.azurebfs.diagnostics.LongConfigurationBasicValidator;
 import org.apache.hadoop.fs.azurebfs.diagnostics.StringConfigurationBasicValidator;
+import org.apache.hadoop.fs.azurebfs.enums.Trilean;
 import org.apache.hadoop.fs.azurebfs.extensions.CustomTokenProviderAdaptee;
 import org.apache.hadoop.fs.azurebfs.extensions.SASTokenProvider;
 import org.apache.hadoop.fs.azurebfs.oauth2.AccessTokenProvider;
@@ -81,6 +82,10 @@ public class AbfsConfiguration{
   private final boolean isSecure;
   private static final Logger LOG = LoggerFactory.getLogger(AbfsConfiguration.class);
 
+  @StringConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_ACCOUNT_IS_HNS_ENABLED,
+      DefaultValue = DEFAULT_FS_AZURE_ACCOUNT_IS_HNS_ENABLED)
+  private String isNamespaceEnabledAccount;
+
   @IntegerConfigurationValidatorAnnotation(ConfigurationKey = AZURE_WRITE_BUFFER_SIZE,
       MinValue = MIN_BUFFER_SIZE,
       MaxValue = MAX_BUFFER_SIZE,
@@ -232,6 +237,10 @@ public class AbfsConfiguration{
     }
   }
 
+  public Trilean getIsNamespaceEnabledAccount() {
+    return Trilean.getTrilean(isNamespaceEnabledAccount);
+  }
+
   /**
    * Gets the Azure Storage account name corresponding to this instance of configuration.
    * @return the Azure Storage account name
@@ -746,6 +755,11 @@ public class AbfsConfiguration{
     this.maxIoRetries = maxIoRetries;
   }
 
+  @VisibleForTesting
+  void setIsNamespaceEnabledAccount(String isNamespaceEnabledAccount) {
+    this.isNamespaceEnabledAccount = isNamespaceEnabledAccount;
+  }
+
   private String getTrimmedPasswordString(String key, String defaultValue) throws IOException {
     String value = getPasswordString(key);
     if (StringUtils.isBlank(value)) {
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
index 62145e1..d37ceb3 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
@@ -73,6 +73,8 @@ import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidUriException;
 import org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode;
 import org.apache.hadoop.fs.azurebfs.contracts.services.ListResultEntrySchema;
 import org.apache.hadoop.fs.azurebfs.contracts.services.ListResultSchema;
+import org.apache.hadoop.fs.azurebfs.contracts.exceptions.TrileanConversionException;
+import org.apache.hadoop.fs.azurebfs.enums.Trilean;
 import org.apache.hadoop.fs.azurebfs.extensions.SASTokenProvider;
 import org.apache.hadoop.fs.azurebfs.extensions.ExtensionHelper;
 import org.apache.hadoop.fs.azurebfs.oauth2.AccessTokenProvider;
@@ -133,8 +135,7 @@ public class AzureBlobFileSystemStore implements Closeable {
 
   private final AbfsConfiguration abfsConfiguration;
   private final Set<String> azureAtomicRenameDirSet;
-  private boolean isNamespaceEnabledSet;
-  private boolean isNamespaceEnabled;
+  private Trilean isNamespaceEnabled;
   private final AuthType authType;
   private final UserGroupInformation userGroupInformation;
   private final IdentityTransformer identityTransformer;
@@ -155,6 +156,8 @@ public class AzureBlobFileSystemStore implements Closeable {
 
     LOG.trace("AbfsConfiguration init complete");
 
+    this.isNamespaceEnabled = abfsConfiguration.getIsNamespaceEnabledAccount();
+
     this.userGroupInformation = UserGroupInformation.getCurrentUser();
     this.userName = userGroupInformation.getShortUserName();
     LOG.trace("UGI init complete");
@@ -234,26 +237,33 @@ public class AzureBlobFileSystemStore implements Closeable {
   }
 
   public boolean getIsNamespaceEnabled() throws AzureBlobFileSystemException {
-    if (!isNamespaceEnabledSet) {
+    try {
+      return this.isNamespaceEnabled.toBoolean();
+    } catch (TrileanConversionException e) {
+      LOG.debug("isNamespaceEnabled is UNKNOWN; fall back and determine through"
+          + " getAcl server call", e);
+    }
 
-      LOG.debug("Get root ACL status");
-      try (AbfsPerfInfo perfInfo = startTracking("getIsNamespaceEnabled", "getAclStatus")) {
-        AbfsRestOperation op = client.getAclStatus(AbfsHttpConstants.FORWARD_SLASH + AbfsHttpConstants.ROOT_PATH);
-        perfInfo.registerResult(op.getResult());
-        isNamespaceEnabled = true;
-        perfInfo.registerSuccess(true);
-      } catch (AbfsRestOperationException ex) {
-        // Get ACL status is a HEAD request, its response doesn't contain errorCode
-        // So can only rely on its status code to determine its account type.
-        if (HttpURLConnection.HTTP_BAD_REQUEST != ex.getStatusCode()) {
-          throw ex;
-        }
-        isNamespaceEnabled = false;
+    LOG.debug("Get root ACL status");
+    try (AbfsPerfInfo perfInfo = startTracking("getIsNamespaceEnabled",
+        "getAclStatus")) {
+      AbfsRestOperation op = client.getAclStatus(
+          AbfsHttpConstants.FORWARD_SLASH + AbfsHttpConstants.ROOT_PATH);
+      perfInfo.registerResult(op.getResult());
+      isNamespaceEnabled = Trilean.getTrilean(true);
+      perfInfo.registerSuccess(true);
+    } catch (AbfsRestOperationException ex) {
+      // Get ACL status is a HEAD request, its response doesn't contain
+      // errorCode
+      // So can only rely on its status code to determine its account type.
+      if (HttpURLConnection.HTTP_BAD_REQUEST != ex.getStatusCode()) {
+        throw ex;
       }
-      isNamespaceEnabledSet = true;
+
+      isNamespaceEnabled = Trilean.getTrilean(false);
     }
 
-    return isNamespaceEnabled;
+    return isNamespaceEnabled.toBoolean();
   }
 
   @VisibleForTesting
@@ -1406,4 +1416,15 @@ public class AzureBlobFileSystemStore implements Closeable {
   AbfsClient getClient() {
     return this.client;
   }
-}
\ No newline at end of file
+
+  @VisibleForTesting
+  void setClient(AbfsClient client) {
+    this.client = client;
+  }
+
+  @VisibleForTesting
+  void setNamespaceEnabled(Trilean isNamespaceEnabled){
+    this.isNamespaceEnabled = isNamespaceEnabled;
+  }
+
+}
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
index 5db111b..597f93f 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
@@ -27,6 +27,13 @@ import org.apache.hadoop.classification.InterfaceStability;
 @InterfaceAudience.Public
 @InterfaceStability.Evolving
 public final class ConfigurationKeys {
+
+  /**
+   * Config to specify if the configured account is HNS enabled or not. If
+   * this config is not set, getacl call is made on account filesystem root
+   * path to determine HNS status.
+   */
+  public static final String FS_AZURE_ACCOUNT_IS_HNS_ENABLED = "fs.azure.account.hns.enabled";
   public static final String FS_AZURE_ACCOUNT_KEY_PROPERTY_NAME = "fs.azure.account.key";
   public static final String FS_AZURE_ACCOUNT_KEY_PROPERTY_NAME_REGX = "fs\\.azure\\.account\\.key\\.(.*)";
   public static final String FS_AZURE_SECURE_MODE = "fs.azure.secure.mode";
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
index 3add0ef..ef2e708 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
@@ -30,6 +30,9 @@ import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EMPTY_ST
 @InterfaceAudience.Public
 @InterfaceStability.Evolving
 public final class FileSystemConfigurations {
+
+  public static final String DEFAULT_FS_AZURE_ACCOUNT_IS_HNS_ENABLED = "";
+
   public static final String USER_HOME_DIRECTORY_PREFIX = "/user";
 
   // Retry parameter defaults.
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/exceptions/TrileanConversionException.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/exceptions/TrileanConversionException.java
new file mode 100644
index 0000000..87eb05c
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/exceptions/TrileanConversionException.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.contracts.exceptions;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * Thrown when tried to convert Trilean.UNKNOWN to boolean. Only Trilean.TRUE
+ * and Trilean.FALSE can be converted to boolean.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Stable
+public final class TrileanConversionException
+    extends AzureBlobFileSystemException {
+  public TrileanConversionException() {
+    super("Cannot convert Trilean.UNKNOWN to boolean");
+  }
+
+}
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/enums/Trilean.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/enums/Trilean.java
new file mode 100644
index 0000000..dc5f439
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/enums/Trilean.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.enums;
+
+import org.apache.hadoop.fs.azurebfs.contracts.exceptions.TrileanConversionException;
+
+/**
+ * Enum to represent 3 values, TRUE, FALSE and UNKNOWN. Can be used where
+ * boolean is not enough to hold the information.
+ */
+public enum Trilean {
+
+  FALSE, TRUE, UNKNOWN;
+
+  private static final String TRUE_STR = "true";
+  private static final String FALSE_STR = "false";
+
+  /**
+   * Converts boolean to Trilean.
+   *
+   * @param isTrue the boolean to convert.
+   * @return the corresponding Trilean for the passed boolean isTrue.
+   */
+  public static Trilean getTrilean(final boolean isTrue) {
+    if (isTrue) {
+      return Trilean.TRUE;
+    }
+
+    return Trilean.FALSE;
+  }
+
+  /**
+   * Converts String to Trilean.
+   *
+   * @param str the string to convert.
+   * @return the corresponding Trilean for the passed string str.
+   */
+  public static Trilean getTrilean(String str) {
+    if (TRUE_STR.equalsIgnoreCase(str)) {
+      return Trilean.TRUE;
+    }
+
+    if (FALSE_STR.equalsIgnoreCase(str)) {
+      return Trilean.FALSE;
+    }
+
+    return Trilean.UNKNOWN;
+  }
+
+  /**
+   * Converts the Trilean enum to boolean.
+   *
+   * @return the corresponding boolean.
+   * @throws TrileanConversionException when tried to convert Trilean.UNKNOWN.
+   */
+  public boolean toBoolean() throws TrileanConversionException {
+    if (this == Trilean.UNKNOWN) {
+      throw new TrileanConversionException();
+    }
+
+    return Boolean.valueOf(this.name());
+  }
+
+}
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/enums/package-info.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/enums/package-info.java
new file mode 100644
index 0000000..b2a9b0f
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/enums/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+package org.apache.hadoop.fs.azurebfs.enums;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
diff --git a/hadoop-tools/hadoop-azure/src/site/markdown/abfs.md b/hadoop-tools/hadoop-azure/src/site/markdown/abfs.md
index e1a2fca..2b4b14d 100644
--- a/hadoop-tools/hadoop-azure/src/site/markdown/abfs.md
+++ b/hadoop-tools/hadoop-azure/src/site/markdown/abfs.md
@@ -663,6 +663,11 @@ Hflush() being the only documented API that can provide persistent data
 transfer, Flush() also attempting to persist buffered data will lead to
 performance issues.
 
+### <a name="hnscheckconfigoptions"></a> HNS Check Options
+Config `fs.azure.account.hns.enabled` provides an option to specify whether
+ the storage account is HNS enabled or not. In case the config is not provided,
+  a server call is made to check the same.
+
 ### <a name="flushconfigoptions"></a> Access Options
 Config `fs.azure.enable.check.access` needs to be set true to enable
  the AzureBlobFileSystem.access().
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestGetNameSpaceEnabled.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestGetNameSpaceEnabled.java
index 74c8803..4268ff2 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestGetNameSpaceEnabled.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestGetNameSpaceEnabled.java
@@ -21,15 +21,31 @@ import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.UUID;
 
+import org.apache.hadoop.fs.azurebfs.enums.Trilean;
 import org.junit.Assume;
 import org.junit.Test;
+import org.assertj.core.api.Assertions;
 
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
+import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
 import org.apache.hadoop.fs.azurebfs.services.AuthType;
 
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_FS_AZURE_ACCOUNT_IS_HNS_ENABLED;
+import static org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.TEST_CONFIGURATION_FILE_NAME;
 import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_CREATE_REMOTE_FILESYSTEM_DURING_INITIALIZATION;
+import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ACCOUNT_IS_HNS_ENABLED;
 import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ACCOUNT_KEY_PROPERTY_NAME;
 import static org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.FS_AZURE_TEST_NAMESPACE_ENABLED_ACCOUNT;
 import static org.apache.hadoop.test.LambdaTestUtils.intercept;
@@ -39,6 +55,9 @@ import static org.apache.hadoop.test.LambdaTestUtils.intercept;
  */
 public class ITestGetNameSpaceEnabled extends AbstractAbfsIntegrationTest {
 
+  private static final String TRUE_STR = "true";
+  private static final String FALSE_STR = "false";
+
   private boolean isUsingXNSAccount;
   public ITestGetNameSpaceEnabled() throws Exception {
     isUsingXNSAccount = getConfiguration().getBoolean(FS_AZURE_TEST_NAMESPACE_ENABLED_ACCOUNT, false);
@@ -57,7 +76,57 @@ public class ITestGetNameSpaceEnabled extends AbstractAbfsIntegrationTest {
     Assume.assumeFalse("Skip this test because the account being used for test is a XNS account",
             isUsingXNSAccount);
     assertFalse("Expecting getIsNamespaceEnabled() return false",
-            getFileSystem().getIsNamespaceEnabled());
+        getFileSystem().getIsNamespaceEnabled());
+  }
+
+  @Test
+  public void testGetIsNamespaceEnabledWhenConfigIsTrue() throws Exception {
+    AzureBlobFileSystem fs = getNewFSWithHnsConf(TRUE_STR);
+    Assertions.assertThat(fs.getIsNamespaceEnabled()).describedAs(
+        "getIsNamespaceEnabled should return true when the "
+            + "config is set as true").isTrue();
+    fs.getAbfsStore().deleteFilesystem();
+    unsetAndAssert();
+  }
+
+  @Test
+  public void testGetIsNamespaceEnabledWhenConfigIsFalse() throws Exception {
+    AzureBlobFileSystem fs = getNewFSWithHnsConf(FALSE_STR);
+    Assertions.assertThat(fs.getIsNamespaceEnabled()).describedAs(
+        "getIsNamespaceEnabled should return false when the "
+            + "config is set as false").isFalse();
+    fs.getAbfsStore().deleteFilesystem();
+    unsetAndAssert();
+  }
+
+  private void unsetAndAssert() throws Exception {
+    AzureBlobFileSystem fs = getNewFSWithHnsConf(
+        DEFAULT_FS_AZURE_ACCOUNT_IS_HNS_ENABLED);
+    boolean expectedValue = this.getConfiguration()
+        .getBoolean(FS_AZURE_TEST_NAMESPACE_ENABLED_ACCOUNT, false);
+    Assertions.assertThat(fs.getIsNamespaceEnabled()).describedAs(
+        "getIsNamespaceEnabled should return the value "
+            + "configured for fs.azure.test.namespace.enabled")
+        .isEqualTo(expectedValue);
+    fs.getAbfsStore().deleteFilesystem();
+  }
+
+  private AzureBlobFileSystem getNewFSWithHnsConf(
+      String isNamespaceEnabledAccount) throws Exception {
+    Configuration rawConfig = new Configuration();
+    rawConfig.addResource(TEST_CONFIGURATION_FILE_NAME);
+    rawConfig.set(FS_AZURE_ACCOUNT_IS_HNS_ENABLED, isNamespaceEnabledAccount);
+    rawConfig
+        .setBoolean(AZURE_CREATE_REMOTE_FILESYSTEM_DURING_INITIALIZATION, true);
+    rawConfig.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY,
+        getNonExistingUrl());
+    return (AzureBlobFileSystem) FileSystem.get(rawConfig);
+  }
+
+  private String getNonExistingUrl() {
+    String testUri = this.getTestUrl();
+    return getAbfsScheme() + "://" + UUID.randomUUID() + testUri
+        .substring(testUri.indexOf("@"));
   }
 
   @Test
@@ -95,4 +164,72 @@ public class ITestGetNameSpaceEnabled extends AbstractAbfsIntegrationTest {
               fs.getIsNamespaceEnabled();
             });
   }
-}
\ No newline at end of file
+
+  @Test
+  public void testEnsureGetAclCallIsMadeOnceWhenConfigIsInvalid()
+      throws Exception {
+    unsetConfAndEnsureGetAclCallIsMadeOnce();
+    ensureGetAclCallIsMadeOnceForInvalidConf(" ");
+    unsetConfAndEnsureGetAclCallIsMadeOnce();
+    ensureGetAclCallIsMadeOnceForInvalidConf("Invalid conf");
+    unsetConfAndEnsureGetAclCallIsMadeOnce();
+  }
+
+  @Test
+  public void testEnsureGetAclCallIsNeverMadeWhenConfigIsValid()
+      throws Exception {
+    unsetConfAndEnsureGetAclCallIsMadeOnce();
+    ensureGetAclCallIsNeverMadeForValidConf(FALSE_STR.toLowerCase());
+    unsetConfAndEnsureGetAclCallIsMadeOnce();
+    ensureGetAclCallIsNeverMadeForValidConf(FALSE_STR.toUpperCase());
+    unsetConfAndEnsureGetAclCallIsMadeOnce();
+    ensureGetAclCallIsNeverMadeForValidConf(TRUE_STR.toLowerCase());
+    unsetConfAndEnsureGetAclCallIsMadeOnce();
+    ensureGetAclCallIsNeverMadeForValidConf(TRUE_STR.toUpperCase());
+    unsetConfAndEnsureGetAclCallIsMadeOnce();
+  }
+
+  @Test
+  public void testEnsureGetAclCallIsMadeOnceWhenConfigIsNotPresent()
+      throws IOException {
+    unsetConfAndEnsureGetAclCallIsMadeOnce();
+  }
+
+  private void ensureGetAclCallIsMadeOnceForInvalidConf(String invalidConf)
+      throws Exception {
+    this.getFileSystem().getAbfsStore()
+        .setNamespaceEnabled(Trilean.getTrilean(invalidConf));
+    AbfsClient mockClient =
+        callAbfsGetIsNamespaceEnabledAndReturnMockAbfsClient();
+    verify(mockClient, times(1)).getAclStatus(anyString());
+  }
+
+  private void ensureGetAclCallIsNeverMadeForValidConf(String validConf)
+      throws Exception {
+    this.getFileSystem().getAbfsStore()
+        .setNamespaceEnabled(Trilean.getTrilean(validConf));
+    AbfsClient mockClient =
+        callAbfsGetIsNamespaceEnabledAndReturnMockAbfsClient();
+    verify(mockClient, never()).getAclStatus(anyString());
+  }
+
+  private void unsetConfAndEnsureGetAclCallIsMadeOnce() throws IOException {
+    this.getFileSystem().getAbfsStore().setNamespaceEnabled(Trilean.UNKNOWN);
+    AbfsClient mockClient =
+        callAbfsGetIsNamespaceEnabledAndReturnMockAbfsClient();
+    verify(mockClient, times(1)).getAclStatus(anyString());
+  }
+
+  private AbfsClient callAbfsGetIsNamespaceEnabledAndReturnMockAbfsClient()
+      throws IOException {
+    final AzureBlobFileSystem abfs = this.getFileSystem();
+    final AzureBlobFileSystemStore abfsStore = abfs.getAbfsStore();
+    final AbfsClient mockClient = mock(AbfsClient.class);
+    doReturn(mock(AbfsRestOperation.class)).when(mockClient)
+        .getAclStatus(anyString());
+    abfsStore.setClient(mockClient);
+    abfs.getIsNamespaceEnabled();
+    return mockClient;
+  }
+
+}
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TrileanTests.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TrileanTests.java
new file mode 100644
index 0000000..45467d4
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TrileanTests.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs;
+
+import org.junit.Test;
+
+import org.apache.hadoop.fs.azurebfs.contracts.exceptions.TrileanConversionException;
+import org.apache.hadoop.fs.azurebfs.enums.Trilean;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.catchThrowable;
+
+/**
+ * Tests for the enum Trilean.
+ */
+public class TrileanTests {
+
+  private static final String TRUE_STR = "true";
+  private static final String FALSE_STR = "false";
+
+  @Test
+  public void testGetTrileanForBoolean() {
+    assertThat(Trilean.getTrilean(true)).describedAs(
+        "getTrilean should return Trilean.TRUE when true is passed")
+        .isEqualTo(Trilean.TRUE);
+    assertThat(Trilean.getTrilean(false)).describedAs(
+        "getTrilean should return Trilean.FALSE when false is passed")
+        .isEqualTo(Trilean.FALSE);
+  }
+
+  @Test
+  public void testGetTrileanForString() {
+    assertThat(Trilean.getTrilean(TRUE_STR.toLowerCase())).describedAs(
+        "getTrilean should return Trilean.TRUE when true is passed")
+        .isEqualTo(Trilean.TRUE);
+    assertThat(Trilean.getTrilean(TRUE_STR.toUpperCase())).describedAs(
+        "getTrilean should return Trilean.TRUE when TRUE is passed")
+        .isEqualTo(Trilean.TRUE);
+
+    assertThat(Trilean.getTrilean(FALSE_STR.toLowerCase())).describedAs(
+        "getTrilean should return Trilean.FALSE when false is passed")
+        .isEqualTo(Trilean.FALSE);
+    assertThat(Trilean.getTrilean(FALSE_STR.toUpperCase())).describedAs(
+        "getTrilean should return Trilean.FALSE when FALSE is passed")
+        .isEqualTo(Trilean.FALSE);
+
+    testInvalidString(null);
+    testInvalidString(" ");
+    testInvalidString("invalid");
+    testInvalidString("truee");
+    testInvalidString("falsee");
+  }
+
+  private void testInvalidString(String invalidString) {
+    assertThat(Trilean.getTrilean(invalidString)).describedAs(
+        "getTrilean should return Trilean.UNKNOWN for anything not true/false")
+        .isEqualTo(Trilean.UNKNOWN);
+  }
+
+  @Test
+  public void testToBoolean() throws TrileanConversionException {
+    assertThat(Trilean.TRUE.toBoolean())
+        .describedAs("toBoolean should return true for Trilean.TRUE").isTrue();
+    assertThat(Trilean.FALSE.toBoolean())
+        .describedAs("toBoolean should return false for Trilean.FALSE")
+        .isFalse();
+
+    assertThat(catchThrowable(() -> Trilean.UNKNOWN.toBoolean())).describedAs(
+        "toBoolean on Trilean.UNKNOWN results in TrileanConversionException")
+        .isInstanceOf(TrileanConversionException.class).describedAs(
+        "Exception message should be: catchThrowable(()->Trilean.UNKNOWN"
+            + ".toBoolean())")
+        .hasMessage("Cannot convert Trilean.UNKNOWN to boolean");
+  }
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[hadoop] 06/06: HADOOP-17076: ABFS: Delegation SAS Generator Updates Contributed by Thomas Marquardt.

Posted by tm...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tmarquardt pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/hadoop.git

commit 63d236c019909d321f7824bb0e043c7bddd60bf0
Author: Thomas Marquardt <tm...@microsoft.com>
AuthorDate: Wed Jun 17 23:12:22 2020 +0000

    HADOOP-17076: ABFS: Delegation SAS Generator Updates
    Contributed by Thomas Marquardt.
    
    DETAILS:
    1) The authentication version in the service has been updated from Dec19 to Feb20, so need to update the client.
    2) Add support and test cases for getXattr and setXAttr.
    3) Update DelegationSASGenerator and related to use Duration instead of int for time periods.
    4) Cleanup DelegationSASGenerator switch/case statement that maps operations to permissions.
    5) Cleanup SASGenerator classes to use String.equals instead of ==.
    
    TESTS:
    Added tests for getXAttr and setXAttr.
    
    All tests are passing against my account in eastus2euap:
    
     $mvn -T 1C -Dparallel-tests=abfs -Dscale -DtestsThreadCount=8 clean verify
     Tests run: 76, Failures: 0, Errors: 0, Skipped: 0
     Tests run: 441, Failures: 0, Errors: 0, Skipped: 33
     Tests run: 206, Failures: 0, Errors: 0, Skipped: 24
---
 .../fs/azurebfs/extensions/SASTokenProvider.java   |  2 +-
 .../ITestAzureBlobFileSystemDelegationSAS.java     | 16 +++++++++++
 .../extensions/MockDelegationSASTokenProvider.java |  4 +--
 .../fs/azurebfs/utils/DelegationSASGenerator.java  | 32 ++++++++++------------
 .../hadoop/fs/azurebfs/utils/SASGenerator.java     |  8 ++++--
 .../fs/azurebfs/utils/ServiceSASGenerator.java     |  6 ++--
 6 files changed, 42 insertions(+), 26 deletions(-)

diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/extensions/SASTokenProvider.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/extensions/SASTokenProvider.java
index 2cd44f1..a2cd292 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/extensions/SASTokenProvider.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/extensions/SASTokenProvider.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.security.AccessControlException;
 public interface SASTokenProvider {
 
   String CHECK_ACCESS_OPERATION = "check-access";
+  String CREATE_DIRECTORY_OPERATION = "create-directory";
   String CREATE_FILE_OPERATION = "create-file";
   String DELETE_OPERATION = "delete";
   String DELETE_RECURSIVE_OPERATION = "delete-recursive";
@@ -40,7 +41,6 @@ public interface SASTokenProvider {
   String GET_STATUS_OPERATION = "get-status";
   String GET_PROPERTIES_OPERATION = "get-properties";
   String LIST_OPERATION = "list";
-  String CREATE_DIRECTORY_OPERATION = "create-directory";
   String READ_OPERATION = "read";
   String RENAME_SOURCE_OPERATION = "rename-source";
   String RENAME_DESTINATION_OPERATION = "rename-destination";
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemDelegationSAS.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemDelegationSAS.java
index 07b5804..c2c691e 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemDelegationSAS.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemDelegationSAS.java
@@ -25,6 +25,7 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.UUID;
 
+import org.junit.Assert;
 import org.junit.Assume;
 import org.junit.Test;
 import org.slf4j.Logger;
@@ -365,4 +366,19 @@ public class ITestAzureBlobFileSystemDelegationSAS extends AbstractAbfsIntegrati
     }
     assertEquals(0, count);
   }
+
+  @Test
+  // Test filesystem operations getXAttr and setXAttr
+  public void testProperties() throws Exception {
+    final AzureBlobFileSystem fs = getFileSystem();
+    Path reqPath = new Path(UUID.randomUUID().toString());
+
+    fs.create(reqPath).close();
+
+    final String propertyName = "user.mime_type";
+    final byte[] propertyValue = "text/plain".getBytes("utf-8");
+    fs.setXAttr(reqPath, propertyName, propertyValue);
+
+    assertArrayEquals(propertyValue, fs.getXAttr(reqPath, propertyName));
+  }
 }
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/extensions/MockDelegationSASTokenProvider.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/extensions/MockDelegationSASTokenProvider.java
index fa50bef..121256c 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/extensions/MockDelegationSASTokenProvider.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/extensions/MockDelegationSASTokenProvider.java
@@ -55,8 +55,8 @@ public class MockDelegationSASTokenProvider implements SASTokenProvider {
     String appSecret = configuration.get(TestConfigurationKeys.FS_AZURE_TEST_APP_SECRET);
     String sktid = configuration.get(TestConfigurationKeys.FS_AZURE_TEST_APP_SERVICE_PRINCIPAL_TENANT_ID);
     String skoid = configuration.get(TestConfigurationKeys.FS_AZURE_TEST_APP_SERVICE_PRINCIPAL_OBJECT_ID);
-    String skt = SASGenerator.ISO_8601_FORMATTER.format(Instant.now().minusSeconds(SASGenerator.FIVE_MINUTES));
-    String ske = SASGenerator.ISO_8601_FORMATTER.format(Instant.now().plusSeconds(SASGenerator.ONE_DAY));
+    String skt = SASGenerator.ISO_8601_FORMATTER.format(Instant.now().minus(SASGenerator.FIVE_MINUTES));
+    String ske = SASGenerator.ISO_8601_FORMATTER.format(Instant.now().plus(SASGenerator.ONE_DAY));
     String skv = SASGenerator.AuthenticationVersion.Dec19.toString();
 
     byte[] key = getUserDelegationKey(accountName, appID, appSecret, sktid, skt, ske, skv);
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/utils/DelegationSASGenerator.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/utils/DelegationSASGenerator.java
index f84ed6a..6f2209a 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/utils/DelegationSASGenerator.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/utils/DelegationSASGenerator.java
@@ -29,12 +29,12 @@ import org.apache.hadoop.fs.azurebfs.services.AbfsUriQueryBuilder;
  * Test Delegation SAS generator.
  */
 public class DelegationSASGenerator extends SASGenerator {
-  private String skoid;
-  private String sktid;
-  private String skt;
-  private String ske;
+  private final String skoid;
+  private final String sktid;
+  private final String skt;
+  private final String ske;
   private final String sks = "b";
-  private String skv;
+  private final String skv;
 
   public DelegationSASGenerator(byte[] userDelegationKey, String skoid, String sktid, String skt, String ske, String skv) {
     super(userDelegationKey);
@@ -48,20 +48,18 @@ public class DelegationSASGenerator extends SASGenerator {
   public String getDelegationSAS(String accountName, String containerName, String path, String operation,
                                  String saoid, String suoid, String scid) {
 
-    final String sv = AuthenticationVersion.Dec19.toString();
-    final String st = ISO_8601_FORMATTER.format(Instant.now().minusSeconds(FIVE_MINUTES));
-    final String se = ISO_8601_FORMATTER.format(Instant.now().plusSeconds(ONE_DAY));
+    final String sv = AuthenticationVersion.Feb20.toString();
+    final String st = ISO_8601_FORMATTER.format(Instant.now().minus(FIVE_MINUTES));
+    final String se = ISO_8601_FORMATTER.format(Instant.now().plus(ONE_DAY));
     String sr = "b";
     String sdd = null;
-    String sp = null;
+    String sp;
 
     switch (operation) {
-      case SASTokenProvider.CHECK_ACCESS_OPERATION:
-        sp = "e";
-        break;
-      case SASTokenProvider.WRITE_OPERATION:
       case SASTokenProvider.CREATE_FILE_OPERATION:
       case SASTokenProvider.CREATE_DIRECTORY_OPERATION:
+      case SASTokenProvider.WRITE_OPERATION:
+      case SASTokenProvider.SET_PROPERTIES_OPERATION:
         sp = "w";
         break;
       case SASTokenProvider.DELETE_OPERATION:
@@ -72,6 +70,7 @@ public class DelegationSASGenerator extends SASGenerator {
         sr = "d";
         sdd = Integer.toString(StringUtils.countMatches(path, "/"));
         break;
+      case SASTokenProvider.CHECK_ACCESS_OPERATION:
       case SASTokenProvider.GET_ACL_OPERATION:
       case SASTokenProvider.GET_STATUS_OPERATION:
         sp = "e";
@@ -79,6 +78,7 @@ public class DelegationSASGenerator extends SASGenerator {
       case SASTokenProvider.LIST_OPERATION:
         sp = "l";
         break;
+      case SASTokenProvider.GET_PROPERTIES_OPERATION:
       case SASTokenProvider.READ_OPERATION:
         sp = "r";
         break;
@@ -87,14 +87,12 @@ public class DelegationSASGenerator extends SASGenerator {
         sp = "m";
         break;
       case SASTokenProvider.SET_ACL_OPERATION:
+      case SASTokenProvider.SET_PERMISSION_OPERATION:
         sp = "p";
         break;
       case SASTokenProvider.SET_OWNER_OPERATION:
         sp = "o";
         break;
-      case SASTokenProvider.SET_PERMISSION_OPERATION:
-        sp = "p";
-        break;
       default:
         throw new IllegalArgumentException(operation);
     }
@@ -146,7 +144,7 @@ public class DelegationSASGenerator extends SASGenerator {
     sb.append(accountName);
     sb.append("/");
     sb.append(containerName);
-    if (path != null && sr != "c") {
+    if (path != null && !sr.equals("c")) {
       sb.append(path);
     }
     sb.append("\n");
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/utils/SASGenerator.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/utils/SASGenerator.java
index 34d504a..2e9289d 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/utils/SASGenerator.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/utils/SASGenerator.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.fs.azurebfs.utils;
 
 import java.io.UnsupportedEncodingException;
 import java.nio.charset.StandardCharsets;
+import java.time.Duration;
 import java.time.format.DateTimeFormatter;
 import java.time.ZoneId;
 import java.util.Locale;
@@ -35,7 +36,8 @@ public abstract class SASGenerator {
 
   public enum AuthenticationVersion {
     Nov18("2018-11-09"),
-    Dec19("2019-12-12");
+    Dec19("2019-12-12"),
+    Feb20("2020-02-10");
 
     private final String ver;
 
@@ -50,8 +52,8 @@ public abstract class SASGenerator {
   }
 
   protected static final Logger LOG = LoggerFactory.getLogger(SASGenerator.class);
-  public static final int FIVE_MINUTES = 5 * 60;
-  public static final int ONE_DAY = 24 * 60 * 60;
+  public static final Duration FIVE_MINUTES = Duration.ofMinutes(5);
+  public static final Duration ONE_DAY = Duration.ofDays(1);
   public static final DateTimeFormatter ISO_8601_FORMATTER =
       DateTimeFormatter
           .ofPattern("yyyy-MM-dd'T'HH:mm:ss'Z'", Locale.ROOT)
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/utils/ServiceSASGenerator.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/utils/ServiceSASGenerator.java
index a76681b..3d8496e 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/utils/ServiceSASGenerator.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/utils/ServiceSASGenerator.java
@@ -40,8 +40,8 @@ public class ServiceSASGenerator extends SASGenerator {
     String sp = "rcwdl";
     String sv = AuthenticationVersion.Nov18.toString();
     String sr = "c";
-    String st = ISO_8601_FORMATTER.format(Instant.now().minusSeconds(FIVE_MINUTES));
-    String se = ISO_8601_FORMATTER.format(Instant.now().plusSeconds(ONE_DAY));
+    String st = ISO_8601_FORMATTER.format(Instant.now().minus(FIVE_MINUTES));
+    String se = ISO_8601_FORMATTER.format(Instant.now().plus(ONE_DAY));
 
     String signature = computeSignatureForSAS(sp, st, se, sv, "c",
         accountName, containerName, null);
@@ -71,7 +71,7 @@ public class ServiceSASGenerator extends SASGenerator {
     sb.append(accountName);
     sb.append("/");
     sb.append(containerName);
-    if (path != null && sr != "c") {
+    if (path != null && !sr.equals("c")) {
       //sb.append("/");
       sb.append(path);
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org