You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by tm...@apache.org on 2018/08/22 18:34:12 UTC

[2/2] hadoop git commit: HADOOP-15661. ABFS: Add support for ACL. Contributed by Junhua Gu and Da Zhou.

HADOOP-15661. ABFS: Add support for ACL.
Contributed by Junhua Gu and Da Zhou.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/e70e1568
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/e70e1568
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/e70e1568

Branch: refs/heads/HADOOP-15407
Commit: e70e15689fca84e7fc63f83927d77290da0931a5
Parents: 2b5f3b0
Author: Thomas Marquardt <tm...@microsoft.com>
Authored: Wed Aug 22 18:31:47 2018 +0000
Committer: Thomas Marquardt <tm...@microsoft.com>
Committed: Wed Aug 22 18:31:47 2018 +0000

----------------------------------------------------------------------
 .../hadoop/fs/azurebfs/AzureBlobFileSystem.java |  199 +++-
 .../fs/azurebfs/AzureBlobFileSystemStore.java   |  351 +++++-
 .../azurebfs/constants/AbfsHttpConstants.java   |   15 +
 .../constants/HttpHeaderConfigurations.java     |    6 +
 .../InvalidAclOperationException.java           |   33 +
 .../services/AzureServiceErrorCode.java         |    1 +
 .../fs/azurebfs/services/AbfsAclHelper.java     |  202 ++++
 .../hadoop/fs/azurebfs/services/AbfsClient.java |  119 +-
 .../fs/azurebfs/services/AbfsPermission.java    |  114 ++
 .../ITestAzureBlobFileSystemBackCompat.java     |    2 +
 .../ITestAzureBlobFileSystemFileStatus.java     |   43 +-
 .../azurebfs/ITestAzureBlobFileSystemFlush.java |    6 +
 .../ITestAzureBlobFileSystemPermission.java     |  109 ++
 .../ITestAzureBlobFileSystemRandomRead.java     |    4 +-
 .../ITestAzureBlobFileSystemRename.java         |   14 +
 .../azurebfs/ITestAzureBlobFilesystemAcl.java   | 1071 ++++++++++++++++++
 .../fs/azurebfs/ITestWasbAbfsCompatibility.java |   12 +
 .../fs/azurebfs/utils/AclTestHelpers.java       |  119 ++
 .../hadoop/fs/azurebfs/utils/Parallelized.java  |   60 +
 19 files changed, 2422 insertions(+), 58 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/e70e1568/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java
index 2cb517b..6bec7cb 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java
@@ -26,6 +26,7 @@ import java.io.OutputStream;
 import java.net.HttpURLConnection;
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.util.List;
 import java.util.ArrayList;
 import java.util.EnumSet;
 import java.util.concurrent.Callable;
@@ -60,6 +61,8 @@ import org.apache.hadoop.fs.azurebfs.contracts.exceptions.FileSystemOperationUnh
 import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidUriAuthorityException;
 import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidUriException;
 import org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode;
+import org.apache.hadoop.fs.permission.AclEntry;
+import org.apache.hadoop.fs.permission.AclStatus;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.Progressable;
@@ -154,7 +157,8 @@ public class AzureBlobFileSystem extends FileSystem {
         blockSize);
 
     try {
-      OutputStream outputStream = abfsStore.createFile(makeQualified(f), overwrite);
+      OutputStream outputStream = abfsStore.createFile(makeQualified(f), overwrite,
+          permission == null ? FsPermission.getFileDefault() : permission, FsPermission.getUMask(getConf()));
       return new FSDataOutputStream(outputStream, statistics);
     } catch(AzureBlobFileSystemException ex) {
       checkException(f, ex);
@@ -253,7 +257,8 @@ public class AzureBlobFileSystem extends FileSystem {
               AzureServiceErrorCode.INVALID_RENAME_SOURCE_PATH,
               AzureServiceErrorCode.SOURCE_PATH_NOT_FOUND,
               AzureServiceErrorCode.INVALID_SOURCE_OR_DESTINATION_RESOURCE_TYPE,
-              AzureServiceErrorCode.RENAME_DESTINATION_PARENT_PATH_NOT_FOUND);
+              AzureServiceErrorCode.RENAME_DESTINATION_PARENT_PATH_NOT_FOUND,
+              AzureServiceErrorCode.INTERNAL_OPERATION_ABORT);
       return false;
     }
 
