You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by jh...@apache.org on 2017/04/20 21:53:34 UTC

[16/50] hadoop git commit: HADOOP-14274. Azure: Simplify Ranger-WASB policy model. Contributed by Sivaguru Sankaridurg

HADOOP-14274. Azure: Simplify Ranger-WASB policy model. Contributed by Sivaguru Sankaridurg


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/0cab5722
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/0cab5722
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/0cab5722

Branch: refs/heads/YARN-5734
Commit: 0cab57223e3f54be17a5f27cefdb6d1da1b073e5
Parents: b053fdc
Author: Mingliang Liu <li...@apache.org>
Authored: Wed Apr 12 16:07:10 2017 -0700
Committer: Mingliang Liu <li...@apache.org>
Committed: Wed Apr 12 16:07:10 2017 -0700

----------------------------------------------------------------------
 .../hadoop/fs/azure/NativeAzureFileSystem.java  | 129 +++--
 .../fs/azure/RemoteWasbAuthorizerImpl.java      |  11 +-
 .../fs/azure/WasbAuthorizationOperations.java   |   2 -
 .../hadoop/fs/azure/MockWasbAuthorizerImpl.java |  22 +-
 .../TestNativeAzureFileSystemAuthorization.java | 554 ++++++++++++++++---
 .../fs/azure/TestWasbRemoteCallHelper.java      |   6 +-
 .../TestAzureFileSystemInstrumentation.java     |   3 +-
 7 files changed, 603 insertions(+), 124 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/0cab5722/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java
index 5469944..e06522b 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java
@@ -1426,13 +1426,20 @@ public class NativeAzureFileSystem extends FileSystem {
     return store;
   }
 
-  private void performAuthCheck(String path, String accessType,
-      String operation) throws WasbAuthorizationException, IOException {
+  /**
+   * @param requestingAccessForPath - The path to the ancestor/parent/subtree/file that needs to be
+   *                                checked before granting access to originalPath
+   * @param accessType - The type of access READ/WRITE being requested
+   * @param operation - A string describing the operation being performed ("delete", "create" etc.).
+   * @param originalPath - The originalPath that was being accessed
+   */
+  private void performAuthCheck(String requestingAccessForPath, WasbAuthorizationOperations accessType,
+      String operation, String originalPath) throws WasbAuthorizationException, IOException {
 
     if (azureAuthorization && this.authorizer != null &&
-        !this.authorizer.authorize(path, accessType)) {
+        !this.authorizer.authorize(requestingAccessForPath, accessType.toString())) {
       throw new WasbAuthorizationException(operation
-          + " operation for Path : " + path + " not allowed");
+          + " operation for Path : " + originalPath + " not allowed");
     }
   }
 
