You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by st...@apache.org on 2019/02/07 21:59:25 UTC

[hadoop] branch trunk updated (546c5d7 -> 1f16550)

This is an automated email from the ASF dual-hosted git repository.

stevel pushed a change to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git.


    from 546c5d7  HADOOP-16032. Distcp It should clear sub directory ACL before applying new ACL on.
     new 668817a  Revert "HADOOP-15954. ABFS: Enable owner and group conversion for MSI and login user using OAuth."
     new 1f16550  HADOOP-15954. ABFS: Enable owner and group conversion for MSI and login user using OAuth.

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../main/java/org/apache/hadoop/fs/shell/Ls.java   | 35 ++--------------------
 .../src/site/markdown/FileSystemShell.md           |  5 +---
 .../java/org/apache/hadoop/fs/shell/TestLs.java    | 35 +---------------------
 .../hadoop-common/src/test/resources/testConf.xml  |  6 +---
 .../hdfs/tools/TestStoragePolicyCommands.java      | 17 -----------
 .../src/test/resources/testErasureCodingConf.xml   | 19 ------------
 .../fs/azurebfs/oauth2/IdentityTransformer.java    |  7 +++--
 .../hadoop/fs/azurebfs/oauth2/package-info.java    |  8 +----
 8 files changed, 12 insertions(+), 120 deletions(-)


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[hadoop] 02/02: HADOOP-15954. ABFS: Enable owner and group conversion for MSI and login user using OAuth.

Posted by st...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

stevel pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git

commit 1f1655028eede24197705a594b6ef19e6737db35
Author: Da Zhou <da...@microsoft.com>
AuthorDate: Thu Feb 7 21:58:21 2019 +0000

    HADOOP-15954. ABFS: Enable owner and group conversion for MSI and login user using OAuth.
    
    Contributed by Da Zhou and Junhua Gu.
---
 .../hadoop/fs/azurebfs/AbfsConfiguration.java      |  10 +
 .../hadoop/fs/azurebfs/AzureBlobFileSystem.java    |  37 +--
 .../fs/azurebfs/AzureBlobFileSystemStore.java      | 179 ++++++------
 .../fs/azurebfs/constants/ConfigurationKeys.java   |  23 +-
 .../fs/azurebfs/oauth2/IdentityTransformer.java    | 278 +++++++++++++++++++
 .../src/site/markdown/testing_azure.md             |  55 ++++
 .../fs/azurebfs/ITestAbfsIdentityTransformer.java  | 301 +++++++++++++++++++++
 7 files changed, 773 insertions(+), 110 deletions(-)

diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
index b9bc7f2..67055c5 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
@@ -219,6 +219,16 @@ public class AbfsConfiguration{
 
   /**
    * Returns the account-specific value if it exists, then looks for an
+   * account-agnostic value.
+   * @param key Account-agnostic configuration key
+   * @return value if one exists, else the default value
+   */
+  public String getString(String key, String defaultValue) {
+    return rawConfig.get(accountConf(key), rawConfig.get(key, defaultValue));
+  }
+
+  /**
+   * Returns the account-specific value if it exists, then looks for an
    * account-agnostic value, and finally tries the default value.
    * @param key Account-agnostic configuration key
    * @param defaultValue Value returned if none is configured
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java
index 4c24ac8..e321e9e 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java
@@ -83,9 +83,6 @@ public class AzureBlobFileSystem extends FileSystem {
   public static final Logger LOG = LoggerFactory.getLogger(AzureBlobFileSystem.class);
   private URI uri;
   private Path workingDir;
-  private UserGroupInformation userGroupInformation;
-  private String user;
-  private String primaryUserGroup;
   private AzureBlobFileSystemStore abfsStore;
   private boolean isClosed;
 
@@ -103,9 +100,7 @@ public class AzureBlobFileSystem extends FileSystem {
     LOG.debug("Initializing AzureBlobFileSystem for {}", uri);
 
     this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority());
-    this.userGroupInformation = UserGroupInformation.getCurrentUser();
-    this.user = userGroupInformation.getUserName();
-    this.abfsStore = new AzureBlobFileSystemStore(uri, this.isSecureScheme(), configuration, userGroupInformation);
+    this.abfsStore = new AzureBlobFileSystemStore(uri, this.isSecureScheme(), configuration);
     final AbfsConfiguration abfsConfiguration = abfsStore.getAbfsConfiguration();
 
     this.setWorkingDirectory(this.getHomeDirectory());
@@ -120,18 +115,6 @@ public class AzureBlobFileSystem extends FileSystem {
       }
     }
 
-    if (!abfsConfiguration.getSkipUserGroupMetadataDuringInitialization()) {
-      try {
-        this.primaryUserGroup = userGroupInformation.getPrimaryGroupName();
-      } catch (IOException ex) {
-        LOG.error("Failed to get primary group for {}, using user name as primary group name", user);
-        this.primaryUserGroup = this.user;
-      }
-    } else {
-      //Provide a default group name
-      this.primaryUserGroup = this.user;
-    }
-
     if (UserGroupInformation.isSecurityEnabled()) {
       this.delegationTokenEnabled = abfsConfiguration.isDelegationTokenManagerEnabled();
 
@@ -153,8 +136,8 @@ public class AzureBlobFileSystem extends FileSystem {
     final StringBuilder sb = new StringBuilder(
         "AzureBlobFileSystem{");
     sb.append("uri=").append(uri);
-    sb.append(", user='").append(user).append('\'');
-    sb.append(", primaryUserGroup='").append(primaryUserGroup).append('\'');
+    sb.append(", user='").append(abfsStore.getUser()).append('\'');
+    sb.append(", primaryUserGroup='").append(abfsStore.getPrimaryGroup()).append('\'');
     sb.append('}');
     return sb.toString();
   }
@@ -503,7 +486,7 @@ public class AzureBlobFileSystem extends FileSystem {
   public Path getHomeDirectory() {
     return makeQualified(new Path(
             FileSystemConfigurations.USER_HOME_DIRECTORY_PREFIX
-                + "/" + this.userGroupInformation.getShortUserName()));
+                + "/" + abfsStore.getUser()));
   }
 
   /**
@@ -554,12 +537,20 @@ public class AzureBlobFileSystem extends FileSystem {
     super.finalize();
   }
 
+  /**
+   * Get the username of the FS.
+   * @return the short name of the user who instantiated the FS
+   */
   public String getOwnerUser() {
-    return user;
+    return abfsStore.getUser();
   }
 
+  /**
+   * Get the group name of the owner of the FS.
+   * @return primary group name
+   */
   public String getOwnerUserPrimaryGroup() {
-    return primaryUserGroup;
+    return abfsStore.getPrimaryGroup();
   }
 
   private boolean deleteRoot() throws IOException {
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
index 5c28bd4..c2739e9 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
@@ -67,6 +67,7 @@ import org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode;
 import org.apache.hadoop.fs.azurebfs.contracts.services.ListResultEntrySchema;
 import org.apache.hadoop.fs.azurebfs.contracts.services.ListResultSchema;
 import org.apache.hadoop.fs.azurebfs.oauth2.AccessTokenProvider;
+import org.apache.hadoop.fs.azurebfs.oauth2.IdentityTransformer;
 import org.apache.hadoop.fs.azurebfs.services.AbfsAclHelper;
 import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
 import org.apache.hadoop.fs.azurebfs.services.AbfsHttpOperation;
@@ -88,9 +89,7 @@ import org.apache.http.client.utils.URIBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.SUPER_USER;
 import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_ABFS_ENDPOINT;
-
 /**
  * Provides the bridging logic between Hadoop's abstract filesystem and Azure Storage.
  */
@@ -101,7 +100,6 @@ public class AzureBlobFileSystemStore {
 
   private AbfsClient client;
   private URI uri;
-  private final UserGroupInformation userGroupInformation;
   private String userName;
   private String primaryUserGroup;
   private static final String DATE_TIME_PATTERN = "E, dd MMM yyyy HH:mm:ss 'GMT'";
@@ -113,11 +111,12 @@ public class AzureBlobFileSystemStore {
   private boolean isNamespaceEnabledSet;
   private boolean isNamespaceEnabled;
   private final AuthType authType;
+  private final UserGroupInformation userGroupInformation;
+  private final IdentityTransformer identityTransformer;
 
-  public AzureBlobFileSystemStore(URI uri, boolean isSecureScheme, Configuration configuration, UserGroupInformation userGroupInformation)
-          throws AzureBlobFileSystemException, IOException {
+  public AzureBlobFileSystemStore(URI uri, boolean isSecureScheme, Configuration configuration)
+          throws IOException {
     this.uri = uri;
-
     String[] authorityParts = authorityParts(uri);
     final String fileSystemName = authorityParts[0];
     final String accountName = authorityParts[1];
@@ -127,10 +126,8 @@ public class AzureBlobFileSystemStore {
     } catch (IllegalAccessException exception) {
       throw new FileSystemOperationUnhandledException(exception);
     }
-
-    this.userGroupInformation = userGroupInformation;
+    this.userGroupInformation = UserGroupInformation.getCurrentUser();
     this.userName = userGroupInformation.getShortUserName();
-
     if (!abfsConfiguration.getSkipUserGroupMetadataDuringInitialization()) {
       try {
         this.primaryUserGroup = userGroupInformation.getPrimaryGroupName();
@@ -145,12 +142,25 @@ public class AzureBlobFileSystemStore {
 
     this.azureAtomicRenameDirSet = new HashSet<>(Arrays.asList(
         abfsConfiguration.getAzureAtomicRenameDirs().split(AbfsHttpConstants.COMMA)));
-
     this.authType = abfsConfiguration.getAuthType(accountName);
     boolean usingOauth = (authType == AuthType.OAuth);
     boolean useHttps = (usingOauth || abfsConfiguration.isHttpsAlwaysUsed()) ? true : isSecureScheme;
     initializeClient(uri, fileSystemName, accountName, useHttps);
+    this.identityTransformer = new IdentityTransformer(abfsConfiguration.getRawConfiguration());
+  }
 
+  /**
+   * @return local user name.
+   * */
+  public String getUser() {
+    return this.userName;
+  }
+
+  /**
+  * @return primary group that user belongs to.
+  * */
+  public String getPrimaryGroup() {
+    return this.primaryUserGroup;
   }
 
   private String[] authorityParts(URI uri) throws InvalidUriAuthorityException, InvalidUriException {
@@ -452,60 +462,54 @@ public class AzureBlobFileSystemStore {
             path,
             isNamespaceEnabled);
 
+    final AbfsRestOperation op;
     if (path.isRoot()) {
-      final AbfsRestOperation op = isNamespaceEnabled
-          ? client.getAclStatus(AbfsHttpConstants.FORWARD_SLASH + AbfsHttpConstants.ROOT_PATH)
-          : client.getFilesystemProperties();
-
-      final long blockSize = abfsConfiguration.getAzureBlockSize();
-      final String owner = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_OWNER);
-      final String group = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_GROUP);
-      final String permissions = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_PERMISSIONS);
-      final String eTag = op.getResult().getResponseHeader(HttpHeaderConfigurations.ETAG);
-      final String lastModified = op.getResult().getResponseHeader(HttpHeaderConfigurations.LAST_MODIFIED);
-      final boolean hasAcl = AbfsPermission.isExtendedAcl(permissions);
-
-      return new VersionedFileStatus(
-              isSuperUserOrEmpty(owner) ? userName : owner,
-              isSuperUserOrEmpty(group) ? primaryUserGroup : group,
-              permissions == null ? new AbfsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL)
-                      : AbfsPermission.valueOf(permissions),
-              hasAcl,
-              0,
-              true,
-              1,
-              blockSize,
-              parseLastModifiedTime(lastModified),
-              path,
-              eTag);
+      op = isNamespaceEnabled
+              ? client.getAclStatus(AbfsHttpConstants.FORWARD_SLASH + AbfsHttpConstants.ROOT_PATH)
+              : client.getFilesystemProperties();
     } else {
-      AbfsRestOperation op = client.getPathProperties(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path), isNamespaceEnabled);
-
-      final long blockSize = abfsConfiguration.getAzureBlockSize();
-      final AbfsHttpOperation result = op.getResult();
-      final String eTag = result.getResponseHeader(HttpHeaderConfigurations.ETAG);
-      final String lastModified = result.getResponseHeader(HttpHeaderConfigurations.LAST_MODIFIED);
-      final String contentLength = result.getResponseHeader(HttpHeaderConfigurations.CONTENT_LENGTH);
-      final String resourceType = result.getResponseHeader(HttpHeaderConfigurations.X_MS_RESOURCE_TYPE);
-      final String owner = result.getResponseHeader(HttpHeaderConfigurations.X_MS_OWNER);
-      final String group = result.getResponseHeader(HttpHeaderConfigurations.X_MS_GROUP);
-      final String permissions = result.getResponseHeader((HttpHeaderConfigurations.X_MS_PERMISSIONS));
-      final boolean hasAcl = AbfsPermission.isExtendedAcl(permissions);
-
-      return new VersionedFileStatus(
-              isSuperUserOrEmpty(owner) ? userName : owner,
-              isSuperUserOrEmpty(group) ? primaryUserGroup : group,
-              permissions == null ? new AbfsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL)
-                      : AbfsPermission.valueOf(permissions),
-              hasAcl,
-              parseContentLength(contentLength),
-              parseIsDirectory(resourceType),
-              1,
-              blockSize,
-              parseLastModifiedTime(lastModified),
-              path,
-              eTag);
+      op = client.getPathProperties(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path), isNamespaceEnabled);
     }
+
+    final long blockSize = abfsConfiguration.getAzureBlockSize();
+    final AbfsHttpOperation result = op.getResult();
+
+    final String eTag = result.getResponseHeader(HttpHeaderConfigurations.ETAG);
+    final String lastModified = result.getResponseHeader(HttpHeaderConfigurations.LAST_MODIFIED);
+    final String permissions = result.getResponseHeader((HttpHeaderConfigurations.X_MS_PERMISSIONS));
+    final boolean hasAcl = AbfsPermission.isExtendedAcl(permissions);
+    final long contentLength;
+    final boolean resourceIsDir;
+
+    if (path.isRoot()) {
+      contentLength = 0;
+      resourceIsDir = true;
+    } else {
+      contentLength = parseContentLength(result.getResponseHeader(HttpHeaderConfigurations.CONTENT_LENGTH));
+      resourceIsDir = parseIsDirectory(result.getResponseHeader(HttpHeaderConfigurations.X_MS_RESOURCE_TYPE));
+    }
+
+    final String transformedOwner = identityTransformer.transformIdentityForGetRequest(
+              result.getResponseHeader(HttpHeaderConfigurations.X_MS_OWNER),
+              userName);
+
+    final String transformedGroup = identityTransformer.transformIdentityForGetRequest(
+              result.getResponseHeader(HttpHeaderConfigurations.X_MS_GROUP),
+              primaryUserGroup);
+
+    return new VersionedFileStatus(
+            transformedOwner,
+            transformedGroup,
+            permissions == null ? new AbfsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL)
+                    : AbfsPermission.valueOf(permissions),
+            hasAcl,
+            contentLength,
+            resourceIsDir,
+            1,
+            blockSize,
+            parseLastModifiedTime(lastModified),
+            path,
+            eTag);
   }
 
   public FileStatus[] listStatus(final Path path) throws IOException {
@@ -532,8 +536,8 @@ public class AzureBlobFileSystemStore {
       long blockSize = abfsConfiguration.getAzureBlockSize();
 
       for (ListResultEntrySchema entry : retrievedSchema.paths()) {
-        final String owner = isSuperUserOrEmpty(entry.owner()) ? userName : entry.owner();
-        final String group = isSuperUserOrEmpty(entry.group()) ? primaryUserGroup : entry.group();
+        final String owner = identityTransformer.transformIdentityForGetRequest(entry.owner(), userName);
+        final String group = identityTransformer.transformIdentityForGetRequest(entry.group(), primaryUserGroup);
         final FsPermission fsPermission = entry.permissions() == null
                 ? new AbfsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL)
                 : AbfsPermission.valueOf(entry.permissions());
@@ -566,7 +570,7 @@ public class AzureBlobFileSystemStore {
 
     } while (continuation != null && !continuation.isEmpty());
 
-    return fileStatuses.toArray(new FileStatus[0]);
+    return fileStatuses.toArray(new FileStatus[fileStatuses.size()]);
   }
 
   public void setOwner(final Path path, final String owner, final String group) throws
@@ -576,20 +580,17 @@ public class AzureBlobFileSystemStore {
           "This operation is only valid for storage accounts with the hierarchical namespace enabled.");
     }
 
