You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by st...@apache.org on 2017/09/28 18:53:16 UTC

hadoop git commit: HADOOP-14768. Honoring sticky bit during Deletion when authorization is enabled in WASB Contributed by Varada Hemeswari

Repository: hadoop
Updated Branches:
  refs/heads/trunk 8e1bd114e -> a530e7ab3


HADOOP-14768. Honoring sticky bit during Deletion when authorization is enabled in WASB
Contributed by Varada Hemeswari


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/a530e7ab
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/a530e7ab
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/a530e7ab

Branch: refs/heads/trunk
Commit: a530e7ab3b3f5bd71143a91266b46787962ac532
Parents: 8e1bd11
Author: Steve Loughran <st...@apache.org>
Authored: Thu Sep 28 19:52:56 2017 +0100
Committer: Steve Loughran <st...@apache.org>
Committed: Thu Sep 28 19:52:56 2017 +0100

----------------------------------------------------------------------
 .../hadoop/fs/azure/NativeAzureFileSystem.java  | 522 +++++++++++++++--
 ...stNativeAzureFSAuthWithBlobSpecificKeys.java |   2 +-
 .../ITestNativeAzureFSAuthorizationCaching.java |   2 +-
 ...veAzureFileSystemAuthorizationWithOwner.java | 122 ----
 .../hadoop/fs/azure/MockWasbAuthorizerImpl.java | 284 +++++-----
 .../TestNativeAzureFileSystemAuthorization.java | 555 +++++++++++++++++--
 6 files changed, 1136 insertions(+), 351 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/a530e7ab/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java
index 280c0e0..5f86f84 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java
@@ -40,6 +40,8 @@ import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 import java.util.Arrays;
 import java.util.List;
+import java.util.Stack;
+import java.util.HashMap;
 
 import com.fasterxml.jackson.core.JsonParseException;
 import com.fasterxml.jackson.core.JsonParser;
@@ -80,6 +82,8 @@ import org.apache.hadoop.util.Time;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.hadoop.fs.azure.NativeAzureFileSystemHelper.*;
+
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.annotations.VisibleForTesting;
 import com.microsoft.azure.storage.StorageException;
@@ -1849,31 +1853,256 @@ public class NativeAzureFileSystem extends FileSystem {
   }
 
   /**
-   * Delete the specified file or folder. The parameter
-   * skipParentFolderLastModifiedTimeUpdate
-   * is used in the case of atomic folder rename redo. In that case, there is
-   * a lease on the parent folder, so (without reworking the code) modifying
-   * the parent folder update time will fail because of a conflict with the
-   * lease. Since we are going to delete the folder soon anyway so accurate
-   * modified time is not necessary, it's easier to just skip
-   * the modified time update.
-   *
-   * @param f file path to be deleted.
-   * @param recursive specify deleting recursively or not.
-   * @param skipParentFolderLastModifiedTimeUpdate If true, don't update the folder last
-   * modified time.
-   * @return true if and only if the file is deleted
-   * @throws IOException Thrown when fail to delete file or directory.
+   * Delete file or folder with authorization checks. Most of the code
+   * is duplicate of the actual delete implementation and will be merged
+   * once the performance and funcional aspects are guaranteed not to
+   * regress existing delete semantics.
    */