@@ -308,7 +313,8 @@ public class AzureBlobFileSystem extends FileSystem {
     }
 
     try {
-      abfsStore.createDirectory(makeQualified(f));
+      abfsStore.createDirectory(makeQualified(f), permission == null ? FsPermission.getDirDefault() : permission,
+          FsPermission.getUMask(getConf()));
       return true;
     } catch (AzureBlobFileSystemException ex) {
       checkException(f, ex, AzureServiceErrorCode.PATH_ALREADY_EXISTS);
@@ -457,6 +463,188 @@ public class AzureBlobFileSystem extends FileSystem {
     return true;
   }
 
+   /**
+   * Set owner of a path (i.e. a file or a directory).
+   * The parameters owner and group cannot both be null.
+   *
+   * @param path  The path
+   * @param owner If it is null, the original username remains unchanged.
+   * @param group If it is null, the original groupname remains unchanged.
+   */
+  @Override
+  public void setOwner(final Path path, final String owner, final String group)
+      throws IOException {
+    LOG.debug(
+        "AzureBlobFileSystem.setOwner path: {}", path);
+
+    if ((owner == null || owner.isEmpty()) && (group == null || group.isEmpty())) {
+      throw new IllegalArgumentException("A valid owner or group must be specified.");
+    }
+
+    try {
+      abfsStore.setOwner(makeQualified(path),
+              owner,
+              group);
+    } catch (AzureBlobFileSystemException ex) {
+      checkException(path, ex);
+    }
+  }
+
+  /**
+   * Set permission of a path.
+   *
+   * @param path       The path
+   * @param permission Access permission
+   */
+  @Override
+  public void setPermission(final Path path, final FsPermission permission)
+      throws IOException {
+    LOG.debug("AzureBlobFileSystem.setPermission path: {}", path);
+
+    if (permission == null) {
+      throw new IllegalArgumentException("The permission can't be null");
+    }
+
+    try {
+      abfsStore.setPermission(makeQualified(path),
+              permission);
+    } catch (AzureBlobFileSystemException ex) {
+      checkException(path, ex);
+    }
+  }
+
+  /**
+   * Modifies ACL entries of files and directories.  This method can add new ACL
+   * entries or modify the permissions on existing ACL entries.  All existing
+   * ACL entries that are not specified in this call are retained without
+   * changes.  (Modifications are merged into the current ACL.)
+   *
+   * @param path    Path to modify
+   * @param aclSpec List of AbfsAclEntry describing modifications
+   * @throws IOException if an ACL could not be modified
+   */
+  @Override
+  public void modifyAclEntries(final Path path, final List<AclEntry> aclSpec)
+      throws IOException {
+    LOG.debug("AzureBlobFileSystem.modifyAclEntries path: {}", path.toString());
+
+    if (aclSpec == null || aclSpec.isEmpty()) {
+      throw new IllegalArgumentException("The value of the aclSpec parameter is invalid.");
+    }
+
+    try {
+      abfsStore.modifyAclEntries(makeQualified(path),
+              aclSpec);
+    } catch (AzureBlobFileSystemException ex) {
+      checkException(path, ex);
+    }
+  }
+
+  /**
+   * Removes ACL entries from files and directories.  Other ACL entries are
+   * retained.
+   *
+   * @param path    Path to modify
+   * @param aclSpec List of AclEntry describing entries to remove
+   * @throws IOException if an ACL could not be modified
+   */
+  @Override
+  public void removeAclEntries(final Path path, final List<AclEntry> aclSpec)
+      throws IOException {
+    LOG.debug("AzureBlobFileSystem.removeAclEntries path: {}", path);
+
+    if (aclSpec == null || aclSpec.isEmpty()) {
+      throw new IllegalArgumentException("The aclSpec argument is invalid.");
+    }
+
+    try {
+      abfsStore.removeAclEntries(makeQualified(path), aclSpec);
+    } catch (AzureBlobFileSystemException ex) {
+      checkException(path, ex);
+    }
+  }
+
+  /**
+   * Removes all default ACL entries from files and directories.
+   *
+   * @param path Path to modify
+   * @throws IOException if an ACL could not be modified
+   */
+  @Override
+  public void removeDefaultAcl(final Path path) throws IOException {
+    LOG.debug("AzureBlobFileSystem.removeDefaultAcl path: {}", path);
+
+    try {
+      abfsStore.removeDefaultAcl(makeQualified(path));
+    } catch (AzureBlobFileSystemException ex) {
+      checkException(path, ex);
+    }
+  }
+
+  /**
+   * Removes all but the base ACL entries of files and directories.  The entries
+   * for user, group, and others are retained for compatibility with permission
+   * bits.
+   *
+   * @param path Path to modify
+   * @throws IOException if an ACL could not be removed
+   */
+  @Override
+  public void removeAcl(final Path path) throws IOException {
+    LOG.debug("AzureBlobFileSystem.removeAcl path: {}", path);
+
+    try {
+      abfsStore.removeAcl(makeQualified(path));
+    } catch (AzureBlobFileSystemException ex) {
+      checkException(path, ex);
+    }
+  }
+
+  /**
+   * Fully replaces ACL of files and directories, discarding all existing
+   * entries.
+   *
+   * @param path    Path to modify
+   * @param aclSpec List of AclEntry describing modifications, must include
+   *                entries for user, group, and others for compatibility with
+   *                permission bits.
+   * @throws IOException if an ACL could not be modified
+   */
+  @Override
+  public void setAcl(final Path path, final List<AclEntry> aclSpec)
+      throws IOException {
+    LOG.debug("AzureBlobFileSystem.setAcl path: {}", path);
+
+    if (aclSpec == null || aclSpec.size() == 0) {
+      throw new IllegalArgumentException("The aclSpec argument is invalid.");
+    }
+
+    try {
+      abfsStore.setAcl(makeQualified(path), aclSpec);
+    } catch (AzureBlobFileSystemException ex) {
+      checkException(path, ex);
+    }
+  }
+
+  /**
+   * Gets the ACL of a file or directory.
+   *
+   * @param path Path to get
+   * @return AbfsAclStatus describing the ACL of the file or directory
+   * @throws IOException if an ACL could not be read
+   */
+  @Override
+  public AclStatus getAclStatus(final Path path) throws IOException {
+    LOG.debug("AzureBlobFileSystem.getAclStatus path: {}", path.toString());
+
+    try {
+      return abfsStore.getAclStatus(makeQualified(path));
+    } catch (AzureBlobFileSystemException ex) {
+      checkException(path, ex);
+      return null;
+    }
+  }
+
   private FileStatus tryGetFileStatus(final Path f) {
     try {
       return getFileStatus(f);
@@ -656,4 +844,9 @@ public class AzureBlobFileSystem extends FileSystem {
   AbfsClient getAbfsClient() {
     return abfsStore.getClient();
   }
+
+  @VisibleForTesting
+  boolean getIsNamespaceEnabeld() throws AzureBlobFileSystemException {
+    return abfsStore.getIsNamespaceEnabled();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e70e1568/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
index b8da35b..58df914 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
@@ -36,8 +36,10 @@ import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Date;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Hashtable;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import javax.xml.bind.DatatypeConverter;
@@ -68,13 +70,18 @@ import org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode;
 import org.apache.hadoop.fs.azurebfs.contracts.services.ListResultEntrySchema;
 import org.apache.hadoop.fs.azurebfs.contracts.services.ListResultSchema;
 import org.apache.hadoop.fs.azurebfs.oauth2.AccessTokenProvider;
+import org.apache.hadoop.fs.azurebfs.services.AbfsAclHelper;
 import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
+import org.apache.hadoop.fs.azurebfs.services.AbfsHttpOperation;
 import org.apache.hadoop.fs.azurebfs.services.AbfsInputStream;
 import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream;
+import org.apache.hadoop.fs.azurebfs.services.AbfsPermission;
 import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation;
 import org.apache.hadoop.fs.azurebfs.services.AuthType;
 import org.apache.hadoop.fs.azurebfs.services.ExponentialRetryPolicy;
 import org.apache.hadoop.fs.azurebfs.services.SharedKeyCredentials;
+import org.apache.hadoop.fs.permission.AclEntry;
+import org.apache.hadoop.fs.permission.AclStatus;
 import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -85,7 +92,7 @@ import org.slf4j.LoggerFactory;
 import static org.apache.hadoop.util.Time.now;
 
 /**
- * Provides the bridging logic between Hadoop's abstract filesystem and Azure Storage
+ * Provides the bridging logic between Hadoop's abstract filesystem and Azure Storage.
  */
 @InterfaceAudience.Public
 @InterfaceStability.Evolving
@@ -103,7 +110,8 @@ public class AzureBlobFileSystemStore {
 
   private final AbfsConfiguration abfsConfiguration;
   private final Set<String> azureAtomicRenameDirSet;
-
+  private boolean isNamespaceEnabledSet;
+  private boolean isNamespaceEnabled;
 
   public AzureBlobFileSystemStore(URI uri, boolean isSecure, Configuration configuration, UserGroupInformation userGroupInformation)
           throws AzureBlobFileSystemException {
@@ -121,6 +129,20 @@ public class AzureBlobFileSystemStore {
     initializeClient(uri, isSecure);
   }
 
+  public boolean getIsNamespaceEnabled() throws AzureBlobFileSystemException {
+    if (!isNamespaceEnabledSet) {
+      LOG.debug("getFilesystemProperties for filesystem: {}",
+          client.getFileSystem());
+
+      final AbfsRestOperation op = client.getFilesystemProperties();
+      isNamespaceEnabled = Boolean.parseBoolean(
+          op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_NAMESPACE_ENABLED));
+      isNamespaceEnabledSet = true;
+    }
+
+    return isNamespaceEnabled;
+  }
+
   @VisibleForTesting
   URIBuilder getURIBuilder(final String hostName, boolean isSecure) {
     String scheme = isSecure ? FileSystemUriSchemes.HTTPS_SCHEME : FileSystemUriSchemes.HTTP_SCHEME;
@@ -197,7 +219,7 @@ public class AzureBlobFileSystemStore {
     } catch (CharacterCodingException ex) {
       throw new InvalidAbfsRestOperationException(ex);
     }
-    client.setPathProperties("/" + getRelativePath(path), commaSeparatedProperties);
+    client.setPathProperties(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path), commaSeparatedProperties);
   }
 
   public void createFilesystem() throws AzureBlobFileSystemException {
@@ -214,13 +236,20 @@ public class AzureBlobFileSystemStore {
     client.deleteFilesystem();
   }
 
-  public OutputStream createFile(final Path path, final boolean overwrite) throws AzureBlobFileSystemException {
-    LOG.debug("createFile filesystem: {} path: {} overwrite: {}",
+  public OutputStream createFile(final Path path, final boolean overwrite, final FsPermission permission,
+                                 final FsPermission umask) throws AzureBlobFileSystemException {
+    boolean isNamespaceEnabled = getIsNamespaceEnabled();
+    LOG.debug("createFile filesystem: {} path: {} overwrite: {} permission: {} umask: {} isNamespaceEnabled: {}",
             client.getFileSystem(),
             path,
-            overwrite);
+            overwrite,
+            permission.toString(),
+            umask.toString(),
+            isNamespaceEnabled);
 
-    client.createPath(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path), true, overwrite);
+    client.createPath(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path), true, overwrite,
+        isNamespaceEnabled ? getOctalNotation(permission) : null,
+        isNamespaceEnabled ? getOctalNotation(umask) : null);
 
     final OutputStream outputStream;
     outputStream = new FSDataOutputStream(
@@ -229,16 +258,23 @@ public class AzureBlobFileSystemStore {
     return outputStream;
   }
 
-  public void createDirectory(final Path path) throws AzureBlobFileSystemException {
-    LOG.debug("createDirectory filesystem: {} path: {}",
+  public void createDirectory(final Path path, final FsPermission permission, final FsPermission umask)
+      throws AzureBlobFileSystemException {
+    boolean isNamespaceEnabled = getIsNamespaceEnabled();
+    LOG.debug("createDirectory filesystem: {} path: {} permission: {} umask: {} isNamespaceEnabled: {}",
             client.getFileSystem(),
-            path);
+            path,
+            permission,
+            umask,
+            isNamespaceEnabled);
 
-    client.createPath("/" + getRelativePath(path), false, true);
+    client.createPath(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path), false, true,
+        isNamespaceEnabled ? getOctalNotation(permission) : null,
+        isNamespaceEnabled ? getOctalNotation(umask) : null);
   }
 
-  public InputStream openFileForRead(final Path path, final FileSystem.Statistics statistics) throws AzureBlobFileSystemException {
-
+  public InputStream openFileForRead(final Path path, final FileSystem.Statistics statistics)
+      throws AzureBlobFileSystemException {
     LOG.debug("openFileForRead filesystem: {} path: {}",
             client.getFileSystem(),
             path);
@@ -327,7 +363,6 @@ public class AzureBlobFileSystemStore {
 
   public void delete(final Path path, final boolean recursive)
       throws AzureBlobFileSystemException {
-
     LOG.debug("delete filesystem: {} path: {} recursive: {}",
             client.getFileSystem(),
             path,
@@ -351,19 +386,31 @@ public class AzureBlobFileSystemStore {
   }
 
   public FileStatus getFileStatus(final Path path) throws IOException {
-
-    LOG.debug("getFileStatus filesystem: {} path: {}",
+    boolean isNamespaceEnabled = getIsNamespaceEnabled();
+    LOG.debug("getFileStatus filesystem: {} path: {} isNamespaceEnabled: {}",
             client.getFileSystem(),
-           path);
+            path,
+            isNamespaceEnabled);
 
     if (path.isRoot()) {
-      AbfsRestOperation op = client.getFilesystemProperties();
+      final AbfsRestOperation op = isNamespaceEnabled
+          ? client.getAclStatus(AbfsHttpConstants.FORWARD_SLASH + AbfsHttpConstants.ROOT_PATH)
+          : client.getFilesystemProperties();
+
       final long blockSize = abfsConfiguration.getAzureBlockSize();
+      final String owner = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_OWNER);
+      final String group = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_GROUP);
+      final String permissions = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_PERMISSIONS);
       final String eTag = op.getResult().getResponseHeader(HttpHeaderConfigurations.ETAG);
       final String lastModified = op.getResult().getResponseHeader(HttpHeaderConfigurations.LAST_MODIFIED);
+      final boolean hasAcl = AbfsPermission.isExtendedAcl(permissions);
+
       return new VersionedFileStatus(
-              userGroupInformation.getUserName(),
-              userGroupInformation.getPrimaryGroupName(),
+              owner == null ? userGroupInformation.getUserName() : owner,
+              group == null ? userGroupInformation.getPrimaryGroupName() : group,
+              permissions == null ? new AbfsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL)
+                      : AbfsPermission.valueOf(permissions),
+              hasAcl,
               0,
               true,
               1,
@@ -375,14 +422,22 @@ public class AzureBlobFileSystemStore {
       AbfsRestOperation op = client.getPathProperties(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path));
 
       final long blockSize = abfsConfiguration.getAzureBlockSize();
-      final String eTag = op.getResult().getResponseHeader(HttpHeaderConfigurations.ETAG);
-      final String lastModified = op.getResult().getResponseHeader(HttpHeaderConfigurations.LAST_MODIFIED);
-      final String contentLength = op.getResult().getResponseHeader(HttpHeaderConfigurations.CONTENT_LENGTH);
-      final String resourceType = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_RESOURCE_TYPE);
+      final AbfsHttpOperation result = op.getResult();
+      final String eTag = result.getResponseHeader(HttpHeaderConfigurations.ETAG);
+      final String lastModified = result.getResponseHeader(HttpHeaderConfigurations.LAST_MODIFIED);
+      final String contentLength = result.getResponseHeader(HttpHeaderConfigurations.CONTENT_LENGTH);
+      final String resourceType = result.getResponseHeader(HttpHeaderConfigurations.X_MS_RESOURCE_TYPE);
+      final String owner = result.getResponseHeader(HttpHeaderConfigurations.X_MS_OWNER);
+      final String group = result.getResponseHeader(HttpHeaderConfigurations.X_MS_GROUP);
+      final String permissions = result.getResponseHeader((HttpHeaderConfigurations.X_MS_PERMISSIONS));
+      final boolean hasAcl = AbfsPermission.isExtendedAcl(permissions);
 
       return new VersionedFileStatus(
-              userGroupInformation.getUserName(),
-              userGroupInformation.getPrimaryGroupName(),
+              owner == null ? userGroupInformation.getUserName() : owner,
+              group == null ? userGroupInformation.getPrimaryGroupName() : group,
+              permissions == null ? new AbfsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL)
+                      : AbfsPermission.valueOf(permissions),
+              hasAcl,
               parseContentLength(contentLength),
               parseIsDirectory(resourceType),
               1,
@@ -417,6 +472,13 @@ public class AzureBlobFileSystemStore {
       long blockSize = abfsConfiguration.getAzureBlockSize();
 
       for (ListResultEntrySchema entry : retrievedSchema.paths()) {
+        final String owner = entry.owner() == null ? userGroupInformation.getUserName() : entry.owner();
+        final String group = entry.group() == null ? userGroupInformation.getPrimaryGroupName() : entry.group();
+        final FsPermission fsPermission = entry.permissions() == null
+                ? new AbfsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL)
+                : AbfsPermission.valueOf(entry.permissions());
+        final boolean hasAcl = AbfsPermission.isExtendedAcl(entry.permissions());
+
         long lastModifiedMillis = 0;
         long contentLength = entry.contentLength() == null ? 0 : entry.contentLength();
         boolean isDirectory = entry.isDirectory() == null ? false : entry.isDirectory();
@@ -429,8 +491,10 @@ public class AzureBlobFileSystemStore {
 
         fileStatuses.add(
                 new VersionedFileStatus(
-                        userGroupInformation.getUserName(),
-                        userGroupInformation.getPrimaryGroupName(),
+                        owner,
+                        group,
+                        fsPermission,
+                        hasAcl,
                         contentLength,
                         isDirectory,
                         1,
@@ -445,6 +509,211 @@ public class AzureBlobFileSystemStore {
     return fileStatuses.toArray(new FileStatus[0]);
   }
 
+  public void setOwner(final Path path, final String owner, final String group) throws
+          AzureBlobFileSystemException {
+    if (!getIsNamespaceEnabled()) {
+      throw new UnsupportedOperationException(
+          "This operation is only valid for storage accounts with the hierarchical namespace enabled.");
+    }
+
+    LOG.debug(
+            "setOwner filesystem: {} path: {} owner: {} group: {}",
+            client.getFileSystem(),
+            path.toString(),
+            owner,
+            group);
+    client.setOwner(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path, true), owner, group);
+  }
+
+  public void setPermission(final Path path, final FsPermission permission) throws
+          AzureBlobFileSystemException {
+    if (!getIsNamespaceEnabled()) {
+      throw new UnsupportedOperationException(
+          "This operation is only valid for storage accounts with the hierarchical namespace enabled.");
+    }
+
+    LOG.debug(
+            "setPermission filesystem: {} path: {} permission: {}",
+            client.getFileSystem(),
+            path.toString(),
+            permission.toString());
+    client.setPermission(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path, true),
+            String.format(AbfsHttpConstants.PERMISSION_FORMAT, permission.toOctal()));
+  }
+
+  public void modifyAclEntries(final Path path, final List<AclEntry> aclSpec) throws
+          AzureBlobFileSystemException {
+    if (!getIsNamespaceEnabled()) {
+      throw new UnsupportedOperationException(
+          "This operation is only valid for storage accounts with the hierarchical namespace enabled.");
+    }
+
+    LOG.debug(
+            "modifyAclEntries filesystem: {} path: {} aclSpec: {}",
+            client.getFileSystem(),
+            path.toString(),
+            AclEntry.aclSpecToString(aclSpec));
+
+    final Map<String, String> modifyAclEntries = AbfsAclHelper.deserializeAclSpec(AclEntry.aclSpecToString(aclSpec));
+
+    final AbfsRestOperation op = client.getAclStatus(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path, true));
+    final String eTag = op.getResult().getResponseHeader(HttpHeaderConfigurations.ETAG);
+
+    final Map<String, String> aclEntries = AbfsAclHelper.deserializeAclSpec(op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_ACL));
+
+    for (Map.Entry<String, String> modifyAclEntry : modifyAclEntries.entrySet()) {
+      aclEntries.put(modifyAclEntry.getKey(), modifyAclEntry.getValue());
+    }
+
+    if (!modifyAclEntries.containsKey(AbfsHttpConstants.ACCESS_MASK)) {
+      aclEntries.remove(AbfsHttpConstants.ACCESS_MASK);
+    }
+
+    if (!modifyAclEntries.containsKey(AbfsHttpConstants.DEFAULT_MASK)) {
+      aclEntries.remove(AbfsHttpConstants.DEFAULT_MASK);
+    }
+
+    client.setAcl(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path, true),
+        AbfsAclHelper.serializeAclSpec(aclEntries), eTag);
+  }
+
+  public void removeAclEntries(final Path path, final List<AclEntry> aclSpec) throws AzureBlobFileSystemException {
+    if (!getIsNamespaceEnabled()) {
+      throw new UnsupportedOperationException(
+          "This operation is only valid for storage accounts with the hierarchical namespace enabled.");
+    }
+
+    LOG.debug(
+            "removeAclEntries filesystem: {} path: {} aclSpec: {}",
+            client.getFileSystem(),
+            path.toString(),
+            AclEntry.aclSpecToString(aclSpec));
+
+    final Map<String, String> removeAclEntries = AbfsAclHelper.deserializeAclSpec(AclEntry.aclSpecToString(aclSpec));
+    final AbfsRestOperation op = client.getAclStatus(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path, true));
+    final String eTag = op.getResult().getResponseHeader(HttpHeaderConfigurations.ETAG);
+
+    final Map<String, String> aclEntries = AbfsAclHelper.deserializeAclSpec(op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_ACL));
+
+    AbfsAclHelper.removeAclEntriesInternal(aclEntries, removeAclEntries);
+
+    client.setAcl(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path, true),
+            AbfsAclHelper.serializeAclSpec(aclEntries), eTag);
+  }
+
+  public void removeDefaultAcl(final Path path) throws AzureBlobFileSystemException {
+    if (!getIsNamespaceEnabled()) {
+      throw new UnsupportedOperationException(
+          "This operation is only valid for storage accounts with the hierarchical namespace enabled.");
+    }
+
+    LOG.debug(
+            "removeDefaultAcl filesystem: {} path: {}",
+            client.getFileSystem(),
+            path.toString());
+
+    final AbfsRestOperation op = client.getAclStatus(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path, true));
+    final String eTag = op.getResult().getResponseHeader(HttpHeaderConfigurations.ETAG);
+    final Map<String, String> aclEntries = AbfsAclHelper.deserializeAclSpec(op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_ACL));
+    final Map<String, String> defaultAclEntries = new HashMap<>();
+
+    for (Map.Entry<String, String> aclEntry : aclEntries.entrySet()) {
+      if (aclEntry.getKey().startsWith("default:")) {
+        defaultAclEntries.put(aclEntry.getKey(), aclEntry.getValue());
+      }
+    }
+
+    for (Map.Entry<String, String> defaultAclEntry : defaultAclEntries.entrySet()) {
+      aclEntries.remove(defaultAclEntry.getKey());
+    }
+
+    client.setAcl(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path, true),
+        AbfsAclHelper.serializeAclSpec(aclEntries), eTag);
+  }
+
+  public void removeAcl(final Path path) throws AzureBlobFileSystemException {
+    if (!getIsNamespaceEnabled()) {
+      throw new UnsupportedOperationException(
+          "This operation is only valid for storage accounts with the hierarchical namespace enabled.");
+    }
+
+    LOG.debug(
+            "removeAcl filesystem: {} path: {}",
+            client.getFileSystem(),
+            path.toString());
+    final AbfsRestOperation op = client.getAclStatus(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path, true));
+    final String eTag = op.getResult().getResponseHeader(HttpHeaderConfigurations.ETAG);
+
+    final Map<String, String> aclEntries = AbfsAclHelper.deserializeAclSpec(op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_ACL));
+    final Map<String, String> newAclEntries = new HashMap<>();
+
+    newAclEntries.put(AbfsHttpConstants.ACCESS_USER, aclEntries.get(AbfsHttpConstants.ACCESS_USER));
+    newAclEntries.put(AbfsHttpConstants.ACCESS_GROUP, aclEntries.get(AbfsHttpConstants.ACCESS_GROUP));
+    newAclEntries.put(AbfsHttpConstants.ACCESS_OTHER, aclEntries.get(AbfsHttpConstants.ACCESS_OTHER));
+
+    client.setAcl(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path, true),
+        AbfsAclHelper.serializeAclSpec(newAclEntries), eTag);
+  }
+
+  public void setAcl(final Path path, final List<AclEntry> aclSpec) throws AzureBlobFileSystemException {
+    if (!getIsNamespaceEnabled()) {
+      throw new UnsupportedOperationException(
+          "This operation is only valid for storage accounts with the hierarchical namespace enabled.");
+    }
+
+    LOG.debug(
+            "setAcl filesystem: {} path: {} aclspec: {}",
+            client.getFileSystem(),
+            path.toString(),
+            AclEntry.aclSpecToString(aclSpec));
+    final Map<String, String> aclEntries = AbfsAclHelper.deserializeAclSpec(AclEntry.aclSpecToString(aclSpec));
+    final AbfsRestOperation op = client.getAclStatus(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path, true));
+    final String eTag = op.getResult().getResponseHeader(HttpHeaderConfigurations.ETAG);
+
+    final Map<String, String> getAclEntries = AbfsAclHelper.deserializeAclSpec(op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_ACL));
+    for (Map.Entry<String, String> ace : getAclEntries.entrySet()) {
+      if (ace.getKey().startsWith("default:") && (ace.getKey() != AbfsHttpConstants.DEFAULT_MASK)
+              && !aclEntries.containsKey(ace.getKey())) {
+        aclEntries.put(ace.getKey(), ace.getValue());
+      }
+    }
+
+    client.setAcl(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path, true),
+        AbfsAclHelper.serializeAclSpec(aclEntries), eTag);
+  }
+
+  public AclStatus getAclStatus(final Path path) throws IOException {
+    if (!getIsNamespaceEnabled()) {
+      throw new UnsupportedOperationException(
+          "This operation is only valid for storage accounts with the hierarchical namespace enabled.");
+    }
+
+    LOG.debug(
+            "getAclStatus filesystem: {} path: {}",
+            client.getFileSystem(),
+            path.toString());
+    AbfsRestOperation op = client.getAclStatus(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path, true));
+    AbfsHttpOperation result = op.getResult();
+
+    final String owner = result.getResponseHeader(HttpHeaderConfigurations.X_MS_OWNER);
+    final String group = result.getResponseHeader(HttpHeaderConfigurations.X_MS_GROUP);
+    final String permissions = result.getResponseHeader(HttpHeaderConfigurations.X_MS_PERMISSIONS);
+    final String aclSpecString = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_ACL);
+
+    final List<AclEntry> processedAclEntries = AclEntry.parseAclSpec(AbfsAclHelper.processAclString(aclSpecString), true);
+    final FsPermission fsPermission = permissions == null ? new AbfsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL)
+            : AbfsPermission.valueOf(permissions);
+
+    final AclStatus.Builder aclStatusBuilder = new AclStatus.Builder();
+    aclStatusBuilder.owner(owner == null ? userGroupInformation.getUserName() : owner);
+    aclStatusBuilder.group(group == null ? userGroupInformation.getPrimaryGroupName() : group);
+
+    aclStatusBuilder.setPermission(fsPermission);
+    aclStatusBuilder.stickyBit(fsPermission.getStickyBit());
+    aclStatusBuilder.addEntries(processedAclEntries);
+    return aclStatusBuilder.build();
+  }
+
   public boolean isAtomicRenameKey(String key) {
     return isKeyForDirectorySet(key, azureAtomicRenameDirSet);
   }