-    String effectiveOwner = owner;
-    String effectiveGroup = group;
-    if (authType == AuthType.SharedKey && owner.equals(userName)) {
-      effectiveOwner = SUPER_USER;
-      effectiveGroup = SUPER_USER;
-    }
-
     LOG.debug(
             "setOwner filesystem: {} path: {} owner: {} group: {}",
             client.getFileSystem(),
             path.toString(),
-            effectiveOwner,
-            effectiveGroup);
-    client.setOwner(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path, true), effectiveOwner, effectiveGroup);
+            owner,
+            group);
+
+    final String transformedOwner = identityTransformer.transformUserOrGroupForSetRequest(owner);
+    final String transformedGroup = identityTransformer.transformUserOrGroupForSetRequest(group);
+
+    client.setOwner(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path, true), transformedOwner, transformedGroup);
   }
 
   public void setPermission(final Path path, final FsPermission permission) throws
@@ -620,7 +621,9 @@ public class AzureBlobFileSystemStore {
             client.getFileSystem(),
             path.toString(),
             AclEntry.aclSpecToString(aclSpec));
-    final Map<String, String> modifyAclEntries = AbfsAclHelper.deserializeAclSpec(AclEntry.aclSpecToString(aclSpec));
+
+    final List<AclEntry> transformedAclEntries = identityTransformer.transformAclEntriesForSetRequest(aclSpec);
+    final Map<String, String> modifyAclEntries = AbfsAclHelper.deserializeAclSpec(AclEntry.aclSpecToString(transformedAclEntries));
     boolean useUpn = AbfsAclHelper.isUpnFormatAclEntries(modifyAclEntries);
 
     final AbfsRestOperation op = client.getAclStatus(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path, true), useUpn);
@@ -645,7 +648,9 @@ public class AzureBlobFileSystemStore {
             client.getFileSystem(),
             path.toString(),
             AclEntry.aclSpecToString(aclSpec));
-    final Map<String, String> removeAclEntries = AbfsAclHelper.deserializeAclSpec(AclEntry.aclSpecToString(aclSpec));
+
+    final List<AclEntry> transformedAclEntries = identityTransformer.transformAclEntriesForSetRequest(aclSpec);
+    final Map<String, String> removeAclEntries = AbfsAclHelper.deserializeAclSpec(AclEntry.aclSpecToString(transformedAclEntries));
     boolean isUpnFormat = AbfsAclHelper.isUpnFormatAclEntries(removeAclEntries);
 
     final AbfsRestOperation op = client.getAclStatus(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path, true), isUpnFormat);
@@ -722,7 +727,9 @@ public class AzureBlobFileSystemStore {
             client.getFileSystem(),
             path.toString(),
             AclEntry.aclSpecToString(aclSpec));
-    final Map<String, String> aclEntries = AbfsAclHelper.deserializeAclSpec(AclEntry.aclSpecToString(aclSpec));
+
+    final List<AclEntry> transformedAclEntries = identityTransformer.transformAclEntriesForSetRequest(aclSpec);
+    final Map<String, String> aclEntries = AbfsAclHelper.deserializeAclSpec(AclEntry.aclSpecToString(transformedAclEntries));
     final boolean isUpnFormat = AbfsAclHelper.isUpnFormatAclEntries(aclEntries);
 
     final AbfsRestOperation op = client.getAclStatus(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path, true), isUpnFormat);
@@ -749,8 +756,13 @@ public class AzureBlobFileSystemStore {
     AbfsRestOperation op = client.getAclStatus(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path, true));
     AbfsHttpOperation result = op.getResult();
 
-    final String owner = result.getResponseHeader(HttpHeaderConfigurations.X_MS_OWNER);
-    final String group = result.getResponseHeader(HttpHeaderConfigurations.X_MS_GROUP);
+    final String transformedOwner = identityTransformer.transformIdentityForGetRequest(
+            result.getResponseHeader(HttpHeaderConfigurations.X_MS_OWNER),
+            userName);
+    final String transformedGroup = identityTransformer.transformIdentityForGetRequest(
+            result.getResponseHeader(HttpHeaderConfigurations.X_MS_GROUP),
+            primaryUserGroup);
+
     final String permissions = result.getResponseHeader(HttpHeaderConfigurations.X_MS_PERMISSIONS);
     final String aclSpecString = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_ACL);
 
