You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by st...@apache.org on 2017/09/15 16:39:33 UTC

[08/20] hadoop git commit: HADOOP-14553. Add (parallelized) integration tests to hadoop-azure Contributed by Steve Loughran

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2d2d97fa/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestFileSystemOperationsWithThreads.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestFileSystemOperationsWithThreads.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestFileSystemOperationsWithThreads.java
new file mode 100644
index 0000000..4389fda
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestFileSystemOperationsWithThreads.java
@@ -0,0 +1,821 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azure;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.impl.Log4JLogger;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.azure.NativeAzureFileSystem.FolderRenamePending;
+import org.apache.hadoop.test.GenericTestUtils.LogCapturer;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+/**
+ * Tests the Native Azure file system (WASB) using parallel threads for rename and delete operations.
+ */
+public class ITestFileSystemOperationsWithThreads extends AbstractWasbTestBase {
+
+  private final int renameThreads = 10;
+  private final int deleteThreads = 20;
+  private int iterations = 1;
+  private LogCapturer logs = null;
+
+  @Rule
+  public ExpectedException exception = ExpectedException.none();
+
+  @Before
+  public void setUp() throws Exception {
+    super.setUp();
+    Configuration conf = fs.getConf();
+
+    // By default enable parallel threads for rename and delete operations.
+    // Also enable flat listing of blobs for these operations.
+    conf.setInt(NativeAzureFileSystem.AZURE_RENAME_THREADS, renameThreads);
+    conf.setInt(NativeAzureFileSystem.AZURE_DELETE_THREADS, deleteThreads);
+    conf.setBoolean(AzureNativeFileSystemStore.KEY_ENABLE_FLAT_LISTING, true);
+
+    URI uri = fs.getUri();
+    fs.initialize(uri, conf);
+
+    // Capture logs
+    logs = LogCapturer.captureLogs(new Log4JLogger(org.apache.log4j.Logger
+        .getRootLogger()));
+  }
+
+  /*
+   * Helper method to create sub directory and different types of files
+   * for multiple iterations.
+   */
+  private void createFolder(FileSystem fs, String root) throws Exception {
+    fs.mkdirs(new Path(root));
+    for (int i = 0; i < this.iterations; i++) {
+      fs.mkdirs(new Path(root + "/" + i));
+      fs.createNewFile(new Path(root + "/" + i + "/fileToRename"));
+      fs.createNewFile(new Path(root + "/" + i + "/file/to/rename"));
+      fs.createNewFile(new Path(root + "/" + i + "/file+to%rename"));
+      fs.createNewFile(new Path(root + "/fileToRename" + i));
+    }
+  }
+
+  /*
+   * Helper method to do rename operation and validate all files in source folder
+   * doesn't exists and similar files exists in new folder.
+   */
+  private void validateRenameFolder(FileSystem fs, String source, String dest) throws Exception {
+    // Create source folder with files.
+    createFolder(fs, source);
+    Path sourceFolder = new Path(source);
+    Path destFolder = new Path(dest);
+
+    // rename operation
+    assertTrue(fs.rename(sourceFolder, destFolder));
+    assertTrue(fs.exists(destFolder));
+
+    for (int i = 0; i < this.iterations; i++) {
+      // Check destination folder and files exists.
+      assertTrue(fs.exists(new Path(dest + "/" + i)));
+      assertTrue(fs.exists(new Path(dest + "/" + i + "/fileToRename")));
+      assertTrue(fs.exists(new Path(dest + "/" + i + "/file/to/rename")));
+      assertTrue(fs.exists(new Path(dest + "/" + i + "/file+to%rename")));
+      assertTrue(fs.exists(new Path(dest + "/fileToRename" + i)));
+
+      // Check source folder and files doesn't exists.
+      assertFalse(fs.exists(new Path(source + "/" + i)));
+      assertFalse(fs.exists(new Path(source + "/" + i + "/fileToRename")));
+      assertFalse(fs.exists(new Path(source + "/" + i + "/file/to/rename")));
+      assertFalse(fs.exists(new Path(source + "/" + i + "/file+to%rename")));
+      assertFalse(fs.exists(new Path(source + "/fileToRename" + i)));
+    }
+  }
+
+  /*
+   * Test case for rename operation with multiple threads and flat listing enabled.
+   */
+  @Test
+  public void testRenameSmallFolderWithThreads() throws Exception {
+
+    validateRenameFolder(fs, "root", "rootnew");
+
+    // With single iteration, we would have created 7 blobs.
+    int expectedThreadsCreated = Math.min(7, renameThreads);
+
+    // Validate from logs that threads are created.
+    String content = logs.getOutput();
+    assertInLog(content, "ms with threads: " + expectedThreadsCreated);
+
+    // Validate thread executions
+    for (int i = 0; i < expectedThreadsCreated; i++) {
+      assertInLog(content,
+          "AzureBlobRenameThread-" + Thread.currentThread().getName() + "-" + i);
+    }
+
+    // Also ensure that we haven't spawned extra threads.
+    if (expectedThreadsCreated < renameThreads) {
+      for (int i = expectedThreadsCreated; i < renameThreads; i++) {
+        assertNotInLog(content,
+            "AzureBlobRenameThread-" + Thread.currentThread().getName() + "-" + i);
+      }
+    }
+  }
+
+  /*
+   * Test case for rename operation with multiple threads and flat listing enabled.
+   */
+  @Test
+  public void testRenameLargeFolderWithThreads() throws Exception {
+
+    // Populate source folder with large number of files and directories.
+    this.iterations = 10;
+    validateRenameFolder(fs, "root", "rootnew");
+
+    // Validate from logs that threads are created.
+    String content = logs.getOutput();
+    assertInLog(content, "ms with threads: " + renameThreads);
+
+    // Validate thread executions
+    for (int i = 0; i < renameThreads; i++) {
+      assertInLog(content,
+          "AzureBlobRenameThread-" + Thread.currentThread().getName() + "-" + i);
+    }
+  }
+
+  /*
+   * Test case for rename operation with threads disabled and flat listing enabled.
+   */
+  @Test
+  public void testRenameLargeFolderDisableThreads() throws Exception {
+    Configuration conf = fs.getConf();
+
+    // Number of threads set to 0 or 1 disables threads.
+    conf.setInt(NativeAzureFileSystem.AZURE_RENAME_THREADS, 0);
+    URI uri = fs.getUri();
+    fs.initialize(uri, conf);
+
+    // Populate source folder with large number of files and directories.
+    this.iterations = 10;
+    validateRenameFolder(fs, "root", "rootnew");
+
+    // Validate from logs that threads are disabled.
+    String content = logs.getOutput();
+    assertInLog(content,
+        "Disabling threads for Rename operation as thread count 0");
+
+    // Validate no thread executions
+    for (int i = 0; i < renameThreads; i++) {
+      String term = "AzureBlobRenameThread-"
+          + Thread.currentThread().getName()
+          + "-" + i;
+      assertNotInLog(content, term);
+    }
+  }
+
+  /**
+   * Assert that a log contains the given term.
+   * @param content log output
+   * @param term search term
+   */
+  protected void assertInLog(String content, String term) {
+    assertTrue("Empty log", !content.isEmpty());
+    if (!content.contains(term)) {
+      String message = "No " + term + " found in logs";
+      LOG.error(message);
+      System.err.println(content);
+      fail(message);
+    }
+  }
+
+  /**
+   * Assert that a log does not contain the given term.
+   * @param content log output
+   * @param term search term
+   */
+  protected void assertNotInLog(String content, String term) {
+    assertTrue("Empty log", !content.isEmpty());
+    if (content.contains(term)) {
+      String message = term + " found in logs";
+      LOG.error(message);
+      System.err.println(content);
+      fail(message);
+    }
+  }
+
+  /*
+   * Test case for rename operation with threads and flat listing disabled.
+   */
+  @Test
+  public void testRenameSmallFolderDisableThreadsDisableFlatListing() throws Exception {
+    Configuration conf = fs.getConf();
+    conf = fs.getConf();
+
+    // Number of threads set to 0 or 1 disables threads.
+    conf.setInt(NativeAzureFileSystem.AZURE_RENAME_THREADS, 1);
+    conf.setBoolean(AzureNativeFileSystemStore.KEY_ENABLE_FLAT_LISTING, false);
+    URI uri = fs.getUri();
+    fs.initialize(uri, conf);
+
+    validateRenameFolder(fs, "root", "rootnew");
+
+    // Validate from logs that threads are disabled.
+    String content = logs.getOutput();
+    assertInLog(content,
+        "Disabling threads for Rename operation as thread count 1");
+
+    // Validate no thread executions
+    for (int i = 0; i < renameThreads; i++) {
+      assertNotInLog(content,
+          "AzureBlobRenameThread-" + Thread.currentThread().getName() + "-" + i);
+    }
+  }
+
+  /*
+   * Helper method to do delete operation and validate all files in source folder
+   * doesn't exists after delete operation.
+   */
+  private void validateDeleteFolder(FileSystem fs, String source)  throws Exception {
+    // Create folder with files.
+    createFolder(fs, "root");
+    Path sourceFolder = new Path(source);
+
+    // Delete operation
+    assertTrue(fs.delete(sourceFolder, true));
+    assertFalse(fs.exists(sourceFolder));
+
+    for (int i = 0; i < this.iterations; i++) {
+      // check that source folder and files doesn't exists
+      assertFalse(fs.exists(new Path(source + "/" + i)));
+      assertFalse(fs.exists(new Path(source + "/" + i + "/fileToRename")));
+      assertFalse(fs.exists(new Path(source + "/" + i + "/file/to/rename")));
+      assertFalse(fs.exists(new Path(source + "/" + i + "/file+to%rename")));
+      assertFalse(fs.exists(new Path(source + "/fileToRename" + i)));
+    }
+  }
+
+  /*
+   * Test case for delete operation with multiple threads and flat listing enabled.
+   */
+  @Test
+  public void testDeleteSmallFolderWithThreads() throws Exception {
+
+    validateDeleteFolder(fs, "root");
+
+    // With single iteration, we would have created 7 blobs.
+    int expectedThreadsCreated = Math.min(7, deleteThreads);
+
+    // Validate from logs that threads are enabled.
+    String content = logs.getOutput();
+    assertInLog(content, "ms with threads: " + expectedThreadsCreated);
+
+    // Validate thread executions
+    for (int i = 0; i < expectedThreadsCreated; i++) {
+      assertInLog(content,
+          "AzureBlobDeleteThread-" + Thread.currentThread().getName() + "-" + i);
+    }
+
+    // Also ensure that we haven't spawned extra threads.
+    if (expectedThreadsCreated < deleteThreads) {
+      for (int i = expectedThreadsCreated; i < deleteThreads; i++) {
+        assertNotInLog(content,
+            "AzureBlobDeleteThread-" + Thread.currentThread().getName() + "-" + i);
+      }
+    }
+  }
+
+  /*
+   * Test case for delete operation with multiple threads and flat listing enabled.
+   */
+  @Test
+  public void testDeleteLargeFolderWithThreads() throws Exception {
+    // Populate source folder with large number of files and directories.
+    this.iterations = 10;
+    validateDeleteFolder(fs, "root");
+
+    // Validate from logs that threads are enabled.
+    String content = logs.getOutput();
+    assertInLog(content, "ms with threads: " + deleteThreads);
+
+    // Validate thread executions
+    for (int i = 0; i < deleteThreads; i++) {
+      assertInLog(content,
+          "AzureBlobDeleteThread-" + Thread.currentThread().getName() + "-" + i);
+    }
+  }
+
+  /*
+   * Test case for delete operation with threads disabled and flat listing enabled.
+   */
+  @Test
+  public void testDeleteLargeFolderDisableThreads() throws Exception {
+    Configuration conf = fs.getConf();
+    conf.setInt(NativeAzureFileSystem.AZURE_DELETE_THREADS, 0);
+    URI uri = fs.getUri();
+    fs.initialize(uri, conf);
+
+    // Populate source folder with large number of files and directories.
+    this.iterations = 10;
+    validateDeleteFolder(fs, "root");
+
+    // Validate from logs that threads are disabled.
+    String content = logs.getOutput();
+    assertInLog(content,
+        "Disabling threads for Delete operation as thread count 0");
+
+    // Validate no thread executions
+    for (int i = 0; i < deleteThreads; i++) {
+      assertNotInLog(content,
+          "AzureBlobDeleteThread-" + Thread.currentThread().getName() + "-" + i);
+    }
+  }
+
+  /*
+   * Test case for rename operation with threads and flat listing disabled.
+   */
+  @Test
+  public void testDeleteSmallFolderDisableThreadsDisableFlatListing() throws Exception {
+    Configuration conf = fs.getConf();
+
+    // Number of threads set to 0 or 1 disables threads.
+    conf.setInt(NativeAzureFileSystem.AZURE_DELETE_THREADS, 1);
+    conf.setBoolean(AzureNativeFileSystemStore.KEY_ENABLE_FLAT_LISTING, false);
+    URI uri = fs.getUri();
+    fs.initialize(uri, conf);
+
+    validateDeleteFolder(fs, "root");
+
+    // Validate from logs that threads are disabled.
+    String content = logs.getOutput();
+    assertInLog(content,
+        "Disabling threads for Delete operation as thread count 1");
+
+    // Validate no thread executions
+    for (int i = 0; i < deleteThreads; i++) {
+      assertNotInLog(content,
+          "AzureBlobDeleteThread-" + Thread.currentThread().getName() + "-" + i);
+    }
+  }
+
+  /*
+   * Test case for delete operation with multiple threads and flat listing enabled.
+   */
+  @Test
+  public void testDeleteThreadPoolExceptionFailure() throws Exception {
+
+    // Spy azure file system object and raise exception for new thread pool
+    NativeAzureFileSystem mockFs = Mockito.spy((NativeAzureFileSystem) fs);
+
+    String path = mockFs.pathToKey(mockFs.makeAbsolute(new Path("root")));
+
+    AzureFileSystemThreadPoolExecutor mockThreadPoolExecutor = Mockito.spy(
+        mockFs.getThreadPoolExecutor(deleteThreads, "AzureBlobDeleteThread", "Delete",
+            path, NativeAzureFileSystem.AZURE_DELETE_THREADS));
+    Mockito.when(mockThreadPoolExecutor.getThreadPool(7)).thenThrow(new Exception());
+
+    // With single iteration, we would have created 7 blobs resulting 7 threads.
+    Mockito.when(mockFs.getThreadPoolExecutor(deleteThreads, "AzureBlobDeleteThread", "Delete",
+        path, NativeAzureFileSystem.AZURE_DELETE_THREADS)).thenReturn(mockThreadPoolExecutor);
+
+    validateDeleteFolder(mockFs, "root");
+
+    // Validate from logs that threads are disabled.
+    String content = logs.getOutput();
+    assertInLog(content, "Failed to create thread pool with threads");
+    assertInLog(content, "Serializing the Delete operation");
+  }
+
+  /*
+   * Test case for delete operation with multiple threads and flat listing enabled.
+   */
+  @Test
+  public void testDeleteThreadPoolExecuteFailure() throws Exception {
+
+    // Mock thread pool executor to throw exception for all requests.
+    ThreadPoolExecutor mockThreadExecutor = Mockito.mock(ThreadPoolExecutor.class);
+    Mockito.doThrow(new RejectedExecutionException()).when(mockThreadExecutor).execute(Mockito.any(Runnable.class));
+
+    // Spy azure file system object and return mocked thread pool
+    NativeAzureFileSystem mockFs = Mockito.spy((NativeAzureFileSystem) fs);
+
+    String path = mockFs.pathToKey(mockFs.makeAbsolute(new Path("root")));
+
+    AzureFileSystemThreadPoolExecutor mockThreadPoolExecutor = Mockito.spy(
+        mockFs.getThreadPoolExecutor(deleteThreads, "AzureBlobDeleteThread", "Delete",
+            path, NativeAzureFileSystem.AZURE_DELETE_THREADS));
+    Mockito.when(mockThreadPoolExecutor.getThreadPool(7)).thenReturn(mockThreadExecutor);
+
+    // With single iteration, we would have created 7 blobs resulting 7 threads.
+    Mockito.when(mockFs.getThreadPoolExecutor(deleteThreads, "AzureBlobDeleteThread", "Delete",
+        path, NativeAzureFileSystem.AZURE_DELETE_THREADS)).thenReturn(mockThreadPoolExecutor);
+
+    validateDeleteFolder(mockFs, "root");
+
+    // Validate from logs that threads are disabled.
+    String content = logs.getOutput();
+    assertInLog(content,
+        "Rejected execution of thread for Delete operation on blob");
+    assertInLog(content, "Serializing the Delete operation");
+  }
+
+  /*
+   * Test case for delete operation with multiple threads and flat listing enabled.
+   */
+  @Test
+  public void testDeleteThreadPoolExecuteSingleThreadFailure() throws Exception {
+
+    // Spy azure file system object and return mocked thread pool
+    NativeAzureFileSystem mockFs = Mockito.spy((NativeAzureFileSystem) fs);
+
+    // Spy a thread pool executor and link it to azure file system object.
+    String path = mockFs.pathToKey(mockFs.makeAbsolute(new Path("root")));
+    AzureFileSystemThreadPoolExecutor mockThreadPoolExecutor = Mockito.spy(
+        mockFs.getThreadPoolExecutor(deleteThreads, "AzureBlobDeleteThread", "Delete",
+            path, NativeAzureFileSystem.AZURE_DELETE_THREADS));
+
+    // With single iteration, we would have created 7 blobs resulting 7 threads.
+    Mockito.when(mockFs.getThreadPoolExecutor(deleteThreads, "AzureBlobDeleteThread", "Delete",
+        path, NativeAzureFileSystem.AZURE_DELETE_THREADS)).thenReturn(mockThreadPoolExecutor);
+
+    // Create a thread executor and link it to mocked thread pool executor object.
+    ThreadPoolExecutor mockThreadExecutor = Mockito.spy(mockThreadPoolExecutor.getThreadPool(7));
+    Mockito.when(mockThreadPoolExecutor.getThreadPool(7)).thenReturn(mockThreadExecutor);
+
+    // Mock thread executor to throw exception for all requests.
+    Mockito.doCallRealMethod().doThrow(new RejectedExecutionException()).when(mockThreadExecutor).execute(Mockito.any(Runnable.class));
+
+    validateDeleteFolder(mockFs, "root");
+
+    // Validate from logs that threads are enabled and unused threads.
+    String content = logs.getOutput();
+    assertInLog(content,
+        "Using thread pool for Delete operation with threads 7");
+    assertInLog(content,
+        "6 threads not used for Delete operation on blob");
+  }
+
+  /*
+   * Test case for delete operation with multiple threads and flat listing enabled.
+   */
+  @Test
+  public void testDeleteThreadPoolTerminationFailure() throws Exception {
+
+    // Spy azure file system object and return mocked thread pool
+    NativeAzureFileSystem mockFs = Mockito.spy((NativeAzureFileSystem) fs);
+
+    // Spy a thread pool executor and link it to azure file system object.
+    String path = mockFs.pathToKey(mockFs.makeAbsolute(new Path("root")));
+    AzureFileSystemThreadPoolExecutor mockThreadPoolExecutor = Mockito.spy(
+        ((NativeAzureFileSystem) fs).getThreadPoolExecutor(deleteThreads, "AzureBlobDeleteThread", "Delete",
+            path, NativeAzureFileSystem.AZURE_DELETE_THREADS));
+
+    // Create a thread executor and link it to mocked thread pool executor object.
+    // Mock thread executor to throw exception for terminating threads.
+    ThreadPoolExecutor mockThreadExecutor = Mockito.mock(ThreadPoolExecutor.class);
+    Mockito.doNothing().when(mockThreadExecutor).execute(Mockito.any(Runnable.class));
+    Mockito.when(mockThreadExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS)).thenThrow(new InterruptedException());
+
+    Mockito.when(mockThreadPoolExecutor.getThreadPool(7)).thenReturn(mockThreadExecutor);
+
+    // With single iteration, we would have created 7 blobs resulting 7 threads.
+    Mockito.when(mockFs.getThreadPoolExecutor(deleteThreads, "AzureBlobDeleteThread", "Delete",
+        path, NativeAzureFileSystem.AZURE_DELETE_THREADS)).thenReturn(mockThreadPoolExecutor);
+
+    createFolder(mockFs, "root");
+    Path sourceFolder = new Path("root");
+    boolean exception = false;
+    try {
+      mockFs.delete(sourceFolder, true);
+    } catch (IOException e){
+      exception = true;
+    }
+
+    assertTrue(exception);
+    assertTrue(mockFs.exists(sourceFolder));
+
+    // Validate from logs that threads are enabled and delete operation is failed.
+    String content = logs.getOutput();
+    assertInLog(content,
+        "Using thread pool for Delete operation with threads");
+    assertInLog(content, "Threads got interrupted Delete blob operation");
+    assertInLog(content,
+        "Delete failed as operation on subfolders and files failed.");
+  }
+
+  /*
+   * Validate that when a directory is deleted recursively, the operation succeeds
+   * even if a child directory delete fails because the directory does not exist.
+   * This can happen if a child directory is deleted by an external agent while
+   * the parent is in progress of being deleted recursively.
+   */
+  @Test
+  public void testRecursiveDirectoryDeleteWhenChildDirectoryDeleted()
+      throws Exception {
+    testRecusiveDirectoryDelete(true);
+  }
+
+  /*
+   * Validate that when a directory is deleted recursively, the operation succeeds
+   * even if a file delete fails because it does not exist.
+   * This can happen if a file is deleted by an external agent while
+   * the parent directory is in progress of being deleted.
+   */
+  @Test
+  public void testRecursiveDirectoryDeleteWhenDeletingChildFileReturnsFalse()
+      throws Exception {
+    testRecusiveDirectoryDelete(false);
+  }
+
+  private void testRecusiveDirectoryDelete(boolean useDir) throws Exception {
+    String childPathToBeDeletedByExternalAgent = (useDir)
+        ? "root/0"
+        : "root/0/fileToRename";
+    // Spy azure file system object and return false for deleting one file
+    NativeAzureFileSystem mockFs = Mockito.spy((NativeAzureFileSystem) fs);
+    String path = mockFs.pathToKey(mockFs.makeAbsolute(new Path(
+        childPathToBeDeletedByExternalAgent)));
+
+    Answer<Boolean> answer = new Answer<Boolean>() {
+      public Boolean answer(InvocationOnMock invocation) throws Throwable {
+        String path = (String) invocation.getArguments()[0];
+        boolean isDir = (boolean) invocation.getArguments()[1];
+        boolean realResult = fs.deleteFile(path, isDir);
+        assertTrue(realResult);
+        boolean fakeResult = false;
+        return fakeResult;
+      }
+    };
+
+    Mockito.when(mockFs.deleteFile(path, useDir)).thenAnswer(answer);
+
+    createFolder(mockFs, "root");
+    Path sourceFolder = new Path("root");
+
+    assertTrue(mockFs.delete(sourceFolder, true));
+    assertFalse(mockFs.exists(sourceFolder));
+
+    // Validate from logs that threads are enabled, that a child directory was
+    // deleted by an external caller, and the parent delete operation still
+    // succeeds.
+    String content = logs.getOutput();
+    assertInLog(content,
+        "Using thread pool for Delete operation with threads");
+    assertInLog(content, String.format("Attempt to delete non-existent %s %s",
+        useDir ? "directory" : "file", path));
+  }
+
+  /*
+   * Test case for delete operation with multiple threads and flat listing enabled.
+   */
+  @Test
+  public void testDeleteSingleDeleteException() throws Exception {
+
+    // Spy azure file system object and raise exception for deleting one file
+    NativeAzureFileSystem mockFs = Mockito.spy((NativeAzureFileSystem) fs);
+    String path = mockFs.pathToKey(mockFs.makeAbsolute(new Path("root/0")));
+    Mockito.doThrow(new IOException()).when(mockFs).deleteFile(path, true);
+
+    createFolder(mockFs, "root");
+    Path sourceFolder = new Path("root");
+
+    boolean exception = false;
+    try {
+      mockFs.delete(sourceFolder, true);
+    } catch (IOException e){
+      exception = true;
+    }
+
+    assertTrue(exception);
+    assertTrue(mockFs.exists(sourceFolder));
+
+    // Validate from logs that threads are enabled and delete operation failed.
+    String content = logs.getOutput();
+    assertInLog(content,
+        "Using thread pool for Delete operation with threads");
+    assertInLog(content,
+        "Encountered Exception for Delete operation for file " + path);
+    assertInLog(content,
+        "Terminating execution of Delete operation now as some other thread already got exception or operation failed");
+  }
+
+  /*
+   * Test case for rename operation with multiple threads and flat listing enabled.
+   */
+  @Test
+  public void testRenameThreadPoolExceptionFailure() throws Exception {
+
+    // Spy azure file system object and raise exception for new thread pool
+    NativeAzureFileSystem mockFs = Mockito.spy((NativeAzureFileSystem) fs);
+
+    String path = mockFs.pathToKey(mockFs.makeAbsolute(new Path("root")));
+    AzureFileSystemThreadPoolExecutor mockThreadPoolExecutor = Mockito.spy(
+        ((NativeAzureFileSystem) fs).getThreadPoolExecutor(renameThreads, "AzureBlobRenameThread", "Rename",
+            path, NativeAzureFileSystem.AZURE_RENAME_THREADS));
+    Mockito.when(mockThreadPoolExecutor.getThreadPool(7)).thenThrow(new Exception());
+
+    // With single iteration, we would have created 7 blobs resulting 7 threads.
+    Mockito.doReturn(mockThreadPoolExecutor).when(mockFs).getThreadPoolExecutor(renameThreads, "AzureBlobRenameThread", "Rename",
+        path, NativeAzureFileSystem.AZURE_RENAME_THREADS);
+
+    validateRenameFolder(mockFs, "root", "rootnew");
+
+    // Validate from logs that threads are disabled.
+    String content = logs.getOutput();
+    assertInLog(content, "Failed to create thread pool with threads");
+    assertInLog(content, "Serializing the Rename operation");
+  }
+
+  /*
+   * Test case for rename operation with multiple threads and flat listing enabled.
+   */
+  @Test
+  public void testRenameThreadPoolExecuteFailure() throws Exception {
+
+    // Mock thread pool executor to throw exception for all requests.
+    ThreadPoolExecutor mockThreadExecutor = Mockito.mock(ThreadPoolExecutor.class);
+    Mockito.doThrow(new RejectedExecutionException()).when(mockThreadExecutor).execute(Mockito.any(Runnable.class));
+
+    // Spy azure file system object and return mocked thread pool
+    NativeAzureFileSystem mockFs = Mockito.spy((NativeAzureFileSystem) fs);
+
+    String path = mockFs.pathToKey(mockFs.makeAbsolute(new Path("root")));
+    AzureFileSystemThreadPoolExecutor mockThreadPoolExecutor = Mockito.spy(
+        mockFs.getThreadPoolExecutor(renameThreads, "AzureBlobRenameThread", "Rename",
+            path, NativeAzureFileSystem.AZURE_RENAME_THREADS));
+    Mockito.when(mockThreadPoolExecutor.getThreadPool(7)).thenReturn(mockThreadExecutor);
+
+    // With single iteration, we would have created 7 blobs resulting 7 threads.
+    Mockito.when(mockFs.getThreadPoolExecutor(renameThreads, "AzureBlobRenameThread", "Rename",
+        path, NativeAzureFileSystem.AZURE_RENAME_THREADS)).thenReturn(mockThreadPoolExecutor);
+
+    validateRenameFolder(mockFs, "root", "rootnew");
+
+    // Validate from logs that threads are disabled.
+    String content = logs.getOutput();
+    assertInLog(content,
+        "Rejected execution of thread for Rename operation on blob");
+    assertInLog(content, "Serializing the Rename operation");
+  }
+
+  /*
+   * Test case for rename operation with multiple threads and flat listing enabled.
+   */
+  @Test
+  public void testRenameThreadPoolExecuteSingleThreadFailure() throws Exception {
+
+    // Spy azure file system object and return mocked thread pool
+    NativeAzureFileSystem mockFs = Mockito.spy((NativeAzureFileSystem) fs);
+
+    // Spy a thread pool executor and link it to azure file system object.
+    String path = mockFs.pathToKey(mockFs.makeAbsolute(new Path("root")));
+    AzureFileSystemThreadPoolExecutor mockThreadPoolExecutor = Mockito.spy(
+        mockFs.getThreadPoolExecutor(renameThreads, "AzureBlobRenameThread", "Rename",
+            path, NativeAzureFileSystem.AZURE_RENAME_THREADS));
+
+    // With single iteration, we would have created 7 blobs resulting 7 threads.
+    Mockito.when(mockFs.getThreadPoolExecutor(renameThreads, "AzureBlobRenameThread", "Rename",
+        path, NativeAzureFileSystem.AZURE_RENAME_THREADS)).thenReturn(mockThreadPoolExecutor);
+
+    // Create a thread executor and link it to mocked thread pool executor object.
+    ThreadPoolExecutor mockThreadExecutor = Mockito.spy(mockThreadPoolExecutor.getThreadPool(7));
+    Mockito.when(mockThreadPoolExecutor.getThreadPool(7)).thenReturn(mockThreadExecutor);
+
+    // Mock thread executor to throw exception for all requests.
+    Mockito.doCallRealMethod().doThrow(new RejectedExecutionException()).when(mockThreadExecutor).execute(Mockito.any(Runnable.class));
+
+    validateRenameFolder(mockFs, "root", "rootnew");
+
+    // Validate from logs that threads are enabled and unused threads exists.
+    String content = logs.getOutput();
+    assertInLog(content,
+        "Using thread pool for Rename operation with threads 7");
+    assertInLog(content,
+        "6 threads not used for Rename operation on blob");
+  }
+
+  /*
+   * Test case for rename operation with multiple threads and flat listing enabled.
+   */
+  @Test
+  public void testRenameThreadPoolTerminationFailure() throws Exception {
+
+    // Spy azure file system object and return mocked thread pool
+    NativeAzureFileSystem mockFs = Mockito.spy((NativeAzureFileSystem) fs);
+
+    // Spy a thread pool executor and link it to azure file system object.
+    String path = mockFs.pathToKey(mockFs.makeAbsolute(new Path("root")));
+    AzureFileSystemThreadPoolExecutor mockThreadPoolExecutor = Mockito.spy(
+        mockFs.getThreadPoolExecutor(renameThreads, "AzureBlobRenameThread", "Rename",
+            path, NativeAzureFileSystem.AZURE_RENAME_THREADS));
+
+    // With single iteration, we would have created 7 blobs resulting 7 threads.
+    Mockito.when(mockFs.getThreadPoolExecutor(renameThreads, "AzureBlobRenameThread", "Rename",
+        path, NativeAzureFileSystem.AZURE_RENAME_THREADS)).thenReturn(mockThreadPoolExecutor);
+
+    // Mock thread executor to throw exception for all requests.
+    ThreadPoolExecutor mockThreadExecutor = Mockito.mock(ThreadPoolExecutor.class);
+    Mockito.doNothing().when(mockThreadExecutor).execute(Mockito.any(Runnable.class));
+    Mockito.when(mockThreadExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS)).thenThrow(new InterruptedException());
+    Mockito.when(mockThreadPoolExecutor.getThreadPool(7)).thenReturn(mockThreadExecutor);
+
+
+    createFolder(mockFs, "root");
+    Path sourceFolder = new Path("root");
+    Path destFolder = new Path("rootnew");
+    boolean exception = false;
+    try {
+      mockFs.rename(sourceFolder, destFolder);
+    } catch (IOException e){
+      exception = true;
+    }
+
+    assertTrue(exception);
+    assertTrue(mockFs.exists(sourceFolder));
+
+    // Validate from logs that threads are enabled and rename operation is failed.
+    String content = logs.getOutput();
+    assertInLog(content,
+        "Using thread pool for Rename operation with threads");
+    assertInLog(content, "Threads got interrupted Rename blob operation");
+    assertInLog(content,
+        "Rename failed as operation on subfolders and files failed.");
+  }
+
+  /*
+   * Test case for rename operation with multiple threads and flat listing enabled.
+   */
+  @Test
+  public void testRenameSingleRenameException() throws Exception {
+
+    // Spy azure file system object and raise exception for deleting one file
+    Path sourceFolder = new Path("root");
+    Path destFolder = new Path("rootnew");
+
+    // Spy azure file system object and populate rename pending spy object.
+    NativeAzureFileSystem mockFs = Mockito.spy((NativeAzureFileSystem) fs);
+
+    // Populate data now only such that rename pending spy object would see this data.
+    createFolder(mockFs, "root");
+
+    String srcKey = mockFs.pathToKey(mockFs.makeAbsolute(sourceFolder));
+    String dstKey = mockFs.pathToKey(mockFs.makeAbsolute(destFolder));
+
+    FolderRenamePending mockRenameFs = Mockito.spy(mockFs.prepareAtomicFolderRename(srcKey, dstKey));
+    Mockito.when(mockFs.prepareAtomicFolderRename(srcKey, dstKey)).thenReturn(mockRenameFs);
+    String path = mockFs.pathToKey(mockFs.makeAbsolute(new Path("root/0")));
+    Mockito.doThrow(new IOException()).when(mockRenameFs).renameFile(Mockito.any(FileMetadata.class));
+
+    boolean exception = false;
+    try {
+      mockFs.rename(sourceFolder, destFolder);
+    } catch (IOException e){
+      exception = true;
+    }
+
+    assertTrue(exception);
+    assertTrue(mockFs.exists(sourceFolder));
+
+    // Validate from logs that threads are enabled and delete operation failed.
+    String content = logs.getOutput();
+    assertInLog(content,
+        "Using thread pool for Rename operation with threads");
+    assertInLog(content,
+        "Encountered Exception for Rename operation for file " + path);
+    assertInLog(content,
+        "Terminating execution of Rename operation now as some other thread already got exception or operation failed");
+  }
+
+  @Override
+  protected AzureBlobStorageTestAccount createTestAccount() throws Exception {
+    return AzureBlobStorageTestAccount.create();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2d2d97fa/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestNativeAzureFSAuthWithBlobSpecificKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestNativeAzureFSAuthWithBlobSpecificKeys.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestNativeAzureFSAuthWithBlobSpecificKeys.java
new file mode 100644
index 0000000..d7e4831
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestNativeAzureFSAuthWithBlobSpecificKeys.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azure;
+
+import org.apache.hadoop.conf.Configuration;
+
+import static org.apache.hadoop.fs.azure.SecureStorageInterfaceImpl.KEY_USE_CONTAINER_SASKEY_FOR_ALL_ACCESS;
+
+/**
+ * Test class to hold all WASB authorization tests that use blob-specific keys
+ * to access storage.
+ */
+public class ITestNativeAzureFSAuthWithBlobSpecificKeys
+    extends ITestNativeAzureFileSystemAuthorizationWithOwner {
+
+
+  @Override
+  public Configuration createConfiguration() {
+    Configuration conf = super.createConfiguration();
+    conf.set(KEY_USE_CONTAINER_SASKEY_FOR_ALL_ACCESS, "false");
+    return conf;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2d2d97fa/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestNativeAzureFSAuthorizationCaching.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestNativeAzureFSAuthorizationCaching.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestNativeAzureFSAuthorizationCaching.java
new file mode 100644
index 0000000..c73b1cc
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestNativeAzureFSAuthorizationCaching.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azure;
+
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Test;
+
+import static org.apache.hadoop.fs.azure.CachingAuthorizer.KEY_AUTH_SERVICE_CACHING_ENABLE;
+
+/**
+ * Test class to hold all WASB authorization caching related tests.
+ */
+public class ITestNativeAzureFSAuthorizationCaching
+    extends ITestNativeAzureFileSystemAuthorizationWithOwner {
+
+  private static final int DUMMY_TTL_VALUE = 5000;
+
+  @Override
+  public Configuration createConfiguration() {
+    Configuration conf = super.createConfiguration();
+    conf.set(KEY_AUTH_SERVICE_CACHING_ENABLE, "true");
+    return conf;
+  }
+
+  /**
+   * Test to verify cache behavior -- assert that PUT overwrites value if present
+   */
+  @Test
+  public void testCachePut() throws Throwable {
+    CachingAuthorizer<String, Integer> cache = new CachingAuthorizer<>(DUMMY_TTL_VALUE, "TEST");
+    cache.init(createConfiguration());
+    cache.put("TEST", 1);
+    cache.put("TEST", 3);
+    int result = cache.get("TEST");
+    assertEquals("Cache returned unexpected result", 3, result);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2d2d97fa/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestNativeAzureFSPageBlobLive.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestNativeAzureFSPageBlobLive.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestNativeAzureFSPageBlobLive.java
new file mode 100644
index 0000000..a4d8729
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestNativeAzureFSPageBlobLive.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.azure;
+
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Run the base Azure file system tests strictly on page blobs to make sure fundamental
+ * operations on page blob files and folders work as expected.
+ * These operations include create, delete, rename, list, and so on.
+ */
+public class ITestNativeAzureFSPageBlobLive extends
+    NativeAzureFileSystemBaseTest {
+
+  @Override
+  protected AzureBlobStorageTestAccount createTestAccount()
+      throws Exception {
+    Configuration conf = new Configuration();
+
+    // Configure the page blob directories key so every file created is a page blob.
+    conf.set(AzureNativeFileSystemStore.KEY_PAGE_BLOB_DIRECTORIES, "/");
+
+    // Configure the atomic rename directories key so every folder will have
+    // atomic rename applied.
+    conf.set(AzureNativeFileSystemStore.KEY_ATOMIC_RENAME_DIRECTORIES, "/");
+    return AzureBlobStorageTestAccount.create(conf);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2d2d97fa/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestNativeAzureFileSystemAppend.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestNativeAzureFileSystemAppend.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestNativeAzureFileSystemAppend.java
new file mode 100644
index 0000000..29611bf
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestNativeAzureFileSystemAppend.java
@@ -0,0 +1,350 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azure;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Arrays;
+
+import org.apache.commons.lang.RandomStringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.test.GenericTestUtils;
+
+import org.junit.Test;
+
+/**
+ * Test append operations.
+ */
+public class ITestNativeAzureFileSystemAppend extends AbstractWasbTestBase {
+
+  private Path testPath;
+
+  @Override
+  public Configuration createConfiguration() {
+    Configuration conf = super.createConfiguration();
+    conf.setBoolean(NativeAzureFileSystem.APPEND_SUPPORT_ENABLE_PROPERTY_NAME,
+        true);
+    return conf;
+  }
+
+  @Override
+  public void setUp() throws Exception {
+    super.setUp();
+    testPath = methodPath();
+  }
+
+
+  @Override
+  protected AzureBlobStorageTestAccount createTestAccount() throws Exception {
+    return AzureBlobStorageTestAccount.create(createConfiguration());
+  }
+
+  /*
+   * Helper method that creates test data of size provided by the
+   * "size" parameter.
+   */
+  private static byte[] getTestData(int size) {
+    byte[] testData = new byte[size];
+    System.arraycopy(RandomStringUtils.randomAlphabetic(size).getBytes(), 0, testData, 0, size);
+    return testData;
+  }
+
+  // Helper method to create file and write fileSize bytes of data on it.
+  private byte[] createBaseFileWithData(int fileSize, Path testPath) throws Throwable {
+
+    try(FSDataOutputStream createStream = fs.create(testPath)) {
+      byte[] fileData = null;
+
+      if (fileSize != 0) {
+        fileData = getTestData(fileSize);
+        createStream.write(fileData);
+      }
+      return fileData;
+    }
+  }
+
+  /*
+   * Helper method to verify a file data equal to "dataLength" parameter
+   */
+  private boolean verifyFileData(int dataLength, byte[] testData, int testDataIndex,
+      FSDataInputStream srcStream) {
+
+    try {
+
+      byte[] fileBuffer = new byte[dataLength];
+      byte[] testDataBuffer = new byte[dataLength];
+
+      int fileBytesRead = srcStream.read(fileBuffer);
+
+      if (fileBytesRead < dataLength) {
+        return false;
+      }
+
+      System.arraycopy(testData, testDataIndex, testDataBuffer, 0, dataLength);
+
+      if (!Arrays.equals(fileBuffer, testDataBuffer)) {
+        return false;
+      }
+
+      return true;
+
+    } catch (Exception ex) {
+      return false;
+    }
+
+  }
+
+  /*
+   * Helper method to verify Append on a testFile.
+   */
+  private boolean verifyAppend(byte[] testData, Path testFile) {
+
+    try(FSDataInputStream srcStream = fs.open(testFile)) {
+
+      int baseBufferSize = 2048;
+      int testDataSize = testData.length;
+      int testDataIndex = 0;
+
+      while (testDataSize > baseBufferSize) {
+
+        if (!verifyFileData(baseBufferSize, testData, testDataIndex, srcStream)) {
+          return false;
+        }
+        testDataIndex += baseBufferSize;
+        testDataSize -= baseBufferSize;
+      }
+
+      if (!verifyFileData(testDataSize, testData, testDataIndex, srcStream)) {
+        return false;
+      }
+
+      return true;
+    } catch(Exception ex) {
+      return false;
+    }
+  }
+
+  /*
+   * Test case to verify if an append on small size data works. This tests
+   * append E2E
+   */
+  @Test
+  public void testSingleAppend() throws Throwable{
+
+    FSDataOutputStream appendStream = null;
+    try {
+      int baseDataSize = 50;
+      byte[] baseDataBuffer = createBaseFileWithData(baseDataSize, testPath);
+
+      int appendDataSize = 20;
+      byte[] appendDataBuffer = getTestData(appendDataSize);
+      appendStream = fs.append(testPath, 10);
+      appendStream.write(appendDataBuffer);
+      appendStream.close();
+      byte[] testData = new byte[baseDataSize + appendDataSize];
+      System.arraycopy(baseDataBuffer, 0, testData, 0, baseDataSize);
+      System.arraycopy(appendDataBuffer, 0, testData, baseDataSize, appendDataSize);
+
+      assertTrue(verifyAppend(testData, testPath));
+    } finally {
+      if (appendStream != null) {
+        appendStream.close();
+      }
+    }
+  }
+
+  /*
+   * Test case to verify append to an empty file.
+   */
+  @Test
+  public void testSingleAppendOnEmptyFile() throws Throwable {
+
+    FSDataOutputStream appendStream = null;
+
+    try {
+      createBaseFileWithData(0, testPath);
+
+      int appendDataSize = 20;
+      byte[] appendDataBuffer = getTestData(appendDataSize);
+      appendStream = fs.append(testPath, 10);
+      appendStream.write(appendDataBuffer);
+      appendStream.close();
+
+      assertTrue(verifyAppend(appendDataBuffer, testPath));
+    } finally {
+      if (appendStream != null) {
+        appendStream.close();
+      }
+    }
+  }
+
+  /*
+   * Test to verify that we can open only one Append stream on a File.
+   */
+  @Test
+  public void testSingleAppenderScenario() throws Throwable {
+
+    FSDataOutputStream appendStream1 = null;
+    FSDataOutputStream appendStream2 = null;
+    IOException ioe = null;
+    try {
+      createBaseFileWithData(0, testPath);
+      appendStream1 = fs.append(testPath, 10);
+      boolean encounteredException = false;
+      try {
+        appendStream2 = fs.append(testPath, 10);
+      } catch(IOException ex) {
+        encounteredException = true;
+        ioe = ex;
+      }
+
+      appendStream1.close();
+
+      assertTrue(encounteredException);
+      GenericTestUtils.assertExceptionContains("Unable to set Append lease on the Blob", ioe);
+    } finally {
+      if (appendStream1 != null) {
+        appendStream1.close();
+      }
+
+      if (appendStream2 != null) {
+        appendStream2.close();
+      }
+    }
+  }
+
+  /*
+   * Tests to verify multiple appends on a Blob.
+   */
+  @Test
+  public void testMultipleAppends() throws Throwable {
+
+    int baseDataSize = 50;
+    byte[] baseDataBuffer = createBaseFileWithData(baseDataSize, testPath);
+
+    int appendDataSize = 100;
+    int targetAppendCount = 50;
+    byte[] testData = new byte[baseDataSize + (appendDataSize*targetAppendCount)];
+    int testDataIndex = 0;
+    System.arraycopy(baseDataBuffer, 0, testData, testDataIndex, baseDataSize);
+    testDataIndex += baseDataSize;
+
+    int appendCount = 0;
+
+    FSDataOutputStream appendStream = null;
+
+    try {
+      while (appendCount < targetAppendCount) {
+
+        byte[] appendDataBuffer = getTestData(appendDataSize);
+        appendStream = fs.append(testPath, 30);
+        appendStream.write(appendDataBuffer);
+        appendStream.close();
+
+        System.arraycopy(appendDataBuffer, 0, testData, testDataIndex, appendDataSize);
+        testDataIndex += appendDataSize;
+        appendCount++;
+      }
+
+      assertTrue(verifyAppend(testData, testPath));
+
+    } finally {
+      if (appendStream != null) {
+        appendStream.close();
+      }
+    }
+  }
+
+  /*
+   * Test to verify we multiple appends on the same stream.
+   */
+  @Test
+  public void testMultipleAppendsOnSameStream() throws Throwable {
+
+    int baseDataSize = 50;
+    byte[] baseDataBuffer = createBaseFileWithData(baseDataSize, testPath);
+    int appendDataSize = 100;
+    int targetAppendCount = 50;
+    byte[] testData = new byte[baseDataSize + (appendDataSize*targetAppendCount)];
+    int testDataIndex = 0;
+    System.arraycopy(baseDataBuffer, 0, testData, testDataIndex, baseDataSize);
+    testDataIndex += baseDataSize;
+    int appendCount = 0;
+
+    FSDataOutputStream appendStream = null;
+
+    try {
+
+      while (appendCount < targetAppendCount) {
+
+        appendStream = fs.append(testPath, 50);
+
+        int singleAppendChunkSize = 20;
+        int appendRunSize = 0;
+        while (appendRunSize < appendDataSize) {
+
+          byte[] appendDataBuffer = getTestData(singleAppendChunkSize);
+          appendStream.write(appendDataBuffer);
+          System.arraycopy(appendDataBuffer, 0, testData,
+              testDataIndex + appendRunSize, singleAppendChunkSize);
+
+          appendRunSize += singleAppendChunkSize;
+        }
+
+        appendStream.close();
+        testDataIndex += appendDataSize;
+        appendCount++;
+      }
+
+      assertTrue(verifyAppend(testData, testPath));
+    } finally {
+      if (appendStream != null) {
+        appendStream.close();
+      }
+    }
+  }
+
+  @Test(expected=UnsupportedOperationException.class)
+  /*
+   * Test to verify the behavior when Append Support configuration flag is set to false
+   */
+  public void testFalseConfigurationFlagBehavior() throws Throwable {
+
+    fs = testAccount.getFileSystem();
+    Configuration conf = fs.getConf();
+    conf.setBoolean(NativeAzureFileSystem.APPEND_SUPPORT_ENABLE_PROPERTY_NAME, false);
+    URI uri = fs.getUri();
+    fs.initialize(uri, conf);
+
+    FSDataOutputStream appendStream = null;
+
+    try {
+      createBaseFileWithData(0, testPath);
+      appendStream = fs.append(testPath, 10);
+    } finally {
+      if (appendStream != null) {
+        appendStream.close();
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2d2d97fa/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestNativeAzureFileSystemAtomicRenameDirList.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestNativeAzureFileSystemAtomicRenameDirList.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestNativeAzureFileSystemAtomicRenameDirList.java
new file mode 100644
index 0000000..869a31c
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestNativeAzureFileSystemAtomicRenameDirList.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azure;
+
+import java.io.IOException;
+import java.net.URI;
+
+import org.apache.hadoop.conf.Configuration;
+
+import org.junit.Test;
+
+/**
+ * Test atomic renaming.
+ */
+public class ITestNativeAzureFileSystemAtomicRenameDirList
+    extends AbstractWasbTestBase {
+
+  // HBase-site config controlling HBase root dir
+  private static final String HBASE_ROOT_DIR_CONF_STRING = "hbase.rootdir";
+  private static final String HBASE_ROOT_DIR_VALUE_ON_DIFFERENT_FS =
+      "wasb://somedifferentfilesystem.blob.core.windows.net/hbase";
+
+  @Override
+  protected AzureBlobStorageTestAccount createTestAccount() throws Exception {
+    return AzureBlobStorageTestAccount.create();
+  }
+
+  @Test
+  public void testAtomicRenameKeyDoesntNPEOnInitializingWithNonDefaultURI()
+      throws IOException {
+    NativeAzureFileSystem azureFs = fs;
+    AzureNativeFileSystemStore azureStore = azureFs.getStore();
+    Configuration conf = fs.getConf();
+    conf.set(HBASE_ROOT_DIR_CONF_STRING, HBASE_ROOT_DIR_VALUE_ON_DIFFERENT_FS);
+    URI uri = fs.getUri();
+    fs.initialize(uri, conf);
+    azureStore.isAtomicRenameKey("anyrandomkey");
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2d2d97fa/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestNativeAzureFileSystemAuthorizationWithOwner.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestNativeAzureFileSystemAuthorizationWithOwner.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestNativeAzureFileSystemAuthorizationWithOwner.java
new file mode 100644
index 0000000..3ec42f0
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestNativeAzureFileSystemAuthorizationWithOwner.java
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azure;
+
+import org.apache.hadoop.fs.contract.ContractTestUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import java.security.PrivilegedExceptionAction;
+
+import org.apache.hadoop.fs.Path;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Test class that runs wasb authorization tests with owner check enabled.
+ */
+public class ITestNativeAzureFileSystemAuthorizationWithOwner
+  extends TestNativeAzureFileSystemAuthorization {
+
+  @Override
+  public void setUp() throws Exception {
+    super.setUp();
+    authorizer.init(fs.getConf(), true);
+  }
+
+  /**
+   * Test case when owner matches current user.
+   */
+  @Test
+  public void testOwnerPermissionPositive() throws Throwable {
+
+    Path parentDir = new Path("/testOwnerPermissionPositive");
+    Path testPath = new Path(parentDir, "test.data");
+
+    authorizer.addAuthRule("/", WasbAuthorizationOperations.WRITE.toString(), true);
+    authorizer.addAuthRule(testPath.toString(), WasbAuthorizationOperations.READ.toString(), true);
+    authorizer.addAuthRule(parentDir.toString(), WasbAuthorizationOperations.WRITE.toString(), true);
+    // additional rule used for assertPathExists
+    authorizer.addAuthRule(parentDir.toString(), WasbAuthorizationOperations.READ.toString(), true);
+    fs.updateWasbAuthorizer(authorizer);
+
+    try {
+      // creates parentDir with owner as current user
+      fs.mkdirs(parentDir);
+      ContractTestUtils.assertPathExists(fs, "parentDir does not exist", parentDir);
+
+      fs.create(testPath);
+      fs.getFileStatus(testPath);
+      ContractTestUtils.assertPathExists(fs, "testPath does not exist", testPath);
+
+    } finally {
+      allowRecursiveDelete(fs, parentDir.toString());
+      fs.delete(parentDir, true);
+    }
+  }
+
+  /**
+   * Negative test case for owner does not match current user.
+   */
+  @Test
+  public void testOwnerPermissionNegative() throws Throwable {
+    expectedEx.expect(WasbAuthorizationException.class);
+
+    Path parentDir = new Path("/testOwnerPermissionNegative");
+    Path childDir = new Path(parentDir, "childDir");
+
+    setExpectedFailureMessage("mkdirs", childDir);
+
+    authorizer.addAuthRule("/", WasbAuthorizationOperations.WRITE.toString(), true);
+    authorizer.addAuthRule(parentDir.toString(), WasbAuthorizationOperations.WRITE.toString(), true);
+
+    fs.updateWasbAuthorizer(authorizer);
+
+    try{
+      fs.mkdirs(parentDir);
+      UserGroupInformation ugiSuperUser = UserGroupInformation.createUserForTesting(
+          "testuser", new String[] {});
+
+      ugiSuperUser.doAs(new PrivilegedExceptionAction<Void>() {
+      @Override
+      public Void run() throws Exception {
+          fs.mkdirs(childDir);
+          return null;
+        }
+      });
+
+    } finally {
+       allowRecursiveDelete(fs, parentDir.toString());
+       fs.delete(parentDir, true);
+    }
+  }
+
+  /**
+   * Test to verify that retrieving owner information does not
+   * throw when file/folder does not exist.
+   */
+  @Test
+  public void testRetrievingOwnerDoesNotFailWhenFileDoesNotExist() throws Throwable {
+
+    Path testdirectory = new Path("/testDirectory123454565");
+
+    String owner = fs.getOwnerForPath(testdirectory);
+    assertEquals("", owner);
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2d2d97fa/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestNativeAzureFileSystemClientLogging.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestNativeAzureFileSystemClientLogging.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestNativeAzureFileSystemClientLogging.java
new file mode 100644
index 0000000..f73a763
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestNativeAzureFileSystemClientLogging.java
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azure;
+
+import java.net.URI;
+import java.util.StringTokenizer;
+
+import org.apache.commons.logging.impl.Log4JLogger;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.test.GenericTestUtils.LogCapturer;
+import org.apache.log4j.Logger;
+import org.junit.Test;
+
+/**
+ * Test to validate Azure storage client side logging. Tests works only when
+ * testing with Live Azure storage because Emulator does not have support for
+ * client-side logging.
+ *
+ * <I>Important: </I> Do not attempt to move off commons-logging.
+ * The tests will fail.
+ */
+public class ITestNativeAzureFileSystemClientLogging
+    extends AbstractWasbTestBase {
+
+  // Core-site config controlling Azure Storage Client logging
+  private static final String KEY_LOGGING_CONF_STRING = "fs.azure.storage.client.logging";
+
+  // Temporary directory created using WASB.
+  private static final String TEMP_DIR = "tempDir";
+
+  /*
+   * Helper method to verify the client logging is working. This check primarily
+   * checks to make sure we see a line in the logs corresponding to the entity
+   * that is created during test run.
+   */
+  private boolean verifyStorageClientLogs(String capturedLogs, String entity)
+      throws Exception {
+
+    URI uri = testAccount.getRealAccount().getBlobEndpoint();
+    String container = testAccount.getRealContainer().getName();
+    String validateString = uri + Path.SEPARATOR + container + Path.SEPARATOR
+        + entity;
+    boolean entityFound = false;
+
+    StringTokenizer tokenizer = new StringTokenizer(capturedLogs, "\n");
+
+    while (tokenizer.hasMoreTokens()) {
+      String token = tokenizer.nextToken();
+      if (token.contains(validateString)) {
+        entityFound = true;
+        break;
+      }
+    }
+    return entityFound;
+  }
+
+  /*
+   * Helper method that updates the core-site config to enable/disable logging.
+   */
+  private void updateFileSystemConfiguration(Boolean loggingFlag)
+      throws Exception {
+
+    Configuration conf = fs.getConf();
+    conf.set(KEY_LOGGING_CONF_STRING, loggingFlag.toString());
+    URI uri = fs.getUri();
+    fs.initialize(uri, conf);
+  }
+
+  // Using WASB code to communicate with Azure Storage.
+  private void performWASBOperations() throws Exception {
+
+    Path tempDir = new Path(Path.SEPARATOR + TEMP_DIR);
+    fs.mkdirs(tempDir);
+    fs.delete(tempDir, true);
+  }
+
+  @Test
+  public void testLoggingEnabled() throws Exception {
+
+    LogCapturer logs = LogCapturer.captureLogs(new Log4JLogger(Logger
+        .getRootLogger()));
+
+    // Update configuration based on the Test.
+    updateFileSystemConfiguration(true);
+
+    performWASBOperations();
+
+    String output = getLogOutput(logs);
+    assertTrue("Log entry " + TEMP_DIR + " not found  in " + output,
+        verifyStorageClientLogs(output, TEMP_DIR));
+  }
+
+  protected String getLogOutput(LogCapturer logs) {
+    String output = logs.getOutput();
+    assertTrue("No log created/captured", !output.isEmpty());
+    return output;
+  }
+
+  @Test
+  public void testLoggingDisabled() throws Exception {
+
+    LogCapturer logs = LogCapturer.captureLogs(new Log4JLogger(Logger
+        .getRootLogger()));
+
+    // Update configuration based on the Test.
+    updateFileSystemConfiguration(false);
+
+    performWASBOperations();
+    String output = getLogOutput(logs);
+
+    assertFalse("Log entry " + TEMP_DIR + " found  in " + output,
+        verifyStorageClientLogs(output, TEMP_DIR));
+  }
+
+  @Override
+  protected AzureBlobStorageTestAccount createTestAccount() throws Exception {
+    return AzureBlobStorageTestAccount.create();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2d2d97fa/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestNativeAzureFileSystemConcurrencyLive.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestNativeAzureFileSystemConcurrencyLive.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestNativeAzureFileSystemConcurrencyLive.java
new file mode 100644
index 0000000..87cac15
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestNativeAzureFileSystemConcurrencyLive.java
@@ -0,0 +1,185 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azure;
+
+
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+/***
+ * Test class to hold all Live Azure storage concurrency tests.
+ */
+public class ITestNativeAzureFileSystemConcurrencyLive
+    extends AbstractWasbTestBase {
+
+  private static final int THREAD_COUNT = 102;
+  private static final int TEST_EXECUTION_TIMEOUT = 5000;
+
+  @Override
+  protected AzureBlobStorageTestAccount createTestAccount() throws Exception {
+    return AzureBlobStorageTestAccount.create();
+  }
+
+  /**
+   * Validate contract for FileSystem.create when overwrite is true and there
+   * are concurrent callers of FileSystem.delete.  An existing file should be
+   * overwritten, even if the original destination exists but is deleted by an
+   * external agent during the create operation.
+   */
+  @Test(timeout = TEST_EXECUTION_TIMEOUT)
+  public void testConcurrentCreateDeleteFile() throws Exception {
+    Path testFile = methodPath();
+
+    List<CreateFileTask> tasks = new ArrayList<>(THREAD_COUNT);
+
+    for (int i = 0; i < THREAD_COUNT; i++) {
+      tasks.add(new CreateFileTask(fs, testFile));
+    }
+
+    ExecutorService es = null;
+
+    try {
+      es = Executors.newFixedThreadPool(THREAD_COUNT);
+
+      List<Future<Void>> futures = es.invokeAll(tasks);
+
+      for (Future<Void> future : futures) {
+        Assert.assertTrue(future.isDone());
+
+        // we are using Callable<V>, so if an exception
+        // occurred during the operation, it will be thrown
+        // when we call get
+        Assert.assertEquals(null, future.get());
+      }
+    } finally {
+      if (es != null) {
+        es.shutdownNow();
+      }
+    }
+  }
+
+  /**
+   * Validate contract for FileSystem.delete when invoked concurrently.
+   * One of the threads should successfully delete the file and return true;
+   * all other threads should return false.
+   */
+  @Test(timeout = TEST_EXECUTION_TIMEOUT)
+  public void testConcurrentDeleteFile() throws Exception {
+    Path testFile = new Path("test.dat");
+    fs.create(testFile).close();
+
+    List<DeleteFileTask> tasks = new ArrayList<>(THREAD_COUNT);
+
+    for (int i = 0; i < THREAD_COUNT; i++) {
+      tasks.add(new DeleteFileTask(fs, testFile));
+    }
+
+    ExecutorService es = null;
+    try {
+      es = Executors.newFixedThreadPool(THREAD_COUNT);
+
+      List<Future<Boolean>> futures = es.invokeAll(tasks);
+
+      int successCount = 0;
+      for (Future<Boolean> future : futures) {
+        Assert.assertTrue(future.isDone());
+
+        // we are using Callable<V>, so if an exception
+        // occurred during the operation, it will be thrown
+        // when we call get
+        Boolean success = future.get();
+        if (success) {
+          successCount++;
+        }
+      }
+
+      Assert.assertEquals(
+          "Exactly one delete operation should return true.",
+          1,
+          successCount);
+    } finally {
+      if (es != null) {
+        es.shutdownNow();
+      }
+    }
+  }
+
+  abstract class FileSystemTask<V> implements Callable<V> {
+    private final FileSystem fileSystem;
+    private final Path path;
+
+    protected FileSystem getFileSystem() {
+      return this.fileSystem;
+    }
+
+    protected Path getFilePath() {
+      return this.path;
+    }
+
+    FileSystemTask(FileSystem fs, Path p) {
+      this.fileSystem = fs;
+      this.path = p;
+    }
+
+    public abstract V call() throws Exception;
+  }
+
+  class DeleteFileTask extends FileSystemTask<Boolean> {
+
+    DeleteFileTask(FileSystem fs, Path p) {
+      super(fs, p);
+    }
+
+    @Override
+    public Boolean call() throws Exception {
+      return this.getFileSystem().delete(this.getFilePath(), false);
+    }
+  }
+
+  class CreateFileTask extends FileSystemTask<Void> {
+    CreateFileTask(FileSystem fs, Path p) {
+      super(fs, p);
+    }
+
+    public Void call() throws Exception {
+      FileSystem fs = getFileSystem();
+      Path p = getFilePath();
+
+      // Create an empty file and close the stream.
+      FSDataOutputStream stream = fs.create(p, true);
+      stream.close();
+
+      // Delete the file.  We don't care if delete returns true or false.
+      // We just want to ensure the file does not exist.
+      this.getFileSystem().delete(this.getFilePath(), false);
+
+      return null;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2d2d97fa/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestNativeAzureFileSystemContractEmulator.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestNativeAzureFileSystemContractEmulator.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestNativeAzureFileSystemContractEmulator.java
new file mode 100644
index 0000000..4836fc4
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestNativeAzureFileSystemContractEmulator.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azure;
+
+import static org.junit.Assume.assumeNotNull;
+
+import org.apache.hadoop.fs.FileSystemContractBaseTest;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.azure.integration.AzureTestUtils;
+
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.rules.TestName;
+
+/**
+ * Run the {@code FileSystemContractBaseTest} tests against the emulator
+ */
+public class ITestNativeAzureFileSystemContractEmulator extends
+    FileSystemContractBaseTest {
+  private AzureBlobStorageTestAccount testAccount;
+  private Path basePath;
+
+  @Rule
+  public TestName methodName = new TestName();
+
+  private void nameThread() {
+    Thread.currentThread().setName("JUnit-" + methodName.getMethodName());
+  }
+
+  @Before
+  public void setUp() throws Exception {
+    nameThread();
+    testAccount = AzureBlobStorageTestAccount.createForEmulator();
+    if (testAccount != null) {
+      fs = testAccount.getFileSystem();
+    }
+    assumeNotNull(fs);
+    basePath = fs.makeQualified(
+        AzureTestUtils.createTestPath(
+            new Path("ITestNativeAzureFileSystemContractEmulator")));
+  }
+
+  @Override
+  public void tearDown() throws Exception {
+    super.tearDown();
+    testAccount = AzureTestUtils.cleanup(testAccount);
+    fs = null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2d2d97fa/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestNativeAzureFileSystemContractLive.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestNativeAzureFileSystemContractLive.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestNativeAzureFileSystemContractLive.java
new file mode 100644
index 0000000..d3d1bd8
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestNativeAzureFileSystemContractLive.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azure;
+
+import static org.junit.Assume.assumeNotNull;
+
+import org.apache.hadoop.fs.FileSystemContractBaseTest;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.azure.integration.AzureTestConstants;
+import org.apache.hadoop.fs.azure.integration.AzureTestUtils;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+
+/**
+ * Run the {@link FileSystemContractBaseTest} test suite against azure storage.
+ */
+public class ITestNativeAzureFileSystemContractLive extends
+    FileSystemContractBaseTest {
+  private AzureBlobStorageTestAccount testAccount;
+  private Path basePath;
+
+  @Rule
+  public TestName methodName = new TestName();
+
+  private void nameThread() {
+    Thread.currentThread().setName("JUnit-" + methodName.getMethodName());
+  }
+
+  @Before
+  public void setUp() throws Exception {
+    nameThread();
+    testAccount = AzureBlobStorageTestAccount.create();
+    if (testAccount != null) {
+      fs = testAccount.getFileSystem();
+    }
+    assumeNotNull(fs);
+    basePath = fs.makeQualified(
+        AzureTestUtils.createTestPath(
+            new Path("NativeAzureFileSystemContractLive")));
+  }
+
+  @Override
+  public void tearDown() throws Exception {
+    super.tearDown();
+    testAccount = AzureTestUtils.cleanup(testAccount);
+    fs = null;
+  }
+
+  @Override
+  public Path getTestBaseDir() {
+    return basePath;
+  }
+
+  protected int getGlobalTimeout() {
+    return AzureTestConstants.AZURE_TEST_TIMEOUT;
+  }
+
+  /**
+   * The following tests are failing on Azure and the Azure 
+   * file system code needs to be modified to make them pass.
+   * A separate work item has been opened for this.
+   */
+  @Ignore
+  @Test
+  public void testMoveFileUnderParent() throws Throwable {
+  }
+
+  @Ignore
+  @Test
+  public void testRenameFileToSelf() throws Throwable {
+  }
+
+  @Ignore
+  @Test
+  public void testRenameChildDirForbidden() throws Exception {
+  }
+
+  @Ignore
+  @Test
+  public void testMoveDirUnderParent() throws Throwable {
+  }
+
+  @Ignore
+  @Test
+  public void testRenameDirToSelf() throws Throwable {
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2d2d97fa/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestNativeAzureFileSystemContractPageBlobLive.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestNativeAzureFileSystemContractPageBlobLive.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestNativeAzureFileSystemContractPageBlobLive.java
new file mode 100644
index 0000000..03e90aa
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestNativeAzureFileSystemContractPageBlobLive.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azure;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystemContractBaseTest;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.azure.integration.AzureTestConstants;
+import org.apache.hadoop.fs.azure.integration.AzureTestUtils;
+
+import static org.junit.Assume.assumeNotNull;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+
+/**
+ * Run the {@link FileSystemContractBaseTest} test suite against azure
+ * storage, after switching the FS using page blobs everywhere.
+ */
+public class ITestNativeAzureFileSystemContractPageBlobLive extends
+    FileSystemContractBaseTest {
+  private AzureBlobStorageTestAccount testAccount;
+  private Path basePath;
+  @Rule
+  public TestName methodName = new TestName();
+
+  private void nameThread() {
+    Thread.currentThread().setName("JUnit-" + methodName.getMethodName());
+  }
+
+  private AzureBlobStorageTestAccount createTestAccount()
+      throws Exception {
+    Configuration conf = new Configuration();
+
+    // Configure the page blob directories key so every file created is a page blob.
+    conf.set(AzureNativeFileSystemStore.KEY_PAGE_BLOB_DIRECTORIES, "/");
+
+    // Configure the atomic rename directories key so every folder will have
+    // atomic rename applied.
+    conf.set(AzureNativeFileSystemStore.KEY_ATOMIC_RENAME_DIRECTORIES, "/");
+    return AzureBlobStorageTestAccount.create(conf);
+  }
+
+  @Before
+  public void setUp() throws Exception {
+    testAccount = createTestAccount();
+    assumeNotNull(testAccount);
+    fs = testAccount.getFileSystem();
+    basePath = AzureTestUtils.pathForTests(fs, "filesystemcontractpageblob");
+  }
+
+  @Override
+  public void tearDown() throws Exception {
+    testAccount = AzureTestUtils.cleanup(testAccount);
+    fs = null;
+  }
+
+  protected int getGlobalTimeout() {
+    return AzureTestConstants.AZURE_TEST_TIMEOUT;
+  }
+
+  @Override
+  public Path getTestBaseDir() {
+    return basePath;
+  }
+
+  /**
+   * The following tests are failing on Azure and the Azure 
+   * file system code needs to be modified to make them pass.
+   * A separate work item has been opened for this.
+   */
+  @Ignore
+  @Test
+  public void testMoveFileUnderParent() throws Throwable {
+  }
+
+  @Ignore
+  @Test
+  public void testRenameFileToSelf() throws Throwable {
+  }
+  
+  @Ignore
+  @Test
+  public void testRenameChildDirForbidden() throws Exception {
+  }
+  
+  @Ignore
+  @Test
+  public void testMoveDirUnderParent() throws Throwable {
+  }
+  
+  @Ignore
+  @Test
+  public void testRenameDirToSelf() throws Throwable {
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org