@@ -1459,8 +1466,7 @@ public class NativeAzureFileSystem extends FileSystem {
 
     Path absolutePath = makeAbsolute(f);
 
-    performAuthCheck(absolutePath.toString(),
-        WasbAuthorizationOperations.WRITE.toString(), "append");
+    performAuthCheck(absolutePath.toString(), WasbAuthorizationOperations.WRITE, "append", absolutePath.toString());
 
     String key = pathToKey(absolutePath);
     FileMetadata meta = null;
@@ -1663,9 +1669,9 @@ public class NativeAzureFileSystem extends FileSystem {
     }
 
     Path absolutePath = makeAbsolute(f);
+    Path ancestor = getAncestor(absolutePath);
 
-    performAuthCheck(absolutePath.toString(),
-        WasbAuthorizationOperations.WRITE.toString(), "create");
+    performAuthCheck(ancestor.toString(), WasbAuthorizationOperations.WRITE, "create", absolutePath.toString());
 
     String key = pathToKey(absolutePath);
 
@@ -1678,6 +1684,9 @@ public class NativeAzureFileSystem extends FileSystem {
       if (!overwrite) {
         throw new FileAlreadyExistsException("File already exists:" + f);
       }
+      else {
+        performAuthCheck(absolutePath.toString(), WasbAuthorizationOperations.WRITE, "create", absolutePath.toString());
+      }
     }
 
     Path parentFolder = absolutePath.getParent();
@@ -1768,7 +1777,7 @@ public class NativeAzureFileSystem extends FileSystem {
 
   /**
    * Delete the specified file or folder. The parameter
-   * skipParentFolderLastModifidedTimeUpdate
+   * skipParentFolderLastModifiedTimeUpdate
    * is used in the case of atomic folder rename redo. In that case, there is
    * a lease on the parent folder, so (without reworking the code) modifying
    * the parent folder update time will fail because of a conflict with the
@@ -1778,20 +1787,20 @@ public class NativeAzureFileSystem extends FileSystem {
    *
    * @param f file path to be deleted.
    * @param recursive specify deleting recursively or not.
-   * @param skipParentFolderLastModifidedTimeUpdate If true, don't update the folder last
+   * @param skipParentFolderLastModifiedTimeUpdate If true, don't update the folder last
    * modified time.
    * @return true if and only if the file is deleted
    * @throws IOException Thrown when fail to delete file or directory.
    */
   public boolean delete(Path f, boolean recursive,
-      boolean skipParentFolderLastModifidedTimeUpdate) throws IOException {
+      boolean skipParentFolderLastModifiedTimeUpdate) throws IOException {
 
     LOG.debug("Deleting file: {}", f.toString());
 
     Path absolutePath = makeAbsolute(f);
+    Path parentPath = absolutePath.getParent();
 
-    performAuthCheck(absolutePath.toString(),
-        WasbAuthorizationOperations.EXECUTE.toString(), "delete");
+    performAuthCheck(parentPath.toString(), WasbAuthorizationOperations.WRITE, "delete", absolutePath.toString());
 
     String key = pathToKey(absolutePath);
 
@@ -1827,7 +1836,7 @@ public class NativeAzureFileSystem extends FileSystem {
       // (e.g. the blob store only contains the blob a/b and there's no
       // corresponding directory blob a) and that would implicitly delete
       // the directory as well, which is not correct.
-      Path parentPath = absolutePath.getParent();
+
       if (parentPath.getParent() != null) {// Not root
         String parentKey = pathToKey(parentPath);
 
@@ -1876,7 +1885,7 @@ public class NativeAzureFileSystem extends FileSystem {
           store.storeEmptyFolder(parentKey,
               createPermissionStatus(FsPermission.getDefault()));
         } else {
-          if (!skipParentFolderLastModifidedTimeUpdate) {
+          if (!skipParentFolderLastModifiedTimeUpdate) {
             updateParentFolderLastModifiedTime(key);
           }
         }
@@ -1903,7 +1912,6 @@ public class NativeAzureFileSystem extends FileSystem {
       // The path specifies a folder. Recursively delete all entries under the
       // folder.
       LOG.debug("Directory Delete encountered: {}", f.toString());
-      Path parentPath = absolutePath.getParent();
       if (parentPath.getParent() != null) {
         String parentKey = pathToKey(parentPath);
         FileMetadata parentMetadata = null;
@@ -1981,12 +1989,30 @@ public class NativeAzureFileSystem extends FileSystem {
 
       final FileMetadata[] contents = fileMetadataList.toArray(new FileMetadata[fileMetadataList.size()]);
 
-      if (!recursive && contents.length > 0) {
+      if (contents.length > 0) {
+        if (!recursive) {
           // The folder is non-empty and recursive delete was not specified.
           // Throw an exception indicating that a non-recursive delete was
           // specified for a non-empty folder.
           throw new IOException("Non-recursive delete of non-empty directory "
               + f.toString());
+        }
+        else {
+          // Check write-permissions on sub-tree including current folder
+          // NOTE: Ideally the subtree needs read-write-execute access check.
+          // But we will simplify it to write-access check.
+          if (metaFile.isDir()) { // the absolute-path
+            performAuthCheck(absolutePath.toString(), WasbAuthorizationOperations.WRITE, "delete",
+                absolutePath.toString());
+          }
+          for (FileMetadata meta : contents) {
+            if (meta.isDir()) {
+              Path subTreeDir = keyToPath(meta.getKey());
+              performAuthCheck(subTreeDir.toString(), WasbAuthorizationOperations.WRITE, "delete",
+                  absolutePath.toString());
+            }
+          }
+        }
       }
 
       // Delete all files / folders in current directory stored as list in 'contents'.
@@ -2014,7 +2040,7 @@ public class NativeAzureFileSystem extends FileSystem {
       // Update parent directory last modified time
       Path parent = absolutePath.getParent();
       if (parent != null && parent.getParent() != null) { // not root
-        if (!skipParentFolderLastModifidedTimeUpdate) {
+        if (!skipParentFolderLastModifiedTimeUpdate) {
           updateParentFolderLastModifiedTime(key);
         }
       }
@@ -2064,8 +2090,8 @@ public class NativeAzureFileSystem extends FileSystem {
     // Capture the absolute path and the path to key.
     Path absolutePath = makeAbsolute(f);
 
-    performAuthCheck(absolutePath.toString(),
-        WasbAuthorizationOperations.EXECUTE.toString(), "getFileStatus");
+    performAuthCheck(absolutePath.toString(), WasbAuthorizationOperations.READ, "getFileStatus",
+        absolutePath.toString());
 
     String key = pathToKey(absolutePath);
     if (key.length() == 0) { // root always exists
@@ -2166,8 +2192,7 @@ public class NativeAzureFileSystem extends FileSystem {
 
     Path absolutePath = makeAbsolute(f);
 
-    performAuthCheck(absolutePath.toString(),
-        WasbAuthorizationOperations.EXECUTE.toString(), "list");
+    performAuthCheck(absolutePath.toString(), WasbAuthorizationOperations.READ, "liststatus", absolutePath.toString());
 
     String key = pathToKey(absolutePath);
     Set<FileStatus> status = new TreeSet<FileStatus>();
@@ -2375,6 +2400,24 @@ public class NativeAzureFileSystem extends FileSystem {
         permission);
   }
 
+  private Path getAncestor(Path f) throws IOException {
+
+    for (Path current = f.getParent(), parent = current.getParent();
+         parent != null; // Stop when you get to the root
+         current = parent, parent = current.getParent()) {
+
+      String currentKey = pathToKey(current);
+      FileMetadata currentMetadata = store.retrieveMetadata(currentKey);
+      if (currentMetadata != null) {
+        Path ancestor = keyToPath(currentMetadata.getKey());
+        LOG.debug("Found ancestor {}, for path: {}", ancestor.toString(), f.toString());
+        return ancestor;
+      }
+    }
+
+    return new Path("/");
+  }
+
   @Override
   public boolean mkdirs(Path f, FsPermission permission) throws IOException {
       return mkdirs(f, permission, false);
@@ -2391,9 +2434,9 @@ public class NativeAzureFileSystem extends FileSystem {
     }
 
     Path absolutePath = makeAbsolute(f);
+    Path ancestor = getAncestor(absolutePath);
 
-    performAuthCheck(absolutePath.toString(),
-        WasbAuthorizationOperations.EXECUTE.toString(), "mkdirs");
+    performAuthCheck(ancestor.toString(), WasbAuthorizationOperations.WRITE, "mkdirs", absolutePath.toString());
 
     PermissionStatus permissionStatus = null;
     if(noUmask) {
@@ -2449,8 +2492,7 @@ public class NativeAzureFileSystem extends FileSystem {
 
     Path absolutePath = makeAbsolute(f);
 
-    performAuthCheck(absolutePath.toString(),
-        WasbAuthorizationOperations.READ.toString(), "read");
+    performAuthCheck(absolutePath.toString(), WasbAuthorizationOperations.READ, "read", absolutePath.toString());
 
     String key = pathToKey(absolutePath);
     FileMetadata meta = null;
@@ -2508,12 +2550,18 @@ public class NativeAzureFileSystem extends FileSystem {
           + " through WASB that has colons in the name");
     }
 
-    Path absolutePath = makeAbsolute(src);
+    Path absoluteSrcPath = makeAbsolute(src);
+    Path srcParentFolder = absoluteSrcPath.getParent();
 
-    performAuthCheck(absolutePath.toString(),
-        WasbAuthorizationOperations.EXECUTE.toString(), "rename");
+    if (srcParentFolder == null) {
+      // Cannot rename root of file system
+      return false;
+    }
+
+    performAuthCheck(srcParentFolder.toString(), WasbAuthorizationOperations.WRITE, "rename",
+        absoluteSrcPath.toString());
 
-    String srcKey = pathToKey(absolutePath);
+    String srcKey = pathToKey(absoluteSrcPath);
 
     if (srcKey.length() == 0) {
       // Cannot rename root of file system
@@ -2521,8 +2569,13 @@ public class NativeAzureFileSystem extends FileSystem {
     }
 
     // Figure out the final destination
-    Path absoluteDst = makeAbsolute(dst);
-    String dstKey = pathToKey(absoluteDst);
+    Path absoluteDstPath = makeAbsolute(dst);
+    Path dstParentFolder = absoluteDstPath.getParent();
+
+    performAuthCheck(dstParentFolder.toString(), WasbAuthorizationOperations.WRITE, "rename",
+        absoluteDstPath.toString());
+
+    String dstKey = pathToKey(absoluteDstPath);
     FileMetadata dstMetadata = null;
     try {
       dstMetadata = store.retrieveMetadata(dstKey);
@@ -2530,14 +2583,14 @@ public class NativeAzureFileSystem extends FileSystem {
 
       Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex);
 
-      // A BlobNotFound storage exception in only thrown from retrieveMetdata API when
+      // A BlobNotFound storage exception in only thrown from retrieveMetadata API when
       // there is a race condition. If there is another thread which deletes the destination
       // file or folder, then this thread calling rename should be able to continue with
       // rename gracefully. Hence the StorageException is swallowed here.
       if (innerException instanceof StorageException) {
         if (NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
           LOG.debug("BlobNotFound exception encountered for Destination key : {}. "
-              + "Swallowin the exception to handle race condition gracefully", dstKey);
+              + "Swallowing the exception to handle race condition gracefully", dstKey);
         }
       } else {
         throw ex;
@@ -2558,7 +2611,7 @@ public class NativeAzureFileSystem extends FileSystem {
       // Check that the parent directory exists.
       FileMetadata parentOfDestMetadata = null;
       try {
-        parentOfDestMetadata = store.retrieveMetadata(pathToKey(absoluteDst.getParent()));
+        parentOfDestMetadata = store.retrieveMetadata(pathToKey(absoluteDstPath.getParent()));
       } catch (IOException ex) {
 
         Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex);
@@ -2816,9 +2869,6 @@ public class NativeAzureFileSystem extends FileSystem {
   public void setPermission(Path p, FsPermission permission) throws FileNotFoundException, IOException {
     Path absolutePath = makeAbsolute(p);
 
-    performAuthCheck(absolutePath.toString(),
-        WasbAuthorizationOperations.EXECUTE.toString(), "setPermission");
-
     String key = pathToKey(absolutePath);
     FileMetadata metadata = null;
     try {
@@ -2858,9 +2908,6 @@ public class NativeAzureFileSystem extends FileSystem {
       throws IOException {
     Path absolutePath = makeAbsolute(p);
 
-    performAuthCheck(absolutePath.toString(),
-        WasbAuthorizationOperations.EXECUTE.toString(), "setOwner");
-
     String key = pathToKey(absolutePath);
     FileMetadata metadata = null;
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0cab5722/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/RemoteWasbAuthorizerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/RemoteWasbAuthorizerImpl.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/RemoteWasbAuthorizerImpl.java
index a2105c7..8576377 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/RemoteWasbAuthorizerImpl.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/RemoteWasbAuthorizerImpl.java
@@ -132,7 +132,14 @@ public class RemoteWasbAuthorizerImpl implements WasbAuthorizerInterface {
   @Override
   public boolean authorize(String wasbAbsolutePath, String accessType)
       throws WasbAuthorizationException, IOException {
+
       try {
+
+        /* Make an exception for the internal -RenamePending files */
+        if (wasbAbsolutePath.endsWith(NativeAzureFileSystem.FolderRenamePending.SUFFIX)) {
+          return true;
+        }
+
         URIBuilder uriBuilder = new URIBuilder(remoteAuthorizerServiceUrl);
         uriBuilder.setPath("/" + CHECK_AUTHORIZATION_OP);
         uriBuilder.addParameter(WASB_ABSOLUTE_PATH_QUERY_PARAM_NAME,
@@ -203,7 +210,7 @@ public class RemoteWasbAuthorizerImpl implements WasbAuthorizerInterface {
           return authorizerResponse.getAuthorizationResult();
         } else {
           throw new WasbAuthorizationException("Remote authorization"
-              + " serivce encountered an error "
+              + " service encountered an error "
               + authorizerResponse.getResponseMessage());
         }
       } catch (URISyntaxException | WasbRemoteCallException
@@ -220,7 +227,7 @@ public class RemoteWasbAuthorizerImpl implements WasbAuthorizerInterface {
  * response in the following JSON format
  * {
  *    "responseCode" : 0 or non-zero <int>,
- *    "responseMessage" : relavant message of failure <String>
+ *    "responseMessage" : relevant message of failure <String>
  *    "authorizationResult" : authorization result <boolean>
  *                            true - if auhorization allowed
  *                            false - otherwise.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0cab5722/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/WasbAuthorizationOperations.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/WasbAuthorizationOperations.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/WasbAuthorizationOperations.java
index 41ca2b3..7c63d4b 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/WasbAuthorizationOperations.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/WasbAuthorizationOperations.java
@@ -34,8 +34,6 @@ public enum WasbAuthorizationOperations {
         return "read";
       case WRITE:
         return "write";
-      case EXECUTE:
-        return "execute";
       default:
         throw new IllegalArgumentException(
             "Invalid Authorization Operation");

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0cab5722/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/MockWasbAuthorizerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/MockWasbAuthorizerImpl.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/MockWasbAuthorizerImpl.java
index af5a537..445bfd8 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/MockWasbAuthorizerImpl.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/MockWasbAuthorizerImpl.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.fs.azure;
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.regex.Pattern;
 
 import org.apache.hadoop.conf.Configuration;
 
@@ -38,8 +39,11 @@ public class MockWasbAuthorizerImpl implements WasbAuthorizerInterface {
 
   public void addAuthRule(String wasbAbsolutePath,
       String accessType, boolean access) {
-    AuthorizationComponent component =
-        new AuthorizationComponent(wasbAbsolutePath, accessType);
+
+    AuthorizationComponent component = wasbAbsolutePath.endsWith("*")
+        ? new AuthorizationComponent("^" + wasbAbsolutePath.replace("*", ".*"), accessType)
+        : new AuthorizationComponent(wasbAbsolutePath, accessType);
+
     this.authRules.put(component, access);
   }
 
@@ -47,12 +51,26 @@ public class MockWasbAuthorizerImpl implements WasbAuthorizerInterface {
   public boolean authorize(String wasbAbsolutePath, String accessType)
       throws WasbAuthorizationException {
 
+    if (wasbAbsolutePath.endsWith(NativeAzureFileSystem.FolderRenamePending.SUFFIX)) {
+      return true;
+    }
+
     AuthorizationComponent component =
         new AuthorizationComponent(wasbAbsolutePath, accessType);
 
     if (authRules.containsKey(component)) {
       return authRules.get(component);
     } else {
+      // Regex-pattern match if we don't have a straight match
+      for (Map.Entry<AuthorizationComponent, Boolean> entry : authRules.entrySet()) {
+        AuthorizationComponent key = entry.getKey();
+        String keyPath = key.getWasbAbsolutePath();
+        String keyAccess = key.getAccessType();
+
+        if (keyPath.endsWith("*") && Pattern.matches(keyPath, wasbAbsolutePath) && keyAccess.equals(accessType)) {
+          return entry.getValue();
+        }
+      }
       return false;
     }
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0cab5722/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestNativeAzureFileSystemAuthorization.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestNativeAzureFileSystemAuthorization.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestNativeAzureFileSystemAuthorization.java
index a2bbeb1..4e49622 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestNativeAzureFileSystemAuthorization.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestNativeAzureFileSystemAuthorization.java
@@ -28,11 +28,8 @@ import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 
-import com.sun.tools.javac.util.Assert;
 import org.junit.rules.ExpectedException;
 
-import java.io.Console;
-
 import static org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.KEY_USE_SECURE_MODE;
 
 /**
@@ -67,32 +64,154 @@ public class TestNativeAzureFileSystemAuthorization
   public ExpectedException expectedEx = ExpectedException.none();
 
   /**
-   * Positive test to verify Create and delete access check
+   * Setup up permissions to allow a recursive delete for cleanup purposes.
+   */
+  private void allowRecursiveDelete(NativeAzureFileSystem fs, MockWasbAuthorizerImpl authorizer, String path) {
+
+    int index = path.lastIndexOf('/');
+    String parent = (index == 0) ? "/" : path.substring(0, index);
+
+    authorizer.init(null);
+    authorizer.addAuthRule(parent, WasbAuthorizationOperations.WRITE.toString(), true);
+    authorizer.addAuthRule((path.endsWith("*") ? path : path+"*"), WasbAuthorizationOperations.WRITE.toString(), true);
+    fs.updateWasbAuthorizer(authorizer);
+  }
+
+  /**
+   * Positive test to verify Create access check
+   * The file is created directly under an existing folder.
+   * No intermediate folders need to be created.
+   * @throws Throwable
+   */
+  @Test
+  public void testCreateAccessWithoutCreateIntermediateFoldersCheckPositive() throws Throwable {
+
+    AzureBlobStorageTestAccount testAccount = createTestAccount();
+    NativeAzureFileSystem fs = testAccount.getFileSystem();
+
+    Path parentDir = new Path("/");
+    Path testPath = new Path(parentDir, "test.dat");
+
+    MockWasbAuthorizerImpl authorizer = new MockWasbAuthorizerImpl();
+    authorizer.init(null);
+    authorizer.addAuthRule("/", WasbAuthorizationOperations.WRITE.toString(), true);
+    authorizer.addAuthRule(testPath.toString(), WasbAuthorizationOperations.READ.toString(), true);
+    fs.updateWasbAuthorizer(authorizer);
+
+    try {
+      fs.create(testPath);
+      ContractTestUtils.assertPathExists(fs, "testPath was not created", testPath);
+    }
+    finally {
+      fs.delete(testPath, false);
+    }
+  }
+
+  /**
+   * Positive test to verify Create access check
+   * The test tries to create a file whose parent is non-existent to ensure that
+   * the intermediate folders between ancestor and direct parent are being created
+   * when proper ranger policies are configured.
+   * @throws Throwable
+   */
+  @Test
+  public void testCreateAccessWithCreateIntermediateFoldersCheckPositive() throws Throwable {
+
+    AzureBlobStorageTestAccount testAccount = createTestAccount();
+    NativeAzureFileSystem fs = testAccount.getFileSystem();
+
+    Path parentDir = new Path("/testCreateAccessCheckPositive/1/2/3");
+    Path testPath = new Path(parentDir, "test.dat");
+
+    MockWasbAuthorizerImpl authorizer = new MockWasbAuthorizerImpl();
+    authorizer.init(null);
+    authorizer.addAuthRule("/", WasbAuthorizationOperations.WRITE.toString(), true);
+    authorizer.addAuthRule(testPath.toString(), WasbAuthorizationOperations.READ.toString(), true);
+    fs.updateWasbAuthorizer(authorizer);
+
+    try {
+      fs.create(testPath);
+      ContractTestUtils.assertPathExists(fs, "testPath was not created", testPath);
+    }
+    finally {
+      allowRecursiveDelete(fs, authorizer, "/testCreateAccessCheckPositive");
+      fs.delete(new Path("/testCreateAccessCheckPositive"), true);
+    }
+  }
+
+
+  /**
+   * Negative test to verify that create fails when trying to overwrite an existing file
+   * without proper write permissions on the file being overwritten.
+   * @throws Throwable
+   */
+  @Test // (expected=WasbAuthorizationException.class)
+  public void testCreateAccessWithOverwriteCheckNegative() throws Throwable {
+
+    expectedEx.expect(WasbAuthorizationException.class);
+    expectedEx.expectMessage("create operation for Path : /test.dat not allowed");
+
+    AzureBlobStorageTestAccount testAccount = createTestAccount();
+    NativeAzureFileSystem fs = testAccount.getFileSystem();
+
+    Path parentDir = new Path("/");
+    Path testPath = new Path(parentDir, "test.dat");
+
+    MockWasbAuthorizerImpl authorizer = new MockWasbAuthorizerImpl();
+    authorizer.init(null);
+    authorizer.addAuthRule("/", WasbAuthorizationOperations.WRITE.toString(), true);
+    authorizer.addAuthRule(testPath.toString(), WasbAuthorizationOperations.READ.toString(), true);
+    fs.updateWasbAuthorizer(authorizer);
+
+    boolean initialCreateSucceeded = false;
+    try {
+      fs.create(testPath);
+      ContractTestUtils.assertPathExists(fs, "testPath was not created", testPath);
+      initialCreateSucceeded = true;
+      fs.create(testPath, true);
+    }
+    finally {
+      ContractTestUtils.assertTrue(initialCreateSucceeded);
+      fs.delete(testPath, false);
+    }
+  }
+
+  /**
+   * Positive test to verify that create succeeds when trying to overwrite an existing file
+   * when proper write permissions on the file being overwritten are provided.
    * @throws Throwable
    */
   @Test
-  public void testCreateAccessCheckPositive() throws Throwable {
+  public void testCreateAccessWithOverwriteCheckPositive() throws Throwable {
 
     AzureBlobStorageTestAccount testAccount = createTestAccount();
     NativeAzureFileSystem fs = testAccount.getFileSystem();
 
-    Path parentDir = new Path("/testCreateAccessCheckPositive");
+    Path parentDir = new Path("/");
     Path testPath = new Path(parentDir, "test.dat");
 
     MockWasbAuthorizerImpl authorizer = new MockWasbAuthorizerImpl();
     authorizer.init(null);
+    authorizer.addAuthRule("/", WasbAuthorizationOperations.WRITE.toString(), true);
+    authorizer.addAuthRule(testPath.toString(), WasbAuthorizationOperations.READ.toString(), true);
     authorizer.addAuthRule(testPath.toString(), WasbAuthorizationOperations.WRITE.toString(), true);
-    authorizer.addAuthRule(testPath.toString(), WasbAuthorizationOperations.EXECUTE.toString(), true);
-    authorizer.addAuthRule(parentDir.toString(), WasbAuthorizationOperations.EXECUTE.toString(), true);
     fs.updateWasbAuthorizer(authorizer);
 
-    fs.create(testPath);
-    ContractTestUtils.assertPathExists(fs, "testPath was not created", testPath);
-    fs.delete(parentDir, true);
+    boolean initialCreateSucceeded = false;
+    try {
+      fs.create(testPath);
+      ContractTestUtils.assertPathExists(fs, "testPath was not created", testPath);
+      initialCreateSucceeded = true;
+      fs.create(testPath, true);
+    }
+    finally {
+      ContractTestUtils.assertTrue(initialCreateSucceeded);
+      fs.delete(testPath, false);
+    }
   }
 
   /**
-   * Negative test to verify Create access check
+   * Negative test to verify that Create fails when appropriate permissions are not provided.
    * @throws Throwable
    */
 
@@ -110,20 +229,21 @@ public class TestNativeAzureFileSystemAuthorization
 
     MockWasbAuthorizerImpl authorizer = new MockWasbAuthorizerImpl();
     authorizer.init(null);
-    authorizer.addAuthRule(testPath.toString(),WasbAuthorizationOperations.WRITE.toString(), false);
-    authorizer.addAuthRule(parentDir.toString(),WasbAuthorizationOperations.EXECUTE.toString(), true);
+    authorizer.addAuthRule("/", WasbAuthorizationOperations.WRITE.toString(), false);
     fs.updateWasbAuthorizer(authorizer);
 
     try {
       fs.create(testPath);
     }
     finally {
+      /* Provide permissions to cleanup in case the file got created */
+      allowRecursiveDelete(fs, authorizer, parentDir.toString());
       fs.delete(parentDir, true);
     }
   }
 
   /**
-   * Positive test to verify Create and delete access check
+   * Positive test to verify listStatus access check
    * @throws Throwable
    */
   @Test
@@ -133,28 +253,27 @@ public class TestNativeAzureFileSystemAuthorization
     NativeAzureFileSystem fs = testAccount.getFileSystem();
 
     Path parentDir = new Path("/testListAccessCheckPositive");
-    Path testPath = new Path(parentDir, "test.dat");
+    Path intermediateFolders = new Path(parentDir, "1/2/3/");
+    Path testPath = new Path(intermediateFolders, "test.dat");
 
     MockWasbAuthorizerImpl authorizer = new MockWasbAuthorizerImpl();
     authorizer.init(null);
-    authorizer.addAuthRule(testPath.toString(), WasbAuthorizationOperations.WRITE.toString(), true);
-    authorizer.addAuthRule(testPath.toString(), WasbAuthorizationOperations.EXECUTE.toString(), true);
-    authorizer.addAuthRule(parentDir.toString(), WasbAuthorizationOperations.EXECUTE.toString(), true);
+    authorizer.addAuthRule("/", WasbAuthorizationOperations.WRITE.toString(), true);
+    authorizer.addAuthRule(testPath.toString(), WasbAuthorizationOperations.READ.toString(), true);
     fs.updateWasbAuthorizer(authorizer);
 
-    fs.create(testPath);
-    ContractTestUtils.assertPathExists(fs, "testPath does not exist", testPath);
-
     try {
+      fs.create(testPath);
       fs.listStatus(testPath);
     }
     finally {
+      allowRecursiveDelete(fs, authorizer, parentDir.toString());
       fs.delete(parentDir, true);
     }
   }
 
   /**
-   * Negative test to verify Create access check
+   * Negative test to verify listStatus access check
    * @throws Throwable
    */
 
@@ -162,7 +281,7 @@ public class TestNativeAzureFileSystemAuthorization
   public void testListAccessCheckNegative() throws Throwable {
 
     expectedEx.expect(WasbAuthorizationException.class);
-    expectedEx.expectMessage("getFileStatus operation for Path : /testListAccessCheckNegative/test.dat not allowed");
+    expectedEx.expectMessage("liststatus operation for Path : /testListAccessCheckNegative/test.dat not allowed");
 
     AzureBlobStorageTestAccount testAccount = createTestAccount();
     NativeAzureFileSystem fs = testAccount.getFileSystem();
@@ -172,18 +291,16 @@ public class TestNativeAzureFileSystemAuthorization
 
     MockWasbAuthorizerImpl authorizer = new MockWasbAuthorizerImpl();
     authorizer.init(null);
-    authorizer.addAuthRule(testPath.toString(), WasbAuthorizationOperations.WRITE.toString(), true);
-    authorizer.addAuthRule(testPath.toString(), WasbAuthorizationOperations.EXECUTE.toString(), false);
-    authorizer.addAuthRule(parentDir.toString(), WasbAuthorizationOperations.EXECUTE.toString(), true);
+    authorizer.addAuthRule("/", WasbAuthorizationOperations.WRITE.toString(), true);
+    authorizer.addAuthRule(testPath.toString(), WasbAuthorizationOperations.READ.toString(), false);
     fs.updateWasbAuthorizer(authorizer);
 
-    fs.create(testPath);
-    ContractTestUtils.assertPathExists(fs, "testPath does not exist", testPath);
-
     try {
+      fs.create(testPath);
       fs.listStatus(testPath);
     }
     finally {
+      allowRecursiveDelete(fs, authorizer, parentDir.toString());
       fs.delete(parentDir, true);
     }
   }
@@ -199,25 +316,26 @@ public class TestNativeAzureFileSystemAuthorization
     NativeAzureFileSystem fs = testAccount.getFileSystem();
 
     Path parentDir = new Path("/testRenameAccessCheckPositive");
-    Path testPath = new Path(parentDir, "test.dat");
-    Path renamePath = new Path(parentDir, "test2.dat");
+    Path srcPath = new Path(parentDir, "test1.dat");
+    Path dstPath = new Path(parentDir, "test2.dat");
 
     MockWasbAuthorizerImpl authorizer = new MockWasbAuthorizerImpl();
     authorizer.init(null);
-    authorizer.addAuthRule(testPath.toString(), WasbAuthorizationOperations.WRITE.toString(), true);
-    authorizer.addAuthRule(testPath.toString(), WasbAuthorizationOperations.EXECUTE.toString(), true);
-    authorizer.addAuthRule(renamePath.toString(), WasbAuthorizationOperations.EXECUTE.toString(), true);
-    authorizer.addAuthRule(parentDir.toString(), WasbAuthorizationOperations.EXECUTE.toString(), true);
+    authorizer.addAuthRule("/", WasbAuthorizationOperations.WRITE.toString(), true); /* to create parentDir */
+    authorizer.addAuthRule(parentDir.toString(), WasbAuthorizationOperations.WRITE.toString(), true); /* for rename */
+    authorizer.addAuthRule(srcPath.toString(), WasbAuthorizationOperations.READ.toString(), true); /* for exists */
+    authorizer.addAuthRule(dstPath.toString(), WasbAuthorizationOperations.READ.toString(), true); /* for exists */
     fs.updateWasbAuthorizer(authorizer);
 
-    fs.create(testPath);
-    ContractTestUtils.assertPathExists(fs, "sourcePath does not exist", testPath);
-
     try {
-      fs.rename(testPath, renamePath);
-      ContractTestUtils.assertPathExists(fs, "destPath does not exist", renamePath);
+      fs.create(srcPath);
+      ContractTestUtils.assertPathExists(fs, "sourcePath does not exist", srcPath);
+      fs.rename(srcPath, dstPath);
+      ContractTestUtils.assertPathExists(fs, "destPath does not exist", dstPath);
+      ContractTestUtils.assertPathDoesNotExist(fs, "sourcePath exists after rename!", srcPath);
     }
     finally {
+      allowRecursiveDelete(fs, authorizer, parentDir.toString());
       fs.delete(parentDir, true);
     }
   }
@@ -230,34 +348,109 @@ public class TestNativeAzureFileSystemAuthorization
   public void testRenameAccessCheckNegative() throws Throwable {
 
     expectedEx.expect(WasbAuthorizationException.class);
-    expectedEx.expectMessage("rename operation for Path : /testRenameAccessCheckNegative/test.dat not allowed");
+    expectedEx.expectMessage("rename operation for Path : /testRenameAccessCheckNegative/test1.dat not allowed");
 
     AzureBlobStorageTestAccount testAccount = createTestAccount();
     NativeAzureFileSystem fs = testAccount.getFileSystem();
     Path parentDir = new Path("/testRenameAccessCheckNegative");
-    Path testPath = new Path(parentDir, "test.dat");
-    Path renamePath = new Path(parentDir, "test2.dat");
+    Path srcPath = new Path(parentDir, "test1.dat");
+    Path dstPath = new Path(parentDir, "test2.dat");
 
     MockWasbAuthorizerImpl authorizer = new MockWasbAuthorizerImpl();
     authorizer.init(null);
-    authorizer.addAuthRule(testPath.toString(), WasbAuthorizationOperations.WRITE.toString(), true);
-    // set EXECUTE to true for initial assert right after creation.
-    authorizer.addAuthRule(testPath.toString(), WasbAuthorizationOperations.EXECUTE.toString(), true);
-    authorizer.addAuthRule(parentDir.toString(), WasbAuthorizationOperations.EXECUTE.toString(), true);
+    authorizer.addAuthRule("/", WasbAuthorizationOperations.WRITE.toString(), true); /* to create parent dir */
+    authorizer.addAuthRule(parentDir.toString(), WasbAuthorizationOperations.WRITE.toString(), false);
+    authorizer.addAuthRule(srcPath.toString(), WasbAuthorizationOperations.READ.toString(), true);
+    authorizer.addAuthRule(dstPath.toString(), WasbAuthorizationOperations.READ.toString(), true);
+    fs.updateWasbAuthorizer(authorizer);
+
+    try {
+      fs.create(srcPath);
+      ContractTestUtils.assertPathExists(fs, "sourcePath does not exist", srcPath);
+      fs.rename(srcPath, dstPath);
+      ContractTestUtils.assertPathExists(fs, "destPath does not exist", dstPath);
+    } finally {
+      ContractTestUtils.assertPathExists(fs, "sourcePath does not exist after rename failure!", srcPath);
+
+      allowRecursiveDelete(fs, authorizer, parentDir.toString());
+      fs.delete(parentDir, true);
+    }
+  }
+
+  /**
+   * Negative test to verify rename access check - the dstFolder disallows rename
+   * @throws Throwable
+   */
+  @Test //(expected=WasbAuthorizationException.class)
+  public void testRenameAccessCheckNegativeOnDstFolder() throws Throwable {
+
+    expectedEx.expect(WasbAuthorizationException.class);
+    expectedEx.expectMessage("rename operation for Path : /testRenameAccessCheckNegativeDst/test2.dat not allowed");
+
+    AzureBlobStorageTestAccount testAccount = createTestAccount();
+    NativeAzureFileSystem fs = testAccount.getFileSystem();
+    Path parentSrcDir = new Path("/testRenameAccessCheckNegativeSrc");
+    Path srcPath = new Path(parentSrcDir, "test1.dat");
+    Path parentDstDir = new Path("/testRenameAccessCheckNegativeDst");
+    Path dstPath = new Path(parentDstDir, "test2.dat");
+
+    MockWasbAuthorizerImpl authorizer = new MockWasbAuthorizerImpl();
+    authorizer.init(null);
+    authorizer.addAuthRule("/", WasbAuthorizationOperations.WRITE.toString(), true); /* to create parent dir */
+    authorizer.addAuthRule(parentSrcDir.toString(), WasbAuthorizationOperations.WRITE.toString(), true);
+    authorizer.addAuthRule(parentDstDir.toString(), WasbAuthorizationOperations.WRITE.toString(), false);
+    authorizer.addAuthRule(srcPath.toString(), WasbAuthorizationOperations.READ.toString(), true);
+    authorizer.addAuthRule(dstPath.toString(), WasbAuthorizationOperations.READ.toString(), true);
     fs.updateWasbAuthorizer(authorizer);
 
-    fs.create(testPath);
-    ContractTestUtils.assertPathExists(fs, "sourcePath does not exist", testPath);
+    try {
+      fs.create(srcPath);
+      ContractTestUtils.assertPathExists(fs, "sourcePath does not exist", srcPath);
+      fs.rename(srcPath, dstPath);
+      ContractTestUtils.assertPathDoesNotExist(fs, "destPath does not exist", dstPath);
+    } finally {
+      ContractTestUtils.assertPathExists(fs, "sourcePath does not exist after rename !", srcPath);
+      allowRecursiveDelete(fs, authorizer, parentSrcDir.toString());
+      fs.delete(parentSrcDir, true);
+    }
+  }
+
+  /**
+   * Positive test to verify rename access check - the dstFolder allows rename
+   * @throws Throwable
+   */
+  @Test //(expected=WasbAuthorizationException.class)
+  public void testRenameAccessCheckPositiveOnDstFolder() throws Throwable {
+
+    AzureBlobStorageTestAccount testAccount = createTestAccount();
+    NativeAzureFileSystem fs = testAccount.getFileSystem();
+    Path parentSrcDir = new Path("/testRenameAccessCheckPositiveSrc");
+    Path srcPath = new Path(parentSrcDir, "test1.dat");
+    Path parentDstDir = new Path("/testRenameAccessCheckPositiveDst");
+    Path dstPath = new Path(parentDstDir, "test2.dat");
 
-    // Set EXECUTE to false for actual rename-failure test
-    authorizer.addAuthRule(testPath.toString(), WasbAuthorizationOperations.EXECUTE.toString(), false);
+    MockWasbAuthorizerImpl authorizer = new MockWasbAuthorizerImpl();
+    authorizer.init(null);
+    authorizer.addAuthRule("/", WasbAuthorizationOperations.WRITE.toString(), true); /* to create parent dirs */
+    authorizer.addAuthRule(parentSrcDir.toString(), WasbAuthorizationOperations.WRITE.toString(), true);
+    authorizer.addAuthRule(parentDstDir.toString(), WasbAuthorizationOperations.WRITE.toString(), true);
+    authorizer.addAuthRule(srcPath.toString(), WasbAuthorizationOperations.READ.toString(), true);
+    authorizer.addAuthRule(dstPath.toString(), WasbAuthorizationOperations.READ.toString(), true);
     fs.updateWasbAuthorizer(authorizer);
 
     try {
-      fs.rename(testPath, renamePath);
-      ContractTestUtils.assertPathExists(fs, "destPath does not exist", renamePath);
+      fs.create(srcPath);
+      ContractTestUtils.assertPathExists(fs, "sourcePath does not exist", srcPath);
+      fs.mkdirs(parentDstDir);
+      fs.rename(srcPath, dstPath);
+      ContractTestUtils.assertPathDoesNotExist(fs, "sourcePath does not exist", srcPath);
+      ContractTestUtils.assertPathExists(fs, "destPath does not exist", dstPath);
     } finally {
-      fs.delete(parentDir, true);
+      allowRecursiveDelete(fs, authorizer, parentSrcDir.toString());
+      fs.delete(parentSrcDir, true);
+
+      allowRecursiveDelete(fs, authorizer, parentDstDir.toString());
+      fs.delete(parentDstDir, true);
     }
   }
 
@@ -275,27 +468,30 @@ public class TestNativeAzureFileSystemAuthorization
 
     MockWasbAuthorizerImpl authorizer = new MockWasbAuthorizerImpl();
     authorizer.init(null);
-    authorizer.addAuthRule(testPath.toString(), WasbAuthorizationOperations.WRITE.toString(), true);
-    authorizer.addAuthRule(testPath.toString(), WasbAuthorizationOperations.EXECUTE.toString(), true);
+    authorizer.addAuthRule("/", WasbAuthorizationOperations.WRITE.toString(), true);
     authorizer.addAuthRule(testPath.toString(), WasbAuthorizationOperations.READ.toString(), true);
-    authorizer.addAuthRule(parentDir.toString(), WasbAuthorizationOperations.EXECUTE.toString(), true);
     fs.updateWasbAuthorizer(authorizer);
 
-    FSDataOutputStream fso = fs.create(testPath);
-    String data = "Hello World";
-    fso.writeBytes(data);
-    fso.close();
-    ContractTestUtils.assertPathExists(fs, "testPath does not exist", testPath);
-
     FSDataInputStream inputStream = null;
+    FSDataOutputStream fso = null;
+
     try {
+      fso = fs.create(testPath);
+      String data = "Hello World";
+      fso.writeBytes(data);
+      fso.close();
+
       inputStream = fs.open(testPath);
       ContractTestUtils.verifyRead(inputStream, data.getBytes(), 0, data.length());
     }
     finally {
+      if (fso != null) {
+        fso.close();
+      }
       if(inputStream != null) {
         inputStream.close();
       }
+      allowRecursiveDelete(fs, authorizer, parentDir.toString());
       fs.delete(parentDir, true);
     }
   }
@@ -318,27 +514,239 @@ public class TestNativeAzureFileSystemAuthorization
 
     MockWasbAuthorizerImpl authorizer = new MockWasbAuthorizerImpl();
     authorizer.init(null);
-    authorizer.addAuthRule(testPath.toString(), WasbAuthorizationOperations.WRITE.toString(), true);
-    authorizer.addAuthRule(testPath.toString(), WasbAuthorizationOperations.EXECUTE.toString(), true);
+    authorizer.addAuthRule("/", WasbAuthorizationOperations.WRITE.toString(), true);
     authorizer.addAuthRule(testPath.toString(), WasbAuthorizationOperations.READ.toString(), false);
-    authorizer.addAuthRule(parentDir.toString(), WasbAuthorizationOperations.EXECUTE.toString(), true);
     fs.updateWasbAuthorizer(authorizer);
 
-    FSDataOutputStream fso = fs.create(testPath);
-    String data = "Hello World";
-    fso.writeBytes(data);
-    fso.close();
-    ContractTestUtils.assertPathExists(fs, "testPath does not exist", testPath);
-
     FSDataInputStream inputStream = null;
+    FSDataOutputStream fso = null;
+
     try {
+      fso = fs.create(testPath);
+      String data = "Hello World";
+      fso.writeBytes(data);
+      fso.close();
+
       inputStream = fs.open(testPath);
       ContractTestUtils.verifyRead(inputStream, data.getBytes(), 0, data.length());
     } finally {
+      if (fso != null) {
+        fso.close();
+      }
       if (inputStream != null) {
         inputStream.close();
       }
+      allowRecursiveDelete(fs, authorizer, parentDir.toString());
+      fs.delete(parentDir, true);
+    }
+  }
+
+  /**
+   * Positive test to verify file delete access check
+   * @throws Throwable
+   */
+  @Test
+  public void testFileDeleteAccessCheckPositive() throws Throwable {
+
+    AzureBlobStorageTestAccount testAccount = createTestAccount();
+    NativeAzureFileSystem fs = testAccount.getFileSystem();
+
+    Path parentDir = new Path("/");
+    Path testPath = new Path(parentDir, "test.dat");
+
+    MockWasbAuthorizerImpl authorizer = new MockWasbAuthorizerImpl();
+    authorizer.init(null);
+    authorizer.addAuthRule("/", WasbAuthorizationOperations.WRITE.toString(), true);
+    authorizer.addAuthRule(testPath.toString(), WasbAuthorizationOperations.READ.toString(), true);
+    fs.updateWasbAuthorizer(authorizer);
+    try {
+      fs.create(testPath);
+      ContractTestUtils.assertPathExists(fs, "testPath was not created", testPath);
+    }
+    finally {
+      fs.delete(testPath, false);
+      ContractTestUtils.assertPathDoesNotExist(fs, "testPath exists after deletion!", testPath);
+    }
+  }
+
+  /**
+   * Negative test to verify file delete access check
+   * @throws Throwable
+   */
+  @Test //(expected=WasbAuthorizationException.class)
+  public void testFileDeleteAccessCheckNegative() throws Throwable {
+
+    expectedEx.expect(WasbAuthorizationException.class);
+    expectedEx.expectMessage("delete operation for Path : /test.dat not allowed");
+
+    AzureBlobStorageTestAccount testAccount = createTestAccount();
+    NativeAzureFileSystem fs = testAccount.getFileSystem();
+
+    Path parentDir = new Path("/");
+    Path testPath = new Path(parentDir, "test.dat");
+
+    MockWasbAuthorizerImpl authorizer = new MockWasbAuthorizerImpl();
+    authorizer.init(null);
+    authorizer.addAuthRule("/", WasbAuthorizationOperations.WRITE.toString(), true);
+    authorizer.addAuthRule(testPath.toString(), WasbAuthorizationOperations.READ.toString(), true);
+    fs.updateWasbAuthorizer(authorizer);
+    try {
+      fs.create(testPath);
+      ContractTestUtils.assertPathExists(fs, "testPath was not created", testPath);
+
+
+      /* Remove permissions for delete to force failure */
+      authorizer.init(null);
+      authorizer.addAuthRule("/", WasbAuthorizationOperations.WRITE.toString(), false);
+      fs.updateWasbAuthorizer(authorizer);
+
+      fs.delete(testPath, false);
+    }
+    finally {
+      /* Restore permissions to force a successful delete */
+      authorizer.init(null);
+      authorizer.addAuthRule("/", WasbAuthorizationOperations.WRITE.toString(), true);
+      authorizer.addAuthRule(testPath.toString(), WasbAuthorizationOperations.READ.toString(), true);
+      fs.updateWasbAuthorizer(authorizer);
+
+      fs.delete(testPath, false);
+      ContractTestUtils.assertPathDoesNotExist(fs, "testPath exists after deletion!", testPath);
+    }
+  }
+
+  /**
+   * Positive test to verify file delete access check, with intermediate folders
+   * Uses wildcard recursive permissions
+   * @throws Throwable
+   */
+  @Test
+  public void testFileDeleteAccessWithIntermediateFoldersCheckPositive() throws Throwable {
+
+    AzureBlobStorageTestAccount testAccount = createTestAccount();
+    NativeAzureFileSystem fs = testAccount.getFileSystem();
+
+    Path parentDir = new Path("/testDeleteIntermediateFolder");
+    Path testPath = new Path(parentDir, "1/2/test.dat");
+
+    MockWasbAuthorizerImpl authorizer = new MockWasbAuthorizerImpl();
+    authorizer.init(null);
+    authorizer.addAuthRule("/", WasbAuthorizationOperations.WRITE.toString(), true); // for create and delete
+    authorizer.addAuthRule("/testDeleteIntermediateFolder*",
+        WasbAuthorizationOperations.WRITE.toString(), true); // for recursive delete
+    authorizer.addAuthRule("/*", WasbAuthorizationOperations.READ.toString(), true);
+    fs.updateWasbAuthorizer(authorizer);
+
+    try {
+      fs.create(testPath);
+      ContractTestUtils.assertPathExists(fs, "testPath was not created", testPath);
       fs.delete(parentDir, true);
+      ContractTestUtils.assertPathDoesNotExist(fs, "testPath exists after deletion!", parentDir);
+    }
+    finally {
+      allowRecursiveDelete(fs, authorizer, parentDir.toString());
+      fs.delete(parentDir, true);
+    }
+  }
+
+  /**
+   * Positive test for getFileStatus
+   * @throws Throwable
+   */
+  @Test
+  public void testGetFileStatusPositive() throws Throwable {
+
+    AzureBlobStorageTestAccount testAccount = createTestAccount();
+    NativeAzureFileSystem fs = testAccount.getFileSystem();
+
+    Path testPath = new Path("/");
+
+    MockWasbAuthorizerImpl authorizer = new MockWasbAuthorizerImpl();
+    authorizer.init(null);
+    authorizer.addAuthRule("/", WasbAuthorizationOperations.READ.toString(), true);
+    fs.updateWasbAuthorizer(authorizer);
+
+    ContractTestUtils.assertIsDirectory(fs, testPath);
+  }
+
+  /**
+   * Negative test for getFileStatus
+   * @throws Throwable
+   */
+  @Test //(expected=WasbAuthorizationException.class)
+  public void testGetFileStatusNegative() throws Throwable {
+
+    expectedEx.expect(WasbAuthorizationException.class);
+    expectedEx.expectMessage("getFileStatus operation for Path : / not allowed");
+
+    AzureBlobStorageTestAccount testAccount = createTestAccount();
+    NativeAzureFileSystem fs = testAccount.getFileSystem();
+
+    Path testPath = new Path("/");
+
+    MockWasbAuthorizerImpl authorizer = new MockWasbAuthorizerImpl();
+    authorizer.init(null);
+    authorizer.addAuthRule("/", WasbAuthorizationOperations.READ.toString(), false);
+    fs.updateWasbAuthorizer(authorizer);
+
+    ContractTestUtils.assertIsDirectory(fs, testPath);
+  }
+
+  /**
+   * Positive test for mkdirs access check
+   * @throws Throwable
+   */
+  @Test
+  public void testMkdirsCheckPositive() throws Throwable {
+
+    AzureBlobStorageTestAccount testAccount = createTestAccount();
+    NativeAzureFileSystem fs = testAccount.getFileSystem();
+
+    Path testPath = new Path("/testMkdirsAccessCheckPositive/1/2/3");
+
+    MockWasbAuthorizerImpl authorizer = new MockWasbAuthorizerImpl();
+    authorizer.init(null);
+    authorizer.addAuthRule("/", WasbAuthorizationOperations.WRITE.toString(), true);
+    authorizer.addAuthRule(testPath.toString(), WasbAuthorizationOperations.READ.toString(), true);
+    fs.updateWasbAuthorizer(authorizer);
+
+    try {
+      fs.mkdirs(testPath);
+      ContractTestUtils.assertIsDirectory(fs, testPath);
+    }
+    finally {
+      allowRecursiveDelete(fs, authorizer, "/testMkdirsAccessCheckPositive");
+      fs.delete(new Path("/testMkdirsAccessCheckPositive"), true);
+    }
+  }
+
+  /**
+   * Negative test for mkdirs access check
+   * @throws Throwable
+   */
+  @Test //(expected=WasbAuthorizationException.class)
+  public void testMkdirsCheckNegative() throws Throwable {
+
+    expectedEx.expect(WasbAuthorizationException.class);
+    expectedEx.expectMessage("mkdirs operation for Path : /testMkdirsAccessCheckNegative/1/2/3 not allowed");
+
+    AzureBlobStorageTestAccount testAccount = createTestAccount();
+    NativeAzureFileSystem fs = testAccount.getFileSystem();
+
+    Path testPath = new Path("/testMkdirsAccessCheckNegative/1/2/3");
+
+    MockWasbAuthorizerImpl authorizer = new MockWasbAuthorizerImpl();
+    authorizer.init(null);
+    authorizer.addAuthRule("/", WasbAuthorizationOperations.WRITE.toString(), false);
+    authorizer.addAuthRule(testPath.toString(), WasbAuthorizationOperations.READ.toString(), true);
+    fs.updateWasbAuthorizer(authorizer);
+
+    try {
+      fs.mkdirs(testPath);
+      ContractTestUtils.assertPathDoesNotExist(fs, "testPath was not created", testPath);
+    }
+    finally {
+      allowRecursiveDelete(fs, authorizer, "/testMkdirsAccessCheckNegative");
+      fs.delete(new Path("/testMkdirsAccessCheckNegative"), true);
     }
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0cab5722/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestWasbRemoteCallHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestWasbRemoteCallHelper.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestWasbRemoteCallHelper.java
index b13e5e9..d7c40b9 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestWasbRemoteCallHelper.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestWasbRemoteCallHelper.java
@@ -263,9 +263,9 @@ public class TestWasbRemoteCallHelper
 
   private void setupExpectations() {
     expectedEx.expect(WasbAuthorizationException.class);
-    expectedEx.expectMessage("org.apache.hadoop.fs.azure.WasbRemoteCallException: " +
-        "http://localhost/CHECK_AUTHORIZATION?wasb_absolute_path=%2Ftest.dat&" +
-        "operation_type=write&delegation_token:Encountered IOException while making remote call");
+    expectedEx.expectMessage("org.apache.hadoop.fs.azure.WasbRemoteCallException: "
+        + "http://localhost/CHECK_AUTHORIZATION?wasb_absolute_path=%2F&"
+        + "operation_type=write:Encountered IOException while making remote call");
   }
 
   private void performop(HttpClient mockHttpClient) throws Throwable {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0cab5722/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/metrics/TestAzureFileSystemInstrumentation.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/metrics/TestAzureFileSystemInstrumentation.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/metrics/TestAzureFileSystemInstrumentation.java
index 0c9126c..a1a021b 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/metrics/TestAzureFileSystemInstrumentation.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/metrics/TestAzureFileSystemInstrumentation.java
@@ -117,7 +117,8 @@ public class TestAzureFileSystemInstrumentation {
     // levels, and then 2 requests for checking/stamping the version of AS,
     // totaling 11.
     // Also, there's the initial 1 request for container check so total is 12.
-    base = assertWebResponsesInRange(base, 1, 12);
+    // The getAncestor call at the very beginning adds another 4 calls, totalling 16.
+    base = assertWebResponsesInRange(base, 1, 16);
     assertEquals(1,
         AzureMetricsTestUtil.getLongCounterValue(getInstrumentation(), WASB_DIRECTORIES_CREATED));
 


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org