-  public boolean delete(Path f, boolean recursive,
+  private boolean deleteWithAuthEnabled(Path f, boolean recursive,
       boolean skipParentFolderLastModifiedTimeUpdate) throws IOException {
 
-    LOG.debug("Deleting file: {}", f.toString());
+    LOG.debug("Deleting file: {}", f);
 
     Path absolutePath = makeAbsolute(f);
     Path parentPath = absolutePath.getParent();
 
-    performAuthCheck(parentPath, WasbAuthorizationOperations.WRITE, "delete", absolutePath);
+    // If delete is issued for 'root', parentPath will be null
+    // In that case, we perform auth check for root itself before
+    // proceeding for deleting contents under root.
+    if (parentPath != null) {
+      performAuthCheck(parentPath, WasbAuthorizationOperations.WRITE, "delete", absolutePath);
+    } else {
+      performAuthCheck(absolutePath, WasbAuthorizationOperations.WRITE, "delete", absolutePath);
+    }
+
+    String key = pathToKey(absolutePath);
+
+    // Capture the metadata for the path.
+    FileMetadata metaFile = null;
+    try {
+      metaFile = store.retrieveMetadata(key);
+    } catch (IOException e) {
+
+      Throwable innerException = checkForAzureStorageException(e);
+
+      if (innerException instanceof StorageException
+          && isFileNotFoundException((StorageException) innerException)) {
+
+        return false;
+      }
+      throw e;
+    }
+
+    if (null == metaFile) {
+      // The path to be deleted does not exist.
+      return false;
+    }
+
+    FileMetadata parentMetadata = null;
+    String parentKey = null;
+    if (parentPath != null) {
+      parentKey = pathToKey(parentPath);
+
+      try {
+        parentMetadata = store.retrieveMetadata(parentKey);
+      } catch (IOException e) {
+         Throwable innerException = checkForAzureStorageException(e);
+         if (innerException instanceof StorageException) {
+           // Invalid State.
+           // A FileNotFoundException is not thrown here as the API returns false
+           // if the file not present. But not retrieving metadata here is an
+           // unrecoverable state and can only happen if there is a race condition
+           // hence throwing a IOException
+           if (isFileNotFoundException((StorageException) innerException)) {
+             throw new IOException("File " + f + " has a parent directory "
+               + parentPath + " whose metadata cannot be retrieved. Can't resolve");
+           }
+         }
+         throw e;
+      }
+
+      // Same case as unable to retrieve metadata
+      if (parentMetadata == null) {
+          throw new IOException("File " + f + " has a parent directory "
+              + parentPath + " whose metadata cannot be retrieved. Can't resolve");
+      }
+
+      if (!parentMetadata.isDir()) {
+         // Invalid state: the parent path is actually a file. Throw.
+         throw new AzureException("File " + f + " has a parent directory "
+             + parentPath + " which is also a file. Can't resolve.");
+      }
+    }
+
+    // The path exists, determine if it is a folder containing objects,
+    // an empty folder, or a simple file and take the appropriate actions.
+    if (!metaFile.isDir()) {
+      // The path specifies a file. We need to check the parent path
+      // to make sure it's a proper materialized directory before we
+      // delete the file. Otherwise we may get into a situation where
+      // the file we were deleting was the last one in an implicit directory
+      // (e.g. the blob store only contains the blob a/b and there's no
+      // corresponding directory blob a) and that would implicitly delete
+      // the directory as well, which is not correct.
+
+      if (parentPath != null && parentPath.getParent() != null) {// Not root
+
+        if (parentMetadata.getBlobMaterialization() == BlobMaterialization.Implicit) {
+          LOG.debug("Found an implicit parent directory while trying to"
+              + " delete the file {}. Creating the directory blob for"
+              + " it in {}.", f, parentKey);
+
+          store.storeEmptyFolder(parentKey,
+              createPermissionStatus(FsPermission.getDefault()));
+        } else {
+          if (!skipParentFolderLastModifiedTimeUpdate) {
+            updateParentFolderLastModifiedTime(key);
+          }
+        }
+      }
+
+      // check if the file can be deleted based on sticky bit check
+      // This check will be performed only when authorization is enabled
+      if (isStickyBitCheckViolated(metaFile, parentMetadata)) {
+        throw new WasbAuthorizationException(String.format("%s has sticky bit set. "
+          + "File %s cannot be deleted.", parentPath, f));
+      }
+
+      try {
+        if (store.delete(key)) {
+          instrumentation.fileDeleted();
+        } else {
+          return false;
+        }
+      } catch(IOException e) {
+
+        Throwable innerException = checkForAzureStorageException(e);
+
+        if (innerException instanceof StorageException
+            && isFileNotFoundException((StorageException) innerException)) {
+          return false;
+        }
+
+       throw e;
+      }
+    } else {
+      // The path specifies a folder. Recursively delete all entries under the
+      // folder.
+      LOG.debug("Directory Delete encountered: {}", f);
+      if (parentPath != null && parentPath.getParent() != null) {
+
+        if (parentMetadata.getBlobMaterialization() == BlobMaterialization.Implicit) {
+          LOG.debug("Found an implicit parent directory while trying to"
+                  + " delete the directory {}. Creating the directory blob for"
+                  + " it in {}. ", f, parentKey);
+
+          store.storeEmptyFolder(parentKey,
+                  createPermissionStatus(FsPermission.getDefault()));
+        }
+      }
+
+      // check if the folder can be deleted based on sticky bit check on parent
+      // This check will be performed only when authorization is enabled.
+      if (!metaFile.getKey().equals("/")
+          && isStickyBitCheckViolated(metaFile, parentMetadata)) {
+
+        throw new WasbAuthorizationException(String.format("%s has sticky bit set. "
+          + "File %s cannot be deleted.", parentPath, f));
+      }
+
+      // Iterate through folder contents and get the list of files
+      // and folders that can be deleted. We might encounter IOException
+      // while listing blobs. In such cases, we return false.
+      ArrayList<FileMetadata> fileMetadataList = new ArrayList<>();
+      boolean isPartialDelete = false;
+
+      // Start time for list operation
+      long start = Time.monotonicNow();
+
+      try {
+        // Get list of files/folders that can be deleted
+        // based on authorization checks and stickybit checks
+        isPartialDelete = getFolderContentsToDelete(metaFile, fileMetadataList);
+      } catch (IOException e) {
+        Throwable innerException = checkForAzureStorageException(e);
+
+        if (innerException instanceof StorageException
+            && isFileNotFoundException((StorageException) innerException)) {
+            return false;
+        }
+        throw e;
+      }
+
+      long end = Time.monotonicNow();
+      LOG.debug("Time taken to list {} blobs for delete operation: {} ms",
+        fileMetadataList.size(), (end - start));
+
+      // Here contents holds the list of metadata of the files and folders that can be deleted
+      // under the path that is requested for delete(excluding itself).
+      final FileMetadata[] contents = fileMetadataList.toArray(new FileMetadata[fileMetadataList.size()]);
+
+      if (contents.length > 0 && !recursive) {
+          // The folder is non-empty and recursive delete was not specified.
+          // Throw an exception indicating that a non-recursive delete was
+          // specified for a non-empty folder.
+          throw new IOException("Non-recursive delete of non-empty directory "
+              + f);
+      }
+
+      // Delete all files / folders in current directory stored as list in 'contents'.
+      AzureFileSystemThreadTask task = new AzureFileSystemThreadTask() {
+        @Override
+        public boolean execute(FileMetadata file) throws IOException{
+          if (!deleteFile(file.getKey(), file.isDir())) {
+            LOG.warn("Attempt to delete non-existent {} {}",
+                file.isDir() ? "directory" : "file",
+                file.getKey());
+          }
+          return true;
+        }
+      };
+
+      AzureFileSystemThreadPoolExecutor executor = getThreadPoolExecutor(this.deleteThreadCount,
+          "AzureBlobDeleteThread", "Delete", key, AZURE_DELETE_THREADS);
+
+      if (!executor.executeParallel(contents, task)) {
+        LOG.error("Failed to delete files / subfolders in blob {}", key);
+        return false;
+      }
+
+      if (metaFile.getKey().equals("/")) {
+        LOG.error("Cannot delete root directory {}", f);
+        return false;
+      }
+
+      // Delete the current directory if all underlying contents are deleted
+      if (isPartialDelete || (store.retrieveMetadata(metaFile.getKey()) != null
+          && !deleteFile(metaFile.getKey(), metaFile.isDir()))) {
+        LOG.error("Failed delete directory : {}", f);
+        return false;
+      }
+
+      // Update parent directory last modified time
+      Path parent = absolutePath.getParent();
+      if (parent != null && parent.getParent() != null) { // not root
+        if (!skipParentFolderLastModifiedTimeUpdate) {
+          updateParentFolderLastModifiedTime(key);
+        }
+      }
+    }
+
+    // File or directory was successfully deleted.
+    LOG.debug("Delete Successful for : {}", f);
+    return true;
+  }
+
+  private boolean deleteWithoutAuth(Path f, boolean recursive,
+      boolean skipParentFolderLastModifiedTimeUpdate) throws IOException {
+
+    LOG.debug("Deleting file: {}", f);
+
+    Path absolutePath = makeAbsolute(f);
+    Path parentPath = absolutePath.getParent();
 
     String key = pathToKey(absolutePath);
 
@@ -1884,10 +2113,10 @@ public class NativeAzureFileSystem extends FileSystem {
       metaFile = store.retrieveMetadata(key);
     } catch (IOException e) {
 
-      Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(e);
+      Throwable innerException = checkForAzureStorageException(e);
 
       if (innerException instanceof StorageException
-          && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
+          && isFileNotFoundException((StorageException) innerException)) {
 
         return false;
       }
@@ -1918,7 +2147,7 @@ public class NativeAzureFileSystem extends FileSystem {
           parentMetadata = store.retrieveMetadata(parentKey);
         } catch (IOException e) {
 
-          Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(e);
+          Throwable innerException = checkForAzureStorageException(e);
 
           if (innerException instanceof StorageException) {
             // Invalid State.
@@ -1926,7 +2155,7 @@ public class NativeAzureFileSystem extends FileSystem {
             // if the file not present. But not retrieving metadata here is an
             // unrecoverable state and can only happen if there is a race condition
             // hence throwing a IOException
-            if (NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
+            if (isFileNotFoundException((StorageException) innerException)) {
               throw new IOException("File " + f + " has a parent directory "
                   + parentPath + " whose metadata cannot be retrieved. Can't resolve");
             }
@@ -1972,10 +2201,10 @@ public class NativeAzureFileSystem extends FileSystem {
         }
       } catch(IOException e) {
 
-        Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(e);
+        Throwable innerException = checkForAzureStorageException(e);
 
         if (innerException instanceof StorageException
-            && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
+            && isFileNotFoundException((StorageException) innerException)) {
           return false;
         }
 
@@ -1984,7 +2213,7 @@ public class NativeAzureFileSystem extends FileSystem {
     } else {
       // The path specifies a folder. Recursively delete all entries under the
       // folder.
-      LOG.debug("Directory Delete encountered: {}", f.toString());
+      LOG.debug("Directory Delete encountered: {}", f);
       if (parentPath.getParent() != null) {
         String parentKey = pathToKey(parentPath);
         FileMetadata parentMetadata = null;
@@ -1993,7 +2222,7 @@ public class NativeAzureFileSystem extends FileSystem {
           parentMetadata = store.retrieveMetadata(parentKey);
         } catch (IOException e) {
 
-          Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(e);
+          Throwable innerException = checkForAzureStorageException(e);
 
           if (innerException instanceof StorageException) {
             // Invalid State.
@@ -2001,7 +2230,7 @@ public class NativeAzureFileSystem extends FileSystem {
             // if the file not present. But not retrieving metadata here is an
             // unrecoverable state and can only happen if there is a race condition
             // hence throwing a IOException
-            if (NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
+            if (isFileNotFoundException((StorageException) innerException)) {
               throw new IOException("File " + f + " has a parent directory "
                   + parentPath + " whose metadata cannot be retrieved. Can't resolve");
             }
@@ -2046,10 +2275,10 @@ public class NativeAzureFileSystem extends FileSystem {
           }
           priorLastKey = listing.getPriorLastKey();
         } catch (IOException e) {
-          Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(e);
+          Throwable innerException = checkForAzureStorageException(e);
 
           if (innerException instanceof StorageException
-              && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
+              && isFileNotFoundException((StorageException) innerException)) {
             return false;
           }
 
@@ -2067,22 +2296,7 @@ public class NativeAzureFileSystem extends FileSystem {
           // The folder is non-empty and recursive delete was not specified.
           // Throw an exception indicating that a non-recursive delete was
           // specified for a non-empty folder.
-          throw new IOException("Non-recursive delete of non-empty directory "
-              + f.toString());
-        }
-        else {
-          // Check write-permissions on sub-tree including current folder
-          // NOTE: Ideally the subtree needs read-write-execute access check.
-          // But we will simplify it to write-access check.
-          if (metaFile.isDir()) { // the absolute-path
-            performAuthCheck(absolutePath, WasbAuthorizationOperations.WRITE, "delete", absolutePath);
-          }
-          for (FileMetadata meta : contents) {
-            if (meta.isDir()) {
-              Path subTreeDir = keyToPath(meta.getKey());
-              performAuthCheck(subTreeDir, WasbAuthorizationOperations.WRITE, "delete", absolutePath);
-            }
-          }
+          throw new IOException("Non-recursive delete of non-empty directory "+ f);
         }
       }
 
@@ -2108,8 +2322,9 @@ public class NativeAzureFileSystem extends FileSystem {
       }
 
       // Delete the current directory
-      if (store.retrieveMetadata(metaFile.getKey()) != null && !deleteFile(metaFile.getKey(), metaFile.isDir())) {
-        LOG.error("Failed delete directory {}", f.toString());
+      if (store.retrieveMetadata(metaFile.getKey()) != null
+          && !deleteFile(metaFile.getKey(), metaFile.isDir())) {
+        LOG.error("Failed delete directory : {}", f);
         return false;
       }
 
@@ -2123,16 +2338,227 @@ public class NativeAzureFileSystem extends FileSystem {
     }
 
     // File or directory was successfully deleted.
-    LOG.debug("Delete Successful for : {}", f.toString());
+    LOG.debug("Delete Successful for : {}", f);
     return true;
   }
 
+  /**
+   * Delete the specified file or folder. The parameter
+   * skipParentFolderLastModifiedTimeUpdate
+   * is used in the case of atomic folder rename redo. In that case, there is
+   * a lease on the parent folder, so (without reworking the code) modifying
+   * the parent folder update time will fail because of a conflict with the
+   * lease. Since we are going to delete the folder soon anyway so accurate
+   * modified time is not necessary, it's easier to just skip
+   * the modified time update.
+   *
+   * @param f file path to be deleted.
+   * @param recursive specify deleting recursively or not.
+   * @param skipParentFolderLastModifiedTimeUpdate If true, don't update the folder last
+   * modified time.
+   * @return true if and only if the file is deleted
+   * @throws IOException Thrown when fail to delete file or directory.
+   */
+  public boolean delete(Path f, boolean recursive,
+      boolean skipParentFolderLastModifiedTimeUpdate) throws IOException {
+
+    if (this.azureAuthorization) {
+      return deleteWithAuthEnabled(f, recursive,
+        skipParentFolderLastModifiedTimeUpdate);
+    } else {
+      return deleteWithoutAuth(f, recursive,
+        skipParentFolderLastModifiedTimeUpdate);
+    }
+  }
+
   public AzureFileSystemThreadPoolExecutor getThreadPoolExecutor(int threadCount,
       String threadNamePrefix, String operation, String key, String config) {
     return new AzureFileSystemThreadPoolExecutor(threadCount, threadNamePrefix, operation, key, config);
   }
 
   /**
+   * Gets list of contents that can be deleted based on authorization check calls
+   * performed on the sub-tree for the folderToDelete.
+   *
+   * @param folderToDelete - metadata of the folder whose delete is requested.
+   * @param finalList - list of metadata of all files/folders that can be deleted .
+   *
+   * @return 'true' only if all the contents of the folderToDelete can be deleted
+   * @throws IOException Thrown when current user cannot be retrieved.
+   */
+  private boolean getFolderContentsToDelete(FileMetadata folderToDelete,
+      ArrayList<FileMetadata> finalList) throws IOException {
+
+    final int maxListingDepth = 1;
+    Stack<FileMetadata> foldersToProcess = new Stack<FileMetadata>();
+    HashMap<String, FileMetadata> folderContentsMap = new HashMap<String, FileMetadata>();
+
+    boolean isPartialDelete = false;
+
+    Path pathToDelete = makeAbsolute(keyToPath(folderToDelete.getKey()));
+    foldersToProcess.push(folderToDelete);
+
+    while (!foldersToProcess.empty()) {
+
+      FileMetadata currentFolder = foldersToProcess.pop();
+      Path currentPath = makeAbsolute(keyToPath(currentFolder.getKey()));
+      boolean canDeleteChildren = true;
+
+      // If authorization is enabled, check for 'write' permission on current folder
+      // This check maps to subfolders 'write' check for deleting contents recursively.
+      try {
+        performAuthCheck(currentPath, WasbAuthorizationOperations.WRITE, "delete", pathToDelete);
+      } catch (WasbAuthorizationException we) {
+        LOG.debug("Authorization check failed for {}", currentPath);
+        // We cannot delete the children of currentFolder since 'write' check on parent failed
+        canDeleteChildren = false;
+      }
+
+      if (canDeleteChildren) {
+
+        // get immediate children list
+        ArrayList<FileMetadata> fileMetadataList = getChildrenMetadata(currentFolder.getKey(),
+            maxListingDepth);
+
+        // Process children of currentFolder and add them to list of contents
+        // that can be deleted. We Perform stickybit check on every file and
+        // folder under currentFolder in case stickybit is set on currentFolder.
+        for (FileMetadata childItem : fileMetadataList) {
+          if (isStickyBitCheckViolated(childItem, currentFolder, false)) {
+            // Stickybit check failed for the childItem that is being processed.
+            // This file/folder cannot be deleted and neither can the parent paths be deleted.
+            // Remove parent paths from list of contents that can be deleted.
+            canDeleteChildren = false;
+            Path filePath = makeAbsolute(keyToPath(childItem.getKey()));
+            LOG.error("User does not have permissions to delete {}. "
+              + "Parent directory has sticky bit set.", filePath);
+          } else {
+            // push the child directories to the stack to process their contents
+            if (childItem.isDir()) {
+              foldersToProcess.push(childItem);
+            }
+            // Add items to list of contents that can be deleted.
+            folderContentsMap.put(childItem.getKey(), childItem);
+          }
+        }
+
+      } else {
+        // Cannot delete children since parent permission check has not passed and
+        // if there are files/folders under currentFolder they will not be deleted.
+        LOG.error("Authorization check failed. Files or folders under {} "
+          + "will not be processed for deletion.", currentPath);
+      }
+
+      if (!canDeleteChildren) {
+        // We reach here if
+        // 1. cannot delete children since 'write' check on parent failed or
+        // 2. One of the files under the current folder cannot be deleted due to stickybit check.
+        // In this case we remove all the parent paths from the list of contents
+        // that can be deleted till we reach the original path of delete request
+        String pathToRemove = currentFolder.getKey();
+        while (!pathToRemove.equals(folderToDelete.getKey())) {
+          if (folderContentsMap.containsKey(pathToRemove)) {
+            LOG.debug("Cannot delete {} since some of its contents "
+              + "cannot be deleted", pathToRemove);
+            folderContentsMap.remove(pathToRemove);
+          }
+          Path parentPath = keyToPath(pathToRemove).getParent();
+          pathToRemove = pathToKey(parentPath);
+        }
+        // Since one or more files/folders cannot be deleted return value should indicate
+        // partial delete, so that the delete on the path requested by user is not performed
+        isPartialDelete = true;
+      }
+    }
+
+    // final list of contents that can be deleted
+    for (HashMap.Entry<String, FileMetadata> entry : folderContentsMap.entrySet()) {
+      finalList.add(entry.getValue());
+    }
+
+    return isPartialDelete;
+  }
+
+  private ArrayList<FileMetadata> getChildrenMetadata(String key, int maxListingDepth)
+    throws IOException {
+
+    String priorLastKey = null;
+    ArrayList<FileMetadata> fileMetadataList = new ArrayList<FileMetadata>();
+    do {
+       PartialListing listing = store.listAll(key, AZURE_LIST_ALL,
+         maxListingDepth, priorLastKey);
+       for (FileMetadata file : listing.getFiles()) {
+         fileMetadataList.add(file);
+       }
+       priorLastKey = listing.getPriorLastKey();
+    } while (priorLastKey != null);
+
+    return fileMetadataList;
+  }
+
+  private boolean isStickyBitCheckViolated(FileMetadata metaData,
+    FileMetadata parentMetadata, boolean throwOnException) throws IOException {
+      try {
+        return isStickyBitCheckViolated(metaData, parentMetadata);
+      } catch (FileNotFoundException ex) {
+        if (throwOnException) {
+          throw ex;
+        } else {
+          LOG.debug("Encountered FileNotFoundException while performing "
+            + "stickybit check operation for {}", metaData.getKey());
+          // swallow exception and return that stickyBit check has been violated
+          return true;
+        }
+      }
+  }
+
+  /**
+   * Checks if the Current user is not permitted access to a file/folder when
+   * sticky bit is set on parent path. Only the owner of parent path
+   * and owner of the file/folder itself are permitted to perform certain
+   * operations on file/folder based on sticky bit check. Sticky bit check will
+   * be performed only when authorization is enabled.
+   *
+   * @param metaData - metadata of the file/folder whose parent has sticky bit set.
+   * @param parentMetadata - metadata of the parent.
+   *
+   * @return true if Current user violates stickybit check
+   * @throws IOException Thrown when current user cannot be retrieved.
+   */
+   private boolean isStickyBitCheckViolated(FileMetadata metaData,
+    FileMetadata parentMetadata) throws IOException {
+
+    // In case stickybit check should not be performed,
+    // return value should indicate stickybit check is not violated.
+    if (!this.azureAuthorization) {
+      return false;
+    }
+
+    // This should never happen when the sticky bit check is invoked.
+    if (parentMetadata == null) {
+      throw new FileNotFoundException(
+        String.format("Parent metadata for '%s' not found!", metaData.getKey()));
+    }
+
+    // stickybit is not set on parent and hence cannot be violated
+    if (!parentMetadata.getPermissionStatus().getPermission().getStickyBit()) {
+      return false;
+    }
+
+    String currentUser = UserGroupInformation.getCurrentUser().getShortUserName();
+    String parentDirectoryOwner = parentMetadata.getPermissionStatus().getUserName();
+    String currentFileOwner = metaData.getPermissionStatus().getUserName();
+
+    // Files/Folders with no owner set will not pass stickybit check
+    if ((parentDirectoryOwner.equalsIgnoreCase(currentUser))
+      || currentFileOwner.equalsIgnoreCase(currentUser)) {
+
+      return false;
+    }
+    return true;
+  }
+
+  /**
    * Delete the specified file or directory and increment metrics.
    * If the file or directory does not exist, the operation returns false.
    * @param path the path to a file or directory.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a530e7ab/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestNativeAzureFSAuthWithBlobSpecificKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestNativeAzureFSAuthWithBlobSpecificKeys.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestNativeAzureFSAuthWithBlobSpecificKeys.java
index d7e4831..0f3d127 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestNativeAzureFSAuthWithBlobSpecificKeys.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestNativeAzureFSAuthWithBlobSpecificKeys.java
@@ -27,7 +27,7 @@ import static org.apache.hadoop.fs.azure.SecureStorageInterfaceImpl.KEY_USE_CONT
  * to access storage.
  */
 public class ITestNativeAzureFSAuthWithBlobSpecificKeys
-    extends ITestNativeAzureFileSystemAuthorizationWithOwner {
+    extends TestNativeAzureFileSystemAuthorization {
 
 
   @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a530e7ab/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestNativeAzureFSAuthorizationCaching.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestNativeAzureFSAuthorizationCaching.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestNativeAzureFSAuthorizationCaching.java
index c73b1cc..138063d 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestNativeAzureFSAuthorizationCaching.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestNativeAzureFSAuthorizationCaching.java
@@ -27,7 +27,7 @@ import static org.apache.hadoop.fs.azure.CachingAuthorizer.KEY_AUTH_SERVICE_CACH
  * Test class to hold all WASB authorization caching related tests.
  */
 public class ITestNativeAzureFSAuthorizationCaching
-    extends ITestNativeAzureFileSystemAuthorizationWithOwner {
+    extends TestNativeAzureFileSystemAuthorization {
 
   private static final int DUMMY_TTL_VALUE = 5000;
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a530e7ab/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestNativeAzureFileSystemAuthorizationWithOwner.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestNativeAzureFileSystemAuthorizationWithOwner.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestNativeAzureFileSystemAuthorizationWithOwner.java
deleted file mode 100644
index 3ec42f0..0000000
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestNativeAzureFileSystemAuthorizationWithOwner.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.fs.azure;
-
-import org.apache.hadoop.fs.contract.ContractTestUtils;
-import org.apache.hadoop.security.UserGroupInformation;
-import java.security.PrivilegedExceptionAction;
-
-import org.apache.hadoop.fs.Path;
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-
-/**
- * Test class that runs wasb authorization tests with owner check enabled.
- */
-public class ITestNativeAzureFileSystemAuthorizationWithOwner
-  extends TestNativeAzureFileSystemAuthorization {
-
-  @Override
-  public void setUp() throws Exception {
-    super.setUp();
-    authorizer.init(fs.getConf(), true);
-  }
-
-  /**
-   * Test case when owner matches current user.
-   */
-  @Test
-  public void testOwnerPermissionPositive() throws Throwable {
-
-    Path parentDir = new Path("/testOwnerPermissionPositive");
-    Path testPath = new Path(parentDir, "test.data");
-
-    authorizer.addAuthRule("/", WasbAuthorizationOperations.WRITE.toString(), true);
-    authorizer.addAuthRule(testPath.toString(), WasbAuthorizationOperations.READ.toString(), true);
-    authorizer.addAuthRule(parentDir.toString(), WasbAuthorizationOperations.WRITE.toString(), true);
-    // additional rule used for assertPathExists
-    authorizer.addAuthRule(parentDir.toString(), WasbAuthorizationOperations.READ.toString(), true);
-    fs.updateWasbAuthorizer(authorizer);
-
-    try {
-      // creates parentDir with owner as current user
-      fs.mkdirs(parentDir);
-      ContractTestUtils.assertPathExists(fs, "parentDir does not exist", parentDir);
-
-      fs.create(testPath);
-      fs.getFileStatus(testPath);
-      ContractTestUtils.assertPathExists(fs, "testPath does not exist", testPath);
-
-    } finally {
-      allowRecursiveDelete(fs, parentDir.toString());
-      fs.delete(parentDir, true);
-    }
-  }
-
-  /**
-   * Negative test case for owner does not match current user.
-   */
-  @Test
-  public void testOwnerPermissionNegative() throws Throwable {
-    expectedEx.expect(WasbAuthorizationException.class);
-
-    Path parentDir = new Path("/testOwnerPermissionNegative");
-    Path childDir = new Path(parentDir, "childDir");
-
-    setExpectedFailureMessage("mkdirs", childDir);
-
-    authorizer.addAuthRule("/", WasbAuthorizationOperations.WRITE.toString(), true);
-    authorizer.addAuthRule(parentDir.toString(), WasbAuthorizationOperations.WRITE.toString(), true);
-
-    fs.updateWasbAuthorizer(authorizer);
-
-    try{
-      fs.mkdirs(parentDir);
-      UserGroupInformation ugiSuperUser = UserGroupInformation.createUserForTesting(
-          "testuser", new String[] {});
-
-      ugiSuperUser.doAs(new PrivilegedExceptionAction<Void>() {
-      @Override
-      public Void run() throws Exception {
-          fs.mkdirs(childDir);
-          return null;
-        }
-      });
-
-    } finally {
-       allowRecursiveDelete(fs, parentDir.toString());
-       fs.delete(parentDir, true);
-    }
-  }
-
-  /**
-   * Test to verify that retrieving owner information does not
-   * throw when file/folder does not exist.
-   */
-  @Test
-  public void testRetrievingOwnerDoesNotFailWhenFileDoesNotExist() throws Throwable {
-
-    Path testdirectory = new Path("/testDirectory123454565");
-
-    String owner = fs.getOwnerForPath(testdirectory);
-    assertEquals("", owner);
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a530e7ab/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/MockWasbAuthorizerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/MockWasbAuthorizerImpl.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/MockWasbAuthorizerImpl.java
index 7354499..1f5072d 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/MockWasbAuthorizerImpl.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/MockWasbAuthorizerImpl.java
@@ -34,165 +34,183 @@ import org.apache.hadoop.fs.Path;
 
 public class MockWasbAuthorizerImpl implements WasbAuthorizerInterface {
 
-  private Map<AuthorizationComponent, Boolean> authRules;
-  private boolean performOwnerMatch;
-  private CachingAuthorizer<CachedAuthorizerEntry, Boolean> cache;
-
-  // The full qualified URL to the root directory
-  private String qualifiedPrefixUrl;
-
-  public MockWasbAuthorizerImpl(NativeAzureFileSystem fs) {
-    qualifiedPrefixUrl = new Path("/").makeQualified(fs.getUri(),
-        fs.getWorkingDirectory())
-        .toString().replaceAll("/$", "");
-    cache = new CachingAuthorizer<>(TimeUnit.MINUTES.convert(5L, TimeUnit.MINUTES), "AUTHORIZATION");
-  }
-
-  @Override
-  public void init(Configuration conf) {
-    init(conf, false);
-  }
-
-  /*
-  authorization matches owner with currentUserShortName while evaluating auth rules
-  if currentUserShortName is set to a string that is not empty
-  */
-  public void init(Configuration conf, boolean matchOwner) {
-    cache.init(conf);
-    authRules = new HashMap<>();
-    this.performOwnerMatch = matchOwner;
-  }
-
-  public void addAuthRule(String wasbAbsolutePath,
-      String accessType, boolean access) {
-    wasbAbsolutePath = qualifiedPrefixUrl + wasbAbsolutePath;
-    AuthorizationComponent component = wasbAbsolutePath.endsWith("*")
-        ? new AuthorizationComponent("^" + wasbAbsolutePath.replace("*", ".*"),
-        accessType)
-        : new AuthorizationComponent(wasbAbsolutePath, accessType);
-
-    this.authRules.put(component, access);
-  }
-
-  @Override
-  public boolean authorize(String wasbAbsolutePath,
-      String accessType,
-      String owner)
-      throws WasbAuthorizationException {
-
-    if (wasbAbsolutePath.endsWith(
-        NativeAzureFileSystem.FolderRenamePending.SUFFIX)) {
-      return true;
-    }
+    private Map<AuthorizationComponent, Boolean> authRules;
+    private CachingAuthorizer<CachedAuthorizerEntry, Boolean> cache;
+
+    // The full qualified URL to the root directory
+    private String qualifiedPrefixUrl;
 
-    CachedAuthorizerEntry cacheKey = new CachedAuthorizerEntry(wasbAbsolutePath, accessType, owner);
-    Boolean cacheresult = cache.get(cacheKey);
-    if (cacheresult != null) {
-      return cacheresult;
+    public MockWasbAuthorizerImpl(NativeAzureFileSystem fs) {
+        qualifiedPrefixUrl = new Path("/").makeQualified(fs.getUri(),
+                fs.getWorkingDirectory())
+                .toString().replaceAll("/$", "");
+        cache = new CachingAuthorizer<>(TimeUnit.MINUTES.convert(5L, TimeUnit.MINUTES), "AUTHORIZATION");
     }
 
-    boolean authorizeresult = authorizeInternal(wasbAbsolutePath, accessType, owner);
-    cache.put(cacheKey, authorizeresult);
+    @Override
+    public void init(Configuration conf) {
+        cache.init(conf);
+        authRules = new HashMap<>();
+    }
 
-    return authorizeresult;
-  }
+    public void addAuthRuleForOwner(String wasbAbsolutePath,
+                                    String accessType, boolean access) {
+        addAuthRule(wasbAbsolutePath, accessType, "owner", access);
+    }
 
-  private boolean authorizeInternal(String wasbAbsolutePath, String accessType, String owner)
-      throws WasbAuthorizationException {
+    public void addAuthRule(String wasbAbsolutePath,
+                            String accessType, String user, boolean access) {
+        wasbAbsolutePath = qualifiedPrefixUrl + wasbAbsolutePath;
+        AuthorizationComponent component = wasbAbsolutePath.endsWith("*")
+                ? new AuthorizationComponent("^" + wasbAbsolutePath.replace("*", ".*"),
+                accessType, user)
+                : new AuthorizationComponent(wasbAbsolutePath, accessType, user);
 
-    String currentUserShortName = "";
-    if (this.performOwnerMatch) {
-      try {
-        UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
-        currentUserShortName = ugi.getShortUserName();
-      } catch (Exception e) {
-        //no op
-      }
+        this.authRules.put(component, access);
     }
 
-    // In case of root("/"), owner match does not happen because owner is returned as empty string.
-    // we try to force owner match just for purpose of tests to make sure all operations work seemlessly with owner.
-    if (this.performOwnerMatch
-        && StringUtils.equalsIgnoreCase(wasbAbsolutePath,
-        qualifiedPrefixUrl + "/")) {
-      owner = currentUserShortName;
-    }
+    @Override
+    public boolean authorize(String wasbAbsolutePath,
+                             String accessType,
+                             String owner)
+            throws WasbAuthorizationException {
+
+        if (wasbAbsolutePath.endsWith(
+                NativeAzureFileSystem.FolderRenamePending.SUFFIX)) {
+            return true;
+        }
 
-    boolean shouldEvaluateOwnerAccess = owner != null && !owner.isEmpty()
-        && this.performOwnerMatch;
-
-    boolean isOwnerMatch = StringUtils.equalsIgnoreCase(currentUserShortName,
-        owner);
-
-    AuthorizationComponent component =
-        new AuthorizationComponent(wasbAbsolutePath, accessType);
-
-    if (authRules.containsKey(component)) {
-      return shouldEvaluateOwnerAccess ? isOwnerMatch && authRules.get(
-          component) : authRules.get(component);
-    } else {
-      // Regex-pattern match if we don't have a straight match
-      for (Map.Entry<AuthorizationComponent, Boolean> entry : authRules.entrySet()) {
-        AuthorizationComponent key = entry.getKey();
-        String keyPath = key.getWasbAbsolutePath();
-        String keyAccess = key.getAccessType();
-
-        if (keyPath.endsWith("*") && Pattern.matches(keyPath, wasbAbsolutePath)
-            && keyAccess.equals(accessType)) {
-          return shouldEvaluateOwnerAccess
-              ? isOwnerMatch && entry.getValue()
-              : entry.getValue();
+        CachedAuthorizerEntry cacheKey = new CachedAuthorizerEntry(wasbAbsolutePath, accessType, owner);
+        Boolean cacheresult = cache.get(cacheKey);
+        if (cacheresult != null) {
+            return cacheresult;
         }
-      }
-      return false;
+
+        boolean authorizeresult = authorizeInternal(wasbAbsolutePath, accessType, owner);
+        cache.put(cacheKey, authorizeresult);
+
+        return authorizeresult;
     }
-  }
 
-  public void deleteAllAuthRules() {
-    authRules.clear();
-    cache.clear();
-  }
+    private boolean authorizeInternal(String wasbAbsolutePath, String accessType, String owner)
+            throws WasbAuthorizationException {
 
-  private static class AuthorizationComponent {
+        String currentUserShortName = "";
+        try {
+            UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+            currentUserShortName = ugi.getShortUserName();
+        } catch (Exception e) {
+            //no op
+        }
 
-    private final String wasbAbsolutePath;
-    private final String accessType;
+        // In case of root("/"), owner match does not happen
+        // because owner is returned as empty string.
+        // we try to force owner match just for purpose of tests
+        // to make sure all operations work seemlessly with owner.
+        if (StringUtils.equalsIgnoreCase(wasbAbsolutePath, qualifiedPrefixUrl + "/")) {
+            owner = currentUserShortName;
+        }
 
-    AuthorizationComponent(String wasbAbsolutePath,
-        String accessType) {
-      this.wasbAbsolutePath = wasbAbsolutePath;
-      this.accessType = accessType;
+        AuthorizationComponent component = new AuthorizationComponent(wasbAbsolutePath,
+                accessType, currentUserShortName);
+
+        return processRules(authRules, component, owner);
     }
 
-    @Override
-    public int hashCode() {
-      return this.wasbAbsolutePath.hashCode() ^ this.accessType.hashCode();
+    private boolean processRules(Map<AuthorizationComponent, Boolean> authRules,
+                                 AuthorizationComponent component, String owner) {
+
+        // Direct match of rules and access request
+        if (authRules.containsKey(component)) {
+            return authRules.get(component);
+        } else {
+            // Regex-pattern match if we don't have a straight match for path
+            // Current user match if we don't have a owner match
+            for (Map.Entry<AuthorizationComponent, Boolean> entry : authRules.entrySet()) {
+                AuthorizationComponent key = entry.getKey();
+                String keyPath = key.getWasbAbsolutePath();
+                String keyAccess = key.getAccessType();
+                String keyUser = key.getUser();
+
+                boolean foundMatchingOwnerRule = keyPath.equals(component.getWasbAbsolutePath())
+                        && keyAccess.equals(component.getAccessType())
+                        && keyUser.equalsIgnoreCase("owner")
+                        && owner.equals(component.getUser());
+
+                boolean foundMatchingPatternRule = keyPath.endsWith("*")
+                        && Pattern.matches(keyPath, component.getWasbAbsolutePath())
+                        && keyAccess.equals(component.getAccessType())
+                        && keyUser.equalsIgnoreCase(component.getUser());
+
+                boolean foundMatchingPatternOwnerRule = keyPath.endsWith("*")
+                        && Pattern.matches(keyPath, component.getWasbAbsolutePath())
+                        && keyAccess.equals(component.getAccessType())
+                        && keyUser.equalsIgnoreCase("owner")
+                        && owner.equals(component.getUser());
+
+                if (foundMatchingOwnerRule
+                        || foundMatchingPatternRule
+                        || foundMatchingPatternOwnerRule) {
+                    return entry.getValue();
+                }
+            }
+            return false;
+        }
     }
 
-    @Override
-    public boolean equals(Object obj) {
+    public void deleteAllAuthRules() {
+        authRules.clear();
+        cache.clear();
+    }
+
+    private static class AuthorizationComponent {
+
+      private final String wasbAbsolutePath;
+      private final String accessType;
+      private final String user;
 
-      if (obj == this) {
-        return true;
+      AuthorizationComponent(String wasbAbsolutePath,
+          String accessType, String user) {
+        this.wasbAbsolutePath = wasbAbsolutePath;
+        this.accessType = accessType;
+        this.user = user;
       }
 
-      if (obj == null
-          || !(obj instanceof AuthorizationComponent)) {
-        return false;
+      @Override
+      public int hashCode() {
+        return this.wasbAbsolutePath.hashCode() ^ this.accessType.hashCode();
       }
 
-      return ((AuthorizationComponent) obj).
+      @Override
+      public boolean equals(Object obj) {
+
+        if (obj == this) {
+          return true;
+        }
+
+        if (obj == null
+            || !(obj instanceof AuthorizationComponent)) {
+            return false;
+        }
+
+        return ((AuthorizationComponent) obj).
           getWasbAbsolutePath().equals(this.wasbAbsolutePath)
           && ((AuthorizationComponent) obj).
-          getAccessType().equals(this.accessType);
-    }
+          getAccessType().equals(this.accessType)
+          && ((AuthorizationComponent) obj).
+          getUser().equals(this.user);
+      }
 
-    public String getWasbAbsolutePath() {
-      return this.wasbAbsolutePath;
-    }
+      public String getWasbAbsolutePath() {
+        return this.wasbAbsolutePath;
+      }
 
-    public String getAccessType() {
-      return accessType;
-    }
-  }
+      public String getAccessType() {
+        return accessType;
+      }
+
+      public String getUser() {
+        return user;
+      }
+   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a530e7ab/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestNativeAzureFileSystemAuthorization.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestNativeAzureFileSystemAuthorization.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestNativeAzureFileSystemAuthorization.java
index 4bf6f04..2129b33 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestNativeAzureFileSystemAuthorization.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestNativeAzureFileSystemAuthorization.java
@@ -19,12 +19,14 @@
 package org.apache.hadoop.fs.azure;
 
 import java.security.PrivilegedExceptionAction;
+import java.io.IOException;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.contract.ContractTestUtils;
+import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.StringUtils;
 
@@ -47,6 +49,13 @@ public class TestNativeAzureFileSystemAuthorization
   @VisibleForTesting
   protected MockWasbAuthorizerImpl authorizer;
 
+  @VisibleForTesting
+  protected static final short STICKYBIT_PERMISSION_CONSTANT = 1700;
+  @VisibleForTesting
+  protected static final String READ = WasbAuthorizationOperations.READ.toString();
+  @VisibleForTesting
+  protected static final String WRITE = WasbAuthorizationOperations.WRITE.toString();
+
   @Override
   public Configuration createConfiguration() {
     Configuration conf = super.createConfiguration();
@@ -80,14 +89,16 @@ public class TestNativeAzureFileSystemAuthorization
   /**
    * Setup up permissions to allow a recursive delete for cleanup purposes.
    */
-  protected void allowRecursiveDelete(NativeAzureFileSystem fs, String path) {
+  protected void allowRecursiveDelete(NativeAzureFileSystem fs, String path)
+      throws IOException {
 
     int index = path.lastIndexOf('/');
     String parent = (index == 0) ? "/" : path.substring(0, index);
 
     authorizer.deleteAllAuthRules();
-    authorizer.addAuthRule(parent, WasbAuthorizationOperations.WRITE.toString(), true);
-    authorizer.addAuthRule((path.endsWith("*") ? path : path+"*"), WasbAuthorizationOperations.WRITE.toString(), true);
+    authorizer.addAuthRule(parent, WRITE, getCurrentUserShortName(), true);
+    authorizer.addAuthRule((path.endsWith("*") ? path : path+"*"), WRITE,
+        getCurrentUserShortName(), true);
     fs.updateWasbAuthorizer(authorizer);
   }
 
@@ -101,6 +112,13 @@ public class TestNativeAzureFileSystemAuthorization
   }
 
   /**
+   * get current user short name for user context
+   */
+  protected String getCurrentUserShortName() throws IOException {
+    return UserGroupInformation.getCurrentUser().getShortUserName();
+  }
+
+  /**
    * Positive test to verify Create access check.
    * The file is created directly under an existing folder.
    * No intermediate folders need to be created.
@@ -112,7 +130,7 @@ public class TestNativeAzureFileSystemAuthorization
     Path parentDir = new Path("/");
     Path testPath = new Path(parentDir, "test.dat");
 
-    authorizer.addAuthRule("/", WasbAuthorizationOperations.WRITE.toString(), true);
+    authorizer.addAuthRuleForOwner("/", WRITE, true);
     fs.updateWasbAuthorizer(authorizer);
 
     try {
@@ -137,7 +155,7 @@ public class TestNativeAzureFileSystemAuthorization
     Path parentDir = new Path("/testCreateAccessCheckPositive/1/2/3");
     Path testPath = new Path(parentDir, "test.dat");
 
-    authorizer.addAuthRule("/", WasbAuthorizationOperations.WRITE.toString(), true);
+    authorizer.addAuthRuleForOwner("/", WRITE, true);
     fs.updateWasbAuthorizer(authorizer);
 
     try {
@@ -164,7 +182,7 @@ public class TestNativeAzureFileSystemAuthorization
 
     setExpectedFailureMessage("create", testPath);
 
-    authorizer.addAuthRule("/", WasbAuthorizationOperations.WRITE.toString(), true);
+    authorizer.addAuthRuleForOwner("/", WRITE, true);
     fs.updateWasbAuthorizer(authorizer);
 
     try {
@@ -188,8 +206,8 @@ public class TestNativeAzureFileSystemAuthorization
     Path parentDir = new Path("/");
     Path testPath = new Path(parentDir, "test.dat");
 
-    authorizer.addAuthRule("/", WasbAuthorizationOperations.WRITE.toString(), true);
-    authorizer.addAuthRule(testPath.toString(), WasbAuthorizationOperations.WRITE.toString(), true);
+    authorizer.addAuthRuleForOwner("/", WRITE, true);
+    authorizer.addAuthRuleForOwner(testPath.toString(), WRITE, true);
     fs.updateWasbAuthorizer(authorizer);
 
     try {
@@ -215,7 +233,7 @@ public class TestNativeAzureFileSystemAuthorization
 
     setExpectedFailureMessage("create", testPath);
 
-    authorizer.addAuthRule("/", WasbAuthorizationOperations.WRITE.toString(), false);
+    authorizer.addAuthRuleForOwner("/", WRITE, false);
     fs.updateWasbAuthorizer(authorizer);
 
     try {
@@ -239,8 +257,8 @@ public class TestNativeAzureFileSystemAuthorization
     Path intermediateFolders = new Path(parentDir, "1/2/3/");
     Path testPath = new Path(intermediateFolders, "test.dat");
 
-    authorizer.addAuthRule("/", WasbAuthorizationOperations.WRITE.toString(), true);
-    authorizer.addAuthRule(testPath.toString(), WasbAuthorizationOperations.READ.toString(), true);
+    authorizer.addAuthRuleForOwner("/", WRITE, true);
+    authorizer.addAuthRuleForOwner(testPath.toString(), READ, true);
     fs.updateWasbAuthorizer(authorizer);
 
     try {
@@ -266,8 +284,8 @@ public class TestNativeAzureFileSystemAuthorization
 
     setExpectedFailureMessage("liststatus", testPath);
 
-    authorizer.addAuthRule("/", WasbAuthorizationOperations.WRITE.toString(), true);
-    authorizer.addAuthRule(testPath.toString(), WasbAuthorizationOperations.READ.toString(), false);
+    authorizer.addAuthRuleForOwner("/", WRITE, true);
+    authorizer.addAuthRuleForOwner(testPath.toString(), READ, false);
     fs.updateWasbAuthorizer(authorizer);
 
     try {
@@ -291,8 +309,10 @@ public class TestNativeAzureFileSystemAuthorization
     Path srcPath = new Path(parentDir, "test1.dat");
     Path dstPath = new Path(parentDir, "test2.dat");
 
-    authorizer.addAuthRule("/", WasbAuthorizationOperations.WRITE.toString(), true); /* to create parentDir */
-    authorizer.addAuthRule(parentDir.toString(), WasbAuthorizationOperations.WRITE.toString(), true); /* for rename */
+    /* to create parentDir */
+    authorizer.addAuthRuleForOwner("/", WRITE, true);
+    /* for rename */
+    authorizer.addAuthRuleForOwner(parentDir.toString(), WRITE, true);
     fs.updateWasbAuthorizer(authorizer);
 
     try {
@@ -321,8 +341,9 @@ public class TestNativeAzureFileSystemAuthorization
 
     setExpectedFailureMessage("rename", srcPath);
 
-    authorizer.addAuthRule("/", WasbAuthorizationOperations.WRITE.toString(), true); /* to create parent dir */
-    authorizer.addAuthRule(parentDir.toString(), WasbAuthorizationOperations.WRITE.toString(), false);
+    /* to create parent dir */
+    authorizer.addAuthRuleForOwner("/", WRITE, true);
+    authorizer.addAuthRuleForOwner(parentDir.toString(), WRITE, false);
     fs.updateWasbAuthorizer(authorizer);
 
     try {
@@ -352,9 +373,9 @@ public class TestNativeAzureFileSystemAuthorization
 
     setExpectedFailureMessage("rename", dstPath);
 
-    authorizer.addAuthRule("/", WasbAuthorizationOperations.WRITE.toString(), true); /* to create parent dir */
-    authorizer.addAuthRule(parentSrcDir.toString(), WasbAuthorizationOperations.WRITE.toString(), true);
-    authorizer.addAuthRule(parentDstDir.toString(), WasbAuthorizationOperations.WRITE.toString(), false);
+    authorizer.addAuthRuleForOwner("/", WRITE, true); /* to create parent dir */
+    authorizer.addAuthRuleForOwner(parentSrcDir.toString(), WRITE, true);
+    authorizer.addAuthRuleForOwner(parentDstDir.toString(), WRITE, false);
     fs.updateWasbAuthorizer(authorizer);
 
     try {
@@ -381,9 +402,9 @@ public class TestNativeAzureFileSystemAuthorization
     Path parentDstDir = new Path("/testRenameAccessCheckPositiveDst");
     Path dstPath = new Path(parentDstDir, "test2.dat");
 
-    authorizer.addAuthRule("/", WasbAuthorizationOperations.WRITE.toString(), true); /* to create parent dirs */
-    authorizer.addAuthRule(parentSrcDir.toString(), WasbAuthorizationOperations.WRITE.toString(), true);
-    authorizer.addAuthRule(parentDstDir.toString(), WasbAuthorizationOperations.WRITE.toString(), true);
+    authorizer.addAuthRuleForOwner("/", WRITE, true); /* to create parent dirs */
+    authorizer.addAuthRuleForOwner(parentSrcDir.toString(), WRITE, true);
+    authorizer.addAuthRuleForOwner(parentDstDir.toString(), WRITE, true);
     fs.updateWasbAuthorizer(authorizer);
 
     try {
@@ -412,8 +433,8 @@ public class TestNativeAzureFileSystemAuthorization
     Path parentDir = new Path("/testReadAccessCheckPositive");
     Path testPath = new Path(parentDir, "test.dat");
 
-    authorizer.addAuthRule("/", WasbAuthorizationOperations.WRITE.toString(), true);
-    authorizer.addAuthRule(testPath.toString(), WasbAuthorizationOperations.READ.toString(), true);
+    authorizer.addAuthRuleForOwner("/", WRITE, true);
+    authorizer.addAuthRuleForOwner(testPath.toString(), READ, true);
     fs.updateWasbAuthorizer(authorizer);
 
     FSDataInputStream inputStream = null;
@@ -453,8 +474,8 @@ public class TestNativeAzureFileSystemAuthorization
 
     setExpectedFailureMessage("read", testPath);
 
-    authorizer.addAuthRule("/", WasbAuthorizationOperations.WRITE.toString(), true);
-    authorizer.addAuthRule(testPath.toString(), WasbAuthorizationOperations.READ.toString(), false);
+    authorizer.addAuthRuleForOwner("/", WRITE, true);
+    authorizer.addAuthRuleForOwner(testPath.toString(), READ, false);
     fs.updateWasbAuthorizer(authorizer);
 
     FSDataInputStream inputStream = null;
@@ -490,7 +511,7 @@ public class TestNativeAzureFileSystemAuthorization
     Path parentDir = new Path("/");
     Path testPath = new Path(parentDir, "test.dat");
 
-    authorizer.addAuthRule("/", WasbAuthorizationOperations.WRITE.toString(), true);
+    authorizer.addAuthRuleForOwner("/", WRITE, true);
     fs.updateWasbAuthorizer(authorizer);
     try {
       fs.create(testPath);
@@ -514,7 +535,7 @@ public class TestNativeAzureFileSystemAuthorization
 
     setExpectedFailureMessage("delete", testPath);
 
-    authorizer.addAuthRule("/", WasbAuthorizationOperations.WRITE.toString(), true);
+    authorizer.addAuthRuleForOwner("/", WRITE, true);
     fs.updateWasbAuthorizer(authorizer);
     try {
       fs.create(testPath);
@@ -523,7 +544,7 @@ public class TestNativeAzureFileSystemAuthorization
 
       /* Remove permissions for delete to force failure */
       authorizer.deleteAllAuthRules();
-      authorizer.addAuthRule("/", WasbAuthorizationOperations.WRITE.toString(), false);
+      authorizer.addAuthRuleForOwner("/", WRITE, false);
       fs.updateWasbAuthorizer(authorizer);
 
       fs.delete(testPath, false);
@@ -531,7 +552,7 @@ public class TestNativeAzureFileSystemAuthorization
     finally {
       /* Restore permissions to force a successful delete */
       authorizer.deleteAllAuthRules();
-      authorizer.addAuthRule("/", WasbAuthorizationOperations.WRITE.toString(), true);
+      authorizer.addAuthRuleForOwner("/", WRITE, true);
       fs.updateWasbAuthorizer(authorizer);
 
       fs.delete(testPath, false);
@@ -550,9 +571,9 @@ public class TestNativeAzureFileSystemAuthorization
     Path parentDir = new Path("/testDeleteIntermediateFolder");
     Path testPath = new Path(parentDir, "1/2/test.dat");
 
-    authorizer.addAuthRule("/", WasbAuthorizationOperations.WRITE.toString(), true); // for create and delete
-    authorizer.addAuthRule("/testDeleteIntermediateFolder*",
-        WasbAuthorizationOperations.WRITE.toString(), true); // for recursive delete
+    authorizer.addAuthRuleForOwner("/", WRITE, true); // for create and delete
+    authorizer.addAuthRuleForOwner("/testDeleteIntermediateFolder*",
+        WRITE, true); // for recursive delete
     fs.updateWasbAuthorizer(authorizer);
 
     try {
@@ -568,6 +589,365 @@ public class TestNativeAzureFileSystemAuthorization
   }
 
   /**
+   * Test to verify access check failure leaves intermediate folders undeleted.
+   * @throws Throwable
+   */
+  @Test
+  public void testDeleteAuthCheckFailureLeavesFilesUndeleted() throws Throwable {
+
+    Path parentDir = new Path("/testDeleteAuthCheckFailureLeavesFilesUndeleted");
+    Path testPath1 = new Path(parentDir, "child1/test.dat");
+    Path testPath2 = new Path(parentDir, "child2/test.dat");
+
+    authorizer.addAuthRuleForOwner("/", WRITE, true);
+    authorizer.addAuthRuleForOwner("/testDeleteAuthCheckFailureLeavesFilesUndeleted*",
+        WRITE, true);
+    fs.updateWasbAuthorizer(authorizer);
+
+    try {
+      fs.create(testPath1);
+      fs.create(testPath2);
+      ContractTestUtils.assertPathExists(fs, "testPath1 was not created", testPath1);
+      ContractTestUtils.assertPathExists(fs, "testPath2 was not created", testPath2);
+
+      // revoke write on one of the child folders
+      authorizer.deleteAllAuthRules();
+      authorizer.addAuthRuleForOwner("/", WRITE, true);
+      authorizer.addAuthRuleForOwner("/testDeleteAuthCheckFailureLeavesFilesUndeleted",
+        WRITE, true);
+      authorizer.addAuthRuleForOwner("/testDeleteAuthCheckFailureLeavesFilesUndeleted/child2",
+        WRITE, true);
+      authorizer.addAuthRuleForOwner("/testDeleteAuthCheckFailureLeavesFilesUndeleted/child1",
+          WRITE, false);
+
+      assertFalse(fs.delete(parentDir, true));
+
+      // Assert that only child2 contents are deleted
+      ContractTestUtils.assertPathExists(fs, "child1 is deleted!", testPath1);
+      ContractTestUtils.assertPathDoesNotExist(fs, "child2 exists after deletion!", testPath2);
+      ContractTestUtils.assertPathDoesNotExist(fs, "child2 exists after deletion!",
+          new Path("/testDeleteAuthCheckFailureLeavesFilesUndeleted/childPath2"));
+      ContractTestUtils.assertPathExists(fs, "parentDir is deleted!", parentDir);
+
+    }
+    finally {
+      allowRecursiveDelete(fs, parentDir.toString());
+      fs.delete(parentDir, true);
+    }
+  }
+
+  /**
+   * Positive test to verify file delete with sticky bit on parent.
+   * @throws Throwable
+   */
+  @Test
+  public void testSingleFileDeleteWithStickyBitPositive() throws Throwable {
+
+    Path parentDir = new Path("/testSingleFileDeleteWithStickyBitPositive");
+    Path testPath = new Path(parentDir, "test.dat");
+
+    authorizer.addAuthRuleForOwner("/", WRITE, true);
+    authorizer.addAuthRuleForOwner("/testSingleFileDeleteWithStickyBitPositive",
+        WRITE, true);
+    fs.updateWasbAuthorizer(authorizer);
+
+    try {
+      fs.create(testPath);
+      ContractTestUtils.assertPathExists(fs, "testPath was not created", testPath);
+
+      // set stickybit on parent directory
+      fs.setPermission(parentDir, new FsPermission(STICKYBIT_PERMISSION_CONSTANT));
+
+      assertTrue(fs.delete(testPath, true));
+      ContractTestUtils.assertPathDoesNotExist(fs,
+        "testPath exists after deletion!", testPath);
+    }
+    finally {
+      allowRecursiveDelete(fs, parentDir.toString());
+      fs.delete(parentDir, true);
+    }
+  }
+
+  /**
+   * Negative test to verify file delete fails when sticky bit is set on parent
+   * and non-owner user performs delete
+   * @throws Throwable
+   */
+  @Test
+  public void testSingleFileDeleteWithStickyBitNegative() throws Throwable {
+
+    Path parentDir = new Path("/testSingleFileDeleteWithStickyBitNegative");
+    Path testPath = new Path(parentDir, "test.dat");
+
+    expectedEx.expect(WasbAuthorizationException.class);
+    expectedEx.expectMessage(String.format("%s has sticky bit set. File %s cannot be deleted.",
+        parentDir.toString(), testPath.toString()));
+
+    authorizer.addAuthRuleForOwner("/", WRITE, true);
+    authorizer.addAuthRuleForOwner("/testSingleFileDeleteWithStickyBitNegative",
+        WRITE, true);
+    fs.updateWasbAuthorizer(authorizer);
+
+    try {
+      fs.create(testPath);
+      ContractTestUtils.assertPathExists(fs, "testPath was not created", testPath);
+      // set stickybit on parent directory
+      fs.setPermission(parentDir, new FsPermission(STICKYBIT_PERMISSION_CONSTANT));
+
+      UserGroupInformation dummyUser = UserGroupInformation.createUserForTesting(
+          "dummyUser", new String[] {"dummygroup"});
+
+      dummyUser.doAs(new PrivilegedExceptionAction<Void>() {
+        @Override
+        public Void run() throws Exception {
+          authorizer.addAuthRule(parentDir.toString(), WRITE,
+              getCurrentUserShortName(), true);
+          fs.delete(testPath, true);
+          return null;
+        }
+      });
+    }
+    finally {
+      ContractTestUtils.assertPathExists(fs, "testPath should not be deleted!", testPath);
+
+      allowRecursiveDelete(fs, parentDir.toString());
+      fs.delete(parentDir, true);
+    }
+  }
+
+  /**
+   * Positive test to verify file and folder delete succeeds with stickybit
+   * when the owner of the files deletes the file.
+   * @throws Throwable
+   */
+  @Test
+  public void testRecursiveDeleteSucceedsWithStickybit() throws Throwable {
+
+    Path parentDir = new Path("/testRecursiveDeleteSucceedsWithStickybit");
+    Path testFilePath = new Path(parentDir, "child/test.dat");
+    Path testFolderPath = new Path(parentDir, "child/testDirectory");
+
+    authorizer.addAuthRuleForOwner("/", WRITE, true);
+    authorizer.addAuthRuleForOwner("/testRecursiveDeleteSucceedsWithStickybit*",
+        WRITE, true);
+    fs.updateWasbAuthorizer(authorizer);
+
+    try {
+      fs.create(testFilePath);
+      ContractTestUtils.assertPathExists(fs, "file was not created", testFilePath);
+      fs.mkdirs(testFolderPath);
+      ContractTestUtils.assertPathExists(fs, "folder was not created", testFolderPath);
+      // set stickybit on child directory
+      fs.setPermission(new Path(parentDir, "child"),
+        new FsPermission(STICKYBIT_PERMISSION_CONSTANT));
+      // perform delete as owner of the files
+      assertTrue(fs.delete(parentDir, true));
+      ContractTestUtils.assertPathDoesNotExist(fs, "parentDir exists after deletion!", parentDir);
+    }
+    finally {
+      allowRecursiveDelete(fs, parentDir.toString());
+      fs.delete(parentDir, true);
+    }
+  }
+
+  /**
+   * Test to verify delete fails for child files and folders when
+   * non-owner user performs delete and stickybit is set on parent
+   * @throws Throwable
+   */
+  @Test
+  public void testRecursiveDeleteFailsWithStickybit() throws Throwable {
+
+    Path parentDir = new Path("/testRecursiveDeleteFailsWithStickybit");
+    Path testFilePath = new Path(parentDir, "child/test.dat");
+    Path testFolderPath = new Path(parentDir, "child/testDirectory");
+
+    authorizer.addAuthRuleForOwner("/", WRITE, true);
+    authorizer.addAuthRuleForOwner("/testRecursiveDeleteFailsWithStickybit*",
+        WRITE, true);
+    fs.updateWasbAuthorizer(authorizer);
+
+    try {
+      fs.create(testFilePath);
+      ContractTestUtils.assertPathExists(fs, "file was not created", testFilePath);
+      fs.mkdirs(testFolderPath);
+      ContractTestUtils.assertPathExists(fs, "folder was not created", testFolderPath);
+
+      // set stickybit on child directory
+      fs.setPermission(new Path(parentDir, "child"),
+        new FsPermission(STICKYBIT_PERMISSION_CONSTANT));
+
+      UserGroupInformation dummyUser = UserGroupInformation.createUserForTesting(
+          "dummyUser", new String[] {"dummygroup"});
+
+      dummyUser.doAs(new PrivilegedExceptionAction<Void>() {
+        @Override
+        public Void run() throws Exception {
+          // Add auth rules for dummyuser
+          authorizer.addAuthRule("/", WRITE,
+              getCurrentUserShortName(), true);
+          authorizer.addAuthRule("/testRecursiveDeleteFailsWithStickybit*",
+              WRITE, getCurrentUserShortName(), true);
+
+          assertFalse(fs.delete(parentDir, true));
+          return null;
+        }
+      });
+
+      ContractTestUtils.assertPathExists(fs, "parentDir is deleted!", parentDir);
+      ContractTestUtils.assertPathExists(fs, "file is deleted!", testFilePath);
+      ContractTestUtils.assertPathExists(fs, "folder is deleted!", testFolderPath);
+    }
+    finally {
+      allowRecursiveDelete(fs, parentDir.toString());
+      fs.delete(parentDir, true);
+    }
+  }
+
+  /**
+   * Test delete scenario where sticky bit check leaves files/folders not owned
+   * by a specific user intact and the files owned by him/her are deleted
+   * @throws Throwable
+   */
+  @Test
+  public void testDeleteSucceedsForOnlyFilesOwnedByUserWithStickybitSet()
+    throws Throwable {
+
+    Path parentDir = new Path("/testDeleteSucceedsForOnlyFilesOwnedByUserWithStickybitSet");
+    Path testFilePath = new Path(parentDir, "test.dat");
+    Path testFolderPath = new Path(parentDir, "testDirectory");
+
+    authorizer.addAuthRuleForOwner("/", WRITE, true);
+    authorizer.addAuthRuleForOwner(
+        "/testDeleteSucceedsForOnlyFilesOwnedByUserWithStickybitSet*",
+        WRITE, true);
+    fs.updateWasbAuthorizer(authorizer);
+
+    try {
+      fs.create(testFilePath);
+      ContractTestUtils.assertPathExists(fs, "file was not created", testFilePath);
+
+      fs.setPermission(parentDir, new FsPermission(STICKYBIT_PERMISSION_CONSTANT));
+
+      UserGroupInformation dummyUser = UserGroupInformation.createUserForTesting(
+          "dummyuser", new String[] {"dummygroup"});
+      dummyUser.doAs(new PrivilegedExceptionAction<Void>() {
+        @Override
+        public Void run() throws Exception {
+          authorizer.addAuthRule("/", WRITE,
+              getCurrentUserShortName(), true);
+          authorizer.addAuthRule("/testDeleteSucceedsForOnlyFilesOwnedByUserWithStickybitSet*",
+              WRITE, getCurrentUserShortName(), true);
+
+          fs.create(testFolderPath); // the folder will have owner as dummyuser
+          ContractTestUtils.assertPathExists(fs, "folder was not created", testFolderPath);
+          assertFalse(fs.delete(parentDir, true));
+
+          ContractTestUtils.assertPathDoesNotExist(fs, "folder should have been deleted!",
+            testFolderPath);
+          ContractTestUtils.assertPathExists(fs, "parentDir is deleted!", parentDir);
+          ContractTestUtils.assertPathExists(fs, "file is deleted!", testFilePath);
+          return null;
+        }
+      });
+    }
+    finally {
+      allowRecursiveDelete(fs, parentDir.toString());
+      fs.delete(parentDir, true);
+    }
+  }
+
+  /**
+   * Test delete scenario where sticky bit is set and the owner of parent
+   * directory can delete child files/folders which he does not own.
+   * This is according to the sticky bit behaviour specified in hdfs permission
+   * guide which is as follows - The sticky bit can be set on directories,
+   * preventing anyone except the superuser, directory owner or file owner
+   * from deleting or moving the files within the directory
+   * @throws Throwable
+   */
+  @Test
+  public void testDeleteSucceedsForParentDirectoryOwnerUserWithStickybit() throws Throwable {
+
+    Path parentDir = new Path("/testDeleteSucceedsForParentDirectoryOwnerUserWithStickybit");
+    Path testFilePath = new Path(parentDir, "test.dat");
+
+    authorizer.addAuthRuleForOwner("/", WRITE, true);
+    authorizer.addAuthRuleForOwner(
+        "/testDeleteSucceedsForParentDirectoryOwnerUserWithStickybit*",
+        WRITE, true);
+    fs.updateWasbAuthorizer(authorizer);
+
+    try {
+      // create folder with owner as current user
+      fs.mkdirs(parentDir);
+      ContractTestUtils.assertPathExists(fs, "folder was not created", parentDir);
+
+      // create child with owner as dummyUser
+      UserGroupInformation dummyUser = UserGroupInformation.createUserForTesting(
+          "dummyUser", new String[] {"dummygroup"});
+      dummyUser.doAs(new PrivilegedExceptionAction<Void>() {
+        @Override
+        public Void run() throws Exception {
+          authorizer.addAuthRule("/testDeleteSucceedsForParentDirectoryOwnerUserWithStickybit",
+              WRITE, getCurrentUserShortName(), true);
+          fs.create(testFilePath);
+          ContractTestUtils.assertPathExists(fs, "file was not created", testFilePath);
+
+          fs.setPermission(parentDir,
+            new FsPermission(STICKYBIT_PERMISSION_CONSTANT));
+          return null;
+        }
+      });
+
+      // invoke delete as current user
+      assertTrue(fs.delete(parentDir, true));
+      ContractTestUtils.assertPathDoesNotExist(fs, "parentDir is not deleted!", parentDir);
+      ContractTestUtils.assertPathDoesNotExist(fs, "file is not deleted!", testFilePath);
+    }
+    finally {
+      allowRecursiveDelete(fs, parentDir.toString());
+      fs.delete(parentDir, true);
+    }
+  }
+
+  /**
+   * Test to verify delete of root succeeds with proper permissions and
+   * leaves root after delete.
+   * @throws Throwable
+   */
+  @Test
+  public void testDeleteScenarioForRoot() throws Throwable {
+    Path rootPath = new Path("/");
+    Path parentDir = new Path("/testDeleteScenarioForRoot");
+    Path testPath1 = new Path(parentDir, "child1/test.dat");
+    Path testPath2 = new Path(parentDir, "child2/testFolder");
+
+    authorizer.addAuthRuleForOwner("/", WRITE, true);
+    authorizer.addAuthRuleForOwner("/testDeleteScenarioForRoot*",
+            WRITE, true);
+    fs.updateWasbAuthorizer(authorizer);
+
+    try {
+      fs.create(testPath1);
+      fs.create(testPath2);
+      ContractTestUtils.assertPathExists(fs, "testPath1 was not created", testPath1);
+      ContractTestUtils.assertPathExists(fs, "testPath2 was not created", testPath2);
+
+      assertFalse(fs.delete(rootPath, true));
+
+      ContractTestUtils.assertPathDoesNotExist(fs, "file exists after deletion!", testPath1);
+      ContractTestUtils.assertPathDoesNotExist(fs, "folder exists after deletion!", testPath2);
+      ContractTestUtils.assertPathDoesNotExist(fs, "parentDir exists after deletion!", parentDir);
+      ContractTestUtils.assertPathExists(fs, "Root should not have been deleted!", rootPath);
+    }
+    finally {
+      allowRecursiveDelete(fs, parentDir.toString());
+      fs.delete(parentDir, true);
+    }
+  }
+
+  /**
    * Positive test for getFileStatus. No permissions are required for getting filestatus.
    * @throws Throwable
    */
@@ -587,7 +967,7 @@ public class TestNativeAzureFileSystemAuthorization
 
     Path testPath = new Path("/testMkdirsAccessCheckPositive/1/2/3");
 
-    authorizer.addAuthRule("/", WasbAuthorizationOperations.WRITE.toString(), true);
+    authorizer.addAuthRuleForOwner("/", WRITE, true);
     fs.updateWasbAuthorizer(authorizer);
 
     try {
@@ -609,7 +989,7 @@ public class TestNativeAzureFileSystemAuthorization
 
     Path testPath = new Path("/testMkdirsWithExistingHierarchyCheckPositive1");
 
-    authorizer.addAuthRule("/", WasbAuthorizationOperations.WRITE.toString(), true);
+    authorizer.addAuthRuleForOwner("/", WRITE, true);
     fs.updateWasbAuthorizer(authorizer);
 
     try {
@@ -636,11 +1016,11 @@ public class TestNativeAzureFileSystemAuthorization
     Path childPath2 = new Path(childPath1, "2");
     Path childPath3 = new Path(childPath2, "3");
 
-    authorizer.addAuthRule("/",
-        WasbAuthorizationOperations.WRITE.toString(), true);
+    authorizer.addAuthRuleForOwner("/",
+        WRITE, true);
 
-    authorizer.addAuthRule(childPath1.toString(),
-        WasbAuthorizationOperations.WRITE.toString(), true);
+    authorizer.addAuthRuleForOwner(childPath1.toString(),
+        WRITE, true);
 
     fs.updateWasbAuthorizer(authorizer);
 
@@ -675,7 +1055,7 @@ public class TestNativeAzureFileSystemAuthorization
 
     setExpectedFailureMessage("mkdirs", testPath);
 
-    authorizer.addAuthRule("/", WasbAuthorizationOperations.WRITE.toString(), false);
+    authorizer.addAuthRuleForOwner("/", WRITE, false);
     fs.updateWasbAuthorizer(authorizer);
 
     try {
@@ -697,13 +1077,96 @@ public class TestNativeAzureFileSystemAuthorization
 
     Path testPath = new Path("/");
 
-    authorizer.addAuthRule(testPath.toString(), WasbAuthorizationOperations.READ.toString(), true);
+    authorizer.addAuthRuleForOwner(testPath.toString(), READ, true);
     fs.updateWasbAuthorizer(authorizer);
 
     Path testPathWithTripleSlash = new Path("wasb:///" + testPath);
     fs.listStatus(testPathWithTripleSlash);
   }
 
+    /**
+   * Test case when owner matches current user
+   */
+  @Test
+  public void testOwnerPermissionPositive() throws Throwable {
+
+    Path parentDir = new Path("/testOwnerPermissionPositive");
+    Path testPath = new Path(parentDir, "test.data");
+
+    authorizer.addAuthRuleForOwner("/", WRITE, true);
+    authorizer.addAuthRuleForOwner(testPath.toString(), READ, true);
+    authorizer.addAuthRuleForOwner(parentDir.toString(), WRITE, true);
+    // additional rule used for assertPathExists
+    authorizer.addAuthRuleForOwner(parentDir.toString(), READ, true);
+    fs.updateWasbAuthorizer(authorizer);
+
+    try {
+      // creates parentDir with owner as current user
+      fs.mkdirs(parentDir);
+      ContractTestUtils.assertPathExists(fs, "parentDir does not exist", parentDir);
+
+      fs.create(testPath);
+      fs.getFileStatus(testPath);
+      ContractTestUtils.assertPathExists(fs, "testPath does not exist", testPath);
+
+      fs.delete(parentDir, true);
+      ContractTestUtils.assertPathDoesNotExist(fs, "testPath does not exist", testPath);
+
+    } finally {
+      allowRecursiveDelete(fs, parentDir.toString());
+      fs.delete(parentDir, true);
+    }
+  }
+
+  /**
+   * Negative test case for owner does not match current user
+   */
+  @Test
+  public void testOwnerPermissionNegative() throws Throwable {
+
+    Path parentDir = new Path("/testOwnerPermissionNegative");
+    Path childDir = new Path(parentDir, "childDir");
+
+    setExpectedFailureMessage("mkdirs", childDir);
+
+    authorizer.addAuthRuleForOwner("/", WRITE, true);
+    authorizer.addAuthRuleForOwner(parentDir.toString(), WRITE, true);
+
+    fs.updateWasbAuthorizer(authorizer);
+
+    try{
+      fs.mkdirs(parentDir);
+      UserGroupInformation ugiSuperUser = UserGroupInformation.createUserForTesting(
+          "testuser", new String[] {});
+
+      ugiSuperUser.doAs(new PrivilegedExceptionAction<Void>() {
+      @Override
+      public Void run() throws Exception {
+          fs.mkdirs(childDir);
+          return null;
+        }
+      });
+
+    } finally {
+       allowRecursiveDelete(fs, parentDir.toString());
+       fs.delete(parentDir, true);
+    }
+  }
+
+  /**
+   * Test to verify that retrieving owner information does not
+   * throw when file/folder does not exist
+   */
+  @Test
+  public void testRetrievingOwnerDoesNotFailWhenFileDoesNotExist()
+    throws Throwable {
+
+    Path testdirectory = new Path("/testDirectory123454565");
+
+    String owner = fs.getOwnerForPath(testdirectory);
+    assertEquals("", owner);
+  }
+
   /**
    * Negative test for setOwner when Authorization is enabled.
    */
@@ -714,7 +1177,7 @@ public class TestNativeAzureFileSystemAuthorization
 
     Path testPath = new Path("/testSetOwnerNegative");
 
-    authorizer.addAuthRule("/", WasbAuthorizationOperations.WRITE.toString(), true);
+    authorizer.addAuthRuleForOwner("/", WRITE, true);
     fs.updateWasbAuthorizer(authorizer);
 
     String owner = null;
@@ -748,7 +1211,7 @@ public class TestNativeAzureFileSystemAuthorization
 
     Path testPath = new Path("/testSetOwnerPositive");
 
-    authorizer.addAuthRule("/", WasbAuthorizationOperations.WRITE.toString(), true);
+    authorizer.addAuthRuleForOwner("/", WRITE, true);
     fs.updateWasbAuthorizer(authorizer);
 
     String newOwner = "newowner";
@@ -793,7 +1256,7 @@ public class TestNativeAzureFileSystemAuthorization
     Path testPath = new Path("/testSetOwnerPositiveWildcard");
 
     authorizer.init(conf);
-    authorizer.addAuthRule("/", WasbAuthorizationOperations.WRITE.toString(), true);
+    authorizer.addAuthRuleForOwner("/", WRITE, true);
     fs.updateWasbAuthorizer(authorizer);
 
     String newOwner = "newowner";
@@ -839,7 +1302,7 @@ public class TestNativeAzureFileSystemAuthorization
     Path testPath = new Path("/testSetOwnerFailsForIllegalSetup");
 
     authorizer.init(conf);
-    authorizer.addAuthRule("/", WasbAuthorizationOperations.WRITE.toString(), true);
+    authorizer.addAuthRuleForOwner("/", WRITE, true);
     fs.updateWasbAuthorizer(authorizer);
 
     String owner = null;


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org