@@ -507,19 +776,24 @@ public class AzureBlobFileSystemStore {
     this.client =  new AbfsClient(baseUrl, creds, abfsConfiguration, new ExponentialRetryPolicy(), tokenProvider);
   }
 
+  private String getOctalNotation(FsPermission fsPermission) {
+    Preconditions.checkNotNull(fsPermission, "fsPermission");
+    return String.format(AbfsHttpConstants.PERMISSION_FORMAT, fsPermission.toOctal());
+  }
+
   private String getRelativePath(final Path path) {
+    return getRelativePath(path, false);
+  }
+
+  private String getRelativePath(final Path path, final boolean allowRootPath) {
     Preconditions.checkNotNull(path, "path");
     final String relativePath = path.toUri().getPath();
 
-    if (relativePath.isEmpty()) {
-      return relativePath;
+    if (relativePath.length() == 0 || (relativePath.length() == 1 && relativePath.charAt(0) == Path.SEPARATOR_CHAR)) {
+      return allowRootPath ? AbfsHttpConstants.ROOT_PATH : AbfsHttpConstants.EMPTY_STRING;
     }
 
     if (relativePath.charAt(0) == Path.SEPARATOR_CHAR) {
-      if (relativePath.length() == 1) {
-        return AbfsHttpConstants.EMPTY_STRING;
-      }
-
       return relativePath.substring(1);
     }
 
@@ -644,15 +918,17 @@ public class AzureBlobFileSystemStore {
     private final String version;
 
     VersionedFileStatus(
-            final String owner, final String group,
+            final String owner, final String group, final FsPermission fsPermission, final boolean hasAcl,
             final long length, final boolean isdir, final int blockReplication,
             final long blocksize, final long modificationTime, final Path path,
             String version) {
       super(length, isdir, blockReplication, blocksize, modificationTime, 0,
-              new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL),
+              fsPermission,
               owner,
               group,
-              path);
+              null,
+              path,
+              hasAcl, false, false);
 
       this.version = version;
     }
@@ -717,5 +993,4 @@ public class AzureBlobFileSystemStore {
   AbfsClient getClient() {
     return this.client;
   }
-
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e70e1568/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java
index f80bc60..447b681 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java
@@ -34,6 +34,8 @@ public final class AbfsHttpConstants {
   public static final String APPEND_ACTION = "append";
   public static final String FLUSH_ACTION = "flush";
   public static final String SET_PROPERTIES_ACTION = "setProperties";
+  public static final String SET_ACCESS_CONTROL = "setAccessControl";
+  public static final String GET_ACCESS_CONTROL = "getAccessControl";
   public static final String DEFAULT_TIMEOUT = "90";
 
   public static final String JAVA_VERSION = "java.version";
@@ -58,6 +60,7 @@ public final class AbfsHttpConstants {
   public static final String PLUS = "+";
   public static final String STAR = "*";
   public static final String COMMA = ",";
+  public static final String COLON = ":";
   public static final String EQUAL = "=";
   public static final String QUESTION_MARK = "?";
   public static final String AND_MARK = "&";
@@ -72,5 +75,17 @@ public final class AbfsHttpConstants {
   public static final String APPLICATION_JSON = "application/json";
   public static final String APPLICATION_OCTET_STREAM = "application/octet-stream";
 
+  public static final String ROOT_PATH = "/";
+  public static final String ACCESS_MASK = "mask:";
+  public static final String ACCESS_USER = "user:";
+  public static final String ACCESS_GROUP = "group:";
+  public static final String ACCESS_OTHER = "other:";
+  public static final String DEFAULT_MASK = "default:mask:";
+  public static final String DEFAULT_USER = "default:user:";
+  public static final String DEFAULT_GROUP = "default:group:";
+  public static final String DEFAULT_OTHER = "default:other:";
+  public static final String DEFAULT_SCOPE = "default:";
+  public static final String PERMISSION_FORMAT = "%04d";
+
   private AbfsHttpConstants() {}
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e70e1568/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/HttpHeaderConfigurations.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/HttpHeaderConfigurations.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/HttpHeaderConfigurations.java
index 4603b5f..c8d4390 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/HttpHeaderConfigurations.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/HttpHeaderConfigurations.java
@@ -52,6 +52,12 @@ public final class HttpHeaderConfigurations {
   public static final String X_MS_PROPERTIES = "x-ms-properties";
   public static final String X_MS_RENAME_SOURCE = "x-ms-rename-source";
   public static final String LAST_MODIFIED = "Last-Modified";
+  public static final String X_MS_OWNER = "x-ms-owner";
+  public static final String X_MS_GROUP = "x-ms-group";
+  public static final String X_MS_ACL = "x-ms-acl";
+  public static final String X_MS_PERMISSIONS = "x-ms-permissions";
+  public static final String X_MS_UMASK = "x-ms-umask";
+  public static final String X_MS_NAMESPACE_ENABLED = "x-ms-namespace-enabled";
 
   private HttpHeaderConfigurations() {}
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e70e1568/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/exceptions/InvalidAclOperationException.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/exceptions/InvalidAclOperationException.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/exceptions/InvalidAclOperationException.java
new file mode 100644
index 0000000..9c186ba
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/exceptions/InvalidAclOperationException.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.contracts.exceptions;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * Thrown when there is an attempt to perform an invalid operation on an ACL.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public final class InvalidAclOperationException extends AzureBlobFileSystemException {
+  public InvalidAclOperationException(String message) {
+    super(message);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e70e1568/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/AzureServiceErrorCode.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/AzureServiceErrorCode.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/AzureServiceErrorCode.java
index 63bf8d0..60e7f92 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/AzureServiceErrorCode.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/AzureServiceErrorCode.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.classification.InterfaceStability;
 public enum AzureServiceErrorCode {
   FILE_SYSTEM_ALREADY_EXISTS("FilesystemAlreadyExists", HttpURLConnection.HTTP_CONFLICT, null),
   PATH_ALREADY_EXISTS("PathAlreadyExists", HttpURLConnection.HTTP_CONFLICT, null),
+  INTERNAL_OPERATION_ABORT("InternalOperationAbortError", HttpURLConnection.HTTP_CONFLICT, null),
   PATH_CONFLICT("PathConflict", HttpURLConnection.HTTP_CONFLICT, null),
   FILE_SYSTEM_NOT_FOUND("FilesystemNotFound", HttpURLConnection.HTTP_NOT_FOUND, null),
   PATH_NOT_FOUND("PathNotFound", HttpURLConnection.HTTP_NOT_FOUND, null),

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e70e1568/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsAclHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsAclHelper.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsAclHelper.java
new file mode 100644
index 0000000..c28da2c
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsAclHelper.java
@@ -0,0 +1,202 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants;
+import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
+import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidAclOperationException;
+import org.apache.hadoop.fs.permission.FsAction;
+
+/**
+ * AbfsAclHelper provides convenience methods to implement modifyAclEntries / removeAclEntries / removeAcl / removeDefaultAcl
+ * from setAcl and getAcl.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public final class AbfsAclHelper {
+
+  private AbfsAclHelper() {
+    // not called
+  }
+
+  public static Map<String, String> deserializeAclSpec(final String aclSpecString) {
+    final Map<String, String> aclEntries  = new HashMap<>();
+    final String[] aclArray = aclSpecString.split(AbfsHttpConstants.COMMA);
+    for (String acl : aclArray) {
+      int idx = acl.lastIndexOf(AbfsHttpConstants.COLON);
+      aclEntries.put(acl.substring(0, idx), acl.substring(idx + 1));
+    }
+    return aclEntries;
+  }
+
+  public static String serializeAclSpec(final Map<String, String> aclEntries) {
+    final StringBuilder sb = new StringBuilder();
+    for (Map.Entry<String, String> aclEntry : aclEntries.entrySet()) {
+      sb.append(aclEntry.getKey() + AbfsHttpConstants.COLON + aclEntry.getValue() + AbfsHttpConstants.COMMA);
+    }
+    if (sb.length() > 0) {
+      sb.setLength(sb.length() - 1);
+    }
+    return sb.toString();
+  }
+
+  public static String processAclString(final String aclSpecString) {
+    final List<String> aclEntries = Arrays.asList(aclSpecString.split(AbfsHttpConstants.COMMA));
+    final StringBuilder sb = new StringBuilder();
+
+    boolean containsMask = false;
+    for (int i = aclEntries.size() - 1; i >= 0; i--) {
+      String ace = aclEntries.get(i);
+      if (ace.startsWith(AbfsHttpConstants.ACCESS_OTHER)|| ace.startsWith(AbfsHttpConstants.ACCESS_USER + AbfsHttpConstants.COLON)) {
+        // skip
+      } else if (ace.startsWith(AbfsHttpConstants.ACCESS_MASK)) {
+        containsMask = true;
+        // skip
+      } else if (ace.startsWith(AbfsHttpConstants.ACCESS_GROUP + AbfsHttpConstants.COLON) && !containsMask) {
+        // skip
+      } else {
+        sb.insert(0, ace + AbfsHttpConstants.COMMA);
+      }
+    }
+
+    return sb.length() == 0 ? AbfsHttpConstants.EMPTY_STRING : sb.substring(0, sb.length() - 1);
+  }
+
+  public static void removeAclEntriesInternal(Map<String, String> aclEntries, Map<String, String> toRemoveEntries)
+      throws AzureBlobFileSystemException {
+    boolean accessAclTouched = false;
+    boolean defaultAclTouched = false;
+
+    final Set<String> removeIndicationSet = new HashSet<>();
+
+    for (String entryKey : toRemoveEntries.keySet()) {
+      final boolean isDefaultAcl = isDefaultAce(entryKey);
+      if (removeNamedAceAndUpdateSet(entryKey, isDefaultAcl, removeIndicationSet, aclEntries)) {
+        if (isDefaultAcl) {
+          defaultAclTouched = true;
+        } else {
+          accessAclTouched = true;
+        }
+      }
+    }
+    if (accessAclTouched) {
+      if (removeIndicationSet.contains(AbfsHttpConstants.ACCESS_MASK)) {
+        aclEntries.remove(AbfsHttpConstants.ACCESS_MASK);
+      }
+      recalculateMask(aclEntries, false);
+    }
+    if (defaultAclTouched) {
+      if (removeIndicationSet.contains(AbfsHttpConstants.DEFAULT_MASK)) {
+        aclEntries.remove(AbfsHttpConstants.DEFAULT_MASK);
+      }
+      if (removeIndicationSet.contains(AbfsHttpConstants.DEFAULT_USER)) {
+        aclEntries.put(AbfsHttpConstants.DEFAULT_USER, aclEntries.get(AbfsHttpConstants.ACCESS_USER));
+      }
+      if (removeIndicationSet.contains(AbfsHttpConstants.DEFAULT_GROUP)) {
+        aclEntries.put(AbfsHttpConstants.DEFAULT_GROUP, aclEntries.get(AbfsHttpConstants.ACCESS_GROUP));
+      }
+      if (removeIndicationSet.contains(AbfsHttpConstants.DEFAULT_OTHER)) {
+        aclEntries.put(AbfsHttpConstants.DEFAULT_OTHER, aclEntries.get(AbfsHttpConstants.ACCESS_OTHER));
+      }
+      recalculateMask(aclEntries, true);
+    }
+  }
+
+  private static boolean removeNamedAceAndUpdateSet(String entry, boolean isDefaultAcl, Set<String> removeIndicationSet,
+                                                    Map<String, String> aclEntries)
+      throws AzureBlobFileSystemException {
+    final int startIndex = isDefaultAcl ? 1 : 0;
+    final String[] entryParts = entry.split(AbfsHttpConstants.COLON);
+    final String tag = isDefaultAcl ? AbfsHttpConstants.DEFAULT_SCOPE + entryParts[startIndex] + AbfsHttpConstants.COLON
+        : entryParts[startIndex] + AbfsHttpConstants.COLON;
+
+    if ((entry.equals(AbfsHttpConstants.ACCESS_USER) || entry.equals(AbfsHttpConstants.ACCESS_GROUP)
+        || entry.equals(AbfsHttpConstants.ACCESS_OTHER))
+        && !isNamedAce(entry)) {
+      throw new InvalidAclOperationException("Cannot remove user, group or other entry from access ACL.");
+    }
+
+    boolean touched = false;
+    if (!isNamedAce(entry)) {
+      removeIndicationSet.add(tag); // this must not be a access user, group or other
+      touched = true;
+    } else {
+      if (aclEntries.remove(entry) != null) {
+        touched = true;
+      }
+    }
+    return touched;
+  }
+
+  private static void recalculateMask(Map<String, String> aclEntries, boolean isDefaultMask) {
+    FsAction umask = FsAction.NONE;
+    if (!isExtendAcl(aclEntries, isDefaultMask)) {
+      return;
+    }
+
+    for (Map.Entry<String, String> aclEntry : aclEntries.entrySet()) {
+      if (isDefaultMask) {
+        if ((isDefaultAce(aclEntry.getKey()) && isNamedAce(aclEntry.getKey()))
+            || aclEntry.getKey().equals(AbfsHttpConstants.DEFAULT_GROUP)) {
+          umask = umask.or(FsAction.getFsAction(aclEntry.getValue()));
+        }
+      } else {
+        if ((!isDefaultAce(aclEntry.getKey()) && isNamedAce(aclEntry.getKey()))
+            || aclEntry.getKey().equals(AbfsHttpConstants.ACCESS_GROUP)) {
+          umask = umask.or(FsAction.getFsAction(aclEntry.getValue()));
+        }
+      }
+    }
+
+    aclEntries.put(isDefaultMask ? AbfsHttpConstants.DEFAULT_MASK : AbfsHttpConstants.ACCESS_MASK, umask.SYMBOL);
+  }
+
+  private static boolean isExtendAcl(Map<String, String> aclEntries, boolean checkDefault) {
+    for (String entryKey : aclEntries.keySet()) {
+      if (checkDefault && !(entryKey.equals(AbfsHttpConstants.DEFAULT_USER)
+          || entryKey.equals(AbfsHttpConstants.DEFAULT_GROUP)
+          || entryKey.equals(AbfsHttpConstants.DEFAULT_OTHER) || !isDefaultAce(entryKey))) {
+        return true;
+      }
+      if (!checkDefault && !(entryKey.equals(AbfsHttpConstants.ACCESS_USER)
+          || entryKey.equals(AbfsHttpConstants.ACCESS_GROUP)
+          || entryKey.equals(AbfsHttpConstants.ACCESS_OTHER) || isDefaultAce(entryKey))) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  private static boolean isDefaultAce(String entry) {
+    return entry.startsWith(AbfsHttpConstants.DEFAULT_SCOPE);
+  }
+
+  private static boolean isNamedAce(String entry) {
+    return entry.charAt(entry.length() - 1) != AbfsHttpConstants.COLON.charAt(0);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e70e1568/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
index f5c9f18..18773b6 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
@@ -29,6 +29,9 @@ import java.util.Locale;
 
 import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.fs.azurebfs.utils.SSLSocketFactoryEx;
+import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants;
+import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
+import org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -159,7 +162,8 @@ public class AbfsClient {
 
     final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
     abfsUriQueryBuilder.addQuery(QUERY_PARAM_RESOURCE, FILESYSTEM);
-    abfsUriQueryBuilder.addQuery(QUERY_PARAM_DIRECTORY, relativePath == null ? "" : relativePath);
+    abfsUriQueryBuilder.addQuery(QUERY_PARAM_DIRECTORY, relativePath == null ? AbfsHttpConstants.EMPTY_STRING
+        : relativePath);
     abfsUriQueryBuilder.addQuery(QUERY_PARAM_RECURSIVE, String.valueOf(recursive));
     abfsUriQueryBuilder.addQuery(QUERY_PARAM_CONTINUATION, continuation);
     abfsUriQueryBuilder.addQuery(QUERY_PARAM_MAXRESULTS, String.valueOf(listMaxResults));
@@ -206,11 +210,19 @@ public class AbfsClient {
     return op;
   }
 
-  public AbfsRestOperation createPath(final String path, final boolean isFile, final boolean overwrite)
-          throws AzureBlobFileSystemException {
+  public AbfsRestOperation createPath(final String path, final boolean isFile, final boolean overwrite,
+                                      final String permission, final String umask) throws AzureBlobFileSystemException {
     final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
     if (!overwrite) {
-      requestHeaders.add(new AbfsHttpHeader(IF_NONE_MATCH, "*"));
+      requestHeaders.add(new AbfsHttpHeader(IF_NONE_MATCH, AbfsHttpConstants.STAR));
+    }
+
+    if (permission != null && !permission.isEmpty()) {
+      requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.X_MS_PERMISSIONS, permission));
+    }
+
+    if (umask != null && !umask.isEmpty()) {
+      requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.X_MS_UMASK, umask));
     }
 
     final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
@@ -269,7 +281,6 @@ public class AbfsClient {
     return op;
   }
 
-
   public AbfsRestOperation flush(final String path, final long position, boolean retainUncommittedData)
       throws AzureBlobFileSystemException {
     final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
@@ -373,6 +384,104 @@ public class AbfsClient {
     return op;
   }
 
+  public AbfsRestOperation setOwner(final String path, final String owner, final String group)
+      throws AzureBlobFileSystemException {
+    final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
+    // JDK7 does not support PATCH, so to workaround the issue we will use
+    // PUT and specify the real method in the X-Http-Method-Override header.
+    requestHeaders.add(new AbfsHttpHeader(X_HTTP_METHOD_OVERRIDE,
+            HTTP_METHOD_PATCH));
+
+    if (owner != null && !owner.isEmpty()) {
+      requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.X_MS_OWNER, owner));
+    }
+    if (group != null && !group.isEmpty()) {
+      requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.X_MS_GROUP, group));
+    }
+
+    final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
+    abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_ACTION, AbfsHttpConstants.SET_ACCESS_CONTROL);
+
+    final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
+    final AbfsRestOperation op = new AbfsRestOperation(
+        this,
+        AbfsHttpConstants.HTTP_METHOD_PUT,
+        url,
+        requestHeaders);
+    op.execute();
+    return op;
+  }
+
+  public AbfsRestOperation setPermission(final String path, final String permission)
+      throws AzureBlobFileSystemException {
+    final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
+    // JDK7 does not support PATCH, so to workaround the issue we will use
+    // PUT and specify the real method in the X-Http-Method-Override header.
+    requestHeaders.add(new AbfsHttpHeader(X_HTTP_METHOD_OVERRIDE,
+            HTTP_METHOD_PATCH));
+
+    requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.X_MS_PERMISSIONS, permission));
+
+    final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
+    abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_ACTION, AbfsHttpConstants.SET_ACCESS_CONTROL);
+
+    final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
+    final AbfsRestOperation op = new AbfsRestOperation(
+        this,
+        AbfsHttpConstants.HTTP_METHOD_PUT,
+        url,
+        requestHeaders);
+    op.execute();
+    return op;
+  }
+
+  public AbfsRestOperation setAcl(final String path, final String aclSpecString) throws AzureBlobFileSystemException {
+    return setAcl(path, aclSpecString, AbfsHttpConstants.EMPTY_STRING);
+  }
+
+  public AbfsRestOperation setAcl(final String path, final String aclSpecString, final String eTag)
+      throws AzureBlobFileSystemException {
+    final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
+    // JDK7 does not support PATCH, so to workaround the issue we will use
+    // PUT and specify the real method in the X-Http-Method-Override header.
+    requestHeaders.add(new AbfsHttpHeader(X_HTTP_METHOD_OVERRIDE,
+            HTTP_METHOD_PATCH));
+
+    requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.X_MS_ACL, aclSpecString));
+
+    if (eTag != null && !eTag.isEmpty()) {
+      requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.IF_MATCH, eTag));
+    }
+
+    final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
+    abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_ACTION, AbfsHttpConstants.SET_ACCESS_CONTROL);
+
+    final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
+    final AbfsRestOperation op = new AbfsRestOperation(
+        this,
+        AbfsHttpConstants.HTTP_METHOD_PUT,
+        url,
+        requestHeaders);
+    op.execute();
+    return op;
+  }
+
+  public AbfsRestOperation getAclStatus(final String path) throws AzureBlobFileSystemException {
+    final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
+
+    final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
+    abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_ACTION, AbfsHttpConstants.GET_ACCESS_CONTROL);
+
+    final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
+    final AbfsRestOperation op = new AbfsRestOperation(
+        this,
+        AbfsHttpConstants.HTTP_METHOD_HEAD,
+        url,
+        requestHeaders);
+    op.execute();
+    return op;
+  }
+
   private URL createRequestUrl(final String query) throws AzureBlobFileSystemException {
     return createRequestUrl(EMPTY_STRING, query);
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e70e1568/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsPermission.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsPermission.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsPermission.java
new file mode 100644
index 0000000..9c44610
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsPermission.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+
+/**
+ * The AbfsPermission for AbfsClient.
+ */
+public class AbfsPermission extends FsPermission {
+  private static final int STICKY_BIT_OCTAL_VALUE = 01000;
+  private final boolean aclBit;
+
+  public AbfsPermission(Short aShort, boolean aclBitStatus) {
+    super(aShort);
+    this.aclBit = aclBitStatus;
+  }
+
+  public AbfsPermission(FsAction u, FsAction g, FsAction o) {
+    super(u, g, o, false);
+    this.aclBit = false;
+  }
+
+  /**
+   * Returns true if there is also an ACL (access control list).
+   *
+   * @return boolean true if there is also an ACL (access control list).
+   * @deprecated Get acl bit from the {@link org.apache.hadoop.fs.FileStatus}
+   * object.
+   */
+  public boolean getAclBit() {
+    return aclBit;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (obj instanceof FsPermission) {
+      FsPermission that = (FsPermission) obj;
+      return this.getUserAction() == that.getUserAction()
+          && this.getGroupAction() == that.getGroupAction()
+          && this.getOtherAction() == that.getOtherAction()
+          && this.getStickyBit() == that.getStickyBit();
+    }
+    return false;
+  }
+
+  /**
+   * Create a AbfsPermission from a abfs symbolic permission string
+   * @param abfsSymbolicPermission e.g. "rw-rw-rw-+" / "rw-rw-rw-"
+   * @return a permission object for the provided string representation
+   */
+  public static AbfsPermission valueOf(final String abfsSymbolicPermission) {
+    if (abfsSymbolicPermission == null) {
+      return null;
+    }
+
+    final boolean isExtendedAcl = abfsSymbolicPermission.charAt(abfsSymbolicPermission.length() - 1) == '+';
+
+    final String abfsRawSymbolicPermission = isExtendedAcl ? abfsSymbolicPermission.substring(0, abfsSymbolicPermission.length() - 1)
+        : abfsSymbolicPermission;
+
+    int n = 0;
+    for (int i = 0; i < abfsRawSymbolicPermission.length(); i++) {
+      n = n << 1;
+      char c = abfsRawSymbolicPermission.charAt(i);
+      n += (c == '-' || c == 'T' || c == 'S') ? 0: 1;
+    }
+
+    // Add sticky bit value if set
+    if (abfsRawSymbolicPermission.charAt(abfsRawSymbolicPermission.length() - 1) == 't'
+        || abfsRawSymbolicPermission.charAt(abfsRawSymbolicPermission.length() - 1) == 'T') {
+      n += STICKY_BIT_OCTAL_VALUE;
+    }
+
+    return new AbfsPermission((short) n, isExtendedAcl);
+  }
+
+  /**
+   * Check whether abfs symbolic permission string is a extended Acl
+   * @param abfsSymbolicPermission e.g. "rw-rw-rw-+" / "rw-rw-rw-"
+   * @return true if the permission string indicates the existence of an
+   * extended ACL; otherwise false.
+   */
+  public static boolean isExtendedAcl(final String abfsSymbolicPermission) {
+    if (abfsSymbolicPermission == null) {
+      return false;
+    }
+
+    return abfsSymbolicPermission.charAt(abfsSymbolicPermission.length() - 1) == '+';
+  }
+
+  @Override
+  public int hashCode() {
+    return toShort();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e70e1568/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemBackCompat.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemBackCompat.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemBackCompat.java
index 6207a47..800b95a 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemBackCompat.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemBackCompat.java
@@ -43,6 +43,8 @@ public class ITestAzureBlobFileSystemBackCompat extends
   @Test
   public void testBlobBackCompat() throws Exception {
     final AzureBlobFileSystem fs = this.getFileSystem();
+    // test only valid for non-namespace enabled account
+    Assume.assumeFalse(fs.getIsNamespaceEnabeld());
     String storageConnectionString = getBlobConnectionString();
     CloudStorageAccount storageAccount = CloudStorageAccount.parse(storageConnectionString);
     CloudBlobClient blobClient = storageAccount.createCloudBlobClient();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e70e1568/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemFileStatus.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemFileStatus.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemFileStatus.java
index 13abaf8..9daac2a 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemFileStatus.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemFileStatus.java
@@ -20,11 +20,12 @@ package org.apache.hadoop.fs.azurebfs;
 
 import java.io.IOException;
 
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.azurebfs.services.AuthType;
 import org.junit.Test;
 
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.fs.permission.FsPermission;
 
 /**
@@ -32,9 +33,17 @@ import org.apache.hadoop.fs.permission.FsPermission;
  */
 public class ITestAzureBlobFileSystemFileStatus extends
     AbstractAbfsIntegrationTest {
+  private static final String DEFAULT_FILE_PERMISSION_VALUE = "640";
+  private static final String DEFAULT_DIR_PERMISSION_VALUE = "750";
+  private static final String DEFAULT_UMASK_VALUE = "027";
+
   private static final Path TEST_FILE = new Path("testFile");
   private static final Path TEST_FOLDER = new Path("testDir");
 
+  public ITestAzureBlobFileSystemFileStatus() {
+    super();
+  }
+
   @Test
   public void testEnsureStatusWorksForRoot() throws Exception {
     final AzureBlobFileSystem fs = this.getFileSystem();
@@ -48,20 +57,32 @@ public class ITestAzureBlobFileSystemFileStatus extends
   public void testFileStatusPermissionsAndOwnerAndGroup() throws Exception {
     final AzureBlobFileSystem fs = this.getFileSystem();
     touch(TEST_FILE);
-    validateStatus(fs, TEST_FILE);
+    validateStatus(fs, TEST_FILE, false);
   }
 
-  private FileStatus validateStatus(final AzureBlobFileSystem fs, final Path name)
+  private FileStatus validateStatus(final AzureBlobFileSystem fs, final Path name, final boolean isDir)
       throws IOException {
     FileStatus fileStatus = fs.getFileStatus(name);
+    fs.getConf().set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, DEFAULT_UMASK_VALUE);
+
     String errorInStatus = "error in " + fileStatus + " from " + fs;
-    assertEquals(errorInStatus + ": permission",
-        new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL),
-        fileStatus.getPermission());
-    assertEquals(errorInStatus + ": owner",
-        fs.getOwnerUser(), fileStatus.getOwner());
-    assertEquals(errorInStatus + ": group",
-        fs.getOwnerUserPrimaryGroup(), fileStatus.getGroup());
+
+    // When running with Oauth, the owner and group info retrieved from server will be digit ids.
+    if (this.getAuthType() != AuthType.OAuth && !fs.isSecure()) {
+      assertEquals(errorInStatus + ": owner",
+              fs.getOwnerUser(), fileStatus.getOwner());
+      assertEquals(errorInStatus + ": group",
+              fs.getOwnerUserPrimaryGroup(), fileStatus.getGroup());
+    } else {
+      if (isDir) {
+        assertEquals(errorInStatus + ": permission",
+                new FsPermission(DEFAULT_DIR_PERMISSION_VALUE), fileStatus.getPermission());
+      } else {
+        assertEquals(errorInStatus + ": permission",
+                new FsPermission(DEFAULT_FILE_PERMISSION_VALUE), fileStatus.getPermission());
+      }
+    }
+
     return fileStatus;
   }
 
@@ -70,7 +91,7 @@ public class ITestAzureBlobFileSystemFileStatus extends
     final AzureBlobFileSystem fs = this.getFileSystem();
     fs.mkdirs(TEST_FOLDER);
 
-    validateStatus(fs, TEST_FOLDER);
+    validateStatus(fs, TEST_FOLDER, true);
   }
 
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e70e1568/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemFlush.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemFlush.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemFlush.java
index b02d723..8a6207a 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemFlush.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemFlush.java
@@ -218,6 +218,9 @@ public class ITestAzureBlobFileSystemFlush extends AbstractAbfsScaleTest {
     String wasbUrl = testAccount.getFileSystem().getName();
     String abfsUrl = wasbUrlToAbfsUrl(wasbUrl);
     final AzureBlobFileSystem fs = this.getFileSystem(abfsUrl);
+    // test only valid for non-namespace enabled account
+    Assume.assumeFalse(fs.getIsNamespaceEnabeld());
+
     byte[] buffer = getRandomBytesArray();
     CloudBlockBlob blob = testAccount.getBlobReference(TEST_FILE_PATH.toString().substring(1));
     try (FSDataOutputStream stream = getStreamAfterWrite(fs, TEST_FILE_PATH, buffer, true)) {
@@ -238,6 +241,9 @@ public class ITestAzureBlobFileSystemFlush extends AbstractAbfsScaleTest {
     String wasbUrl = testAccount.getFileSystem().getName();
     String abfsUrl = wasbUrlToAbfsUrl(wasbUrl);
     final AzureBlobFileSystem fs = this.getFileSystem(abfsUrl);
+    // test only valid for non-namespace enabled account
+    Assume.assumeFalse(fs.getIsNamespaceEnabeld());
+
     byte[] buffer = getRandomBytesArray();
     CloudBlockBlob blob = testAccount.getBlobReference(TEST_FILE_PATH.toString().substring(1));
     try (FSDataOutputStream stream = getStreamAfterWrite(fs, TEST_FILE_PATH, buffer, false)) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e70e1568/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemPermission.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemPermission.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemPermission.java
new file mode 100644
index 0000000..2f265d1
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemPermission.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.UUID;
+
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.azurebfs.services.AuthType;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.fs.azurebfs.utils.Parallelized;
+
+/**
+ * Test permission operations.
+ */
+@RunWith(Parallelized.class)
+public class ITestAzureBlobFileSystemPermission extends AbstractAbfsIntegrationTest{
+
+  private static Path testRoot = new Path("/test");
+  private static final String DEFAULT_UMASK_VALUE = "027";
+  private static final FsPermission DEFAULT_UMASK_PERMISSION = new FsPermission(DEFAULT_UMASK_VALUE);
+  private static final int KILOBYTE = 1024;
+  private FsPermission permission;
+
+  private Path path;
+
+  public ITestAzureBlobFileSystemPermission(FsPermission testPermission) throws Exception {
+    super();
+    permission = testPermission;
+
+    Assume.assumeTrue(this.getAuthType() == AuthType.OAuth);
+  }
+
+  @Parameterized.Parameters(name = "{0}")
+  public static Collection abfsCreateNonRecursiveTestData()
+      throws Exception {
+    /*
+      Test Data
+      File/Folder name, User permission, Group permission, Other Permission,
+      Parent already exist
+      shouldCreateSucceed, expectedExceptionIfFileCreateFails
+    */
+    final Collection<Object[]> datas = new ArrayList<>();
+    for (FsAction g : FsAction.values()) {
+      for (FsAction o : FsAction.values()) {
+        datas.add(new Object[] {new FsPermission(FsAction.ALL, g, o)});
+      }
+    }
+    return datas;
+  }
+
+  @Test
+  public void testFilePermission() throws Exception {
+
+    final AzureBlobFileSystem fs = this.getFileSystem();
+    fs.getConf().set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, DEFAULT_UMASK_VALUE);
+    path = new Path(testRoot, UUID.randomUUID().toString());
+
+    fs.mkdirs(path.getParent(),
+        new FsPermission(FsAction.ALL, FsAction.NONE, FsAction.NONE));
+    fs.removeDefaultAcl(path.getParent());
+
+    fs.create(path, permission, true, KILOBYTE, (short) 1, KILOBYTE - 1, null);
+    FileStatus status = fs.getFileStatus(path);
+    Assert.assertEquals(permission.applyUMask(DEFAULT_UMASK_PERMISSION), status.getPermission());
+  }
+
+  @Test
+  public void testFolderPermission() throws Exception {
+    final AzureBlobFileSystem fs = this.getFileSystem();
+    fs.getConf().set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, "027");
+
+    path = new Path(testRoot, UUID.randomUUID().toString());
+
+    fs.mkdirs(path.getParent(),
+        new FsPermission(FsAction.ALL, FsAction.WRITE, FsAction.NONE));
+    fs.removeDefaultAcl(path.getParent());
+
+    fs.mkdirs(path, permission);
+    FileStatus status = fs.getFileStatus(path);
+    Assert.assertEquals(permission.applyUMask(DEFAULT_UMASK_PERMISSION), status.getPermission());
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e70e1568/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRandomRead.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRandomRead.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRandomRead.java
index 13c5bc8..38e7133 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRandomRead.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRandomRead.java
@@ -17,7 +17,6 @@
  */
 package org.apache.hadoop.fs.azurebfs;
 
-
 import java.io.EOFException;
 import java.io.IOException;
 import java.util.Random;
@@ -524,6 +523,9 @@ public class ITestAzureBlobFileSystemRandomRead extends
   }
 
   private void createTestFile() throws Exception {
+    final AzureBlobFileSystem abFs = this.getFileSystem();
+    // test only valid for non-namespace enabled account
+    Assume.assumeFalse(abFs.getIsNamespaceEnabeld());
     FileSystem fs = this.getWasbFileSystem();
 
     if (fs.exists(TEST_FILE_PATH)) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e70e1568/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java
index 07426c4..c97e840 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java
@@ -25,6 +25,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 
+import org.junit.Assert;
 import org.junit.Test;
 
 import org.apache.hadoop.fs.FileStatus;
@@ -133,4 +134,17 @@ public class ITestAzureBlobFileSystemRename extends
         new Path(fs.getUri().toString() + "/s"),
         false);
   }
+
+  @Test
+  public void testPosixRenameDirectory() throws Exception {
+    final AzureBlobFileSystem fs = this.getFileSystem();
+    fs.mkdirs(new Path("testDir2/test1/test2/test3"));
+    fs.mkdirs(new Path("testDir2/test4"));
+    Assert.assertTrue(fs.rename(new Path("testDir2/test1/test2/test3"), new Path("testDir2/test4")));
+    assertTrue(fs.exists(new Path("testDir2")));
+    assertTrue(fs.exists(new Path("testDir2/test1/test2")));
+    assertTrue(fs.exists(new Path("testDir2/test4")));
+    assertTrue(fs.exists(new Path("testDir2/test4/test3")));
+    assertFalse(fs.exists(new Path("testDir2/test1/test2/test3")));
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org