You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by st...@apache.org on 2019/03/05 10:48:09 UTC

[hadoop] branch branch-3.2 updated (ae832cc -> dc38fc5)

This is an automated email from the ASF dual-hosted git repository.

stevel pushed a change to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/hadoop.git.


    from ae832cc  HADOOP-16041. Include Hadoop version in User-Agent string for ABFS.
     new 075f6b0  HADOOP-15954. ABFS: Enable owner and group conversion for MSI and login user using OAuth.
     new dc38fc5  HADOOP-16136. ABFS: Should only transform username to short name

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../hadoop/fs/azurebfs/AbfsConfiguration.java      |  10 +
 .../hadoop/fs/azurebfs/AzureBlobFileSystem.java    |  37 +--
 .../fs/azurebfs/AzureBlobFileSystemStore.java      | 183 +++++++------
 .../fs/azurebfs/constants/ConfigurationKeys.java   |  23 +-
 .../fs/azurebfs/oauth2/IdentityTransformer.java    | 279 +++++++++++++++++++
 .../src/site/markdown/testing_azure.md             |  55 ++++
 .../fs/azurebfs/ITestAbfsIdentityTransformer.java  | 304 +++++++++++++++++++++
 7 files changed, 781 insertions(+), 110 deletions(-)
 create mode 100644 hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/IdentityTransformer.java
 create mode 100644 hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsIdentityTransformer.java


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[hadoop] 02/02: HADOOP-16136. ABFS: Should only transform username to short name

Posted by st...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

stevel pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/hadoop.git

commit dc38fc598d2cb96b89be62026f5e675e170e18d3
Author: Da Zhou <da...@microsoft.com>
AuthorDate: Tue Mar 5 10:47:58 2019 +0000

    HADOOP-16136. ABFS: Should only transform username to short name
    
    Contributed by Da Zhou.
    
    (cherry picked from commit 3988e75ca385aec31ca1fc49d6cffce1ea935825)
    Signed-off-by: Steve Loughran <st...@apache.org>
---
 .../fs/azurebfs/AzureBlobFileSystemStore.java      |  8 +++--
 .../fs/azurebfs/oauth2/IdentityTransformer.java    | 39 +++++++++++-----------
 .../fs/azurebfs/ITestAbfsIdentityTransformer.java  | 25 ++++++++------
 3 files changed, 40 insertions(+), 32 deletions(-)

diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
index c2739e9..dbf78ec 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
@@ -491,10 +491,12 @@ public class AzureBlobFileSystemStore {
 
     final String transformedOwner = identityTransformer.transformIdentityForGetRequest(
               result.getResponseHeader(HttpHeaderConfigurations.X_MS_OWNER),
+              true,
               userName);
 
     final String transformedGroup = identityTransformer.transformIdentityForGetRequest(
               result.getResponseHeader(HttpHeaderConfigurations.X_MS_GROUP),
+              false,
               primaryUserGroup);
 
     return new VersionedFileStatus(
@@ -536,8 +538,8 @@ public class AzureBlobFileSystemStore {
       long blockSize = abfsConfiguration.getAzureBlockSize();
 
       for (ListResultEntrySchema entry : retrievedSchema.paths()) {
-        final String owner = identityTransformer.transformIdentityForGetRequest(entry.owner(), userName);
-        final String group = identityTransformer.transformIdentityForGetRequest(entry.group(), primaryUserGroup);
+        final String owner = identityTransformer.transformIdentityForGetRequest(entry.owner(), true, userName);
+        final String group = identityTransformer.transformIdentityForGetRequest(entry.group(), false, primaryUserGroup);
         final FsPermission fsPermission = entry.permissions() == null
                 ? new AbfsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL)
                 : AbfsPermission.valueOf(entry.permissions());
@@ -758,9 +760,11 @@ public class AzureBlobFileSystemStore {
 
     final String transformedOwner = identityTransformer.transformIdentityForGetRequest(
             result.getResponseHeader(HttpHeaderConfigurations.X_MS_OWNER),
+            true,
             userName);
     final String transformedGroup = identityTransformer.transformIdentityForGetRequest(
             result.getResponseHeader(HttpHeaderConfigurations.X_MS_GROUP),
+            false,
             primaryUserGroup);
 
     final String permissions = result.getResponseHeader(HttpHeaderConfigurations.X_MS_PERMISSIONS);
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/IdentityTransformer.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/IdentityTransformer.java
index 62b54d1..343b233 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/IdentityTransformer.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/IdentityTransformer.java
@@ -80,53 +80,54 @@ public class IdentityTransformer {
   /**
    * Perform identity transformation for the Get request results in AzureBlobFileSystemStore:
    * getFileStatus(), listStatus(), getAclStatus().
-   * Input originalUserOrGroup can be one of the following:
+   * Input originalIdentity can be one of the following:
    * 1. $superuser:
    *     by default it will be transformed to local user/group, this can be disabled by setting
    *     "fs.azure.identity.transformer.skip.superuser.replacement" to true.
    *
    * 2. User principal id:
-   *     can be transformed to localUserOrGroup, if this principal id matches the principal id set in
-   *     "fs.azure.identity.transformer.service.principal.id" and localUserOrGroup is stated in
+   *     can be transformed to localIdentity, if this principal id matches the principal id set in
+   *     "fs.azure.identity.transformer.service.principal.id" and localIdentity is stated in
    *     "fs.azure.identity.transformer.service.principal.substitution.list"
    *
    * 3. User principal name (UPN):
-   *     can be transformed to a short name(localUserOrGroup) if "fs.azure.identity.transformer.enable.short.name"
-   *     is enabled.
+   *     can be transformed to a short name(localIdentity) if originalIdentity is owner name, and
+   *     "fs.azure.identity.transformer.enable.short.name" is enabled.
    *
-   * @param originalUserOrGroup the original user or group in the get request results: FileStatus, AclStatus.
-   * @param localUserOrGroup the local user or group, should be parsed from UserGroupInformation.
+   * @param originalIdentity the original user or group in the get request results: FileStatus, AclStatus.
+   * @param isUserName indicate whether the input originalIdentity is an owner name or owning group name.
+   * @param localIdentity the local user or group, should be parsed from UserGroupInformation.
    * @return owner or group after transformation.
    * */
-  public String transformIdentityForGetRequest(String originalUserOrGroup, String localUserOrGroup) {
-    if (originalUserOrGroup == null) {
-      originalUserOrGroup = localUserOrGroup;
-      // localUserOrGroup might be a full name, so continue the transformation.
+  public String transformIdentityForGetRequest(String originalIdentity, boolean isUserName, String localIdentity) {
+    if (originalIdentity == null) {
+      originalIdentity = localIdentity;
+      // localIdentity might be a full name, so continue the transformation.
     }
     // case 1: it is $superuser and replace $superuser config is enabled
-    if (!skipSuperUserReplacement && SUPER_USER.equals(originalUserOrGroup)) {
-      return localUserOrGroup;
+    if (!skipSuperUserReplacement && SUPER_USER.equals(originalIdentity)) {
+      return localIdentity;
     }
 
     if (skipUserIdentityReplacement) {
-      return originalUserOrGroup;
+      return originalIdentity;
     }
 
     // case 2: original owner is principalId set in config, and localUser
     //         is a daemon service specified in substitution list,
     //         To avoid ownership check failure in job task, replace it
     //         to local daemon user/group
-    if (originalUserOrGroup.equals(servicePrincipalId) && isInSubstitutionList(localUserOrGroup)) {
-      return localUserOrGroup;
+    if (originalIdentity.equals(servicePrincipalId) && isInSubstitutionList(localIdentity)) {
+      return localIdentity;
     }
 
     // case 3: If original owner is a fully qualified name, and
     //         short name is enabled, replace with shortName.
-    if (shouldUseShortUserName(originalUserOrGroup)) {
-      return getShortName(originalUserOrGroup);
+    if (isUserName && shouldUseShortUserName(originalIdentity)) {
+      return getShortName(originalIdentity);
     }
 
-    return originalUserOrGroup;
+    return originalIdentity;
   }
 
   /**
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsIdentityTransformer.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsIdentityTransformer.java
index 41f680e..424361b 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsIdentityTransformer.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsIdentityTransformer.java
@@ -153,13 +153,13 @@ public class ITestAbfsIdentityTransformer extends AbstractAbfsScaleTest{
     // with default config, identityTransformer should do $superUser replacement
     IdentityTransformer identityTransformer = getTransformerWithDefaultIdentityConfig(config);
     assertEquals("$superuser should be replaced with local user by default",
-            localUser, identityTransformer.transformIdentityForGetRequest(SUPER_USER, localUser));
+            localUser, identityTransformer.transformIdentityForGetRequest(SUPER_USER, true, localUser));
 
     // Disable $supeuser replacement
     config.setBoolean(FS_AZURE_SKIP_SUPER_USER_REPLACEMENT, true);
     identityTransformer = getTransformerWithCustomizedIdentityConfig(config);
     assertEquals("$superuser should not be replaced",
-            SUPER_USER, identityTransformer.transformIdentityForGetRequest(SUPER_USER, localUser));
+            SUPER_USER, identityTransformer.transformIdentityForGetRequest(SUPER_USER, true, localUser));
   }
 
   @Test
@@ -170,14 +170,14 @@ public class ITestAbfsIdentityTransformer extends AbstractAbfsScaleTest{
     // Default config
     IdentityTransformer identityTransformer = getTransformerWithDefaultIdentityConfig(config);
     assertEquals("By default servicePrincipalId should not be converted for GetFileStatus(), listFileStatus(), getAcl()",
-            SERVICE_PRINCIPAL_ID, identityTransformer.transformIdentityForGetRequest(SERVICE_PRINCIPAL_ID, localUser));
+            SERVICE_PRINCIPAL_ID, identityTransformer.transformIdentityForGetRequest(SERVICE_PRINCIPAL_ID, true, localUser));
 
     resetIdentityConfig(config);
     // 1. substitution list doesn't contain currentUser
     config.set(FS_AZURE_OVERRIDE_OWNER_SP_LIST, "a,b,c,d");
     identityTransformer = getTransformerWithCustomizedIdentityConfig(config);
     assertEquals("servicePrincipalId should not be replaced if local daemon user is not in substitution list",
-            SERVICE_PRINCIPAL_ID, identityTransformer.transformIdentityForGetRequest(SERVICE_PRINCIPAL_ID, localUser));
+            SERVICE_PRINCIPAL_ID, identityTransformer.transformIdentityForGetRequest(SERVICE_PRINCIPAL_ID, true, localUser));
 
     resetIdentityConfig(config);
     // 2. substitution list contains currentUser(daemon name) but the service principal id in config doesn't match
@@ -185,7 +185,7 @@ public class ITestAbfsIdentityTransformer extends AbstractAbfsScaleTest{
     config.set(FS_AZURE_OVERRIDE_OWNER_SP, UUID.randomUUID().toString());
     identityTransformer = getTransformerWithCustomizedIdentityConfig(config);
     assertEquals("servicePrincipalId should not be replaced if it is not equal to the SPN set in config",
-            SERVICE_PRINCIPAL_ID, identityTransformer.transformIdentityForGetRequest(SERVICE_PRINCIPAL_ID, localUser));
+            SERVICE_PRINCIPAL_ID, identityTransformer.transformIdentityForGetRequest(SERVICE_PRINCIPAL_ID, true, localUser));
 
     resetIdentityConfig(config);
     // 3. substitution list contains currentUser(daemon name) and the service principal id in config matches
@@ -193,7 +193,7 @@ public class ITestAbfsIdentityTransformer extends AbstractAbfsScaleTest{
     config.set(FS_AZURE_OVERRIDE_OWNER_SP, SERVICE_PRINCIPAL_ID);
     identityTransformer = getTransformerWithCustomizedIdentityConfig(config);
     assertEquals("servicePrincipalId should be transformed to local use",
-            localUser, identityTransformer.transformIdentityForGetRequest(SERVICE_PRINCIPAL_ID, localUser));
+            localUser, identityTransformer.transformIdentityForGetRequest(SERVICE_PRINCIPAL_ID, true, localUser));
 
     resetIdentityConfig(config);
     // 4. substitution is "*" but the service principal id in config doesn't match the input
@@ -201,7 +201,7 @@ public class ITestAbfsIdentityTransformer extends AbstractAbfsScaleTest{
     config.set(FS_AZURE_OVERRIDE_OWNER_SP, UUID.randomUUID().toString());
     identityTransformer = getTransformerWithCustomizedIdentityConfig(config);
     assertEquals("servicePrincipalId should not be replaced if it is not equal to the SPN set in config",
-            SERVICE_PRINCIPAL_ID, identityTransformer.transformIdentityForGetRequest(SERVICE_PRINCIPAL_ID, localUser));
+            SERVICE_PRINCIPAL_ID, identityTransformer.transformIdentityForGetRequest(SERVICE_PRINCIPAL_ID, true, localUser));
 
     resetIdentityConfig(config);
     // 5. substitution is "*" and the service principal id in config match the input
@@ -209,7 +209,7 @@ public class ITestAbfsIdentityTransformer extends AbstractAbfsScaleTest{
     config.set(FS_AZURE_OVERRIDE_OWNER_SP, SERVICE_PRINCIPAL_ID);
     identityTransformer = getTransformerWithCustomizedIdentityConfig(config);
     assertEquals("servicePrincipalId should be transformed to local user",
-            localUser, identityTransformer.transformIdentityForGetRequest(SERVICE_PRINCIPAL_ID, localUser));
+            localUser, identityTransformer.transformIdentityForGetRequest(SERVICE_PRINCIPAL_ID, true, localUser));
   }
 
   @Test
@@ -220,13 +220,16 @@ public class ITestAbfsIdentityTransformer extends AbstractAbfsScaleTest{
     // Default config
     IdentityTransformer identityTransformer = getTransformerWithDefaultIdentityConfig(config);
     assertEquals("full name should not be transformed if shortname is not enabled",
-            FULLY_QUALIFIED_NAME, identityTransformer.transformIdentityForGetRequest(FULLY_QUALIFIED_NAME, localUser));
+            FULLY_QUALIFIED_NAME, identityTransformer.transformIdentityForGetRequest(FULLY_QUALIFIED_NAME, true, localUser));
 
     // add config to get short name
     config.setBoolean(FS_AZURE_FILE_OWNER_ENABLE_SHORTNAME, true);
     identityTransformer = getTransformerWithCustomizedIdentityConfig(config);
-    assertEquals("should convert the full name to shortname ",
-            SHORT_NAME, identityTransformer.transformIdentityForGetRequest(FULLY_QUALIFIED_NAME, localUser));
+    assertEquals("should convert the full owner name to shortname ",
+            SHORT_NAME, identityTransformer.transformIdentityForGetRequest(FULLY_QUALIFIED_NAME, true, localUser));
+
+    assertEquals("group name should not be converted to shortname ",
+            FULLY_QUALIFIED_NAME, identityTransformer.transformIdentityForGetRequest(FULLY_QUALIFIED_NAME, false, localGroup));
   }
 
   @Test


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[hadoop] 01/02: HADOOP-15954. ABFS: Enable owner and group conversion for MSI and login user using OAuth.

Posted by st...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

stevel pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/hadoop.git

commit 075f6b061cae9e1686814475ca342cb48424157c
Author: Da Zhou <da...@microsoft.com>
AuthorDate: Tue Mar 5 10:44:46 2019 +0000

    HADOOP-15954. ABFS: Enable owner and group conversion for MSI and login user using OAuth.
    
    Contributed by Da Zhou and Junhua Gu.
    
    (cherry picked from commit 1f1655028eede24197705a594b6ef19e6737db35)
    Signed-off-by: Steve Loughran <st...@apache.org>
---
 .../hadoop/fs/azurebfs/AbfsConfiguration.java      |  10 +
 .../hadoop/fs/azurebfs/AzureBlobFileSystem.java    |  37 +--
 .../fs/azurebfs/AzureBlobFileSystemStore.java      | 179 ++++++------
 .../fs/azurebfs/constants/ConfigurationKeys.java   |  23 +-
 .../fs/azurebfs/oauth2/IdentityTransformer.java    | 278 +++++++++++++++++++
 .../src/site/markdown/testing_azure.md             |  55 ++++
 .../fs/azurebfs/ITestAbfsIdentityTransformer.java  | 301 +++++++++++++++++++++
 7 files changed, 773 insertions(+), 110 deletions(-)

diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
index b9bc7f2..67055c5 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
@@ -219,6 +219,16 @@ public class AbfsConfiguration{
 
   /**
    * Returns the account-specific value if it exists, then looks for an
+   * account-agnostic value.
+   * @param key Account-agnostic configuration key
+   * @return value if one exists, else the default value
+   */
+  public String getString(String key, String defaultValue) {
+    return rawConfig.get(accountConf(key), rawConfig.get(key, defaultValue));
+  }
+
+  /**
+   * Returns the account-specific value if it exists, then looks for an
    * account-agnostic value, and finally tries the default value.
    * @param key Account-agnostic configuration key
    * @param defaultValue Value returned if none is configured
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java
index b4277c2..7805ce2 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java
@@ -83,9 +83,6 @@ public class AzureBlobFileSystem extends FileSystem {
   public static final Logger LOG = LoggerFactory.getLogger(AzureBlobFileSystem.class);
   private URI uri;
   private Path workingDir;
-  private UserGroupInformation userGroupInformation;
-  private String user;
-  private String primaryUserGroup;
   private AzureBlobFileSystemStore abfsStore;
   private boolean isClosed;
 
@@ -103,9 +100,7 @@ public class AzureBlobFileSystem extends FileSystem {
     LOG.debug("Initializing AzureBlobFileSystem for {}", uri);
 
     this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority());
-    this.userGroupInformation = UserGroupInformation.getCurrentUser();
-    this.user = userGroupInformation.getUserName();
-    this.abfsStore = new AzureBlobFileSystemStore(uri, this.isSecureScheme(), configuration, userGroupInformation);
+    this.abfsStore = new AzureBlobFileSystemStore(uri, this.isSecureScheme(), configuration);
     final AbfsConfiguration abfsConfiguration = abfsStore.getAbfsConfiguration();
 
     this.setWorkingDirectory(this.getHomeDirectory());
@@ -120,18 +115,6 @@ public class AzureBlobFileSystem extends FileSystem {
       }
     }
 
-    if (!abfsConfiguration.getSkipUserGroupMetadataDuringInitialization()) {
-      try {
-        this.primaryUserGroup = userGroupInformation.getPrimaryGroupName();
-      } catch (IOException ex) {
-        LOG.error("Failed to get primary group for {}, using user name as primary group name", user);
-        this.primaryUserGroup = this.user;
-      }
-    } else {
-      //Provide a default group name
-      this.primaryUserGroup = this.user;
-    }
-
     if (UserGroupInformation.isSecurityEnabled()) {
       this.delegationTokenEnabled = abfsConfiguration.isDelegationTokenManagerEnabled();
 
@@ -153,8 +136,8 @@ public class AzureBlobFileSystem extends FileSystem {
     final StringBuilder sb = new StringBuilder(
         "AzureBlobFileSystem{");
     sb.append("uri=").append(uri);
-    sb.append(", user='").append(user).append('\'');
-    sb.append(", primaryUserGroup='").append(primaryUserGroup).append('\'');
+    sb.append(", user='").append(abfsStore.getUser()).append('\'');
+    sb.append(", primaryUserGroup='").append(abfsStore.getPrimaryGroup()).append('\'');
     sb.append('}');
     return sb.toString();
   }
@@ -472,7 +455,7 @@ public class AzureBlobFileSystem extends FileSystem {
   public Path getHomeDirectory() {
     return makeQualified(new Path(
             FileSystemConfigurations.USER_HOME_DIRECTORY_PREFIX
-                + "/" + this.userGroupInformation.getShortUserName()));
+                + "/" + abfsStore.getUser()));
   }
 
   /**
@@ -523,12 +506,20 @@ public class AzureBlobFileSystem extends FileSystem {
     super.finalize();
   }
 
+  /**
+   * Get the username of the FS.
+   * @return the short name of the user who instantiated the FS
+   */
   public String getOwnerUser() {
-    return user;
+    return abfsStore.getUser();
   }
 
+  /**
+   * Get the group name of the owner of the FS.
+   * @return primary group name
+   */
   public String getOwnerUserPrimaryGroup() {
-    return primaryUserGroup;
+    return abfsStore.getPrimaryGroup();
   }
 
   private boolean deleteRoot() throws IOException {
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
index 5c28bd4..c2739e9 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
@@ -67,6 +67,7 @@ import org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode;
 import org.apache.hadoop.fs.azurebfs.contracts.services.ListResultEntrySchema;
 import org.apache.hadoop.fs.azurebfs.contracts.services.ListResultSchema;
 import org.apache.hadoop.fs.azurebfs.oauth2.AccessTokenProvider;
+import org.apache.hadoop.fs.azurebfs.oauth2.IdentityTransformer;
 import org.apache.hadoop.fs.azurebfs.services.AbfsAclHelper;
 import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
 import org.apache.hadoop.fs.azurebfs.services.AbfsHttpOperation;
@@ -88,9 +89,7 @@ import org.apache.http.client.utils.URIBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.SUPER_USER;
 import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_ABFS_ENDPOINT;
-
 /**
  * Provides the bridging logic between Hadoop's abstract filesystem and Azure Storage.
  */
@@ -101,7 +100,6 @@ public class AzureBlobFileSystemStore {
 
   private AbfsClient client;
   private URI uri;
-  private final UserGroupInformation userGroupInformation;
   private String userName;
   private String primaryUserGroup;
   private static final String DATE_TIME_PATTERN = "E, dd MMM yyyy HH:mm:ss 'GMT'";
@@ -113,11 +111,12 @@ public class AzureBlobFileSystemStore {
   private boolean isNamespaceEnabledSet;
   private boolean isNamespaceEnabled;
   private final AuthType authType;
+  private final UserGroupInformation userGroupInformation;
+  private final IdentityTransformer identityTransformer;
 
-  public AzureBlobFileSystemStore(URI uri, boolean isSecureScheme, Configuration configuration, UserGroupInformation userGroupInformation)
-          throws AzureBlobFileSystemException, IOException {
+  public AzureBlobFileSystemStore(URI uri, boolean isSecureScheme, Configuration configuration)
+          throws IOException {
     this.uri = uri;
-
     String[] authorityParts = authorityParts(uri);
     final String fileSystemName = authorityParts[0];
     final String accountName = authorityParts[1];
@@ -127,10 +126,8 @@ public class AzureBlobFileSystemStore {
     } catch (IllegalAccessException exception) {
       throw new FileSystemOperationUnhandledException(exception);
     }
-
-    this.userGroupInformation = userGroupInformation;
+    this.userGroupInformation = UserGroupInformation.getCurrentUser();
     this.userName = userGroupInformation.getShortUserName();
-
     if (!abfsConfiguration.getSkipUserGroupMetadataDuringInitialization()) {
       try {
         this.primaryUserGroup = userGroupInformation.getPrimaryGroupName();
@@ -145,12 +142,25 @@ public class AzureBlobFileSystemStore {
 
     this.azureAtomicRenameDirSet = new HashSet<>(Arrays.asList(
         abfsConfiguration.getAzureAtomicRenameDirs().split(AbfsHttpConstants.COMMA)));
-
     this.authType = abfsConfiguration.getAuthType(accountName);
     boolean usingOauth = (authType == AuthType.OAuth);
     boolean useHttps = (usingOauth || abfsConfiguration.isHttpsAlwaysUsed()) ? true : isSecureScheme;
     initializeClient(uri, fileSystemName, accountName, useHttps);
+    this.identityTransformer = new IdentityTransformer(abfsConfiguration.getRawConfiguration());
+  }
 
+  /**
+   * @return local user name.
+   * */
+  public String getUser() {
+    return this.userName;
+  }
+
+  /**
+  * @return primary group that user belongs to.
+  * */
+  public String getPrimaryGroup() {
+    return this.primaryUserGroup;
   }
 
   private String[] authorityParts(URI uri) throws InvalidUriAuthorityException, InvalidUriException {
@@ -452,60 +462,54 @@ public class AzureBlobFileSystemStore {
             path,
             isNamespaceEnabled);
 
+    final AbfsRestOperation op;
     if (path.isRoot()) {
-      final AbfsRestOperation op = isNamespaceEnabled
-          ? client.getAclStatus(AbfsHttpConstants.FORWARD_SLASH + AbfsHttpConstants.ROOT_PATH)
-          : client.getFilesystemProperties();
-
-      final long blockSize = abfsConfiguration.getAzureBlockSize();
-      final String owner = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_OWNER);
-      final String group = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_GROUP);
-      final String permissions = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_PERMISSIONS);
-      final String eTag = op.getResult().getResponseHeader(HttpHeaderConfigurations.ETAG);
-      final String lastModified = op.getResult().getResponseHeader(HttpHeaderConfigurations.LAST_MODIFIED);
-      final boolean hasAcl = AbfsPermission.isExtendedAcl(permissions);
-
-      return new VersionedFileStatus(
-              isSuperUserOrEmpty(owner) ? userName : owner,
-              isSuperUserOrEmpty(group) ? primaryUserGroup : group,
-              permissions == null ? new AbfsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL)
-                      : AbfsPermission.valueOf(permissions),
-              hasAcl,
-              0,
-              true,
-              1,
-              blockSize,
-              parseLastModifiedTime(lastModified),
-              path,
-              eTag);
+      op = isNamespaceEnabled
+              ? client.getAclStatus(AbfsHttpConstants.FORWARD_SLASH + AbfsHttpConstants.ROOT_PATH)
+              : client.getFilesystemProperties();
     } else {
-      AbfsRestOperation op = client.getPathProperties(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path), isNamespaceEnabled);
-
-      final long blockSize = abfsConfiguration.getAzureBlockSize();
-      final AbfsHttpOperation result = op.getResult();
-      final String eTag = result.getResponseHeader(HttpHeaderConfigurations.ETAG);
-      final String lastModified = result.getResponseHeader(HttpHeaderConfigurations.LAST_MODIFIED);
-      final String contentLength = result.getResponseHeader(HttpHeaderConfigurations.CONTENT_LENGTH);
-      final String resourceType = result.getResponseHeader(HttpHeaderConfigurations.X_MS_RESOURCE_TYPE);
-      final String owner = result.getResponseHeader(HttpHeaderConfigurations.X_MS_OWNER);
-      final String group = result.getResponseHeader(HttpHeaderConfigurations.X_MS_GROUP);
-      final String permissions = result.getResponseHeader((HttpHeaderConfigurations.X_MS_PERMISSIONS));
-      final boolean hasAcl = AbfsPermission.isExtendedAcl(permissions);
-
-      return new VersionedFileStatus(
-              isSuperUserOrEmpty(owner) ? userName : owner,
-              isSuperUserOrEmpty(group) ? primaryUserGroup : group,
-              permissions == null ? new AbfsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL)
-                      : AbfsPermission.valueOf(permissions),
-              hasAcl,
-              parseContentLength(contentLength),
-              parseIsDirectory(resourceType),
-              1,
-              blockSize,
-              parseLastModifiedTime(lastModified),
-              path,
-              eTag);
+      op = client.getPathProperties(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path), isNamespaceEnabled);
     }
+
+    final long blockSize = abfsConfiguration.getAzureBlockSize();
+    final AbfsHttpOperation result = op.getResult();
+
+    final String eTag = result.getResponseHeader(HttpHeaderConfigurations.ETAG);
+    final String lastModified = result.getResponseHeader(HttpHeaderConfigurations.LAST_MODIFIED);
+    final String permissions = result.getResponseHeader((HttpHeaderConfigurations.X_MS_PERMISSIONS));
+    final boolean hasAcl = AbfsPermission.isExtendedAcl(permissions);
+    final long contentLength;
+    final boolean resourceIsDir;
+
+    if (path.isRoot()) {
+      contentLength = 0;
+      resourceIsDir = true;
+    } else {
+      contentLength = parseContentLength(result.getResponseHeader(HttpHeaderConfigurations.CONTENT_LENGTH));
+      resourceIsDir = parseIsDirectory(result.getResponseHeader(HttpHeaderConfigurations.X_MS_RESOURCE_TYPE));
+    }
+
+    final String transformedOwner = identityTransformer.transformIdentityForGetRequest(
+              result.getResponseHeader(HttpHeaderConfigurations.X_MS_OWNER),
+              userName);
+
+    final String transformedGroup = identityTransformer.transformIdentityForGetRequest(
+              result.getResponseHeader(HttpHeaderConfigurations.X_MS_GROUP),
+              primaryUserGroup);
+
+    return new VersionedFileStatus(
+            transformedOwner,
+            transformedGroup,
+            permissions == null ? new AbfsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL)
+                    : AbfsPermission.valueOf(permissions),
+            hasAcl,
+            contentLength,
+            resourceIsDir,
+            1,
+            blockSize,
+            parseLastModifiedTime(lastModified),
+            path,
+            eTag);
   }
 
   public FileStatus[] listStatus(final Path path) throws IOException {
@@ -532,8 +536,8 @@ public class AzureBlobFileSystemStore {
       long blockSize = abfsConfiguration.getAzureBlockSize();
 
       for (ListResultEntrySchema entry : retrievedSchema.paths()) {
-        final String owner = isSuperUserOrEmpty(entry.owner()) ? userName : entry.owner();
-        final String group = isSuperUserOrEmpty(entry.group()) ? primaryUserGroup : entry.group();
+        final String owner = identityTransformer.transformIdentityForGetRequest(entry.owner(), userName);
+        final String group = identityTransformer.transformIdentityForGetRequest(entry.group(), primaryUserGroup);
         final FsPermission fsPermission = entry.permissions() == null
                 ? new AbfsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL)
                 : AbfsPermission.valueOf(entry.permissions());
@@ -566,7 +570,7 @@ public class AzureBlobFileSystemStore {
 
     } while (continuation != null && !continuation.isEmpty());
 
-    return fileStatuses.toArray(new FileStatus[0]);
+    return fileStatuses.toArray(new FileStatus[fileStatuses.size()]);
   }
 
   public void setOwner(final Path path, final String owner, final String group) throws
@@ -576,20 +580,17 @@ public class AzureBlobFileSystemStore {
           "This operation is only valid for storage accounts with the hierarchical namespace enabled.");
     }
 
-    String effectiveOwner = owner;
-    String effectiveGroup = group;
-    if (authType == AuthType.SharedKey && owner.equals(userName)) {
-      effectiveOwner = SUPER_USER;
-      effectiveGroup = SUPER_USER;
-    }
-
     LOG.debug(
             "setOwner filesystem: {} path: {} owner: {} group: {}",
             client.getFileSystem(),
             path.toString(),
-            effectiveOwner,
-            effectiveGroup);
-    client.setOwner(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path, true), effectiveOwner, effectiveGroup);
+            owner,
+            group);
+
+    final String transformedOwner = identityTransformer.transformUserOrGroupForSetRequest(owner);
+    final String transformedGroup = identityTransformer.transformUserOrGroupForSetRequest(group);
+
+    client.setOwner(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path, true), transformedOwner, transformedGroup);
   }
 
   public void setPermission(final Path path, final FsPermission permission) throws
@@ -620,7 +621,9 @@ public class AzureBlobFileSystemStore {
             client.getFileSystem(),
             path.toString(),
             AclEntry.aclSpecToString(aclSpec));
-    final Map<String, String> modifyAclEntries = AbfsAclHelper.deserializeAclSpec(AclEntry.aclSpecToString(aclSpec));
+
+    final List<AclEntry> transformedAclEntries = identityTransformer.transformAclEntriesForSetRequest(aclSpec);
+    final Map<String, String> modifyAclEntries = AbfsAclHelper.deserializeAclSpec(AclEntry.aclSpecToString(transformedAclEntries));
     boolean useUpn = AbfsAclHelper.isUpnFormatAclEntries(modifyAclEntries);
 
     final AbfsRestOperation op = client.getAclStatus(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path, true), useUpn);
@@ -645,7 +648,9 @@ public class AzureBlobFileSystemStore {
             client.getFileSystem(),
             path.toString(),
             AclEntry.aclSpecToString(aclSpec));
-    final Map<String, String> removeAclEntries = AbfsAclHelper.deserializeAclSpec(AclEntry.aclSpecToString(aclSpec));
+
+    final List<AclEntry> transformedAclEntries = identityTransformer.transformAclEntriesForSetRequest(aclSpec);
+    final Map<String, String> removeAclEntries = AbfsAclHelper.deserializeAclSpec(AclEntry.aclSpecToString(transformedAclEntries));
     boolean isUpnFormat = AbfsAclHelper.isUpnFormatAclEntries(removeAclEntries);
 
     final AbfsRestOperation op = client.getAclStatus(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path, true), isUpnFormat);
@@ -722,7 +727,9 @@ public class AzureBlobFileSystemStore {
             client.getFileSystem(),
             path.toString(),
             AclEntry.aclSpecToString(aclSpec));
-    final Map<String, String> aclEntries = AbfsAclHelper.deserializeAclSpec(AclEntry.aclSpecToString(aclSpec));
+
+    final List<AclEntry> transformedAclEntries = identityTransformer.transformAclEntriesForSetRequest(aclSpec);
+    final Map<String, String> aclEntries = AbfsAclHelper.deserializeAclSpec(AclEntry.aclSpecToString(transformedAclEntries));
     final boolean isUpnFormat = AbfsAclHelper.isUpnFormatAclEntries(aclEntries);
 
     final AbfsRestOperation op = client.getAclStatus(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path, true), isUpnFormat);
@@ -749,8 +756,13 @@ public class AzureBlobFileSystemStore {
     AbfsRestOperation op = client.getAclStatus(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path, true));
     AbfsHttpOperation result = op.getResult();
 
-    final String owner = result.getResponseHeader(HttpHeaderConfigurations.X_MS_OWNER);
-    final String group = result.getResponseHeader(HttpHeaderConfigurations.X_MS_GROUP);
+    final String transformedOwner = identityTransformer.transformIdentityForGetRequest(
+            result.getResponseHeader(HttpHeaderConfigurations.X_MS_OWNER),
+            userName);
+    final String transformedGroup = identityTransformer.transformIdentityForGetRequest(
+            result.getResponseHeader(HttpHeaderConfigurations.X_MS_GROUP),
+            primaryUserGroup);
+
     final String permissions = result.getResponseHeader(HttpHeaderConfigurations.X_MS_PERMISSIONS);
     final String aclSpecString = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_ACL);
 
@@ -759,8 +771,8 @@ public class AzureBlobFileSystemStore {
             : AbfsPermission.valueOf(permissions);
 
     final AclStatus.Builder aclStatusBuilder = new AclStatus.Builder();
-    aclStatusBuilder.owner(isSuperUserOrEmpty(owner)? userName : owner);
-    aclStatusBuilder.group(isSuperUserOrEmpty(group) ? primaryUserGroup : group);
+    aclStatusBuilder.owner(transformedOwner);
+    aclStatusBuilder.group(transformedGroup);
 
     aclStatusBuilder.setPermission(fsPermission);
     aclStatusBuilder.stickyBit(fsPermission.getStickyBit());
@@ -944,11 +956,6 @@ public class AzureBlobFileSystemStore {
     return false;
   }
 
-  private boolean isSuperUserOrEmpty(final String name) {
-      return name == null || name.equals(SUPER_USER);
-  }
-
-
   private static class VersionedFileStatus extends FileStatus {
     private final String version;
 
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
index d079d94..8cd86bf 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
@@ -55,7 +55,28 @@ public final class ConfigurationKeys {
   public static final String FS_AZURE_USER_AGENT_PREFIX_KEY = "fs.azure.user.agent.prefix";
   public static final String FS_AZURE_SSL_CHANNEL_MODE_KEY = "fs.azure.ssl.channel.mode";
   public static final String FS_AZURE_USE_UPN = "fs.azure.use.upn";
-
+  /** User principal names (UPNs) have the format “{alias}@{domain}”. If true,
+   *  only {alias} is included when a UPN would otherwise appear in the output
+   *  of APIs like getFileStatus, getOwner, getAclStatus, etc. Default is false. **/
+  public static final String FS_AZURE_FILE_OWNER_ENABLE_SHORTNAME = "fs.azure.identity.transformer.enable.short.name";
+  /** If the domain name is specified and “fs.azure.identity.transformer.enable.short.name”
+   *  is true, then the {alias} part of a UPN can be specified as input to APIs like setOwner and
+   *  setAcl and it will be transformed to a UPN by appending @ and the domain specified by
+   *  this configuration property. **/
+  public static final String FS_AZURE_FILE_OWNER_DOMAINNAME = "fs.azure.identity.transformer.domain.name";
+  /** An Azure Active Directory object ID (oid) used as the replacement for names contained in the
+   * list specified by “fs.azure.identity.transformer.service.principal.substitution.list.
+   * Notice that instead of setting oid, you can also set $superuser.**/
+  public static final String FS_AZURE_OVERRIDE_OWNER_SP = "fs.azure.identity.transformer.service.principal.id";
+  /** A comma separated list of names to be replaced with the service principal ID specified by
+   * “fs.default.identity.transformer.service.principal.id”. This substitution occurs
+   * when setOwner, setAcl, modifyAclEntries, or removeAclEntries are invoked with identities
+   * contained in the substitution list. Notice that when in non-secure cluster, asterisk symbol "*"
+   * can be used to match all user/group. **/
+  public static final String FS_AZURE_OVERRIDE_OWNER_SP_LIST = "fs.azure.identity.transformer.service.principal.substitution.list";
+  /** By default this is set as false, so “$superuser” is replaced with the current user when it appears as the owner
+   * or owning group of a file or directory. To disable it, set it as true. **/
+  public static final String FS_AZURE_SKIP_SUPER_USER_REPLACEMENT = "fs.azure.identity.transformer.skip.superuser.replacement";
   public static final String AZURE_KEY_ACCOUNT_KEYPROVIDER = "fs.azure.account.keyprovider";
   public static final String AZURE_KEY_ACCOUNT_SHELLKEYPROVIDER_SCRIPT = "fs.azure.shellkeyprovider.script";
 
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/IdentityTransformer.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/IdentityTransformer.java
new file mode 100644
index 0000000..62b54d1
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/IdentityTransformer.java
@@ -0,0 +1,278 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.azurebfs.oauth2;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Locale;
+
+import com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.permission.AclEntry;
+import org.apache.hadoop.fs.permission.AclEntryType;
+import org.apache.hadoop.security.UserGroupInformation;
+
+import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.AT;
+import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.STAR;
+import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.SUPER_USER;
+import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_SKIP_SUPER_USER_REPLACEMENT;
+import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_FILE_OWNER_DOMAINNAME;
+import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_FILE_OWNER_ENABLE_SHORTNAME;
+import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_OVERRIDE_OWNER_SP;
+import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_OVERRIDE_OWNER_SP_LIST;
+
+/**
+ * Perform transformation for Azure Active Directory identities used in owner, group and acls.
+ */
+public class IdentityTransformer {
+  private static final Logger LOG = LoggerFactory.getLogger(IdentityTransformer.class);
+
+  private boolean isSecure;
+  private String servicePrincipalId;
+  private String serviceWhiteList;
+  private String domainName;
+  private boolean enableShortName;
+  private boolean skipUserIdentityReplacement;
+  private boolean skipSuperUserReplacement;
+  private boolean domainIsSet;
+  private static final String UUID_PATTERN = "^[0-9a-f]{8}-[0-9a-f]{4}-[1-5][0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12}$";
+
+  public IdentityTransformer(Configuration configuration) throws IOException {
+    Preconditions.checkNotNull(configuration, "configuration");
+    this.isSecure = UserGroupInformation.getCurrentUser().isSecurityEnabled();
+    this.servicePrincipalId = configuration.get(FS_AZURE_OVERRIDE_OWNER_SP, "");
+    this.serviceWhiteList = configuration.get(FS_AZURE_OVERRIDE_OWNER_SP_LIST, "");
+    this.domainName = configuration.get(FS_AZURE_FILE_OWNER_DOMAINNAME, "");
+    this.enableShortName = configuration.getBoolean(FS_AZURE_FILE_OWNER_ENABLE_SHORTNAME, false);
+
+    // - "servicePrincipalId" and "serviceWhiteList" are required for
+    //    transformation between localUserOrGroup and principalId,$superuser
+    // - "enableShortName" is required for transformation between shortName and fullyQualifiedName.
+    this.skipUserIdentityReplacement = servicePrincipalId.isEmpty() && serviceWhiteList.isEmpty() && !enableShortName;
+    this.skipSuperUserReplacement = configuration.getBoolean(FS_AZURE_SKIP_SUPER_USER_REPLACEMENT, false);
+
+    if (enableShortName){
+      // need to check the domain setting only when short name is enabled.
+      // if shortName is not enabled, transformer won't transform a shortName to
+      // a fully qualified name.
+      this.domainIsSet = !domainName.isEmpty();
+    }
+  }
+
+  /**
+   * Perform identity transformation for the Get request results in AzureBlobFileSystemStore:
+   * getFileStatus(), listStatus(), getAclStatus().
+   * Input originalUserOrGroup can be one of the following:
+   * 1. $superuser:
+   *     by default it will be transformed to local user/group, this can be disabled by setting
+   *     "fs.azure.identity.transformer.skip.superuser.replacement" to true.
+   *
+   * 2. User principal id:
+   *     can be transformed to localUserOrGroup, if this principal id matches the principal id set in
+   *     "fs.azure.identity.transformer.service.principal.id" and localUserOrGroup is stated in
+   *     "fs.azure.identity.transformer.service.principal.substitution.list"
+   *
+   * 3. User principal name (UPN):
+   *     can be transformed to a short name(localUserOrGroup) if "fs.azure.identity.transformer.enable.short.name"
+   *     is enabled.
+   *
+   * @param originalUserOrGroup the original user or group in the get request results: FileStatus, AclStatus.
+   * @param localUserOrGroup the local user or group, should be parsed from UserGroupInformation.
+   * @return owner or group after transformation.
+   * */
+  public String transformIdentityForGetRequest(String originalUserOrGroup, String localUserOrGroup) {
+    if (originalUserOrGroup == null) {
+      originalUserOrGroup = localUserOrGroup;
+      // localUserOrGroup might be a full name, so continue the transformation.
+    }
+    // case 1: it is $superuser and replace $superuser config is enabled
+    if (!skipSuperUserReplacement && SUPER_USER.equals(originalUserOrGroup)) {
+      return localUserOrGroup;
+    }
+
+    if (skipUserIdentityReplacement) {
+      return originalUserOrGroup;
+    }
+
+    // case 2: original owner is principalId set in config, and localUser
+    //         is a daemon service specified in substitution list,
+    //         To avoid ownership check failure in job task, replace it
+    //         to local daemon user/group
+    if (originalUserOrGroup.equals(servicePrincipalId) && isInSubstitutionList(localUserOrGroup)) {
+      return localUserOrGroup;
+    }
+
+    // case 3: If original owner is a fully qualified name, and
+    //         short name is enabled, replace with shortName.
+    if (shouldUseShortUserName(originalUserOrGroup)) {
+      return getShortName(originalUserOrGroup);
+    }
+
+    return originalUserOrGroup;
+  }
+
+  /**
+   * Perform Identity transformation when setting owner on a path.
+   * There are four possible input:
+   * 1.short name; 2.$superuser; 3.Fully qualified name; 4. principal id.
+   *
+   * short name could be transformed to:
+   *    - A service principal id or $superuser, if short name belongs a daemon service
+   *      stated in substitution list AND "fs.azure.identity.transformer.service.principal.id"
+   *      is set with $superuser or a principal id.
+   *    - Fully qualified name, if "fs.azure.identity.transformer.domain.name" is set in configuration.
+   *
+   * $superuser, fully qualified name and principalId should not be transformed.
+   *
+   * @param userOrGroup the user or group to be set as owner.
+   * @return user or group after transformation.
+   * */
+  public String transformUserOrGroupForSetRequest(String userOrGroup) {
+    if (userOrGroup == null || userOrGroup.isEmpty() || skipUserIdentityReplacement) {
+      return userOrGroup;
+    }
+
+    // case 1: when the owner to be set is stated in substitution list.
+    if (isInSubstitutionList(userOrGroup)) {
+      return servicePrincipalId;
+    }
+
+    // case 2: when the owner is a short name of the user principal name(UPN).
+    if (shouldUseFullyQualifiedUserName(userOrGroup)) {
+      return getFullyQualifiedName(userOrGroup);
+    }
+
+    return userOrGroup;
+  }
+
+  /**
+   * Perform Identity transformation when calling setAcl(),removeAclEntries() and modifyAclEntries()
+   * If the AclEntry type is a user or group, and its name is one of the following:
+   * 1.short name; 2.$superuser; 3.Fully qualified name; 4. principal id.
+   * Short name could be transformed to:
+   *    - A service principal id or $superuser, if short name belongs a daemon service
+   *      stated in substitution list AND "fs.azure.identity.transformer.service.principal.id"
+   *      is set with $superuser or a principal id.
+   *    - A fully qualified name, if the AclEntry type is User AND if "fs.azure.identity.transformer.domain.name"
+   *    is set in configuration. This is to make the behavior consistent with HDI.
+   *
+   * $superuser, fully qualified name and principal id should not be transformed.
+   *
+   * @param aclEntries list of AclEntry
+   * @return list of AclEntry after the identity transformation.
+   * */
+  public List<AclEntry> transformAclEntriesForSetRequest(final List<AclEntry> aclEntries) {
+    if (skipUserIdentityReplacement) {
+      return aclEntries;
+    }
+
+    for (int i = 0; i < aclEntries.size(); i++) {
+      AclEntry aclEntry = aclEntries.get(i);
+      String name = aclEntry.getName();
+      String transformedName = name;
+      if (name == null || name.isEmpty() || aclEntry.getType().equals(AclEntryType.OTHER) || aclEntry.getType().equals(AclEntryType.MASK)) {
+        continue;
+      }
+
+      // case 1: when the user or group name to be set is stated in substitution list.
+      if (isInSubstitutionList(name)) {
+        transformedName = servicePrincipalId;
+      } else if (aclEntry.getType().equals(AclEntryType.USER) // case 2: when the owner is a short name
+              && shouldUseFullyQualifiedUserName(name)) {     //         of the user principal name (UPN).
+        // Notice: for group type ACL entry, if name is shortName.
+        //         It won't be converted to Full Name. This is
+        //         to make the behavior consistent with HDI.
+        transformedName = getFullyQualifiedName(name);
+      }
+
+      // Avoid unnecessary new AclEntry allocation
+      if (transformedName.equals(name)) {
+        continue;
+      }
+
+      AclEntry.Builder aclEntryBuilder = new AclEntry.Builder();
+      aclEntryBuilder.setType(aclEntry.getType());
+      aclEntryBuilder.setName(transformedName);
+      aclEntryBuilder.setScope(aclEntry.getScope());
+      aclEntryBuilder.setPermission(aclEntry.getPermission());
+
+      // Replace the original AclEntry
+      aclEntries.set(i, aclEntryBuilder.build());
+    }
+    return aclEntries;
+  }
+
+  /**
+   * Internal method to identify if owner name returned by the ADLS backend is short name or not.
+   * If name contains "@", this code assumes that whatever comes after '@' is domain name and ignores it.
+   * @param owner
+   * @return
+   */
+  private boolean isShortUserName(String owner) {
+    return (owner != null) && !owner.contains(AT);
+  }
+
+  private boolean shouldUseShortUserName(String owner){
+    return enableShortName && !isShortUserName(owner);
+  }
+
+  private String getShortName(String userName) {
+    if (userName == null)    {
+      return  null;
+    }
+
+    if (isShortUserName(userName)) {
+      return userName;
+    }
+
+    String userNameBeforeAt = userName.substring(0, userName.indexOf(AT));
+    if (isSecure) {
+      //In secure clusters we apply auth to local rules to lowercase all short localUser names (notice /L at the end),
+      //E.G. : RULE:[1:$1@$0](.*@FOO.ONMICROSOFT.COM)s/@.*/// Ideally we should use the HadoopKerberosName class to get
+      // new HadoopKerberosName(arg).getShortName. However,
+      //1. ADLS can report the Realm in lower case while returning file owner names( ie. : Some.User@realm.onmicrosoft.com)
+      //2. The RULE specification does not allow specifying character classes to do case insensitive matches
+      //Due to this, we end up using a forced lowercase version of the manually shortened name
+      return userNameBeforeAt.toLowerCase(Locale.ENGLISH);
+    }
+    return userNameBeforeAt;
+  }
+
+  private String getFullyQualifiedName(String name){
+    if (domainIsSet && (name != null) && !name.contains(AT)){
+      return name + AT + domainName;
+    }
+    return name;
+  }
+
+  private boolean shouldUseFullyQualifiedUserName(String owner){
+    return domainIsSet && !SUPER_USER.equals(owner) && !isUuid(owner) && enableShortName && isShortUserName(owner);
+  }
+
+  private boolean isInSubstitutionList(String localUserName) {
+    return serviceWhiteList.contains(STAR) || serviceWhiteList.contains(localUserName);
+  }
+
+  private boolean isUuid(String input) {
+    if (input == null) return false;
+    return input.matches(UUID_PATTERN);
+  }
+}
diff --git a/hadoop-tools/hadoop-azure/src/site/markdown/testing_azure.md b/hadoop-tools/hadoop-azure/src/site/markdown/testing_azure.md
index c2afe74..40a372d 100644
--- a/hadoop-tools/hadoop-azure/src/site/markdown/testing_azure.md
+++ b/hadoop-tools/hadoop-azure/src/site/markdown/testing_azure.md
@@ -733,6 +733,61 @@ hierarchical namespace enabled, and set the following configuration settings:
      <description>AAD client id.</description>
    </property>
   -->
+
+  <!--
+    <property>
+        <name>fs.azure.identity.transformer.enable.short.name</name>
+        <value>true/false</value>
+        <description>
+          User principal names (UPNs) have the format “{alias}@{domain}”.
+          If true, only {alias} is included when a UPN would otherwise appear in the output
+          of APIs like getFileStatus, getOwner, getAclStatus, etc, default is false.
+        </description>
+    </property>
+
+    <property>
+        <name>fs.azure.identity.transformer.domain.name</name>
+        <value>domain name of the user's upn</value>
+        <description>
+          If the domain name is specified and “fs.azure.identity.transformer.enable.short.name”
+          is true, then the {alias} part of a UPN can be specified as input to APIs like setOwner,
+          setAcl, modifyAclEntries, or removeAclEntries, and it will be transformed to a UPN by appending @ and the domain specified by
+          this configuration property.
+        </description>
+    </property>
+
+    <property>
+        <name>fs.azure.identity.transformer.service.principal.id</name>
+        <value>service principal object id</value>
+        <description>
+          An Azure Active Directory object ID (oid) used as the replacement for names contained
+          in the list specified by “fs.azure.identity.transformer.service.principal.substitution.list”.
+          Notice that instead of setting oid, you can also set $superuser here.
+        </description>
+    </property>
+
+    <property>
+        <name>fs.azure.identity.transformer.skip.superuser.replacement</name>
+        <value>true/false</value>
+        <description>
+          If false, “$superuser” is replaced with the current user when it appears as the owner
+          or owning group of a file or directory. The default is false.
+        </description>
+    </property>
+
+    <property>
+        <name>fs.azure.identity.transformer.service.principal.substitution.list</name>
+        <value>mapred,hdfs,yarn,hive,tez</value>
+        <description>
+           A comma separated list of names to be replaced with the service principal ID specified by
+           “fs.azure.identity.transformer.service.principal.id”.  This substitution occurs
+           when setOwner, setAcl, modifyAclEntries, or removeAclEntries are invoked with identities
+           contained in the substitution list. Notice that when in non-secure cluster, asterisk symbol *
+           can be used to match all user/group.
+        </description>
+    </property>
+   -->
+
 ```
 
 If running tests against an endpoint that uses the URL format
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsIdentityTransformer.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsIdentityTransformer.java
new file mode 100644
index 0000000..41f680e
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsIdentityTransformer.java
@@ -0,0 +1,301 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.UUID;
+
+import com.google.common.collect.Lists;
+import org.apache.hadoop.fs.azurebfs.oauth2.IdentityTransformer;
+import org.apache.hadoop.fs.permission.AclEntry;
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+
+import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.SUPER_USER;
+import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_FILE_OWNER_DOMAINNAME;
+import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_FILE_OWNER_ENABLE_SHORTNAME;
+import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_OVERRIDE_OWNER_SP;
+import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_OVERRIDE_OWNER_SP_LIST;
+import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_SKIP_SUPER_USER_REPLACEMENT;
+import static org.apache.hadoop.fs.azurebfs.utils.AclTestHelpers.aclEntry;
+import static org.apache.hadoop.fs.permission.AclEntryScope.ACCESS;
+import static org.apache.hadoop.fs.permission.AclEntryScope.DEFAULT;
+import static org.apache.hadoop.fs.permission.AclEntryType.GROUP;
+import static org.apache.hadoop.fs.permission.AclEntryType.MASK;
+import static org.apache.hadoop.fs.permission.AclEntryType.OTHER;
+import static org.apache.hadoop.fs.permission.AclEntryType.USER;
+import static org.apache.hadoop.fs.permission.FsAction.ALL;
+
+/**
+ * Test IdentityTransformer.
+ */
+//@RunWith(Parameterized.class)
+public class ITestAbfsIdentityTransformer extends AbstractAbfsScaleTest{
+  private final UserGroupInformation userGroupInfo;
+  private final String localUser;
+  private final String localGroup;
+  private static final String DAEMON = "daemon";
+  private static final String ASTERISK = "*";
+  private static final String SHORT_NAME = "abc";
+  private static final String DOMAIN = "domain.com";
+  private static final String FULLY_QUALIFIED_NAME = "abc@domain.com";
+  private static final String SERVICE_PRINCIPAL_ID = UUID.randomUUID().toString();
+
+  public ITestAbfsIdentityTransformer() throws Exception {
+    super();
+    userGroupInfo = UserGroupInformation.getCurrentUser();
+    localUser = userGroupInfo.getShortUserName();
+    localGroup = userGroupInfo.getPrimaryGroupName();
+  }
+
+  @Test
+  public void testDaemonServiceSettingIdentity() throws IOException {
+    Configuration config = this.getRawConfiguration();
+    resetIdentityConfig(config);
+    // Default config
+    IdentityTransformer identityTransformer = getTransformerWithDefaultIdentityConfig(config);
+    assertEquals("Identity should not change for default config",
+            DAEMON, identityTransformer.transformUserOrGroupForSetRequest(DAEMON));
+
+    // Add service principal id
+    config.set(FS_AZURE_OVERRIDE_OWNER_SP, SERVICE_PRINCIPAL_ID);
+
+    // case 1: substitution list doesn't contain daemon
+    config.set(FS_AZURE_OVERRIDE_OWNER_SP_LIST, "a,b,c,d");
+    identityTransformer = getTransformerWithCustomizedIdentityConfig(config);
+    assertEquals("Identity should not change when substitution list doesn't contain daemon",
+            DAEMON, identityTransformer.transformUserOrGroupForSetRequest(DAEMON));
+
+    // case 2: substitution list contains daemon name
+    config.set(FS_AZURE_OVERRIDE_OWNER_SP_LIST, DAEMON + ",a,b,c,d");
+    identityTransformer = getTransformerWithCustomizedIdentityConfig(config);
+    assertEquals("Identity should be replaced to servicePrincipalId",
+            SERVICE_PRINCIPAL_ID, identityTransformer.transformUserOrGroupForSetRequest(DAEMON));
+
+    // case 3: substitution list is *
+    config.set(FS_AZURE_OVERRIDE_OWNER_SP_LIST, ASTERISK);
+    identityTransformer = getTransformerWithCustomizedIdentityConfig(config);
+    assertEquals("Identity should be replaced to servicePrincipalId",
+            SERVICE_PRINCIPAL_ID, identityTransformer.transformUserOrGroupForSetRequest(DAEMON));
+  }
+
+  @Test
+  public void testFullyQualifiedNameSettingIdentity() throws IOException {
+    Configuration config = this.getRawConfiguration();
+    // Default config
+    IdentityTransformer identityTransformer = getTransformerWithDefaultIdentityConfig(config);
+    assertEquals("short name should not be converted to full name by default",
+            SHORT_NAME, identityTransformer.transformUserOrGroupForSetRequest(SHORT_NAME));
+
+    resetIdentityConfig(config);
+
+    // Add config to get fully qualified username
+    config.setBoolean(FS_AZURE_FILE_OWNER_ENABLE_SHORTNAME, true);
+    config.set(FS_AZURE_FILE_OWNER_DOMAINNAME, DOMAIN);
+    identityTransformer = getTransformerWithCustomizedIdentityConfig(config);
+    assertEquals("short name should be converted to full name",
+            FULLY_QUALIFIED_NAME, identityTransformer.transformUserOrGroupForSetRequest(SHORT_NAME));
+  }
+
+  @Test
+  public void testNoOpForSettingOidAsIdentity() throws IOException {
+    Configuration config = this.getRawConfiguration();
+    resetIdentityConfig(config);
+
+    config.setBoolean(FS_AZURE_FILE_OWNER_ENABLE_SHORTNAME, true);
+    config.set(FS_AZURE_FILE_OWNER_DOMAINNAME, DOMAIN);
+    config.set(FS_AZURE_OVERRIDE_OWNER_SP, UUID.randomUUID().toString());
+    config.set(FS_AZURE_OVERRIDE_OWNER_SP_LIST, "a,b,c,d");
+
+    IdentityTransformer identityTransformer = getTransformerWithCustomizedIdentityConfig(config);
+    final String principalId = UUID.randomUUID().toString();
+    assertEquals("Identity should not be changed when owner is already a principal id ",
+            principalId, identityTransformer.transformUserOrGroupForSetRequest(principalId));
+  }
+
+  @Test
+  public void testNoOpWhenSettingSuperUserAsdentity() throws IOException {
+    Configuration config = this.getRawConfiguration();
+    resetIdentityConfig(config);
+
+    config.setBoolean(FS_AZURE_FILE_OWNER_ENABLE_SHORTNAME, true);
+    config.set(FS_AZURE_FILE_OWNER_DOMAINNAME, DOMAIN);
+    // Default config
+    IdentityTransformer identityTransformer = getTransformerWithDefaultIdentityConfig(config);
+    assertEquals("Identity should not be changed because it is not in substitution list",
+            SUPER_USER, identityTransformer.transformUserOrGroupForSetRequest(SUPER_USER));
+  }
+
+  @Test
+  public void testIdentityReplacementForSuperUserGetRequest() throws IOException {
+    Configuration config = this.getRawConfiguration();
+    resetIdentityConfig(config);
+
+    // with default config, identityTransformer should do $superUser replacement
+    IdentityTransformer identityTransformer = getTransformerWithDefaultIdentityConfig(config);
+    assertEquals("$superuser should be replaced with local user by default",
+            localUser, identityTransformer.transformIdentityForGetRequest(SUPER_USER, localUser));
+
+    // Disable $supeuser replacement
+    config.setBoolean(FS_AZURE_SKIP_SUPER_USER_REPLACEMENT, true);
+    identityTransformer = getTransformerWithCustomizedIdentityConfig(config);
+    assertEquals("$superuser should not be replaced",
+            SUPER_USER, identityTransformer.transformIdentityForGetRequest(SUPER_USER, localUser));
+  }
+
+  @Test
+  public void testIdentityReplacementForDaemonServiceGetRequest() throws IOException {
+    Configuration config = this.getRawConfiguration();
+    resetIdentityConfig(config);
+
+    // Default config
+    IdentityTransformer identityTransformer = getTransformerWithDefaultIdentityConfig(config);
+    assertEquals("By default servicePrincipalId should not be converted for GetFileStatus(), listFileStatus(), getAcl()",
+            SERVICE_PRINCIPAL_ID, identityTransformer.transformIdentityForGetRequest(SERVICE_PRINCIPAL_ID, localUser));
+
+    resetIdentityConfig(config);
+    // 1. substitution list doesn't contain currentUser
+    config.set(FS_AZURE_OVERRIDE_OWNER_SP_LIST, "a,b,c,d");
+    identityTransformer = getTransformerWithCustomizedIdentityConfig(config);
+    assertEquals("servicePrincipalId should not be replaced if local daemon user is not in substitution list",
+            SERVICE_PRINCIPAL_ID, identityTransformer.transformIdentityForGetRequest(SERVICE_PRINCIPAL_ID, localUser));
+
+    resetIdentityConfig(config);
+    // 2. substitution list contains currentUser(daemon name) but the service principal id in config doesn't match
+    config.set(FS_AZURE_OVERRIDE_OWNER_SP_LIST, localUser + ",a,b,c,d");
+    config.set(FS_AZURE_OVERRIDE_OWNER_SP, UUID.randomUUID().toString());
+    identityTransformer = getTransformerWithCustomizedIdentityConfig(config);
+    assertEquals("servicePrincipalId should not be replaced if it is not equal to the SPN set in config",
+            SERVICE_PRINCIPAL_ID, identityTransformer.transformIdentityForGetRequest(SERVICE_PRINCIPAL_ID, localUser));
+
+    resetIdentityConfig(config);
+    // 3. substitution list contains currentUser(daemon name) and the service principal id in config matches
+    config.set(FS_AZURE_OVERRIDE_OWNER_SP_LIST, localUser + ",a,b,c,d");
+    config.set(FS_AZURE_OVERRIDE_OWNER_SP, SERVICE_PRINCIPAL_ID);
+    identityTransformer = getTransformerWithCustomizedIdentityConfig(config);
+    assertEquals("servicePrincipalId should be transformed to local use",
+            localUser, identityTransformer.transformIdentityForGetRequest(SERVICE_PRINCIPAL_ID, localUser));
+
+    resetIdentityConfig(config);
+    // 4. substitution is "*" but the service principal id in config doesn't match the input
+    config.set(FS_AZURE_OVERRIDE_OWNER_SP_LIST, ASTERISK);
+    config.set(FS_AZURE_OVERRIDE_OWNER_SP, UUID.randomUUID().toString());
+    identityTransformer = getTransformerWithCustomizedIdentityConfig(config);
+    assertEquals("servicePrincipalId should not be replaced if it is not equal to the SPN set in config",
+            SERVICE_PRINCIPAL_ID, identityTransformer.transformIdentityForGetRequest(SERVICE_PRINCIPAL_ID, localUser));
+
+    resetIdentityConfig(config);
+    // 5. substitution is "*" and the service principal id in config match the input
+    config.set(FS_AZURE_OVERRIDE_OWNER_SP_LIST, ASTERISK);
+    config.set(FS_AZURE_OVERRIDE_OWNER_SP, SERVICE_PRINCIPAL_ID);
+    identityTransformer = getTransformerWithCustomizedIdentityConfig(config);
+    assertEquals("servicePrincipalId should be transformed to local user",
+            localUser, identityTransformer.transformIdentityForGetRequest(SERVICE_PRINCIPAL_ID, localUser));
+  }
+
+  @Test
+  public void testIdentityReplacementForKinitUserGetRequest() throws IOException {
+    Configuration config = this.getRawConfiguration();
+    resetIdentityConfig(config);
+
+    // Default config
+    IdentityTransformer identityTransformer = getTransformerWithDefaultIdentityConfig(config);
+    assertEquals("full name should not be transformed if shortname is not enabled",
+            FULLY_QUALIFIED_NAME, identityTransformer.transformIdentityForGetRequest(FULLY_QUALIFIED_NAME, localUser));
+
+    // add config to get short name
+    config.setBoolean(FS_AZURE_FILE_OWNER_ENABLE_SHORTNAME, true);
+    identityTransformer = getTransformerWithCustomizedIdentityConfig(config);
+    assertEquals("should convert the full name to shortname ",
+            SHORT_NAME, identityTransformer.transformIdentityForGetRequest(FULLY_QUALIFIED_NAME, localUser));
+  }
+
+  @Test
+  public void transformAclEntriesForSetRequest() throws IOException {
+    Configuration config = this.getRawConfiguration();
+    resetIdentityConfig(config);
+
+    List<AclEntry> aclEntriesToBeTransformed = Lists.newArrayList(
+            aclEntry(ACCESS, USER, DAEMON, ALL),
+            aclEntry(ACCESS, USER, FULLY_QUALIFIED_NAME,ALL),
+            aclEntry(DEFAULT, USER, SUPER_USER, ALL),
+            aclEntry(DEFAULT, USER, SERVICE_PRINCIPAL_ID, ALL),
+            aclEntry(DEFAULT, USER, SHORT_NAME, ALL),
+            aclEntry(DEFAULT, GROUP, DAEMON, ALL),
+            aclEntry(DEFAULT, GROUP, SHORT_NAME, ALL), // Notice: for group type ACL entry, if name is shortName,
+            aclEntry(DEFAULT, OTHER, ALL),             //         It won't be converted to Full Name. This is
+            aclEntry(DEFAULT, MASK, ALL)               //         to make the behavior consistent with HDI.
+    );
+
+    // Default config should not change the identities
+    IdentityTransformer identityTransformer = getTransformerWithDefaultIdentityConfig(config);
+    checkAclEntriesList(aclEntriesToBeTransformed, identityTransformer.transformAclEntriesForSetRequest(aclEntriesToBeTransformed));
+
+    resetIdentityConfig(config);
+    // With config
+    config.set(FS_AZURE_OVERRIDE_OWNER_SP_LIST, DAEMON + ",a,b,c,d");
+    config.setBoolean(FS_AZURE_FILE_OWNER_ENABLE_SHORTNAME, true);
+    config.set(FS_AZURE_FILE_OWNER_DOMAINNAME, DOMAIN);
+    config.set(FS_AZURE_OVERRIDE_OWNER_SP, SERVICE_PRINCIPAL_ID);
+    identityTransformer = getTransformerWithCustomizedIdentityConfig(config);
+
+    // expected acl entries
+    List<AclEntry> expectedAclEntries = Lists.newArrayList(
+            aclEntry(ACCESS, USER, SERVICE_PRINCIPAL_ID, ALL),
+            aclEntry(ACCESS, USER, FULLY_QUALIFIED_NAME, ALL),
+            aclEntry(DEFAULT, USER, SUPER_USER, ALL),
+            aclEntry(DEFAULT, USER, SERVICE_PRINCIPAL_ID, ALL),
+            aclEntry(DEFAULT, USER, FULLY_QUALIFIED_NAME, ALL),
+            aclEntry(DEFAULT, GROUP, SERVICE_PRINCIPAL_ID, ALL),
+            aclEntry(DEFAULT, GROUP, SHORT_NAME, ALL),
+            aclEntry(DEFAULT, OTHER, ALL),
+            aclEntry(DEFAULT, MASK, ALL)
+    );
+
+    checkAclEntriesList(identityTransformer.transformAclEntriesForSetRequest(aclEntriesToBeTransformed), expectedAclEntries);
+
+  }
+
+  private void resetIdentityConfig(Configuration config) {
+    config.unset(FS_AZURE_FILE_OWNER_ENABLE_SHORTNAME);
+    config.unset(FS_AZURE_FILE_OWNER_DOMAINNAME);
+    config.unset(FS_AZURE_OVERRIDE_OWNER_SP);
+    config.unset(FS_AZURE_OVERRIDE_OWNER_SP_LIST);
+    config.unset(FS_AZURE_SKIP_SUPER_USER_REPLACEMENT);
+  }
+
+  private IdentityTransformer getTransformerWithDefaultIdentityConfig(Configuration config) throws IOException {
+    resetIdentityConfig(config);
+    return new IdentityTransformer(config);
+  }
+
+  private IdentityTransformer getTransformerWithCustomizedIdentityConfig(Configuration config) throws IOException {
+    return new IdentityTransformer(config);
+  }
+
+  private void checkAclEntriesList(List<AclEntry> aclEntries, List<AclEntry> expected) {
+    assertTrue("list size not equals", aclEntries.size() == expected.size());
+    for (int i = 0; i < aclEntries.size(); i++) {
+      assertEquals("Identity doesn't match", expected.get(i).getName(), aclEntries.get(i).getName());
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org