@@ -759,8 +771,8 @@ public class AzureBlobFileSystemStore {
             : AbfsPermission.valueOf(permissions);
 
     final AclStatus.Builder aclStatusBuilder = new AclStatus.Builder();
-    aclStatusBuilder.owner(isSuperUserOrEmpty(owner)? userName : owner);
-    aclStatusBuilder.group(isSuperUserOrEmpty(group) ? primaryUserGroup : group);
+    aclStatusBuilder.owner(transformedOwner);
+    aclStatusBuilder.group(transformedGroup);
 
     aclStatusBuilder.setPermission(fsPermission);
     aclStatusBuilder.stickyBit(fsPermission.getStickyBit());
@@ -944,11 +956,6 @@ public class AzureBlobFileSystemStore {
     return false;
   }
 
-  private boolean isSuperUserOrEmpty(final String name) {
-      return name == null || name.equals(SUPER_USER);
-  }
-
-
   private static class VersionedFileStatus extends FileStatus {
     private final String version;
 
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
index d079d94..8cd86bf 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
@@ -55,7 +55,28 @@ public final class ConfigurationKeys {
   public static final String FS_AZURE_USER_AGENT_PREFIX_KEY = "fs.azure.user.agent.prefix";
   public static final String FS_AZURE_SSL_CHANNEL_MODE_KEY = "fs.azure.ssl.channel.mode";
   public static final String FS_AZURE_USE_UPN = "fs.azure.use.upn";
-
+  /** User principal names (UPNs) have the format “{alias}@{domain}”. If true,
+   *  only {alias} is included when a UPN would otherwise appear in the output
+   *  of APIs like getFileStatus, getOwner, getAclStatus, etc. Default is false. **/
+  public static final String FS_AZURE_FILE_OWNER_ENABLE_SHORTNAME = "fs.azure.identity.transformer.enable.short.name";
+  /** If the domain name is specified and “fs.azure.identity.transformer.enable.short.name”
+   *  is true, then the {alias} part of a UPN can be specified as input to APIs like setOwner and
+   *  setAcl and it will be transformed to a UPN by appending @ and the domain specified by
+   *  this configuration property. **/
+  public static final String FS_AZURE_FILE_OWNER_DOMAINNAME = "fs.azure.identity.transformer.domain.name";
+  /** An Azure Active Directory object ID (oid) used as the replacement for names contained in the
+   * list specified by “fs.azure.identity.transformer.service.principal.substitution.list.
+   * Notice that instead of setting oid, you can also set $superuser.**/
+  public static final String FS_AZURE_OVERRIDE_OWNER_SP = "fs.azure.identity.transformer.service.principal.id";
+  /** A comma separated list of names to be replaced with the service principal ID specified by
+   * “fs.default.identity.transformer.service.principal.id”. This substitution occurs
+   * when setOwner, setAcl, modifyAclEntries, or removeAclEntries are invoked with identities
+   * contained in the substitution list. Notice that when in non-secure cluster, asterisk symbol "*"
+   * can be used to match all user/group. **/
+  public static final String FS_AZURE_OVERRIDE_OWNER_SP_LIST = "fs.azure.identity.transformer.service.principal.substitution.list";
+  /** By default this is set as false, so “$superuser” is replaced with the current user when it appears as the owner
+   * or owning group of a file or directory. To disable it, set it as true. **/
+  public static final String FS_AZURE_SKIP_SUPER_USER_REPLACEMENT = "fs.azure.identity.transformer.skip.superuser.replacement";
   public static final String AZURE_KEY_ACCOUNT_KEYPROVIDER = "fs.azure.account.keyprovider";
   public static final String AZURE_KEY_ACCOUNT_SHELLKEYPROVIDER_SCRIPT = "fs.azure.shellkeyprovider.script";
 
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/IdentityTransformer.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/IdentityTransformer.java
new file mode 100644
index 0000000..62b54d1
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/IdentityTransformer.java
@@ -0,0 +1,278 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.azurebfs.oauth2;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Locale;
+
+import com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.permission.AclEntry;
+import org.apache.hadoop.fs.permission.AclEntryType;
+import org.apache.hadoop.security.UserGroupInformation;
+
+import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.AT;
+import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.STAR;
+import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.SUPER_USER;
+import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_SKIP_SUPER_USER_REPLACEMENT;
+import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_FILE_OWNER_DOMAINNAME;
+import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_FILE_OWNER_ENABLE_SHORTNAME;
+import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_OVERRIDE_OWNER_SP;
+import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_OVERRIDE_OWNER_SP_LIST;
+
+/**
+ * Perform transformation for Azure Active Directory identities used in owner, group and acls.
+ */
+public class IdentityTransformer {
+  private static final Logger LOG = LoggerFactory.getLogger(IdentityTransformer.class);
+
+  private boolean isSecure;
+  private String servicePrincipalId;
+  private String serviceWhiteList;
+  private String domainName;
+  private boolean enableShortName;
+  private boolean skipUserIdentityReplacement;
+  private boolean skipSuperUserReplacement;
+  private boolean domainIsSet;
+  private static final String UUID_PATTERN = "^[0-9a-f]{8}-[0-9a-f]{4}-[1-5][0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12}$";
+
+  public IdentityTransformer(Configuration configuration) throws IOException {
+    Preconditions.checkNotNull(configuration, "configuration");
+    this.isSecure = UserGroupInformation.getCurrentUser().isSecurityEnabled();
+    this.servicePrincipalId = configuration.get(FS_AZURE_OVERRIDE_OWNER_SP, "");
+    this.serviceWhiteList = configuration.get(FS_AZURE_OVERRIDE_OWNER_SP_LIST, "");
+    this.domainName = configuration.get(FS_AZURE_FILE_OWNER_DOMAINNAME, "");
+    this.enableShortName = configuration.getBoolean(FS_AZURE_FILE_OWNER_ENABLE_SHORTNAME, false);
+
+    // - "servicePrincipalId" and "serviceWhiteList" are required for
+    //    transformation between localUserOrGroup and principalId,$superuser
+    // - "enableShortName" is required for transformation between shortName and fullyQualifiedName.
+    this.skipUserIdentityReplacement = servicePrincipalId.isEmpty() && serviceWhiteList.isEmpty() && !enableShortName;
+    this.skipSuperUserReplacement = configuration.getBoolean(FS_AZURE_SKIP_SUPER_USER_REPLACEMENT, false);
+
+    if (enableShortName){
+      // need to check the domain setting only when short name is enabled.
+      // if shortName is not enabled, transformer won't transform a shortName to
+      // a fully qualified name.
+      this.domainIsSet = !domainName.isEmpty();
+    }
+  }
+
+  /**
+   * Perform identity transformation for the Get request results in AzureBlobFileSystemStore:
+   * getFileStatus(), listStatus(), getAclStatus().
+   * Input originalUserOrGroup can be one of the following:
+   * 1. $superuser:
+   *     by default it will be transformed to local user/group, this can be disabled by setting
+   *     "fs.azure.identity.transformer.skip.superuser.replacement" to true.
+   *
+   * 2. User principal id:
+   *     can be transformed to localUserOrGroup, if this principal id matches the principal id set in
+   *     "fs.azure.identity.transformer.service.principal.id" and localUserOrGroup is stated in
+   *     "fs.azure.identity.transformer.service.principal.substitution.list"
+   *
+   * 3. User principal name (UPN):
+   *     can be transformed to a short name(localUserOrGroup) if "fs.azure.identity.transformer.enable.short.name"
+   *     is enabled.
+   *
+   * @param originalUserOrGroup the original user or group in the get request results: FileStatus, AclStatus.
+   * @param localUserOrGroup the local user or group, should be parsed from UserGroupInformation.
+   * @return owner or group after transformation.
+   * */
+  public String transformIdentityForGetRequest(String originalUserOrGroup, String localUserOrGroup) {
+    if (originalUserOrGroup == null) {
+      originalUserOrGroup = localUserOrGroup;
+      // localUserOrGroup might be a full name, so continue the transformation.
+    }
+    // case 1: it is $superuser and replace $superuser config is enabled
+    if (!skipSuperUserReplacement && SUPER_USER.equals(originalUserOrGroup)) {
+      return localUserOrGroup;
+    }
+
+    if (skipUserIdentityReplacement) {
+      return originalUserOrGroup;
+    }
+
+    // case 2: original owner is principalId set in config, and localUser
+    //         is a daemon service specified in substitution list,
+    //         To avoid ownership check failure in job task, replace it
+    //         to local daemon user/group
+    if (originalUserOrGroup.equals(servicePrincipalId) && isInSubstitutionList(localUserOrGroup)) {
+      return localUserOrGroup;
+    }
+
+    // case 3: If original owner is a fully qualified name, and
+    //         short name is enabled, replace with shortName.
+    if (shouldUseShortUserName(originalUserOrGroup)) {
+      return getShortName(originalUserOrGroup);
+    }
+
+    return originalUserOrGroup;
+  }
+
+  /**
+   * Perform Identity transformation when setting owner on a path.
+   * There are four possible input:
+   * 1.short name; 2.$superuser; 3.Fully qualified name; 4. principal id.
+   *
+   * short name could be transformed to:
+   *    - A service principal id or $superuser, if short name belongs a daemon service
+   *      stated in substitution list AND "fs.azure.identity.transformer.service.principal.id"
+   *      is set with $superuser or a principal id.
+   *    - Fully qualified name, if "fs.azure.identity.transformer.domain.name" is set in configuration.
+   *
+   * $superuser, fully qualified name and principalId should not be transformed.
+   *
+   * @param userOrGroup the user or group to be set as owner.
+   * @return user or group after transformation.
+   * */
+  public String transformUserOrGroupForSetRequest(String userOrGroup) {
+    if (userOrGroup == null || userOrGroup.isEmpty() || skipUserIdentityReplacement) {
+      return userOrGroup;
+    }
+
+    // case 1: when the owner to be set is stated in substitution list.
+    if (isInSubstitutionList(userOrGroup)) {
+      return servicePrincipalId;
+    }
+
+    // case 2: when the owner is a short name of the user principal name(UPN).
+    if (shouldUseFullyQualifiedUserName(userOrGroup)) {
+      return getFullyQualifiedName(userOrGroup);
+    }
+
+    return userOrGroup;
+  }
+
+  /**
+   * Perform Identity transformation when calling setAcl(),removeAclEntries() and modifyAclEntries()
+   * If the AclEntry type is a user or group, and its name is one of the following:
+   * 1.short name; 2.$superuser; 3.Fully qualified name; 4. principal id.
+   * Short name could be transformed to:
+   *    - A service principal id or $superuser, if short name belongs a daemon service
+   *      stated in substitution list AND "fs.azure.identity.transformer.service.principal.id"
+   *      is set with $superuser or a principal id.
+   *    - A fully qualified name, if the AclEntry type is User AND if "fs.azure.identity.transformer.domain.name"
+   *    is set in configuration. This is to make the behavior consistent with HDI.
+   *
+   * $superuser, fully qualified name and principal id should not be transformed.
+   *
+   * @param aclEntries list of AclEntry
+   * @return list of AclEntry after the identity transformation.
+   * */
+  public List<AclEntry> transformAclEntriesForSetRequest(final List<AclEntry> aclEntries) {
+    if (skipUserIdentityReplacement) {
+      return aclEntries;
+    }
+
+    for (int i = 0; i < aclEntries.size(); i++) {
+      AclEntry aclEntry = aclEntries.get(i);
+      String name = aclEntry.getName();
+      String transformedName = name;
+      if (name == null || name.isEmpty() || aclEntry.getType().equals(AclEntryType.OTHER) || aclEntry.getType().equals(AclEntryType.MASK)) {
+        continue;
+      }
+
+      // case 1: when the user or group name to be set is stated in substitution list.
+      if (isInSubstitutionList(name)) {
+        transformedName = servicePrincipalId;
+      } else if (aclEntry.getType().equals(AclEntryType.USER) // case 2: when the owner is a short name
+              && shouldUseFullyQualifiedUserName(name)) {     //         of the user principal name (UPN).
+        // Notice: for group type ACL entry, if name is shortName.
+        //         It won't be converted to Full Name. This is
+        //         to make the behavior consistent with HDI.
+        transformedName = getFullyQualifiedName(name);
+      }
+
+      // Avoid unnecessary new AclEntry allocation
+      if (transformedName.equals(name)) {
+        continue;
+      }
+
+      AclEntry.Builder aclEntryBuilder = new AclEntry.Builder();
+      aclEntryBuilder.setType(aclEntry.getType());
+      aclEntryBuilder.setName(transformedName);
+      aclEntryBuilder.setScope(aclEntry.getScope());
+      aclEntryBuilder.setPermission(aclEntry.getPermission());
+
+      // Replace the original AclEntry
+      aclEntries.set(i, aclEntryBuilder.build());
+    }
+    return aclEntries;
+  }
+
+  /**
+   * Internal method to identify if owner name returned by the ADLS backend is short name or not.
+   * If name contains "@", this code assumes that whatever comes after '@' is domain name and ignores it.
+   * @param owner
+   * @return
+   */
+  private boolean isShortUserName(String owner) {
+    return (owner != null) && !owner.contains(AT);
+  }
+
+  private boolean shouldUseShortUserName(String owner){
+    return enableShortName && !isShortUserName(owner);
+  }
+
+  private String getShortName(String userName) {
+    if (userName == null)    {
+      return  null;
+    }
+
+    if (isShortUserName(userName)) {
+      return userName;
+    }
+
+    String userNameBeforeAt = userName.substring(0, userName.indexOf(AT));
+    if (isSecure) {
+      //In secure clusters we apply auth to local rules to lowercase all short localUser names (notice /L at the end),
+      //E.G. : RULE:[1:$1@$0](.*@FOO.ONMICROSOFT.COM)s/@.*/// Ideally we should use the HadoopKerberosName class to get
+      // new HadoopKerberosName(arg).getShortName. However,
+      //1. ADLS can report the Realm in lower case while returning file owner names( ie. : Some.User@realm.onmicrosoft.com)
+      //2. The RULE specification does not allow specifying character classes to do case insensitive matches
+      //Due to this, we end up using a forced lowercase version of the manually shortened name
+      return userNameBeforeAt.toLowerCase(Locale.ENGLISH);
+    }
+    return userNameBeforeAt;
+  }
+
+  private String getFullyQualifiedName(String name){
+    if (domainIsSet && (name != null) && !name.contains(AT)){
+      return name + AT + domainName;
+    }
+    return name;
+  }
+
+  private boolean shouldUseFullyQualifiedUserName(String owner){
+    return domainIsSet && !SUPER_USER.equals(owner) && !isUuid(owner) && enableShortName && isShortUserName(owner);
+  }
+
+  private boolean isInSubstitutionList(String localUserName) {
+    return serviceWhiteList.contains(STAR) || serviceWhiteList.contains(localUserName);
+  }
+
+  private boolean isUuid(String input) {
+    if (input == null) return false;
+    return input.matches(UUID_PATTERN);
+  }
+}
diff --git a/hadoop-tools/hadoop-azure/src/site/markdown/testing_azure.md b/hadoop-tools/hadoop-azure/src/site/markdown/testing_azure.md
index c2afe74..40a372d 100644
--- a/hadoop-tools/hadoop-azure/src/site/markdown/testing_azure.md
+++ b/hadoop-tools/hadoop-azure/src/site/markdown/testing_azure.md
@@ -733,6 +733,61 @@ hierarchical namespace enabled, and set the following configuration settings:
      <description>AAD client id.</description>
    </property>
   -->
+
+  <!--
+    <property>
+        <name>fs.azure.identity.transformer.enable.short.name</name>
+        <value>true/false</value>
+        <description>
+          User principal names (UPNs) have the format “{alias}@{domain}”.
+          If true, only {alias} is included when a UPN would otherwise appear in the output
+          of APIs like getFileStatus, getOwner, getAclStatus, etc, default is false.
+        </description>
+    </property>
+
+    <property>
+        <name>fs.azure.identity.transformer.domain.name</name>
+        <value>domain name of the user's upn</value>
+        <description>
+          If the domain name is specified and “fs.azure.identity.transformer.enable.short.name”
+          is true, then the {alias} part of a UPN can be specified as input to APIs like setOwner,
+          setAcl, modifyAclEntries, or removeAclEntries, and it will be transformed to a UPN by appending @ and the domain specified by
+          this configuration property.
+        </description>
+    </property>
+
+    <property>
+        <name>fs.azure.identity.transformer.service.principal.id</name>
+        <value>service principal object id</value>
+        <description>
+          An Azure Active Directory object ID (oid) used as the replacement for names contained
+          in the list specified by “fs.azure.identity.transformer.service.principal.substitution.list”.
+          Notice that instead of setting oid, you can also set $superuser here.
+        </description>
+    </property>
+
+    <property>
+        <name>fs.azure.identity.transformer.skip.superuser.replacement</name>
+        <value>true/false</value>
+        <description>
+          If false, “$superuser” is replaced with the current user when it appears as the owner
+          or owning group of a file or directory. The default is false.
+        </description>
+    </property>
+
+    <property>
+        <name>fs.azure.identity.transformer.service.principal.substitution.list</name>
+        <value>mapred,hdfs,yarn,hive,tez</value>
+        <description>
+           A comma separated list of names to be replaced with the service principal ID specified by
+           “fs.azure.identity.transformer.service.principal.id”.  This substitution occurs
+           when setOwner, setAcl, modifyAclEntries, or removeAclEntries are invoked with identities
+           contained in the substitution list. Notice that when in non-secure cluster, asterisk symbol *
+           can be used to match all user/group.
+        </description>
+    </property>
+   -->
+
 ```
 
 If running tests against an endpoint that uses the URL format
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsIdentityTransformer.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsIdentityTransformer.java
new file mode 100644
index 0000000..41f680e
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsIdentityTransformer.java
@@ -0,0 +1,301 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.UUID;
+
+import com.google.common.collect.Lists;
+import org.apache.hadoop.fs.azurebfs.oauth2.IdentityTransformer;
+import org.apache.hadoop.fs.permission.AclEntry;
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+
+import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.SUPER_USER;
+import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_FILE_OWNER_DOMAINNAME;
+import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_FILE_OWNER_ENABLE_SHORTNAME;
+import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_OVERRIDE_OWNER_SP;
+import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_OVERRIDE_OWNER_SP_LIST;
+import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_SKIP_SUPER_USER_REPLACEMENT;
+import static org.apache.hadoop.fs.azurebfs.utils.AclTestHelpers.aclEntry;
+import static org.apache.hadoop.fs.permission.AclEntryScope.ACCESS;
+import static org.apache.hadoop.fs.permission.AclEntryScope.DEFAULT;
+import static org.apache.hadoop.fs.permission.AclEntryType.GROUP;
+import static org.apache.hadoop.fs.permission.AclEntryType.MASK;
+import static org.apache.hadoop.fs.permission.AclEntryType.OTHER;
+import static org.apache.hadoop.fs.permission.AclEntryType.USER;
+import static org.apache.hadoop.fs.permission.FsAction.ALL;
+
+/**
+ * Test IdentityTransformer.
+ */
+//@RunWith(Parameterized.class)
+public class ITestAbfsIdentityTransformer extends AbstractAbfsScaleTest{
+  private final UserGroupInformation userGroupInfo;
+  private final String localUser;
+  private final String localGroup;
+  private static final String DAEMON = "daemon";
+  private static final String ASTERISK = "*";
+  private static final String SHORT_NAME = "abc";
+  private static final String DOMAIN = "domain.com";
+  private static final String FULLY_QUALIFIED_NAME = "abc@domain.com";
+  private static final String SERVICE_PRINCIPAL_ID = UUID.randomUUID().toString();
+
+  public ITestAbfsIdentityTransformer() throws Exception {
+    super();
+    userGroupInfo = UserGroupInformation.getCurrentUser();
+    localUser = userGroupInfo.getShortUserName();
+    localGroup = userGroupInfo.getPrimaryGroupName();
+  }
+
+  @Test
+  public void testDaemonServiceSettingIdentity() throws IOException {
+    Configuration config = this.getRawConfiguration();
+    resetIdentityConfig(config);
+    // Default config
+    IdentityTransformer identityTransformer = getTransformerWithDefaultIdentityConfig(config);
+    assertEquals("Identity should not change for default config",
+            DAEMON, identityTransformer.transformUserOrGroupForSetRequest(DAEMON));
+
+    // Add service principal id
+    config.set(FS_AZURE_OVERRIDE_OWNER_SP, SERVICE_PRINCIPAL_ID);
+
+    // case 1: substitution list doesn't contain daemon
+    config.set(FS_AZURE_OVERRIDE_OWNER_SP_LIST, "a,b,c,d");
+    identityTransformer = getTransformerWithCustomizedIdentityConfig(config);
+    assertEquals("Identity should not change when substitution list doesn't contain daemon",
+            DAEMON, identityTransformer.transformUserOrGroupForSetRequest(DAEMON));
+
+    // case 2: substitution list contains daemon name
+    config.set(FS_AZURE_OVERRIDE_OWNER_SP_LIST, DAEMON + ",a,b,c,d");
+    identityTransformer = getTransformerWithCustomizedIdentityConfig(config);
+    assertEquals("Identity should be replaced to servicePrincipalId",
+            SERVICE_PRINCIPAL_ID, identityTransformer.transformUserOrGroupForSetRequest(DAEMON));
+
+    // case 3: substitution list is *
+    config.set(FS_AZURE_OVERRIDE_OWNER_SP_LIST, ASTERISK);
+    identityTransformer = getTransformerWithCustomizedIdentityConfig(config);
+    assertEquals("Identity should be replaced to servicePrincipalId",
+            SERVICE_PRINCIPAL_ID, identityTransformer.transformUserOrGroupForSetRequest(DAEMON));
+  }
+
+  @Test
+  public void testFullyQualifiedNameSettingIdentity() throws IOException {
+    Configuration config = this.getRawConfiguration();
+    // Default config
+    IdentityTransformer identityTransformer = getTransformerWithDefaultIdentityConfig(config);
+    assertEquals("short name should not be converted to full name by default",
+            SHORT_NAME, identityTransformer.transformUserOrGroupForSetRequest(SHORT_NAME));
+
+    resetIdentityConfig(config);
+
+    // Add config to get fully qualified username
+    config.setBoolean(FS_AZURE_FILE_OWNER_ENABLE_SHORTNAME, true);
+    config.set(FS_AZURE_FILE_OWNER_DOMAINNAME, DOMAIN);
+    identityTransformer = getTransformerWithCustomizedIdentityConfig(config);
+    assertEquals("short name should be converted to full name",
+            FULLY_QUALIFIED_NAME, identityTransformer.transformUserOrGroupForSetRequest(SHORT_NAME));
+  }
+
+  @Test
+  public void testNoOpForSettingOidAsIdentity() throws IOException {
+    Configuration config = this.getRawConfiguration();
+    resetIdentityConfig(config);
+
+    config.setBoolean(FS_AZURE_FILE_OWNER_ENABLE_SHORTNAME, true);
+    config.set(FS_AZURE_FILE_OWNER_DOMAINNAME, DOMAIN);
+    config.set(FS_AZURE_OVERRIDE_OWNER_SP, UUID.randomUUID().toString());
+    config.set(FS_AZURE_OVERRIDE_OWNER_SP_LIST, "a,b,c,d");
+
+    IdentityTransformer identityTransformer = getTransformerWithCustomizedIdentityConfig(config);
+    final String principalId = UUID.randomUUID().toString();
+    assertEquals("Identity should not be changed when owner is already a principal id ",
+            principalId, identityTransformer.transformUserOrGroupForSetRequest(principalId));
+  }
+
+  @Test
+  public void testNoOpWhenSettingSuperUserAsdentity() throws IOException {
+    Configuration config = this.getRawConfiguration();
+    resetIdentityConfig(config);
+
+    config.setBoolean(FS_AZURE_FILE_OWNER_ENABLE_SHORTNAME, true);
+    config.set(FS_AZURE_FILE_OWNER_DOMAINNAME, DOMAIN);
+    // Default config
+    IdentityTransformer identityTransformer = getTransformerWithDefaultIdentityConfig(config);
+    assertEquals("Identity should not be changed because it is not in substitution list",
+            SUPER_USER, identityTransformer.transformUserOrGroupForSetRequest(SUPER_USER));
+  }
+
+  @Test
+  public void testIdentityReplacementForSuperUserGetRequest() throws IOException {
+    Configuration config = this.getRawConfiguration();
+    resetIdentityConfig(config);
+
+    // with default config, identityTransformer should do $superUser replacement
+    IdentityTransformer identityTransformer = getTransformerWithDefaultIdentityConfig(config);
+    assertEquals("$superuser should be replaced with local user by default",
+            localUser, identityTransformer.transformIdentityForGetRequest(SUPER_USER, localUser));
+
+    // Disable $supeuser replacement
+    config.setBoolean(FS_AZURE_SKIP_SUPER_USER_REPLACEMENT, true);
+    identityTransformer = getTransformerWithCustomizedIdentityConfig(config);
+    assertEquals("$superuser should not be replaced",
+            SUPER_USER, identityTransformer.transformIdentityForGetRequest(SUPER_USER, localUser));
+  }
+
+  @Test
+  public void testIdentityReplacementForDaemonServiceGetRequest() throws IOException {
+    Configuration config = this.getRawConfiguration();
+    resetIdentityConfig(config);
+
+    // Default config
+    IdentityTransformer identityTransformer = getTransformerWithDefaultIdentityConfig(config);
+    assertEquals("By default servicePrincipalId should not be converted for GetFileStatus(), listFileStatus(), getAcl()",
+            SERVICE_PRINCIPAL_ID, identityTransformer.transformIdentityForGetRequest(SERVICE_PRINCIPAL_ID, localUser));
+
+    resetIdentityConfig(config);
+    // 1. substitution list doesn't contain currentUser
+    config.set(FS_AZURE_OVERRIDE_OWNER_SP_LIST, "a,b,c,d");
+    identityTransformer = getTransformerWithCustomizedIdentityConfig(config);
+    assertEquals("servicePrincipalId should not be replaced if local daemon user is not in substitution list",
+            SERVICE_PRINCIPAL_ID, identityTransformer.transformIdentityForGetRequest(SERVICE_PRINCIPAL_ID, localUser));
+
+    resetIdentityConfig(config);
+    // 2. substitution list contains currentUser(daemon name) but the service principal id in config doesn't match
+    config.set(FS_AZURE_OVERRIDE_OWNER_SP_LIST, localUser + ",a,b,c,d");
+    config.set(FS_AZURE_OVERRIDE_OWNER_SP, UUID.randomUUID().toString());
+    identityTransformer = getTransformerWithCustomizedIdentityConfig(config);
+    assertEquals("servicePrincipalId should not be replaced if it is not equal to the SPN set in config",
+            SERVICE_PRINCIPAL_ID, identityTransformer.transformIdentityForGetRequest(SERVICE_PRINCIPAL_ID, localUser));
+
+    resetIdentityConfig(config);
+    // 3. substitution list contains currentUser(daemon name) and the service principal id in config matches
+    config.set(FS_AZURE_OVERRIDE_OWNER_SP_LIST, localUser + ",a,b,c,d");
+    config.set(FS_AZURE_OVERRIDE_OWNER_SP, SERVICE_PRINCIPAL_ID);
+    identityTransformer = getTransformerWithCustomizedIdentityConfig(config);
+    assertEquals("servicePrincipalId should be transformed to local use",
+            localUser, identityTransformer.transformIdentityForGetRequest(SERVICE_PRINCIPAL_ID, localUser));
+
+    resetIdentityConfig(config);
+    // 4. substitution is "*" but the service principal id in config doesn't match the input
+    config.set(FS_AZURE_OVERRIDE_OWNER_SP_LIST, ASTERISK);
+    config.set(FS_AZURE_OVERRIDE_OWNER_SP, UUID.randomUUID().toString());
+    identityTransformer = getTransformerWithCustomizedIdentityConfig(config);
+    assertEquals("servicePrincipalId should not be replaced if it is not equal to the SPN set in config",
+            SERVICE_PRINCIPAL_ID, identityTransformer.transformIdentityForGetRequest(SERVICE_PRINCIPAL_ID, localUser));
+
+    resetIdentityConfig(config);
+    // 5. substitution is "*" and the service principal id in config match the input
+    config.set(FS_AZURE_OVERRIDE_OWNER_SP_LIST, ASTERISK);
+    config.set(FS_AZURE_OVERRIDE_OWNER_SP, SERVICE_PRINCIPAL_ID);
+    identityTransformer = getTransformerWithCustomizedIdentityConfig(config);
+    assertEquals("servicePrincipalId should be transformed to local user",
+            localUser, identityTransformer.transformIdentityForGetRequest(SERVICE_PRINCIPAL_ID, localUser));
+  }
+
+  @Test
+  public void testIdentityReplacementForKinitUserGetRequest() throws IOException {
+    Configuration config = this.getRawConfiguration();
+    resetIdentityConfig(config);
+
+    // Default config
+    IdentityTransformer identityTransformer = getTransformerWithDefaultIdentityConfig(config);
+    assertEquals("full name should not be transformed if shortname is not enabled",
+            FULLY_QUALIFIED_NAME, identityTransformer.transformIdentityForGetRequest(FULLY_QUALIFIED_NAME, localUser));
+
+    // add config to get short name
+    config.setBoolean(FS_AZURE_FILE_OWNER_ENABLE_SHORTNAME, true);
+    identityTransformer = getTransformerWithCustomizedIdentityConfig(config);
+    assertEquals("should convert the full name to shortname ",
+            SHORT_NAME, identityTransformer.transformIdentityForGetRequest(FULLY_QUALIFIED_NAME, localUser));
+  }
+
+  @Test
+  public void transformAclEntriesForSetRequest() throws IOException {
+    Configuration config = this.getRawConfiguration();
+    resetIdentityConfig(config);
+
+    List<AclEntry> aclEntriesToBeTransformed = Lists.newArrayList(
+            aclEntry(ACCESS, USER, DAEMON, ALL),
+            aclEntry(ACCESS, USER, FULLY_QUALIFIED_NAME,ALL),
+            aclEntry(DEFAULT, USER, SUPER_USER, ALL),
+            aclEntry(DEFAULT, USER, SERVICE_PRINCIPAL_ID, ALL),
+            aclEntry(DEFAULT, USER, SHORT_NAME, ALL),
+            aclEntry(DEFAULT, GROUP, DAEMON, ALL),
+            aclEntry(DEFAULT, GROUP, SHORT_NAME, ALL), // Notice: for group type ACL entry, if name is shortName,
+            aclEntry(DEFAULT, OTHER, ALL),             //         It won't be converted to Full Name. This is
+            aclEntry(DEFAULT, MASK, ALL)               //         to make the behavior consistent with HDI.
+    );
+
+    // Default config should not change the identities
+    IdentityTransformer identityTransformer = getTransformerWithDefaultIdentityConfig(config);
+    checkAclEntriesList(aclEntriesToBeTransformed, identityTransformer.transformAclEntriesForSetRequest(aclEntriesToBeTransformed));
+
+    resetIdentityConfig(config);
+    // With config
+    config.set(FS_AZURE_OVERRIDE_OWNER_SP_LIST, DAEMON + ",a,b,c,d");
+    config.setBoolean(FS_AZURE_FILE_OWNER_ENABLE_SHORTNAME, true);
+    config.set(FS_AZURE_FILE_OWNER_DOMAINNAME, DOMAIN);
+    config.set(FS_AZURE_OVERRIDE_OWNER_SP, SERVICE_PRINCIPAL_ID);
+    identityTransformer = getTransformerWithCustomizedIdentityConfig(config);
+
+    // expected acl entries
+    List<AclEntry> expectedAclEntries = Lists.newArrayList(
+            aclEntry(ACCESS, USER, SERVICE_PRINCIPAL_ID, ALL),
+            aclEntry(ACCESS, USER, FULLY_QUALIFIED_NAME, ALL),
+            aclEntry(DEFAULT, USER, SUPER_USER, ALL),
+            aclEntry(DEFAULT, USER, SERVICE_PRINCIPAL_ID, ALL),
+            aclEntry(DEFAULT, USER, FULLY_QUALIFIED_NAME, ALL),
+            aclEntry(DEFAULT, GROUP, SERVICE_PRINCIPAL_ID, ALL),
+            aclEntry(DEFAULT, GROUP, SHORT_NAME, ALL),
+            aclEntry(DEFAULT, OTHER, ALL),
+            aclEntry(DEFAULT, MASK, ALL)
+    );
+
+    checkAclEntriesList(identityTransformer.transformAclEntriesForSetRequest(aclEntriesToBeTransformed), expectedAclEntries);
+
+  }
+
+  private void resetIdentityConfig(Configuration config) {
+    config.unset(FS_AZURE_FILE_OWNER_ENABLE_SHORTNAME);
+    config.unset(FS_AZURE_FILE_OWNER_DOMAINNAME);
+    config.unset(FS_AZURE_OVERRIDE_OWNER_SP);
+    config.unset(FS_AZURE_OVERRIDE_OWNER_SP_LIST);
+    config.unset(FS_AZURE_SKIP_SUPER_USER_REPLACEMENT);
+  }
+
+  private IdentityTransformer getTransformerWithDefaultIdentityConfig(Configuration config) throws IOException {
+    resetIdentityConfig(config);
+    return new IdentityTransformer(config);
+  }
+
+  private IdentityTransformer getTransformerWithCustomizedIdentityConfig(Configuration config) throws IOException {
+    return new IdentityTransformer(config);
+  }
+
+  private void checkAclEntriesList(List<AclEntry> aclEntries, List<AclEntry> expected) {
+    assertTrue("list size not equals", aclEntries.size() == expected.size());
+    for (int i = 0; i < aclEntries.size(); i++) {
+      assertEquals("Identity doesn't match", expected.get(i).getName(), aclEntries.get(i).getName());
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[hadoop] 01/02: Revert "HADOOP-15954. ABFS: Enable owner and group conversion for MSI and login user using OAuth."

Posted by st...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

stevel pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git

commit 668817a6cefa6025ddfe082ed71d7d317d811381
Author: Steve Loughran <st...@apache.org>
AuthorDate: Thu Feb 7 21:56:43 2019 +0000

    Revert "HADOOP-15954. ABFS: Enable owner and group conversion for MSI and login user using OAuth."
    
    (accidentally mixed in two patches)
    
    This reverts commit fa8cd1bf28f5b81849ba351a2d7225fbc580350d.
---
 .../main/java/org/apache/hadoop/fs/shell/Ls.java   |  35 +--
 .../src/site/markdown/FileSystemShell.md           |   5 +-
 .../java/org/apache/hadoop/fs/shell/TestLs.java    |  35 +--
 .../hadoop-common/src/test/resources/testConf.xml  |   6 +-
 .../hdfs/tools/TestStoragePolicyCommands.java      |  17 --
 .../src/test/resources/testErasureCodingConf.xml   |  19 --
 .../hadoop/fs/azurebfs/AbfsConfiguration.java      |  10 -
 .../hadoop/fs/azurebfs/AzureBlobFileSystem.java    |  37 ++-
 .../fs/azurebfs/AzureBlobFileSystemStore.java      | 179 ++++++------
 .../fs/azurebfs/constants/ConfigurationKeys.java   |  23 +-
 .../fs/azurebfs/oauth2/IdentityTransformer.java    | 275 -------------------
 .../hadoop/fs/azurebfs/oauth2/package-info.java    |   8 +-
 .../src/site/markdown/testing_azure.md             |  55 ----
 .../fs/azurebfs/ITestAbfsIdentityTransformer.java  | 301 ---------------------
 14 files changed, 117 insertions(+), 888 deletions(-)

diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/Ls.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/Ls.java
index b2bdc84..efc541c 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/Ls.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/Ls.java
@@ -33,7 +33,6 @@ import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.BlockStoragePolicySpi;
 import org.apache.hadoop.fs.ContentSummary;
 
 /**
@@ -58,15 +57,13 @@ class Ls extends FsCommand {
   private static final String OPTION_ATIME = "u";
   private static final String OPTION_SIZE = "S";
   private static final String OPTION_ECPOLICY = "e";
-  private static final String OPTION_SPOLICY = "sp";
 
   public static final String NAME = "ls";
   public static final String USAGE = "[-" + OPTION_PATHONLY + "] [-" +
       OPTION_DIRECTORY + "] [-" + OPTION_HUMAN + "] [-" +
       OPTION_HIDENONPRINTABLE + "] [-" + OPTION_RECURSIVE + "] [-" +
       OPTION_MTIME + "] [-" + OPTION_SIZE + "] [-" + OPTION_REVERSE + "] [-" +
-      OPTION_ATIME + "] [-" + OPTION_ECPOLICY + "] [-" + OPTION_SPOLICY
-      + "] [<path> ...]";
+      OPTION_ATIME + "] [-" + OPTION_ECPOLICY +"] [<path> ...]";
 
   public static final String DESCRIPTION =
       "List the contents that match the specified file pattern. If " +
@@ -99,9 +96,7 @@ class Ls extends FsCommand {
           "  Use time of last access instead of modification for\n" +
           "      display and sorting.\n"+
           "  -" + OPTION_ECPOLICY +
-          "  Display the erasure coding policy of files and directories.\n" +
-          "  -" + OPTION_SPOLICY +
-          "  Display the storage policy of files and directories.\n";
+          "  Display the erasure coding policy of files and directories.\n";
 
   protected final SimpleDateFormat dateFormat =
     new SimpleDateFormat("yyyy-MM-dd HH:mm");
@@ -115,7 +110,6 @@ class Ls extends FsCommand {
   private boolean orderSize;
   private boolean useAtime;
   private boolean displayECPolicy;
-  private boolean displaySPolicy;
   private Comparator<PathData> orderComparator;
 
   protected boolean humanReadable = false;
@@ -141,8 +135,7 @@ class Ls extends FsCommand {
     CommandFormat cf = new CommandFormat(0, Integer.MAX_VALUE,
         OPTION_PATHONLY, OPTION_DIRECTORY, OPTION_HUMAN,
         OPTION_HIDENONPRINTABLE, OPTION_RECURSIVE, OPTION_REVERSE,
-        OPTION_MTIME, OPTION_SIZE, OPTION_ATIME, OPTION_ECPOLICY,
-        OPTION_SPOLICY);
+        OPTION_MTIME, OPTION_SIZE, OPTION_ATIME, OPTION_ECPOLICY);
     cf.parse(args);
     pathOnly = cf.getOpt(OPTION_PATHONLY);
     dirRecurse = !cf.getOpt(OPTION_DIRECTORY);
@@ -154,7 +147,6 @@ class Ls extends FsCommand {
     orderSize = !orderTime && cf.getOpt(OPTION_SIZE);
     useAtime = cf.getOpt(OPTION_ATIME);
     displayECPolicy = cf.getOpt(OPTION_ECPOLICY);
-    displaySPolicy = cf.getOpt(OPTION_SPOLICY);
     if (args.isEmpty()) args.add(Path.CUR_DIR);
 
     initialiseOrderComparator();
@@ -237,16 +229,6 @@ class Ls extends FsCommand {
     return this.displayECPolicy;
   }
 
-  /**
-   * Should storage policies be displayed.
-   * @return true display storage policies, false doesn't display storage
-   *         policies
-   */
-  @VisibleForTesting
-  boolean isDisplaySPolicy() {
-    return this.displaySPolicy;
-  }
-
   @Override
   protected void processPathArgument(PathData item) throws IOException {
     if (isDisplayECPolicy() && item.fs.getContentSummary(item.path)
@@ -316,7 +298,6 @@ class Ls extends FsCommand {
           stat.getOwner(),
           stat.getGroup(),
           contentSummary.getErasureCodingPolicy(),
-          displaySPolicy ? item.fs.getStoragePolicy(item.path).getName() : "",
           formatSize(stat.getLen()),
           dateFormat.format(new Date(isUseAtime()
               ? stat.getAccessTime()
@@ -330,7 +311,6 @@ class Ls extends FsCommand {
           (stat.isFile() ? stat.getReplication() : "-"),
           stat.getOwner(),
           stat.getGroup(),
-          displaySPolicy ? item.fs.getStoragePolicy(item.path).getName() : "",
           formatSize(stat.getLen()),
           dateFormat.format(new Date(isUseAtime()
               ? stat.getAccessTime()
@@ -369,15 +349,6 @@ class Ls extends FsCommand {
       }
       fmt.append((maxEC > 0) ? "%-" + maxEC + "s " : "%s");
     }
-    int maxSpolicy = 0;
-    if (displaySPolicy) {
-      if (items.length != 0) {
-        for (BlockStoragePolicySpi s : items[0].fs.getAllStoragePolicies()) {
-          maxSpolicy = maxLength(maxSpolicy, s.getName());
-        }
-      }
-    }
-    fmt.append((maxSpolicy > 0) ? "%-" + maxSpolicy + "s " : "%s");
     fmt.append("%"  + maxLen   + "s ");
     fmt.append("%s %s"); // mod time & path
     lineFormat = fmt.toString();
diff --git a/hadoop-common-project/hadoop-common/src/site/markdown/FileSystemShell.md b/hadoop-common-project/hadoop-common/src/site/markdown/FileSystemShell.md
index fea082a..d9567b9 100644
--- a/hadoop-common-project/hadoop-common/src/site/markdown/FileSystemShell.md
+++ b/hadoop-common-project/hadoop-common/src/site/markdown/FileSystemShell.md
@@ -423,7 +423,7 @@ Return usage output.
 ls
 ----
 
-Usage: `hadoop fs -ls [-C] [-d] [-h] [-q] [-R] [-t] [-S] [-r] [-u] [-e] [-sp] <args> `
+Usage: `hadoop fs -ls [-C] [-d] [-h] [-q] [-R] [-t] [-S] [-r] [-u] [-e] <args> `
 
 Options:
 
@@ -437,9 +437,6 @@ Options:
 * -r: Reverse the sort order.
 * -u: Use access time rather than modification time for display and sorting.  
 * -e: Display the erasure coding policy of files and directories only.
-* -sp: Display the storage policy of files and directories.
-
-For a file/directory with no explicitly set storage policy the ls command with -sp returns the default storage policy.
 
 For a file ls returns stat on the file with the following format:
 
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/shell/TestLs.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/shell/TestLs.java
index 128805e..4a4f453 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/shell/TestLs.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/shell/TestLs.java
@@ -88,7 +88,6 @@ public class TestLs {
     assertFalse(ls.isOrderTime());
     assertFalse(ls.isUseAtime());
     assertFalse(ls.isDisplayECPolicy());
-    assertFalse(ls.isDisplaySPolicy());
   }
 
   // check the -C option is recognised
@@ -107,7 +106,6 @@ public class TestLs {
     assertFalse(ls.isOrderTime());
     assertFalse(ls.isUseAtime());
     assertFalse(ls.isDisplayECPolicy());
-    assertFalse(ls.isDisplaySPolicy());
   }
 
   // check the -d option is recognised
@@ -126,7 +124,6 @@ public class TestLs {
     assertFalse(ls.isOrderTime());
     assertFalse(ls.isUseAtime());
     assertFalse(ls.isDisplayECPolicy());
-    assertFalse(ls.isDisplaySPolicy());
   }
 
   // check the -h option is recognised
@@ -145,7 +142,6 @@ public class TestLs {
     assertFalse(ls.isOrderTime());
     assertFalse(ls.isUseAtime());
     assertFalse(ls.isDisplayECPolicy());
-    assertFalse(ls.isDisplaySPolicy());
   }
 
   // check the -R option is recognised
@@ -164,7 +160,6 @@ public class TestLs {
     assertFalse(ls.isOrderTime());
     assertFalse(ls.isUseAtime());
     assertFalse(ls.isDisplayECPolicy());
-    assertFalse(ls.isDisplaySPolicy());
   }
 
   // check the -r option is recognised
@@ -183,7 +178,6 @@ public class TestLs {
     assertFalse(ls.isOrderTime());
     assertFalse(ls.isUseAtime());
     assertFalse(ls.isDisplayECPolicy());
-    assertFalse(ls.isDisplaySPolicy());
   }
 
   // check the -S option is recognised
@@ -202,7 +196,6 @@ public class TestLs {
     assertFalse(ls.isOrderTime());
     assertFalse(ls.isUseAtime());
     assertFalse(ls.isDisplayECPolicy());
-    assertFalse(ls.isDisplaySPolicy());
   }
 
   // check the -t option is recognised
@@ -221,7 +214,6 @@ public class TestLs {
     assertTrue(ls.isOrderTime());
     assertFalse(ls.isUseAtime());
     assertFalse(ls.isDisplayECPolicy());
-    assertFalse(ls.isDisplaySPolicy());
   }
 
   // check the precedence of the -t and -S options
@@ -241,7 +233,6 @@ public class TestLs {
     assertTrue(ls.isOrderTime());
     assertFalse(ls.isUseAtime());
     assertFalse(ls.isDisplayECPolicy());
-    assertFalse(ls.isDisplaySPolicy());
   }
 
   // check the precedence of the -t, -S and -r options
@@ -262,7 +253,6 @@ public class TestLs {
     assertTrue(ls.isOrderTime());
     assertFalse(ls.isUseAtime());
     assertFalse(ls.isDisplayECPolicy());
-    assertFalse(ls.isDisplaySPolicy());
   }
 
   // chheck the -u option is recognised
@@ -281,10 +271,9 @@ public class TestLs {
     assertFalse(ls.isOrderTime());
     assertTrue(ls.isUseAtime());
     assertFalse(ls.isDisplayECPolicy());
-    assertFalse(ls.isDisplaySPolicy());
   }
 
-  // check the -e option is recognised
+  // chheck the -e option is recognised
   @Test
   public void processOptionsDisplayECPolicy() throws IOException {
     LinkedList<String> options = new LinkedList<String>();
@@ -299,29 +288,9 @@ public class TestLs {
     assertFalse(ls.isOrderSize());
     assertFalse(ls.isOrderTime());
     assertFalse(ls.isUseAtime());
-    assertFalse(ls.isDisplaySPolicy());
     assertTrue(ls.isDisplayECPolicy());
   }
 
-  // check the -sp option is recognised
-  @Test
-  public void processOptionsDisplaySPolicy() throws IOException {
-    LinkedList<String> options = new LinkedList<String>();
-    options.add("-sp");
-    Ls ls = new Ls();
-    ls.processOptions(options);
-    assertFalse(ls.isPathOnly());
-    assertTrue(ls.isDirRecurse());
-    assertFalse(ls.isHumanReadable());
-    assertFalse(ls.isRecursive());
-    assertFalse(ls.isOrderReverse());
-    assertFalse(ls.isOrderSize());
-    assertFalse(ls.isOrderTime());
-    assertFalse(ls.isUseAtime());
-    assertFalse(ls.isDisplayECPolicy());
-    assertTrue(ls.isDisplaySPolicy());
-  }
-
   // check all options is handled correctly
   @Test
   public void processOptionsAll() throws IOException {
@@ -335,7 +304,6 @@ public class TestLs {
     options.add("-S"); // size order
     options.add("-u"); // show atime
     options.add("-e"); // show EC policies
-    options.add("-sp"); // show storage policies
     Ls ls = new Ls();
     ls.processOptions(options);
     assertTrue(ls.isPathOnly());
@@ -347,7 +315,6 @@ public class TestLs {
     assertTrue(ls.isOrderTime());
     assertTrue(ls.isUseAtime());
     assertTrue(ls.isDisplayECPolicy());
-    assertTrue(ls.isDisplaySPolicy());
   }
 
   // check listing of a single file
diff --git a/hadoop-common-project/hadoop-common/src/test/resources/testConf.xml b/hadoop-common-project/hadoop-common/src/test/resources/testConf.xml
index bac109d..1798563 100644
--- a/hadoop-common-project/hadoop-common/src/test/resources/testConf.xml
+++ b/hadoop-common-project/hadoop-common/src/test/resources/testConf.xml
@@ -54,7 +54,7 @@
       <comparators>
         <comparator>
           <type>RegexpComparator</type>
-          <expected-output>^-ls \[-C\] \[-d\] \[-h\] \[-q\] \[-R\] \[-t\] \[-S\] \[-r\] \[-u\] \[-e\] \[-sp\] \[&lt;path&gt; \.\.\.\] :( |\t)*</expected-output>
+          <expected-output>^-ls \[-C\] \[-d\] \[-h\] \[-q\] \[-R\] \[-t\] \[-S\] \[-r\] \[-u\] \[-e\] \[&lt;path&gt; \.\.\.\] :( |\t)*</expected-output>
         </comparator>
         <comparator>
           <type>RegexpComparator</type>
@@ -140,10 +140,6 @@
           <type>RegexpComparator</type>
           <expected-output>^( |\t)*-e\s+Display the erasure coding policy of files and directories\.</expected-output>
         </comparator>
-        <comparator>
-          <type>RegexpComparator</type>
-          <expected-output>^( |\t)*-sp\s+Display the storage policy of files and directories\.</expected-output>
-        </comparator>
       </comparators>
     </test>
 
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestStoragePolicyCommands.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestStoragePolicyCommands.java
index 8a042ba..ad77684 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestStoragePolicyCommands.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestStoragePolicyCommands.java
@@ -22,7 +22,6 @@ import java.net.URISyntaxException;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FsShell;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
@@ -167,20 +166,4 @@ public class TestStoragePolicyCommands {
     DFSTestUtil.toolRun(admin, "-getStoragePolicy -path /fooz", 2,
         "File/Directory does not exist: /fooz");
   }
-
-  @Test
-  public void testLsWithSpParameter() throws Exception {
-    Path file = new Path("/foo/bar");
-    DFSTestUtil.createFile(fs, file, SIZE, REPL, 0);
-    fs.setStoragePolicy(file, "COLD");
-    FsShell shell = new FsShell(conf);
-    DFSTestUtil.toolRun(shell, "-ls -sp /foo", 0, "COLD");
-  }
-
-  @Test
-  public void testLsWithSpParameterUnsupportedFs() throws Exception {
-    FsShell shell = new FsShell(conf);
-    DFSTestUtil.toolRun(shell, "-ls -sp file://", -1,
-        "UnsupportedOperationException");
-  }
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/testErasureCodingConf.xml b/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/testErasureCodingConf.xml
index 036f8af..c280eca 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/testErasureCodingConf.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/testErasureCodingConf.xml
@@ -1035,25 +1035,6 @@
     </test>
 
     <test>
-      <description>ls: Using both -e and -sp to list both erasure and storage policy</description>
-      <test-commands>
-        <command>-fs NAMENODE -mkdir -p /ecdir</command>
-        <ec-admin-command>-fs NAMENODE -setPolicy -path /ecdir -policy RS-6-3-1024k</ec-admin-command>
-        <command>-fs NAMENODE -touchz /ecdir/file1</command>
-        <command>-fs NAMENODE -ls -e -sp /ecdir</command>
-      </test-commands>
-      <cleanup-commands>
-        <command>-fs NAMENODE -rmdir /ecdir</command>
-      </cleanup-commands>
-      <comparators>
-        <comparator>
-          <type>RegexpComparator</type>
-          <expected-output>^-rw-r--r--( )*1( )*USERNAME( )*supergroup( )*(RS-6-3-1024k)( )*(HOT)( )*0( )*[0-9]{4,}-[0-9]{2,}-[0-9]{2,} [0-9]{2,}:[0-9]{2,}( )*/ecdir/file1</expected-output>
-        </comparator>
-      </comparators>
-    </test>
-
-    <test>
       <description>ls: file with disabled EC Policy</description>
       <test-commands>
         <command>-fs NAMENODE -mkdir -p /ecdir</command>
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
index 67055c5..b9bc7f2 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
@@ -219,16 +219,6 @@ public class AbfsConfiguration{
 
   /**
    * Returns the account-specific value if it exists, then looks for an
-   * account-agnostic value.
-   * @param key Account-agnostic configuration key
-   * @return value if one exists, else the default value
-   */
-  public String getString(String key, String defaultValue) {
-    return rawConfig.get(accountConf(key), rawConfig.get(key, defaultValue));
-  }
-
-  /**
-   * Returns the account-specific value if it exists, then looks for an
    * account-agnostic value, and finally tries the default value.
    * @param key Account-agnostic configuration key
    * @param defaultValue Value returned if none is configured
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java
index e321e9e..4c24ac8 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java
@@ -83,6 +83,9 @@ public class AzureBlobFileSystem extends FileSystem {
   public static final Logger LOG = LoggerFactory.getLogger(AzureBlobFileSystem.class);
   private URI uri;
   private Path workingDir;
+  private UserGroupInformation userGroupInformation;
+  private String user;
+  private String primaryUserGroup;
   private AzureBlobFileSystemStore abfsStore;
   private boolean isClosed;
 
@@ -100,7 +103,9 @@ public class AzureBlobFileSystem extends FileSystem {
     LOG.debug("Initializing AzureBlobFileSystem for {}", uri);
 
     this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority());
-    this.abfsStore = new AzureBlobFileSystemStore(uri, this.isSecureScheme(), configuration);
+    this.userGroupInformation = UserGroupInformation.getCurrentUser();
+    this.user = userGroupInformation.getUserName();
+    this.abfsStore = new AzureBlobFileSystemStore(uri, this.isSecureScheme(), configuration, userGroupInformation);
     final AbfsConfiguration abfsConfiguration = abfsStore.getAbfsConfiguration();
 
     this.setWorkingDirectory(this.getHomeDirectory());
@@ -115,6 +120,18 @@ public class AzureBlobFileSystem extends FileSystem {
       }
     }
 
+    if (!abfsConfiguration.getSkipUserGroupMetadataDuringInitialization()) {
+      try {
+        this.primaryUserGroup = userGroupInformation.getPrimaryGroupName();
+      } catch (IOException ex) {
+        LOG.error("Failed to get primary group for {}, using user name as primary group name", user);
+        this.primaryUserGroup = this.user;
+      }
+    } else {
+      //Provide a default group name
+      this.primaryUserGroup = this.user;
+    }
+
     if (UserGroupInformation.isSecurityEnabled()) {
       this.delegationTokenEnabled = abfsConfiguration.isDelegationTokenManagerEnabled();
 
@@ -136,8 +153,8 @@ public class AzureBlobFileSystem extends FileSystem {
     final StringBuilder sb = new StringBuilder(
         "AzureBlobFileSystem{");
     sb.append("uri=").append(uri);
-    sb.append(", user='").append(abfsStore.getUser()).append('\'');
-    sb.append(", primaryUserGroup='").append(abfsStore.getPrimaryGroup()).append('\'');
+    sb.append(", user='").append(user).append('\'');
+    sb.append(", primaryUserGroup='").append(primaryUserGroup).append('\'');
     sb.append('}');
     return sb.toString();
   }
@@ -486,7 +503,7 @@ public class AzureBlobFileSystem extends FileSystem {
   public Path getHomeDirectory() {
     return makeQualified(new Path(
             FileSystemConfigurations.USER_HOME_DIRECTORY_PREFIX
-                + "/" + abfsStore.getUser()));
+                + "/" + this.userGroupInformation.getShortUserName()));
   }
 
   /**
@@ -537,20 +554,12 @@ public class AzureBlobFileSystem extends FileSystem {
     super.finalize();
   }
 
-  /**
-   * Get the username of the FS.
-   * @return the short name of the user who instantiated the FS
-   */
   public String getOwnerUser() {
-    return abfsStore.getUser();
+    return user;
   }
 
-  /**
-   * Get the group name of the owner of the FS.
-   * @return primary group name
-   */
   public String getOwnerUserPrimaryGroup() {
-    return abfsStore.getPrimaryGroup();
+    return primaryUserGroup;
   }
 
   private boolean deleteRoot() throws IOException {
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
index c2739e9..5c28bd4 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
@@ -67,7 +67,6 @@ import org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode;
 import org.apache.hadoop.fs.azurebfs.contracts.services.ListResultEntrySchema;
 import org.apache.hadoop.fs.azurebfs.contracts.services.ListResultSchema;
 import org.apache.hadoop.fs.azurebfs.oauth2.AccessTokenProvider;
-import org.apache.hadoop.fs.azurebfs.oauth2.IdentityTransformer;
 import org.apache.hadoop.fs.azurebfs.services.AbfsAclHelper;
 import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
 import org.apache.hadoop.fs.azurebfs.services.AbfsHttpOperation;
@@ -89,7 +88,9 @@ import org.apache.http.client.utils.URIBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.SUPER_USER;
 import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_ABFS_ENDPOINT;
+
 /**
  * Provides the bridging logic between Hadoop's abstract filesystem and Azure Storage.
  */
@@ -100,6 +101,7 @@ public class AzureBlobFileSystemStore {
 
   private AbfsClient client;
   private URI uri;
+  private final UserGroupInformation userGroupInformation;
   private String userName;
   private String primaryUserGroup;
   private static final String DATE_TIME_PATTERN = "E, dd MMM yyyy HH:mm:ss 'GMT'";
@@ -111,12 +113,11 @@ public class AzureBlobFileSystemStore {
   private boolean isNamespaceEnabledSet;
   private boolean isNamespaceEnabled;
   private final AuthType authType;
-  private final UserGroupInformation userGroupInformation;
-  private final IdentityTransformer identityTransformer;
 
-  public AzureBlobFileSystemStore(URI uri, boolean isSecureScheme, Configuration configuration)
-          throws IOException {
+  public AzureBlobFileSystemStore(URI uri, boolean isSecureScheme, Configuration configuration, UserGroupInformation userGroupInformation)
+          throws AzureBlobFileSystemException, IOException {
     this.uri = uri;
+
     String[] authorityParts = authorityParts(uri);
     final String fileSystemName = authorityParts[0];
     final String accountName = authorityParts[1];
@@ -126,8 +127,10 @@ public class AzureBlobFileSystemStore {
     } catch (IllegalAccessException exception) {
       throw new FileSystemOperationUnhandledException(exception);
     }
-    this.userGroupInformation = UserGroupInformation.getCurrentUser();
+
+    this.userGroupInformation = userGroupInformation;
     this.userName = userGroupInformation.getShortUserName();
+
     if (!abfsConfiguration.getSkipUserGroupMetadataDuringInitialization()) {
       try {
         this.primaryUserGroup = userGroupInformation.getPrimaryGroupName();
@@ -142,25 +145,12 @@ public class AzureBlobFileSystemStore {
 
     this.azureAtomicRenameDirSet = new HashSet<>(Arrays.asList(
         abfsConfiguration.getAzureAtomicRenameDirs().split(AbfsHttpConstants.COMMA)));
+
     this.authType = abfsConfiguration.getAuthType(accountName);
     boolean usingOauth = (authType == AuthType.OAuth);
     boolean useHttps = (usingOauth || abfsConfiguration.isHttpsAlwaysUsed()) ? true : isSecureScheme;
     initializeClient(uri, fileSystemName, accountName, useHttps);
-    this.identityTransformer = new IdentityTransformer(abfsConfiguration.getRawConfiguration());
-  }
 
-  /**
-   * @return local user name.
-   * */
-  public String getUser() {
-    return this.userName;
-  }
-
-  /**
-  * @return primary group that user belongs to.
-  * */
-  public String getPrimaryGroup() {
-    return this.primaryUserGroup;
   }
 
   private String[] authorityParts(URI uri) throws InvalidUriAuthorityException, InvalidUriException {
@@ -462,54 +452,60 @@ public class AzureBlobFileSystemStore {
             path,
             isNamespaceEnabled);
 
-    final AbfsRestOperation op;
-    if (path.isRoot()) {
-      op = isNamespaceEnabled
-              ? client.getAclStatus(AbfsHttpConstants.FORWARD_SLASH + AbfsHttpConstants.ROOT_PATH)
-              : client.getFilesystemProperties();
-    } else {
-      op = client.getPathProperties(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path), isNamespaceEnabled);
-    }
-
-    final long blockSize = abfsConfiguration.getAzureBlockSize();
-    final AbfsHttpOperation result = op.getResult();
-
-    final String eTag = result.getResponseHeader(HttpHeaderConfigurations.ETAG);
-    final String lastModified = result.getResponseHeader(HttpHeaderConfigurations.LAST_MODIFIED);
-    final String permissions = result.getResponseHeader((HttpHeaderConfigurations.X_MS_PERMISSIONS));
-    final boolean hasAcl = AbfsPermission.isExtendedAcl(permissions);
-    final long contentLength;
-    final boolean resourceIsDir;
-
     if (path.isRoot()) {
-      contentLength = 0;
-      resourceIsDir = true;
+      final AbfsRestOperation op = isNamespaceEnabled
+          ? client.getAclStatus(AbfsHttpConstants.FORWARD_SLASH + AbfsHttpConstants.ROOT_PATH)
+          : client.getFilesystemProperties();
+
+      final long blockSize = abfsConfiguration.getAzureBlockSize();
+      final String owner = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_OWNER);
+      final String group = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_GROUP);
+      final String permissions = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_PERMISSIONS);
+      final String eTag = op.getResult().getResponseHeader(HttpHeaderConfigurations.ETAG);
+      final String lastModified = op.getResult().getResponseHeader(HttpHeaderConfigurations.LAST_MODIFIED);
+      final boolean hasAcl = AbfsPermission.isExtendedAcl(permissions);
+
+      return new VersionedFileStatus(
+              isSuperUserOrEmpty(owner) ? userName : owner,
+              isSuperUserOrEmpty(group) ? primaryUserGroup : group,
+              permissions == null ? new AbfsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL)
+                      : AbfsPermission.valueOf(permissions),
+              hasAcl,
+              0,
+              true,
+              1,
+              blockSize,
+              parseLastModifiedTime(lastModified),
+              path,
+              eTag);
     } else {
-      contentLength = parseContentLength(result.getResponseHeader(HttpHeaderConfigurations.CONTENT_LENGTH));
-      resourceIsDir = parseIsDirectory(result.getResponseHeader(HttpHeaderConfigurations.X_MS_RESOURCE_TYPE));
+      AbfsRestOperation op = client.getPathProperties(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path), isNamespaceEnabled);
+
+      final long blockSize = abfsConfiguration.getAzureBlockSize();
+      final AbfsHttpOperation result = op.getResult();
+      final String eTag = result.getResponseHeader(HttpHeaderConfigurations.ETAG);
+      final String lastModified = result.getResponseHeader(HttpHeaderConfigurations.LAST_MODIFIED);
+      final String contentLength = result.getResponseHeader(HttpHeaderConfigurations.CONTENT_LENGTH);
+      final String resourceType = result.getResponseHeader(HttpHeaderConfigurations.X_MS_RESOURCE_TYPE);
+      final String owner = result.getResponseHeader(HttpHeaderConfigurations.X_MS_OWNER);
+      final String group = result.getResponseHeader(HttpHeaderConfigurations.X_MS_GROUP);
+      final String permissions = result.getResponseHeader((HttpHeaderConfigurations.X_MS_PERMISSIONS));
+      final boolean hasAcl = AbfsPermission.isExtendedAcl(permissions);
+
+      return new VersionedFileStatus(
+              isSuperUserOrEmpty(owner) ? userName : owner,
+              isSuperUserOrEmpty(group) ? primaryUserGroup : group,
+              permissions == null ? new AbfsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL)
+                      : AbfsPermission.valueOf(permissions),
+              hasAcl,
+              parseContentLength(contentLength),
+              parseIsDirectory(resourceType),
+              1,
+              blockSize,
+              parseLastModifiedTime(lastModified),
+              path,
+              eTag);
     }
-
-    final String transformedOwner = identityTransformer.transformIdentityForGetRequest(
-              result.getResponseHeader(HttpHeaderConfigurations.X_MS_OWNER),
-              userName);
-
-    final String transformedGroup = identityTransformer.transformIdentityForGetRequest(
-              result.getResponseHeader(HttpHeaderConfigurations.X_MS_GROUP),
-              primaryUserGroup);
-
-    return new VersionedFileStatus(
-            transformedOwner,
-            transformedGroup,
-            permissions == null ? new AbfsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL)
-                    : AbfsPermission.valueOf(permissions),
-            hasAcl,
-            contentLength,
-            resourceIsDir,
-            1,
-            blockSize,
-            parseLastModifiedTime(lastModified),
-            path,
-            eTag);
   }
 
   public FileStatus[] listStatus(final Path path) throws IOException {
@@ -536,8 +532,8 @@ public class AzureBlobFileSystemStore {
       long blockSize = abfsConfiguration.getAzureBlockSize();
 
       for (ListResultEntrySchema entry : retrievedSchema.paths()) {
-        final String owner = identityTransformer.transformIdentityForGetRequest(entry.owner(), userName);
-        final String group = identityTransformer.transformIdentityForGetRequest(entry.group(), primaryUserGroup);
+        final String owner = isSuperUserOrEmpty(entry.owner()) ? userName : entry.owner();
+        final String group = isSuperUserOrEmpty(entry.group()) ? primaryUserGroup : entry.group();
         final FsPermission fsPermission = entry.permissions() == null
                 ? new AbfsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL)
                 : AbfsPermission.valueOf(entry.permissions());
@@ -570,7 +566,7 @@ public class AzureBlobFileSystemStore {
 
     } while (continuation != null && !continuation.isEmpty());
 
-    return fileStatuses.toArray(new FileStatus[fileStatuses.size()]);
+    return fileStatuses.toArray(new FileStatus[0]);
   }
 
   public void setOwner(final Path path, final String owner, final String group) throws
@@ -580,17 +576,20 @@ public class AzureBlobFileSystemStore {
           "This operation is only valid for storage accounts with the hierarchical namespace enabled.");
     }
 
+    String effectiveOwner = owner;
+    String effectiveGroup = group;
+    if (authType == AuthType.SharedKey && owner.equals(userName)) {
+      effectiveOwner = SUPER_USER;
+      effectiveGroup = SUPER_USER;
+    }
+
     LOG.debug(
             "setOwner filesystem: {} path: {} owner: {} group: {}",
             client.getFileSystem(),
             path.toString(),
-            owner,
-            group);
-
-    final String transformedOwner = identityTransformer.transformUserOrGroupForSetRequest(owner);
-    final String transformedGroup = identityTransformer.transformUserOrGroupForSetRequest(group);
-
-    client.setOwner(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path, true), transformedOwner, transformedGroup);
+            effectiveOwner,
+            effectiveGroup);
+    client.setOwner(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path, true), effectiveOwner, effectiveGroup);
   }
 
   public void setPermission(final Path path, final FsPermission permission) throws
@@ -621,9 +620,7 @@ public class AzureBlobFileSystemStore {
             client.getFileSystem(),
             path.toString(),
             AclEntry.aclSpecToString(aclSpec));
-
-    final List<AclEntry> transformedAclEntries = identityTransformer.transformAclEntriesForSetRequest(aclSpec);
-    final Map<String, String> modifyAclEntries = AbfsAclHelper.deserializeAclSpec(AclEntry.aclSpecToString(transformedAclEntries));
+    final Map<String, String> modifyAclEntries = AbfsAclHelper.deserializeAclSpec(AclEntry.aclSpecToString(aclSpec));
     boolean useUpn = AbfsAclHelper.isUpnFormatAclEntries(modifyAclEntries);
 
     final AbfsRestOperation op = client.getAclStatus(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path, true), useUpn);
@@ -648,9 +645,7 @@ public class AzureBlobFileSystemStore {
             client.getFileSystem(),
             path.toString(),
             AclEntry.aclSpecToString(aclSpec));
-
-    final List<AclEntry> transformedAclEntries = identityTransformer.transformAclEntriesForSetRequest(aclSpec);
-    final Map<String, String> removeAclEntries = AbfsAclHelper.deserializeAclSpec(AclEntry.aclSpecToString(transformedAclEntries));
+    final Map<String, String> removeAclEntries = AbfsAclHelper.deserializeAclSpec(AclEntry.aclSpecToString(aclSpec));
     boolean isUpnFormat = AbfsAclHelper.isUpnFormatAclEntries(removeAclEntries);
 
     final AbfsRestOperation op = client.getAclStatus(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path, true), isUpnFormat);
@@ -727,9 +722,7 @@ public class AzureBlobFileSystemStore {
             client.getFileSystem(),
             path.toString(),
             AclEntry.aclSpecToString(aclSpec));
-
-    final List<AclEntry> transformedAclEntries = identityTransformer.transformAclEntriesForSetRequest(aclSpec);
-    final Map<String, String> aclEntries = AbfsAclHelper.deserializeAclSpec(AclEntry.aclSpecToString(transformedAclEntries));
+    final Map<String, String> aclEntries = AbfsAclHelper.deserializeAclSpec(AclEntry.aclSpecToString(aclSpec));
     final boolean isUpnFormat = AbfsAclHelper.isUpnFormatAclEntries(aclEntries);
 
     final AbfsRestOperation op = client.getAclStatus(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path, true), isUpnFormat);
@@ -756,13 +749,8 @@ public class AzureBlobFileSystemStore {
     AbfsRestOperation op = client.getAclStatus(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path, true));
     AbfsHttpOperation result = op.getResult();
 
-    final String transformedOwner = identityTransformer.transformIdentityForGetRequest(
-            result.getResponseHeader(HttpHeaderConfigurations.X_MS_OWNER),
-            userName);
-    final String transformedGroup = identityTransformer.transformIdentityForGetRequest(
-            result.getResponseHeader(HttpHeaderConfigurations.X_MS_GROUP),
-            primaryUserGroup);
-
+    final String owner = result.getResponseHeader(HttpHeaderConfigurations.X_MS_OWNER);
+    final String group = result.getResponseHeader(HttpHeaderConfigurations.X_MS_GROUP);
     final String permissions = result.getResponseHeader(HttpHeaderConfigurations.X_MS_PERMISSIONS);
     final String aclSpecString = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_ACL);
 
@@ -771,8 +759,8 @@ public class AzureBlobFileSystemStore {
             : AbfsPermission.valueOf(permissions);
 
     final AclStatus.Builder aclStatusBuilder = new AclStatus.Builder();
-    aclStatusBuilder.owner(transformedOwner);
-    aclStatusBuilder.group(transformedGroup);
+    aclStatusBuilder.owner(isSuperUserOrEmpty(owner)? userName : owner);
+    aclStatusBuilder.group(isSuperUserOrEmpty(group) ? primaryUserGroup : group);
 
     aclStatusBuilder.setPermission(fsPermission);
     aclStatusBuilder.stickyBit(fsPermission.getStickyBit());
@@ -956,6 +944,11 @@ public class AzureBlobFileSystemStore {
     return false;
   }
 
+  private boolean isSuperUserOrEmpty(final String name) {
+      return name == null || name.equals(SUPER_USER);
+  }
+
+
   private static class VersionedFileStatus extends FileStatus {
     private final String version;
 
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
index 8cd86bf..d079d94 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
@@ -55,28 +55,7 @@ public final class ConfigurationKeys {
   public static final String FS_AZURE_USER_AGENT_PREFIX_KEY = "fs.azure.user.agent.prefix";
   public static final String FS_AZURE_SSL_CHANNEL_MODE_KEY = "fs.azure.ssl.channel.mode";
   public static final String FS_AZURE_USE_UPN = "fs.azure.use.upn";
-  /** User principal names (UPNs) have the format “{alias}@{domain}”. If true,
-   *  only {alias} is included when a UPN would otherwise appear in the output
-   *  of APIs like getFileStatus, getOwner, getAclStatus, etc. Default is false. **/
-  public static final String FS_AZURE_FILE_OWNER_ENABLE_SHORTNAME = "fs.azure.identity.transformer.enable.short.name";
-  /** If the domain name is specified and “fs.azure.identity.transformer.enable.short.name”
-   *  is true, then the {alias} part of a UPN can be specified as input to APIs like setOwner and
-   *  setAcl and it will be transformed to a UPN by appending @ and the domain specified by
-   *  this configuration property. **/
-  public static final String FS_AZURE_FILE_OWNER_DOMAINNAME = "fs.azure.identity.transformer.domain.name";
-  /** An Azure Active Directory object ID (oid) used as the replacement for names contained in the
-   * list specified by “fs.azure.identity.transformer.service.principal.substitution.list.
-   * Notice that instead of setting oid, you can also set $superuser.**/
-  public static final String FS_AZURE_OVERRIDE_OWNER_SP = "fs.azure.identity.transformer.service.principal.id";
-  /** A comma separated list of names to be replaced with the service principal ID specified by
-   * “fs.default.identity.transformer.service.principal.id”. This substitution occurs
-   * when setOwner, setAcl, modifyAclEntries, or removeAclEntries are invoked with identities
-   * contained in the substitution list. Notice that when in non-secure cluster, asterisk symbol "*"
-   * can be used to match all user/group. **/
-  public static final String FS_AZURE_OVERRIDE_OWNER_SP_LIST = "fs.azure.identity.transformer.service.principal.substitution.list";
-  /** By default this is set as false, so “$superuser” is replaced with the current user when it appears as the owner
-   * or owning group of a file or directory. To disable it, set it as true. **/
-  public static final String FS_AZURE_SKIP_SUPER_USER_REPLACEMENT = "fs.azure.identity.transformer.skip.superuser.replacement";
+
   public static final String AZURE_KEY_ACCOUNT_KEYPROVIDER = "fs.azure.account.keyprovider";
   public static final String AZURE_KEY_ACCOUNT_SHELLKEYPROVIDER_SCRIPT = "fs.azure.shellkeyprovider.script";
 
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/IdentityTransformer.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/IdentityTransformer.java
deleted file mode 100644
index 90bd5e1..0000000
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/IdentityTransformer.java
+++ /dev/null
@@ -1,275 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.fs.azurebfs.oauth2;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Locale;
-
-import com.google.common.base.Preconditions;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.permission.AclEntry;
-import org.apache.hadoop.fs.permission.AclEntryType;
-import org.apache.hadoop.security.UserGroupInformation;
-
-import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.AT;
-import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.STAR;
-import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.SUPER_USER;
-import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_SKIP_SUPER_USER_REPLACEMENT;
-import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_FILE_OWNER_DOMAINNAME;
-import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_FILE_OWNER_ENABLE_SHORTNAME;
-import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_OVERRIDE_OWNER_SP;
-import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_OVERRIDE_OWNER_SP_LIST;
-
-/**
- * Perform transformation for Azure Active Directory identities used in owner, group and acls.
- */
-public class IdentityTransformer {
-
-  private boolean isSecure;
-  private String servicePrincipalId;
-  private String serviceWhiteList;
-  private String domainName;
-  private boolean enableShortName;
-  private boolean skipUserIdentityReplacement;
-  private boolean skipSuperUserReplacement;
-  private boolean domainIsSet;
-  private static final String UUID_PATTERN = "^[0-9a-f]{8}-[0-9a-f]{4}-[1-5][0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12}$";
-
-  public IdentityTransformer(Configuration configuration) throws IOException {
-    Preconditions.checkNotNull(configuration, "configuration");
-    this.isSecure = UserGroupInformation.getCurrentUser().isSecurityEnabled();
-    this.servicePrincipalId = configuration.get(FS_AZURE_OVERRIDE_OWNER_SP, "");
-    this.serviceWhiteList = configuration.get(FS_AZURE_OVERRIDE_OWNER_SP_LIST, "");
-    this.domainName = configuration.get(FS_AZURE_FILE_OWNER_DOMAINNAME, "");
-    this.enableShortName = configuration.getBoolean(FS_AZURE_FILE_OWNER_ENABLE_SHORTNAME, false);
-
-    // - "servicePrincipalId" and "serviceWhiteList" are required for
-    //    transformation between localUserOrGroup and principalId,$superuser
-    // - "enableShortName" is required for transformation between shortName and fullyQualifiedName.
-    this.skipUserIdentityReplacement = servicePrincipalId.isEmpty() && serviceWhiteList.isEmpty() && !enableShortName;
-    this.skipSuperUserReplacement = configuration.getBoolean(FS_AZURE_SKIP_SUPER_USER_REPLACEMENT, false);
-
-    if (enableShortName){
-      // need to check the domain setting only when short name is enabled.
-      // if shortName is not enabled, transformer won't transform a shortName to
-      // a fully qualified name.
-      this.domainIsSet = !domainName.isEmpty();
-    }
-  }
-
-  /**
-   * Perform identity transformation for the Get request results in AzureBlobFileSystemStore:
-   * getFileStatus(), listStatus(), getAclStatus().
-   * Input originalUserOrGroup can be one of the following:
-   * 1. $superuser:
-   *     by default it will be transformed to local user/group, this can be disabled by setting
-   *     "fs.azure.identity.transformer.skip.superuser.replacement" to true.
-   *
-   * 2. User principal id:
-   *     can be transformed to localUserOrGroup, if this principal id matches the principal id set in
-   *     "fs.azure.identity.transformer.service.principal.id" and localUserOrGroup is stated in
-   *     "fs.azure.identity.transformer.service.principal.substitution.list"
-   *
-   * 3. User principal name (UPN):
-   *     can be transformed to a short name(localUserOrGroup) if "fs.azure.identity.transformer.enable.short.name"
-   *     is enabled.
-   *
-   * @param originalUserOrGroup the original user or group in the get request results: FileStatus, AclStatus.
-   * @param localUserOrGroup the local user or group, should be parsed from UserGroupInformation.
-   * @return owner or group after transformation.
-   * */
-  public String transformIdentityForGetRequest(String originalUserOrGroup, String localUserOrGroup) {
-    if (originalUserOrGroup == null) {
-      originalUserOrGroup = localUserOrGroup;
-      // localUserOrGroup might be a full name, so continue the transformation.
-    }
-    // case 1: it is $superuser and replace $superuser config is enabled
-    if (!skipSuperUserReplacement && SUPER_USER.equals(originalUserOrGroup)) {
-      return localUserOrGroup;
-    }
-
-    if (skipUserIdentityReplacement) {
-      return originalUserOrGroup;
-    }
-
-    // case 2: original owner is principalId set in config, and localUser
-    //         is a daemon service specified in substitution list,
-    //         To avoid ownership check failure in job task, replace it
-    //         to local daemon user/group
-    if (originalUserOrGroup.equals(servicePrincipalId) && isInSubstitutionList(localUserOrGroup)) {
-      return localUserOrGroup;
-    }
-
-    // case 3: If original owner is a fully qualified name, and
-    //         short name is enabled, replace with shortName.
-    if (shouldUseShortUserName(originalUserOrGroup)) {
-      return getShortName(originalUserOrGroup);
-    }
-
-    return originalUserOrGroup;
-  }
-
-  /**
-   * Perform Identity transformation when setting owner on a path.
-   * There are four possible input:
-   * 1.short name; 2.$superuser; 3.Fully qualified name; 4. principal id.
-   *
-   * short name could be transformed to:
-   *    - A service principal id or $superuser, if short name belongs a daemon service
-   *      stated in substitution list AND "fs.azure.identity.transformer.service.principal.id"
-   *      is set with $superuser or a principal id.
-   *    - Fully qualified name, if "fs.azure.identity.transformer.domain.name" is set in configuration.
-   *
-   * $superuser, fully qualified name and principalId should not be transformed.
-   *
-   * @param userOrGroup the user or group to be set as owner.
-   * @return user or group after transformation.
-   * */
-  public String transformUserOrGroupForSetRequest(String userOrGroup) {
-    if (userOrGroup == null || userOrGroup.isEmpty() || skipUserIdentityReplacement) {
-      return userOrGroup;
-    }
-
-    // case 1: when the owner to be set is stated in substitution list.
-    if (isInSubstitutionList(userOrGroup)) {
-      return servicePrincipalId;
-    }
-
-    // case 2: when the owner is a short name of the user principal name(UPN).
-    if (shouldUseFullyQualifiedUserName(userOrGroup)) {
-      return getFullyQualifiedName(userOrGroup);
-    }
-
-    return userOrGroup;
-  }
-
-  /**
-   * Perform Identity transformation when calling setAcl(),removeAclEntries() and modifyAclEntries()
-   * If the AclEntry type is a user or group, and its name is one of the following:
-   * 1.short name; 2.$superuser; 3.Fully qualified name; 4. principal id.
-   * Short name could be transformed to:
-   *    - A service principal id or $superuser, if short name belongs a daemon service
-   *      stated in substitution list AND "fs.azure.identity.transformer.service.principal.id"
-   *      is set with $superuser or a principal id.
-   *    - A fully qualified name, if the AclEntry type is User AND if "fs.azure.identity.transformer.domain.name"
-   *    is set in configuration. This is to make the behavior consistent with HDI.
-   *
-   * $superuser, fully qualified name and principal id should not be transformed.
-   *
-   * @param aclEntries list of AclEntry
-   * @return list of AclEntry after the identity transformation.
-   * */
-  public List<AclEntry> transformAclEntriesForSetRequest(final List<AclEntry> aclEntries) {
-    if (skipUserIdentityReplacement) {
-      return aclEntries;
-    }
-
-    for (int i = 0; i < aclEntries.size(); i++) {
-      AclEntry aclEntry = aclEntries.get(i);
-      String name = aclEntry.getName();
-      String transformedName = name;
-      if (name == null || name.isEmpty() || aclEntry.getType().equals(AclEntryType.OTHER) || aclEntry.getType().equals(AclEntryType.MASK)) {
-        continue;
-      }
-
-      // case 1: when the user or group name to be set is stated in substitution list.
-      if (isInSubstitutionList(name)) {
-        transformedName = servicePrincipalId;
-      } else if (aclEntry.getType().equals(AclEntryType.USER) // case 2: when the owner is a short name
-              && shouldUseFullyQualifiedUserName(name)) {     //         of the user principal name (UPN).
-        // Notice: for group type ACL entry, if name is shortName.
-        //         It won't be converted to Full Name. This is
-        //         to make the behavior consistent with HDI.
-        transformedName = getFullyQualifiedName(name);
-      }
-
-      // Avoid unnecessary new AclEntry allocation
-      if (transformedName.equals(name)) {
-        continue;
-      }
-
-      AclEntry.Builder aclEntryBuilder = new AclEntry.Builder();
-      aclEntryBuilder.setType(aclEntry.getType());
-      aclEntryBuilder.setName(transformedName);
-      aclEntryBuilder.setScope(aclEntry.getScope());
-      aclEntryBuilder.setPermission(aclEntry.getPermission());
-
-      // Replace the original AclEntry
-      aclEntries.set(i, aclEntryBuilder.build());
-    }
-    return aclEntries;
-  }
-
-  /**
-   * Internal method to identify if owner name returned by the ADLS backend is short name or not.
-   * If name contains "@", this code assumes that whatever comes after '@' is domain name and ignores it.
-   * @param owner owner name
-   * @return true if it is non null & contain an "@"
-   */
-  private boolean isShortUserName(String owner) {
-    return (owner != null) && !owner.contains(AT);
-  }
-
-  private boolean shouldUseShortUserName(String owner){
-    return enableShortName && !isShortUserName(owner);
-  }
-
-  private String getShortName(String userName) {
-    if (userName == null)    {
-      return  null;
-    }
-
-    if (isShortUserName(userName)) {
-      return userName;
-    }
-
-    String userNameBeforeAt = userName.substring(0, userName.indexOf(AT));
-    if (isSecure) {
-      //In secure clusters we apply auth to local rules to lowercase all short localUser names (notice /L at the end),
-      //E.G. : RULE:[1:$1@$0](.*@FOO.ONMICROSOFT.COM)s/@.*/// Ideally we should use the HadoopKerberosName class to get
-      // new HadoopKerberosName(arg).getShortName. However,
-      //1. ADLS can report the Realm in lower case while returning file owner names( ie. : Some.User@realm.onmicrosoft.com)
-      //2. The RULE specification does not allow specifying character classes to do case insensitive matches
-      //Due to this, we end up using a forced lowercase version of the manually shortened name
-      return userNameBeforeAt.toLowerCase(Locale.ENGLISH);
-    }
-    return userNameBeforeAt;
-  }
-
-  private String getFullyQualifiedName(String name){
-    if (domainIsSet && (name != null) && !name.contains(AT)){
-      return name + AT + domainName;
-    }
-    return name;
-  }
-
-  private boolean shouldUseFullyQualifiedUserName(String owner){
-    return domainIsSet && !SUPER_USER.equals(owner) && !isUuid(owner) && enableShortName && isShortUserName(owner);
-  }
-
-  private boolean isInSubstitutionList(String localUserName) {
-    return serviceWhiteList.contains(STAR) || serviceWhiteList.contains(localUserName);
-  }
-
-  private boolean isUuid(String input) {
-    if (input == null) return false;
-    return input.matches(UUID_PATTERN);
-  }
-}
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/package-info.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/package-info.java
index 9bca4e6..bad1a85 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/package-info.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/package-info.java
@@ -15,10 +15,4 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
-@InterfaceAudience.Private
-@InterfaceStability.Evolving
-package org.apache.hadoop.fs.azurebfs.oauth2;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
+package org.apache.hadoop.fs.azurebfs.oauth2;
\ No newline at end of file
diff --git a/hadoop-tools/hadoop-azure/src/site/markdown/testing_azure.md b/hadoop-tools/hadoop-azure/src/site/markdown/testing_azure.md
index 40a372d..c2afe74 100644
--- a/hadoop-tools/hadoop-azure/src/site/markdown/testing_azure.md
+++ b/hadoop-tools/hadoop-azure/src/site/markdown/testing_azure.md
@@ -733,61 +733,6 @@ hierarchical namespace enabled, and set the following configuration settings:
      <description>AAD client id.</description>
    </property>
   -->
-
-  <!--
-    <property>
-        <name>fs.azure.identity.transformer.enable.short.name</name>
-        <value>true/false</value>
-        <description>
-          User principal names (UPNs) have the format “{alias}@{domain}”.
-          If true, only {alias} is included when a UPN would otherwise appear in the output
-          of APIs like getFileStatus, getOwner, getAclStatus, etc, default is false.
-        </description>
-    </property>
-
-    <property>
-        <name>fs.azure.identity.transformer.domain.name</name>
-        <value>domain name of the user's upn</value>
-        <description>
-          If the domain name is specified and “fs.azure.identity.transformer.enable.short.name”
-          is true, then the {alias} part of a UPN can be specified as input to APIs like setOwner,
-          setAcl, modifyAclEntries, or removeAclEntries, and it will be transformed to a UPN by appending @ and the domain specified by
-          this configuration property.
-        </description>
-    </property>
-
-    <property>
-        <name>fs.azure.identity.transformer.service.principal.id</name>
-        <value>service principal object id</value>
-        <description>
-          An Azure Active Directory object ID (oid) used as the replacement for names contained
-          in the list specified by “fs.azure.identity.transformer.service.principal.substitution.list”.
-          Notice that instead of setting oid, you can also set $superuser here.
-        </description>
-    </property>
-
-    <property>
-        <name>fs.azure.identity.transformer.skip.superuser.replacement</name>
-        <value>true/false</value>
-        <description>
-          If false, “$superuser” is replaced with the current user when it appears as the owner
-          or owning group of a file or directory. The default is false.
-        </description>
-    </property>
-
-    <property>
-        <name>fs.azure.identity.transformer.service.principal.substitution.list</name>
-        <value>mapred,hdfs,yarn,hive,tez</value>
-        <description>
-           A comma separated list of names to be replaced with the service principal ID specified by
-           “fs.azure.identity.transformer.service.principal.id”.  This substitution occurs
-           when setOwner, setAcl, modifyAclEntries, or removeAclEntries are invoked with identities
-           contained in the substitution list. Notice that when in non-secure cluster, asterisk symbol *
-           can be used to match all user/group.
-        </description>
-    </property>
-   -->
-
 ```
 
 If running tests against an endpoint that uses the URL format
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsIdentityTransformer.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsIdentityTransformer.java
deleted file mode 100644
index 41f680e..0000000
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsIdentityTransformer.java
+++ /dev/null
@@ -1,301 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.fs.azurebfs;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.UUID;
-
-import com.google.common.collect.Lists;
-import org.apache.hadoop.fs.azurebfs.oauth2.IdentityTransformer;
-import org.apache.hadoop.fs.permission.AclEntry;
-import org.junit.Test;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.security.UserGroupInformation;
-
-import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.SUPER_USER;
-import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_FILE_OWNER_DOMAINNAME;
-import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_FILE_OWNER_ENABLE_SHORTNAME;
-import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_OVERRIDE_OWNER_SP;
-import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_OVERRIDE_OWNER_SP_LIST;
-import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_SKIP_SUPER_USER_REPLACEMENT;
-import static org.apache.hadoop.fs.azurebfs.utils.AclTestHelpers.aclEntry;
-import static org.apache.hadoop.fs.permission.AclEntryScope.ACCESS;
-import static org.apache.hadoop.fs.permission.AclEntryScope.DEFAULT;
-import static org.apache.hadoop.fs.permission.AclEntryType.GROUP;
-import static org.apache.hadoop.fs.permission.AclEntryType.MASK;
-import static org.apache.hadoop.fs.permission.AclEntryType.OTHER;
-import static org.apache.hadoop.fs.permission.AclEntryType.USER;
-import static org.apache.hadoop.fs.permission.FsAction.ALL;
-
-/**
- * Test IdentityTransformer.
- */
-//@RunWith(Parameterized.class)
-public class ITestAbfsIdentityTransformer extends AbstractAbfsScaleTest{
-  private final UserGroupInformation userGroupInfo;
-  private final String localUser;
-  private final String localGroup;
-  private static final String DAEMON = "daemon";
-  private static final String ASTERISK = "*";
-  private static final String SHORT_NAME = "abc";
-  private static final String DOMAIN = "domain.com";
-  private static final String FULLY_QUALIFIED_NAME = "abc@domain.com";
-  private static final String SERVICE_PRINCIPAL_ID = UUID.randomUUID().toString();
-
-  public ITestAbfsIdentityTransformer() throws Exception {
-    super();
-    userGroupInfo = UserGroupInformation.getCurrentUser();
-    localUser = userGroupInfo.getShortUserName();
-    localGroup = userGroupInfo.getPrimaryGroupName();
-  }
-
-  @Test
-  public void testDaemonServiceSettingIdentity() throws IOException {
-    Configuration config = this.getRawConfiguration();
-    resetIdentityConfig(config);
-    // Default config
-    IdentityTransformer identityTransformer = getTransformerWithDefaultIdentityConfig(config);
-    assertEquals("Identity should not change for default config",
-            DAEMON, identityTransformer.transformUserOrGroupForSetRequest(DAEMON));
-
-    // Add service principal id
-    config.set(FS_AZURE_OVERRIDE_OWNER_SP, SERVICE_PRINCIPAL_ID);
-
-    // case 1: substitution list doesn't contain daemon
-    config.set(FS_AZURE_OVERRIDE_OWNER_SP_LIST, "a,b,c,d");
-    identityTransformer = getTransformerWithCustomizedIdentityConfig(config);
-    assertEquals("Identity should not change when substitution list doesn't contain daemon",
-            DAEMON, identityTransformer.transformUserOrGroupForSetRequest(DAEMON));
-
-    // case 2: substitution list contains daemon name
-    config.set(FS_AZURE_OVERRIDE_OWNER_SP_LIST, DAEMON + ",a,b,c,d");
-    identityTransformer = getTransformerWithCustomizedIdentityConfig(config);
-    assertEquals("Identity should be replaced to servicePrincipalId",
-            SERVICE_PRINCIPAL_ID, identityTransformer.transformUserOrGroupForSetRequest(DAEMON));
-
-    // case 3: substitution list is *
-    config.set(FS_AZURE_OVERRIDE_OWNER_SP_LIST, ASTERISK);
-    identityTransformer = getTransformerWithCustomizedIdentityConfig(config);
-    assertEquals("Identity should be replaced to servicePrincipalId",
-            SERVICE_PRINCIPAL_ID, identityTransformer.transformUserOrGroupForSetRequest(DAEMON));
-  }
-
-  @Test
-  public void testFullyQualifiedNameSettingIdentity() throws IOException {
-    Configuration config = this.getRawConfiguration();
-    // Default config
-    IdentityTransformer identityTransformer = getTransformerWithDefaultIdentityConfig(config);
-    assertEquals("short name should not be converted to full name by default",
-            SHORT_NAME, identityTransformer.transformUserOrGroupForSetRequest(SHORT_NAME));
-
-    resetIdentityConfig(config);
-
-    // Add config to get fully qualified username
-    config.setBoolean(FS_AZURE_FILE_OWNER_ENABLE_SHORTNAME, true);
-    config.set(FS_AZURE_FILE_OWNER_DOMAINNAME, DOMAIN);
-    identityTransformer = getTransformerWithCustomizedIdentityConfig(config);
-    assertEquals("short name should be converted to full name",
-            FULLY_QUALIFIED_NAME, identityTransformer.transformUserOrGroupForSetRequest(SHORT_NAME));
-  }
-
-  @Test
-  public void testNoOpForSettingOidAsIdentity() throws IOException {
-    Configuration config = this.getRawConfiguration();
-    resetIdentityConfig(config);
-
-    config.setBoolean(FS_AZURE_FILE_OWNER_ENABLE_SHORTNAME, true);
-    config.set(FS_AZURE_FILE_OWNER_DOMAINNAME, DOMAIN);
-    config.set(FS_AZURE_OVERRIDE_OWNER_SP, UUID.randomUUID().toString());
-    config.set(FS_AZURE_OVERRIDE_OWNER_SP_LIST, "a,b,c,d");
-
-    IdentityTransformer identityTransformer = getTransformerWithCustomizedIdentityConfig(config);
-    final String principalId = UUID.randomUUID().toString();
-    assertEquals("Identity should not be changed when owner is already a principal id ",
-            principalId, identityTransformer.transformUserOrGroupForSetRequest(principalId));
-  }
-
-  @Test
-  public void testNoOpWhenSettingSuperUserAsdentity() throws IOException {
-    Configuration config = this.getRawConfiguration();
-    resetIdentityConfig(config);
-
-    config.setBoolean(FS_AZURE_FILE_OWNER_ENABLE_SHORTNAME, true);
-    config.set(FS_AZURE_FILE_OWNER_DOMAINNAME, DOMAIN);
-    // Default config
-    IdentityTransformer identityTransformer = getTransformerWithDefaultIdentityConfig(config);
-    assertEquals("Identity should not be changed because it is not in substitution list",
-            SUPER_USER, identityTransformer.transformUserOrGroupForSetRequest(SUPER_USER));
-  }
-
-  @Test
-  public void testIdentityReplacementForSuperUserGetRequest() throws IOException {
-    Configuration config = this.getRawConfiguration();
-    resetIdentityConfig(config);
-
-    // with default config, identityTransformer should do $superUser replacement
-    IdentityTransformer identityTransformer = getTransformerWithDefaultIdentityConfig(config);
-    assertEquals("$superuser should be replaced with local user by default",
-            localUser, identityTransformer.transformIdentityForGetRequest(SUPER_USER, localUser));
-
-    // Disable $supeuser replacement
-    config.setBoolean(FS_AZURE_SKIP_SUPER_USER_REPLACEMENT, true);
-    identityTransformer = getTransformerWithCustomizedIdentityConfig(config);
-    assertEquals("$superuser should not be replaced",
-            SUPER_USER, identityTransformer.transformIdentityForGetRequest(SUPER_USER, localUser));
-  }
-
-  @Test
-  public void testIdentityReplacementForDaemonServiceGetRequest() throws IOException {
-    Configuration config = this.getRawConfiguration();
-    resetIdentityConfig(config);
-
-    // Default config
-    IdentityTransformer identityTransformer = getTransformerWithDefaultIdentityConfig(config);
-    assertEquals("By default servicePrincipalId should not be converted for GetFileStatus(), listFileStatus(), getAcl()",
-            SERVICE_PRINCIPAL_ID, identityTransformer.transformIdentityForGetRequest(SERVICE_PRINCIPAL_ID, localUser));
-
-    resetIdentityConfig(config);
-    // 1. substitution list doesn't contain currentUser
-    config.set(FS_AZURE_OVERRIDE_OWNER_SP_LIST, "a,b,c,d");
-    identityTransformer = getTransformerWithCustomizedIdentityConfig(config);
-    assertEquals("servicePrincipalId should not be replaced if local daemon user is not in substitution list",
-            SERVICE_PRINCIPAL_ID, identityTransformer.transformIdentityForGetRequest(SERVICE_PRINCIPAL_ID, localUser));
-
-    resetIdentityConfig(config);
-    // 2. substitution list contains currentUser(daemon name) but the service principal id in config doesn't match
-    config.set(FS_AZURE_OVERRIDE_OWNER_SP_LIST, localUser + ",a,b,c,d");
-    config.set(FS_AZURE_OVERRIDE_OWNER_SP, UUID.randomUUID().toString());
-    identityTransformer = getTransformerWithCustomizedIdentityConfig(config);
-    assertEquals("servicePrincipalId should not be replaced if it is not equal to the SPN set in config",
-            SERVICE_PRINCIPAL_ID, identityTransformer.transformIdentityForGetRequest(SERVICE_PRINCIPAL_ID, localUser));
-
-    resetIdentityConfig(config);
-    // 3. substitution list contains currentUser(daemon name) and the service principal id in config matches
-    config.set(FS_AZURE_OVERRIDE_OWNER_SP_LIST, localUser + ",a,b,c,d");
-    config.set(FS_AZURE_OVERRIDE_OWNER_SP, SERVICE_PRINCIPAL_ID);
-    identityTransformer = getTransformerWithCustomizedIdentityConfig(config);
-    assertEquals("servicePrincipalId should be transformed to local use",
-            localUser, identityTransformer.transformIdentityForGetRequest(SERVICE_PRINCIPAL_ID, localUser));
-
-    resetIdentityConfig(config);
-    // 4. substitution is "*" but the service principal id in config doesn't match the input
-    config.set(FS_AZURE_OVERRIDE_OWNER_SP_LIST, ASTERISK);
-    config.set(FS_AZURE_OVERRIDE_OWNER_SP, UUID.randomUUID().toString());
-    identityTransformer = getTransformerWithCustomizedIdentityConfig(config);
-    assertEquals("servicePrincipalId should not be replaced if it is not equal to the SPN set in config",
-            SERVICE_PRINCIPAL_ID, identityTransformer.transformIdentityForGetRequest(SERVICE_PRINCIPAL_ID, localUser));
-
-    resetIdentityConfig(config);
-    // 5. substitution is "*" and the service principal id in config match the input
-    config.set(FS_AZURE_OVERRIDE_OWNER_SP_LIST, ASTERISK);
-    config.set(FS_AZURE_OVERRIDE_OWNER_SP, SERVICE_PRINCIPAL_ID);
-    identityTransformer = getTransformerWithCustomizedIdentityConfig(config);
-    assertEquals("servicePrincipalId should be transformed to local user",
-            localUser, identityTransformer.transformIdentityForGetRequest(SERVICE_PRINCIPAL_ID, localUser));
-  }
-
-  @Test
-  public void testIdentityReplacementForKinitUserGetRequest() throws IOException {
-    Configuration config = this.getRawConfiguration();
-    resetIdentityConfig(config);
-
-    // Default config
-    IdentityTransformer identityTransformer = getTransformerWithDefaultIdentityConfig(config);
-    assertEquals("full name should not be transformed if shortname is not enabled",
-            FULLY_QUALIFIED_NAME, identityTransformer.transformIdentityForGetRequest(FULLY_QUALIFIED_NAME, localUser));
-
-    // add config to get short name
-    config.setBoolean(FS_AZURE_FILE_OWNER_ENABLE_SHORTNAME, true);
-    identityTransformer = getTransformerWithCustomizedIdentityConfig(config);
-    assertEquals("should convert the full name to shortname ",
-            SHORT_NAME, identityTransformer.transformIdentityForGetRequest(FULLY_QUALIFIED_NAME, localUser));
-  }
-
-  @Test
-  public void transformAclEntriesForSetRequest() throws IOException {
-    Configuration config = this.getRawConfiguration();
-    resetIdentityConfig(config);
-
-    List<AclEntry> aclEntriesToBeTransformed = Lists.newArrayList(
-            aclEntry(ACCESS, USER, DAEMON, ALL),
-            aclEntry(ACCESS, USER, FULLY_QUALIFIED_NAME,ALL),
-            aclEntry(DEFAULT, USER, SUPER_USER, ALL),
-            aclEntry(DEFAULT, USER, SERVICE_PRINCIPAL_ID, ALL),
-            aclEntry(DEFAULT, USER, SHORT_NAME, ALL),
-            aclEntry(DEFAULT, GROUP, DAEMON, ALL),
-            aclEntry(DEFAULT, GROUP, SHORT_NAME, ALL), // Notice: for group type ACL entry, if name is shortName,
-            aclEntry(DEFAULT, OTHER, ALL),             //         It won't be converted to Full Name. This is
-            aclEntry(DEFAULT, MASK, ALL)               //         to make the behavior consistent with HDI.
-    );
-
-    // Default config should not change the identities
-    IdentityTransformer identityTransformer = getTransformerWithDefaultIdentityConfig(config);
-    checkAclEntriesList(aclEntriesToBeTransformed, identityTransformer.transformAclEntriesForSetRequest(aclEntriesToBeTransformed));
-
-    resetIdentityConfig(config);
-    // With config
-    config.set(FS_AZURE_OVERRIDE_OWNER_SP_LIST, DAEMON + ",a,b,c,d");
-    config.setBoolean(FS_AZURE_FILE_OWNER_ENABLE_SHORTNAME, true);
-    config.set(FS_AZURE_FILE_OWNER_DOMAINNAME, DOMAIN);
-    config.set(FS_AZURE_OVERRIDE_OWNER_SP, SERVICE_PRINCIPAL_ID);
-    identityTransformer = getTransformerWithCustomizedIdentityConfig(config);
-
-    // expected acl entries
-    List<AclEntry> expectedAclEntries = Lists.newArrayList(
-            aclEntry(ACCESS, USER, SERVICE_PRINCIPAL_ID, ALL),
-            aclEntry(ACCESS, USER, FULLY_QUALIFIED_NAME, ALL),
-            aclEntry(DEFAULT, USER, SUPER_USER, ALL),
-            aclEntry(DEFAULT, USER, SERVICE_PRINCIPAL_ID, ALL),
-            aclEntry(DEFAULT, USER, FULLY_QUALIFIED_NAME, ALL),
-            aclEntry(DEFAULT, GROUP, SERVICE_PRINCIPAL_ID, ALL),
-            aclEntry(DEFAULT, GROUP, SHORT_NAME, ALL),
-            aclEntry(DEFAULT, OTHER, ALL),
-            aclEntry(DEFAULT, MASK, ALL)
-    );
-
-    checkAclEntriesList(identityTransformer.transformAclEntriesForSetRequest(aclEntriesToBeTransformed), expectedAclEntries);
-
-  }
-
-  private void resetIdentityConfig(Configuration config) {
-    config.unset(FS_AZURE_FILE_OWNER_ENABLE_SHORTNAME);
-    config.unset(FS_AZURE_FILE_OWNER_DOMAINNAME);
-    config.unset(FS_AZURE_OVERRIDE_OWNER_SP);
-    config.unset(FS_AZURE_OVERRIDE_OWNER_SP_LIST);
-    config.unset(FS_AZURE_SKIP_SUPER_USER_REPLACEMENT);
-  }
-
-  private IdentityTransformer getTransformerWithDefaultIdentityConfig(Configuration config) throws IOException {
-    resetIdentityConfig(config);
-    return new IdentityTransformer(config);
-  }
-
-  private IdentityTransformer getTransformerWithCustomizedIdentityConfig(Configuration config) throws IOException {
-    return new IdentityTransformer(config);
-  }
-
-  private void checkAclEntriesList(List<AclEntry> aclEntries, List<AclEntry> expected) {
-    assertTrue("list size not equals", aclEntries.size() == expected.size());
-    for (int i = 0; i < aclEntries.size(); i++) {
-      assertEquals("Identity doesn't match", expected.get(i).getName(), aclEntries.get(i).getName());
-    }
-  }
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org