You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by st...@apache.org on 2017/09/15 16:39:33 UTC
[08/20] hadoop git commit: HADOOP-14553. Add (parallelized)
integration tests to hadoop-azure Contributed by Steve Loughran
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2d2d97fa/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestFileSystemOperationsWithThreads.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestFileSystemOperationsWithThreads.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestFileSystemOperationsWithThreads.java
new file mode 100644
index 0000000..4389fda
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestFileSystemOperationsWithThreads.java
@@ -0,0 +1,821 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azure;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.impl.Log4JLogger;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.azure.NativeAzureFileSystem.FolderRenamePending;
+import org.apache.hadoop.test.GenericTestUtils.LogCapturer;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+/**
+ * Tests the Native Azure file system (WASB) using parallel threads for rename and delete operations.
+ */
+public class ITestFileSystemOperationsWithThreads extends AbstractWasbTestBase {
+
+ private final int renameThreads = 10;
+ private final int deleteThreads = 20;
+ private int iterations = 1;
+ private LogCapturer logs = null;
+
+ @Rule
+ public ExpectedException exception = ExpectedException.none();
+
+ @Before
+ public void setUp() throws Exception {
+ super.setUp();
+ Configuration conf = fs.getConf();
+
+ // By default enable parallel threads for rename and delete operations.
+ // Also enable flat listing of blobs for these operations.
+ conf.setInt(NativeAzureFileSystem.AZURE_RENAME_THREADS, renameThreads);
+ conf.setInt(NativeAzureFileSystem.AZURE_DELETE_THREADS, deleteThreads);
+ conf.setBoolean(AzureNativeFileSystemStore.KEY_ENABLE_FLAT_LISTING, true);
+
+ URI uri = fs.getUri();
+ fs.initialize(uri, conf);
+
+ // Capture logs
+ logs = LogCapturer.captureLogs(new Log4JLogger(org.apache.log4j.Logger
+ .getRootLogger()));
+ }
+
+ /*
+ * Helper method to create sub directory and different types of files
+ * for multiple iterations.
+ */
+ private void createFolder(FileSystem fs, String root) throws Exception {
+ fs.mkdirs(new Path(root));
+ for (int i = 0; i < this.iterations; i++) {
+ fs.mkdirs(new Path(root + "/" + i));
+ fs.createNewFile(new Path(root + "/" + i + "/fileToRename"));
+ fs.createNewFile(new Path(root + "/" + i + "/file/to/rename"));
+ fs.createNewFile(new Path(root + "/" + i + "/file+to%rename"));
+ fs.createNewFile(new Path(root + "/fileToRename" + i));
+ }
+ }
+
+ /*
+ * Helper method to do rename operation and validate all files in source folder
+ * doesn't exists and similar files exists in new folder.
+ */
+ private void validateRenameFolder(FileSystem fs, String source, String dest) throws Exception {
+ // Create source folder with files.
+ createFolder(fs, source);
+ Path sourceFolder = new Path(source);
+ Path destFolder = new Path(dest);
+
+ // rename operation
+ assertTrue(fs.rename(sourceFolder, destFolder));
+ assertTrue(fs.exists(destFolder));
+
+ for (int i = 0; i < this.iterations; i++) {
+ // Check destination folder and files exists.
+ assertTrue(fs.exists(new Path(dest + "/" + i)));
+ assertTrue(fs.exists(new Path(dest + "/" + i + "/fileToRename")));
+ assertTrue(fs.exists(new Path(dest + "/" + i + "/file/to/rename")));
+ assertTrue(fs.exists(new Path(dest + "/" + i + "/file+to%rename")));
+ assertTrue(fs.exists(new Path(dest + "/fileToRename" + i)));
+
+ // Check source folder and files doesn't exists.
+ assertFalse(fs.exists(new Path(source + "/" + i)));
+ assertFalse(fs.exists(new Path(source + "/" + i + "/fileToRename")));
+ assertFalse(fs.exists(new Path(source + "/" + i + "/file/to/rename")));
+ assertFalse(fs.exists(new Path(source + "/" + i + "/file+to%rename")));
+ assertFalse(fs.exists(new Path(source + "/fileToRename" + i)));
+ }
+ }
+
+ /*
+ * Test case for rename operation with multiple threads and flat listing enabled.
+ */
+ @Test
+ public void testRenameSmallFolderWithThreads() throws Exception {
+
+ validateRenameFolder(fs, "root", "rootnew");
+
+ // With single iteration, we would have created 7 blobs.
+ int expectedThreadsCreated = Math.min(7, renameThreads);
+
+ // Validate from logs that threads are created.
+ String content = logs.getOutput();
+ assertInLog(content, "ms with threads: " + expectedThreadsCreated);
+
+ // Validate thread executions
+ for (int i = 0; i < expectedThreadsCreated; i++) {
+ assertInLog(content,
+ "AzureBlobRenameThread-" + Thread.currentThread().getName() + "-" + i);
+ }
+
+ // Also ensure that we haven't spawned extra threads.
+ if (expectedThreadsCreated < renameThreads) {
+ for (int i = expectedThreadsCreated; i < renameThreads; i++) {
+ assertNotInLog(content,
+ "AzureBlobRenameThread-" + Thread.currentThread().getName() + "-" + i);
+ }
+ }
+ }
+
+ /*
+ * Test case for rename operation with multiple threads and flat listing enabled.
+ */
+ @Test
+ public void testRenameLargeFolderWithThreads() throws Exception {
+
+ // Populate source folder with large number of files and directories.
+ this.iterations = 10;
+ validateRenameFolder(fs, "root", "rootnew");
+
+ // Validate from logs that threads are created.
+ String content = logs.getOutput();
+ assertInLog(content, "ms with threads: " + renameThreads);
+
+ // Validate thread executions
+ for (int i = 0; i < renameThreads; i++) {
+ assertInLog(content,
+ "AzureBlobRenameThread-" + Thread.currentThread().getName() + "-" + i);
+ }
+ }
+
+ /*
+ * Test case for rename operation with threads disabled and flat listing enabled.
+ */
+ @Test
+ public void testRenameLargeFolderDisableThreads() throws Exception {
+ Configuration conf = fs.getConf();
+
+ // Number of threads set to 0 or 1 disables threads.
+ conf.setInt(NativeAzureFileSystem.AZURE_RENAME_THREADS, 0);
+ URI uri = fs.getUri();
+ fs.initialize(uri, conf);
+
+ // Populate source folder with large number of files and directories.
+ this.iterations = 10;
+ validateRenameFolder(fs, "root", "rootnew");
+
+ // Validate from logs that threads are disabled.
+ String content = logs.getOutput();
+ assertInLog(content,
+ "Disabling threads for Rename operation as thread count 0");
+
+ // Validate no thread executions
+ for (int i = 0; i < renameThreads; i++) {
+ String term = "AzureBlobRenameThread-"
+ + Thread.currentThread().getName()
+ + "-" + i;
+ assertNotInLog(content, term);
+ }
+ }
+
+ /**
+ * Assert that a log contains the given term.
+ * @param content log output
+ * @param term search term
+ */
+ protected void assertInLog(String content, String term) {
+ assertTrue("Empty log", !content.isEmpty());
+ if (!content.contains(term)) {
+ String message = "No " + term + " found in logs";
+ LOG.error(message);
+ System.err.println(content);
+ fail(message);
+ }
+ }
+
+ /**
+ * Assert that a log does not contain the given term.
+ * @param content log output
+ * @param term search term
+ */
+ protected void assertNotInLog(String content, String term) {
+ assertTrue("Empty log", !content.isEmpty());
+ if (content.contains(term)) {
+ String message = term + " found in logs";
+ LOG.error(message);
+ System.err.println(content);
+ fail(message);
+ }
+ }
+
+ /*
+ * Test case for rename operation with threads and flat listing disabled.
+ */
+ @Test
+ public void testRenameSmallFolderDisableThreadsDisableFlatListing() throws Exception {
+ Configuration conf = fs.getConf();
+ conf = fs.getConf();
+
+ // Number of threads set to 0 or 1 disables threads.
+ conf.setInt(NativeAzureFileSystem.AZURE_RENAME_THREADS, 1);
+ conf.setBoolean(AzureNativeFileSystemStore.KEY_ENABLE_FLAT_LISTING, false);
+ URI uri = fs.getUri();
+ fs.initialize(uri, conf);
+
+ validateRenameFolder(fs, "root", "rootnew");
+
+ // Validate from logs that threads are disabled.
+ String content = logs.getOutput();
+ assertInLog(content,
+ "Disabling threads for Rename operation as thread count 1");
+
+ // Validate no thread executions
+ for (int i = 0; i < renameThreads; i++) {
+ assertNotInLog(content,
+ "AzureBlobRenameThread-" + Thread.currentThread().getName() + "-" + i);
+ }
+ }
+
+ /*
+ * Helper method to do delete operation and validate all files in source folder
+ * doesn't exists after delete operation.
+ */
+ private void validateDeleteFolder(FileSystem fs, String source) throws Exception {
+ // Create folder with files.
+ createFolder(fs, "root");
+ Path sourceFolder = new Path(source);
+
+ // Delete operation
+ assertTrue(fs.delete(sourceFolder, true));
+ assertFalse(fs.exists(sourceFolder));
+
+ for (int i = 0; i < this.iterations; i++) {
+ // check that source folder and files doesn't exists
+ assertFalse(fs.exists(new Path(source + "/" + i)));
+ assertFalse(fs.exists(new Path(source + "/" + i + "/fileToRename")));
+ assertFalse(fs.exists(new Path(source + "/" + i + "/file/to/rename")));
+ assertFalse(fs.exists(new Path(source + "/" + i + "/file+to%rename")));
+ assertFalse(fs.exists(new Path(source + "/fileToRename" + i)));
+ }
+ }
+
+ /*
+ * Test case for delete operation with multiple threads and flat listing enabled.
+ */
+ @Test
+ public void testDeleteSmallFolderWithThreads() throws Exception {
+
+ validateDeleteFolder(fs, "root");
+
+ // With single iteration, we would have created 7 blobs.
+ int expectedThreadsCreated = Math.min(7, deleteThreads);
+
+ // Validate from logs that threads are enabled.
+ String content = logs.getOutput();
+ assertInLog(content, "ms with threads: " + expectedThreadsCreated);
+
+ // Validate thread executions
+ for (int i = 0; i < expectedThreadsCreated; i++) {
+ assertInLog(content,
+ "AzureBlobDeleteThread-" + Thread.currentThread().getName() + "-" + i);
+ }
+
+ // Also ensure that we haven't spawned extra threads.
+ if (expectedThreadsCreated < deleteThreads) {
+ for (int i = expectedThreadsCreated; i < deleteThreads; i++) {
+ assertNotInLog(content,
+ "AzureBlobDeleteThread-" + Thread.currentThread().getName() + "-" + i);
+ }
+ }
+ }
+
+ /*
+ * Test case for delete operation with multiple threads and flat listing enabled.
+ */
+ @Test
+ public void testDeleteLargeFolderWithThreads() throws Exception {
+ // Populate source folder with large number of files and directories.
+ this.iterations = 10;
+ validateDeleteFolder(fs, "root");
+
+ // Validate from logs that threads are enabled.
+ String content = logs.getOutput();
+ assertInLog(content, "ms with threads: " + deleteThreads);
+
+ // Validate thread executions
+ for (int i = 0; i < deleteThreads; i++) {
+ assertInLog(content,
+ "AzureBlobDeleteThread-" + Thread.currentThread().getName() + "-" + i);
+ }
+ }
+
+ /*
+ * Test case for delete operation with threads disabled and flat listing enabled.
+ */
+ @Test
+ public void testDeleteLargeFolderDisableThreads() throws Exception {
+ Configuration conf = fs.getConf();
+ conf.setInt(NativeAzureFileSystem.AZURE_DELETE_THREADS, 0);
+ URI uri = fs.getUri();
+ fs.initialize(uri, conf);
+
+ // Populate source folder with large number of files and directories.
+ this.iterations = 10;
+ validateDeleteFolder(fs, "root");
+
+ // Validate from logs that threads are disabled.
+ String content = logs.getOutput();
+ assertInLog(content,
+ "Disabling threads for Delete operation as thread count 0");
+
+ // Validate no thread executions
+ for (int i = 0; i < deleteThreads; i++) {
+ assertNotInLog(content,
+ "AzureBlobDeleteThread-" + Thread.currentThread().getName() + "-" + i);
+ }
+ }
+
+ /*
+ * Test case for rename operation with threads and flat listing disabled.
+ */
+ @Test
+ public void testDeleteSmallFolderDisableThreadsDisableFlatListing() throws Exception {
+ Configuration conf = fs.getConf();
+
+ // Number of threads set to 0 or 1 disables threads.
+ conf.setInt(NativeAzureFileSystem.AZURE_DELETE_THREADS, 1);
+ conf.setBoolean(AzureNativeFileSystemStore.KEY_ENABLE_FLAT_LISTING, false);
+ URI uri = fs.getUri();
+ fs.initialize(uri, conf);
+
+ validateDeleteFolder(fs, "root");
+
+ // Validate from logs that threads are disabled.
+ String content = logs.getOutput();
+ assertInLog(content,
+ "Disabling threads for Delete operation as thread count 1");
+
+ // Validate no thread executions
+ for (int i = 0; i < deleteThreads; i++) {
+ assertNotInLog(content,
+ "AzureBlobDeleteThread-" + Thread.currentThread().getName() + "-" + i);
+ }
+ }
+
+ /*
+ * Test case for delete operation with multiple threads and flat listing enabled.
+ */
+ @Test
+ public void testDeleteThreadPoolExceptionFailure() throws Exception {
+
+ // Spy azure file system object and raise exception for new thread pool
+ NativeAzureFileSystem mockFs = Mockito.spy((NativeAzureFileSystem) fs);
+
+ String path = mockFs.pathToKey(mockFs.makeAbsolute(new Path("root")));
+
+ AzureFileSystemThreadPoolExecutor mockThreadPoolExecutor = Mockito.spy(
+ mockFs.getThreadPoolExecutor(deleteThreads, "AzureBlobDeleteThread", "Delete",
+ path, NativeAzureFileSystem.AZURE_DELETE_THREADS));
+ Mockito.when(mockThreadPoolExecutor.getThreadPool(7)).thenThrow(new Exception());
+
+ // With single iteration, we would have created 7 blobs resulting 7 threads.
+ Mockito.when(mockFs.getThreadPoolExecutor(deleteThreads, "AzureBlobDeleteThread", "Delete",
+ path, NativeAzureFileSystem.AZURE_DELETE_THREADS)).thenReturn(mockThreadPoolExecutor);
+
+ validateDeleteFolder(mockFs, "root");
+
+ // Validate from logs that threads are disabled.
+ String content = logs.getOutput();
+ assertInLog(content, "Failed to create thread pool with threads");
+ assertInLog(content, "Serializing the Delete operation");
+ }
+
+ /*
+ * Test case for delete operation with multiple threads and flat listing enabled.
+ */
+ @Test
+ public void testDeleteThreadPoolExecuteFailure() throws Exception {
+
+ // Mock thread pool executor to throw exception for all requests.
+ ThreadPoolExecutor mockThreadExecutor = Mockito.mock(ThreadPoolExecutor.class);
+ Mockito.doThrow(new RejectedExecutionException()).when(mockThreadExecutor).execute(Mockito.any(Runnable.class));
+
+ // Spy azure file system object and return mocked thread pool
+ NativeAzureFileSystem mockFs = Mockito.spy((NativeAzureFileSystem) fs);
+
+ String path = mockFs.pathToKey(mockFs.makeAbsolute(new Path("root")));
+
+ AzureFileSystemThreadPoolExecutor mockThreadPoolExecutor = Mockito.spy(
+ mockFs.getThreadPoolExecutor(deleteThreads, "AzureBlobDeleteThread", "Delete",
+ path, NativeAzureFileSystem.AZURE_DELETE_THREADS));
+ Mockito.when(mockThreadPoolExecutor.getThreadPool(7)).thenReturn(mockThreadExecutor);
+
+ // With single iteration, we would have created 7 blobs resulting 7 threads.
+ Mockito.when(mockFs.getThreadPoolExecutor(deleteThreads, "AzureBlobDeleteThread", "Delete",
+ path, NativeAzureFileSystem.AZURE_DELETE_THREADS)).thenReturn(mockThreadPoolExecutor);
+
+ validateDeleteFolder(mockFs, "root");
+
+ // Validate from logs that threads are disabled.
+ String content = logs.getOutput();
+ assertInLog(content,
+ "Rejected execution of thread for Delete operation on blob");
+ assertInLog(content, "Serializing the Delete operation");
+ }
+
+ /*
+ * Test case for delete operation with multiple threads and flat listing enabled.
+ */
+ @Test
+ public void testDeleteThreadPoolExecuteSingleThreadFailure() throws Exception {
+
+ // Spy azure file system object and return mocked thread pool
+ NativeAzureFileSystem mockFs = Mockito.spy((NativeAzureFileSystem) fs);
+
+ // Spy a thread pool executor and link it to azure file system object.
+ String path = mockFs.pathToKey(mockFs.makeAbsolute(new Path("root")));
+ AzureFileSystemThreadPoolExecutor mockThreadPoolExecutor = Mockito.spy(
+ mockFs.getThreadPoolExecutor(deleteThreads, "AzureBlobDeleteThread", "Delete",
+ path, NativeAzureFileSystem.AZURE_DELETE_THREADS));
+
+ // With single iteration, we would have created 7 blobs resulting 7 threads.
+ Mockito.when(mockFs.getThreadPoolExecutor(deleteThreads, "AzureBlobDeleteThread", "Delete",
+ path, NativeAzureFileSystem.AZURE_DELETE_THREADS)).thenReturn(mockThreadPoolExecutor);
+
+ // Create a thread executor and link it to mocked thread pool executor object.
+ ThreadPoolExecutor mockThreadExecutor = Mockito.spy(mockThreadPoolExecutor.getThreadPool(7));
+ Mockito.when(mockThreadPoolExecutor.getThreadPool(7)).thenReturn(mockThreadExecutor);
+
+ // Mock thread executor to throw exception for all requests.
+ Mockito.doCallRealMethod().doThrow(new RejectedExecutionException()).when(mockThreadExecutor).execute(Mockito.any(Runnable.class));
+
+ validateDeleteFolder(mockFs, "root");
+
+ // Validate from logs that threads are enabled and unused threads.
+ String content = logs.getOutput();
+ assertInLog(content,
+ "Using thread pool for Delete operation with threads 7");
+ assertInLog(content,
+ "6 threads not used for Delete operation on blob");
+ }
+
+ /*
+ * Test case for delete operation with multiple threads and flat listing enabled.
+ */
+ @Test
+ public void testDeleteThreadPoolTerminationFailure() throws Exception {
+
+ // Spy azure file system object and return mocked thread pool
+ NativeAzureFileSystem mockFs = Mockito.spy((NativeAzureFileSystem) fs);
+
+ // Spy a thread pool executor and link it to azure file system object.
+ String path = mockFs.pathToKey(mockFs.makeAbsolute(new Path("root")));
+ AzureFileSystemThreadPoolExecutor mockThreadPoolExecutor = Mockito.spy(
+ ((NativeAzureFileSystem) fs).getThreadPoolExecutor(deleteThreads, "AzureBlobDeleteThread", "Delete",
+ path, NativeAzureFileSystem.AZURE_DELETE_THREADS));
+
+ // Create a thread executor and link it to mocked thread pool executor object.
+ // Mock thread executor to throw exception for terminating threads.
+ ThreadPoolExecutor mockThreadExecutor = Mockito.mock(ThreadPoolExecutor.class);
+ Mockito.doNothing().when(mockThreadExecutor).execute(Mockito.any(Runnable.class));
+ Mockito.when(mockThreadExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS)).thenThrow(new InterruptedException());
+
+ Mockito.when(mockThreadPoolExecutor.getThreadPool(7)).thenReturn(mockThreadExecutor);
+
+ // With single iteration, we would have created 7 blobs resulting 7 threads.
+ Mockito.when(mockFs.getThreadPoolExecutor(deleteThreads, "AzureBlobDeleteThread", "Delete",
+ path, NativeAzureFileSystem.AZURE_DELETE_THREADS)).thenReturn(mockThreadPoolExecutor);
+
+ createFolder(mockFs, "root");
+ Path sourceFolder = new Path("root");
+ boolean exception = false;
+ try {
+ mockFs.delete(sourceFolder, true);
+ } catch (IOException e){
+ exception = true;
+ }
+
+ assertTrue(exception);
+ assertTrue(mockFs.exists(sourceFolder));
+
+ // Validate from logs that threads are enabled and delete operation is failed.
+ String content = logs.getOutput();
+ assertInLog(content,
+ "Using thread pool for Delete operation with threads");
+ assertInLog(content, "Threads got interrupted Delete blob operation");
+ assertInLog(content,
+ "Delete failed as operation on subfolders and files failed.");
+ }
+
+ /*
+ * Validate that when a directory is deleted recursively, the operation succeeds
+ * even if a child directory delete fails because the directory does not exist.
+ * This can happen if a child directory is deleted by an external agent while
+ * the parent is in progress of being deleted recursively.
+ */
+ @Test
+ public void testRecursiveDirectoryDeleteWhenChildDirectoryDeleted()
+ throws Exception {
+ testRecusiveDirectoryDelete(true);
+ }
+
+ /*
+ * Validate that when a directory is deleted recursively, the operation succeeds
+ * even if a file delete fails because it does not exist.
+ * This can happen if a file is deleted by an external agent while
+ * the parent directory is in progress of being deleted.
+ */
+ @Test
+ public void testRecursiveDirectoryDeleteWhenDeletingChildFileReturnsFalse()
+ throws Exception {
+ testRecusiveDirectoryDelete(false);
+ }
+
+ private void testRecusiveDirectoryDelete(boolean useDir) throws Exception {
+ String childPathToBeDeletedByExternalAgent = (useDir)
+ ? "root/0"
+ : "root/0/fileToRename";
+ // Spy azure file system object and return false for deleting one file
+ NativeAzureFileSystem mockFs = Mockito.spy((NativeAzureFileSystem) fs);
+ String path = mockFs.pathToKey(mockFs.makeAbsolute(new Path(
+ childPathToBeDeletedByExternalAgent)));
+
+ Answer<Boolean> answer = new Answer<Boolean>() {
+ public Boolean answer(InvocationOnMock invocation) throws Throwable {
+ String path = (String) invocation.getArguments()[0];
+ boolean isDir = (boolean) invocation.getArguments()[1];
+ boolean realResult = fs.deleteFile(path, isDir);
+ assertTrue(realResult);
+ boolean fakeResult = false;
+ return fakeResult;
+ }
+ };
+
+ Mockito.when(mockFs.deleteFile(path, useDir)).thenAnswer(answer);
+
+ createFolder(mockFs, "root");
+ Path sourceFolder = new Path("root");
+
+ assertTrue(mockFs.delete(sourceFolder, true));
+ assertFalse(mockFs.exists(sourceFolder));
+
+ // Validate from logs that threads are enabled, that a child directory was
+ // deleted by an external caller, and the parent delete operation still
+ // succeeds.
+ String content = logs.getOutput();
+ assertInLog(content,
+ "Using thread pool for Delete operation with threads");
+ assertInLog(content, String.format("Attempt to delete non-existent %s %s",
+ useDir ? "directory" : "file", path));
+ }
+
+ /*
+ * Test case for delete operation with multiple threads and flat listing enabled.
+ */
+ @Test
+ public void testDeleteSingleDeleteException() throws Exception {
+
+ // Spy azure file system object and raise exception for deleting one file
+ NativeAzureFileSystem mockFs = Mockito.spy((NativeAzureFileSystem) fs);
+ String path = mockFs.pathToKey(mockFs.makeAbsolute(new Path("root/0")));
+ Mockito.doThrow(new IOException()).when(mockFs).deleteFile(path, true);
+
+ createFolder(mockFs, "root");
+ Path sourceFolder = new Path("root");
+
+ boolean exception = false;
+ try {
+ mockFs.delete(sourceFolder, true);
+ } catch (IOException e){
+ exception = true;
+ }
+
+ assertTrue(exception);
+ assertTrue(mockFs.exists(sourceFolder));
+
+ // Validate from logs that threads are enabled and delete operation failed.
+ String content = logs.getOutput();
+ assertInLog(content,
+ "Using thread pool for Delete operation with threads");
+ assertInLog(content,
+ "Encountered Exception for Delete operation for file " + path);
+ assertInLog(content,
+ "Terminating execution of Delete operation now as some other thread already got exception or operation failed");
+ }
+
+ /*
+ * Test case for rename operation with multiple threads and flat listing enabled.
+ */
+ @Test
+ public void testRenameThreadPoolExceptionFailure() throws Exception {
+
+ // Spy azure file system object and raise exception for new thread pool
+ NativeAzureFileSystem mockFs = Mockito.spy((NativeAzureFileSystem) fs);
+
+ String path = mockFs.pathToKey(mockFs.makeAbsolute(new Path("root")));
+ AzureFileSystemThreadPoolExecutor mockThreadPoolExecutor = Mockito.spy(
+ ((NativeAzureFileSystem) fs).getThreadPoolExecutor(renameThreads, "AzureBlobRenameThread", "Rename",
+ path, NativeAzureFileSystem.AZURE_RENAME_THREADS));
+ Mockito.when(mockThreadPoolExecutor.getThreadPool(7)).thenThrow(new Exception());
+
+ // With single iteration, we would have created 7 blobs resulting 7 threads.
+ Mockito.doReturn(mockThreadPoolExecutor).when(mockFs).getThreadPoolExecutor(renameThreads, "AzureBlobRenameThread", "Rename",
+ path, NativeAzureFileSystem.AZURE_RENAME_THREADS);
+
+ validateRenameFolder(mockFs, "root", "rootnew");
+
+ // Validate from logs that threads are disabled.
+ String content = logs.getOutput();
+ assertInLog(content, "Failed to create thread pool with threads");
+ assertInLog(content, "Serializing the Rename operation");
+ }
+
+ /*
+ * Test case for rename operation with multiple threads and flat listing enabled.
+ */
+ @Test
+ public void testRenameThreadPoolExecuteFailure() throws Exception {
+
+ // Mock thread pool executor to throw exception for all requests.
+ ThreadPoolExecutor mockThreadExecutor = Mockito.mock(ThreadPoolExecutor.class);
+ Mockito.doThrow(new RejectedExecutionException()).when(mockThreadExecutor).execute(Mockito.any(Runnable.class));
+
+ // Spy azure file system object and return mocked thread pool
+ NativeAzureFileSystem mockFs = Mockito.spy((NativeAzureFileSystem) fs);
+
+ String path = mockFs.pathToKey(mockFs.makeAbsolute(new Path("root")));
+ AzureFileSystemThreadPoolExecutor mockThreadPoolExecutor = Mockito.spy(
+ mockFs.getThreadPoolExecutor(renameThreads, "AzureBlobRenameThread", "Rename",
+ path, NativeAzureFileSystem.AZURE_RENAME_THREADS));
+ Mockito.when(mockThreadPoolExecutor.getThreadPool(7)).thenReturn(mockThreadExecutor);
+
+ // With single iteration, we would have created 7 blobs resulting 7 threads.
+ Mockito.when(mockFs.getThreadPoolExecutor(renameThreads, "AzureBlobRenameThread", "Rename",
+ path, NativeAzureFileSystem.AZURE_RENAME_THREADS)).thenReturn(mockThreadPoolExecutor);
+
+ validateRenameFolder(mockFs, "root", "rootnew");
+
+ // Validate from logs that threads are disabled.
+ String content = logs.getOutput();
+ assertInLog(content,
+ "Rejected execution of thread for Rename operation on blob");
+ assertInLog(content, "Serializing the Rename operation");
+ }
+
+ /*
+ * Test case for rename operation with multiple threads and flat listing enabled.
+ */
+ @Test
+ public void testRenameThreadPoolExecuteSingleThreadFailure() throws Exception {
+
+ // Spy azure file system object and return mocked thread pool
+ NativeAzureFileSystem mockFs = Mockito.spy((NativeAzureFileSystem) fs);
+
+ // Spy a thread pool executor and link it to azure file system object.
+ String path = mockFs.pathToKey(mockFs.makeAbsolute(new Path("root")));
+ AzureFileSystemThreadPoolExecutor mockThreadPoolExecutor = Mockito.spy(
+ mockFs.getThreadPoolExecutor(renameThreads, "AzureBlobRenameThread", "Rename",
+ path, NativeAzureFileSystem.AZURE_RENAME_THREADS));
+
+ // With single iteration, we would have created 7 blobs resulting 7 threads.
+ Mockito.when(mockFs.getThreadPoolExecutor(renameThreads, "AzureBlobRenameThread", "Rename",
+ path, NativeAzureFileSystem.AZURE_RENAME_THREADS)).thenReturn(mockThreadPoolExecutor);
+
+ // Create a thread executor and link it to mocked thread pool executor object.
+ ThreadPoolExecutor mockThreadExecutor = Mockito.spy(mockThreadPoolExecutor.getThreadPool(7));
+ Mockito.when(mockThreadPoolExecutor.getThreadPool(7)).thenReturn(mockThreadExecutor);
+
+ // Mock thread executor to throw exception for all requests.
+ Mockito.doCallRealMethod().doThrow(new RejectedExecutionException()).when(mockThreadExecutor).execute(Mockito.any(Runnable.class));
+
+ validateRenameFolder(mockFs, "root", "rootnew");
+
+ // Validate from logs that threads are enabled and unused threads exists.
+ String content = logs.getOutput();
+ assertInLog(content,
+ "Using thread pool for Rename operation with threads 7");
+ assertInLog(content,
+ "6 threads not used for Rename operation on blob");
+ }
+
+ /*
+ * Test case for rename operation with multiple threads and flat listing enabled.
+ */
+ @Test
+ public void testRenameThreadPoolTerminationFailure() throws Exception {
+
+ // Spy azure file system object and return mocked thread pool
+ NativeAzureFileSystem mockFs = Mockito.spy((NativeAzureFileSystem) fs);
+
+ // Spy a thread pool executor and link it to azure file system object.
+ String path = mockFs.pathToKey(mockFs.makeAbsolute(new Path("root")));
+ AzureFileSystemThreadPoolExecutor mockThreadPoolExecutor = Mockito.spy(
+ mockFs.getThreadPoolExecutor(renameThreads, "AzureBlobRenameThread", "Rename",
+ path, NativeAzureFileSystem.AZURE_RENAME_THREADS));
+
+ // With single iteration, we would have created 7 blobs resulting 7 threads.
+ Mockito.when(mockFs.getThreadPoolExecutor(renameThreads, "AzureBlobRenameThread", "Rename",
+ path, NativeAzureFileSystem.AZURE_RENAME_THREADS)).thenReturn(mockThreadPoolExecutor);
+
+ // Mock thread executor to throw exception for all requests.
+ ThreadPoolExecutor mockThreadExecutor = Mockito.mock(ThreadPoolExecutor.class);
+ Mockito.doNothing().when(mockThreadExecutor).execute(Mockito.any(Runnable.class));
+ Mockito.when(mockThreadExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS)).thenThrow(new InterruptedException());
+ Mockito.when(mockThreadPoolExecutor.getThreadPool(7)).thenReturn(mockThreadExecutor);
+
+
+ createFolder(mockFs, "root");
+ Path sourceFolder = new Path("root");
+ Path destFolder = new Path("rootnew");
+ boolean exception = false;
+ try {
+ mockFs.rename(sourceFolder, destFolder);
+ } catch (IOException e){
+ exception = true;
+ }
+
+ assertTrue(exception);
+ assertTrue(mockFs.exists(sourceFolder));
+
+ // Validate from logs that threads are enabled and rename operation is failed.
+ String content = logs.getOutput();
+ assertInLog(content,
+ "Using thread pool for Rename operation with threads");
+ assertInLog(content, "Threads got interrupted Rename blob operation");
+ assertInLog(content,
+ "Rename failed as operation on subfolders and files failed.");
+ }
+
+ /*
+ * Test case for rename operation with multiple threads and flat listing enabled.
+ */
+ @Test
+ public void testRenameSingleRenameException() throws Exception {
+
+ // Spy azure file system object and raise exception for deleting one file
+ Path sourceFolder = new Path("root");
+ Path destFolder = new Path("rootnew");
+
+ // Spy azure file system object and populate rename pending spy object.
+ NativeAzureFileSystem mockFs = Mockito.spy((NativeAzureFileSystem) fs);
+
+ // Populate data now only such that rename pending spy object would see this data.
+ createFolder(mockFs, "root");
+
+ String srcKey = mockFs.pathToKey(mockFs.makeAbsolute(sourceFolder));
+ String dstKey = mockFs.pathToKey(mockFs.makeAbsolute(destFolder));
+
+ FolderRenamePending mockRenameFs = Mockito.spy(mockFs.prepareAtomicFolderRename(srcKey, dstKey));
+ Mockito.when(mockFs.prepareAtomicFolderRename(srcKey, dstKey)).thenReturn(mockRenameFs);
+ String path = mockFs.pathToKey(mockFs.makeAbsolute(new Path("root/0")));
+ Mockito.doThrow(new IOException()).when(mockRenameFs).renameFile(Mockito.any(FileMetadata.class));
+
+ boolean exception = false;
+ try {
+ mockFs.rename(sourceFolder, destFolder);
+ } catch (IOException e){
+ exception = true;
+ }
+
+ assertTrue(exception);
+ assertTrue(mockFs.exists(sourceFolder));
+
+ // Validate from logs that threads are enabled and delete operation failed.
+ String content = logs.getOutput();
+ assertInLog(content,
+ "Using thread pool for Rename operation with threads");
+ assertInLog(content,
+ "Encountered Exception for Rename operation for file " + path);
+ assertInLog(content,
+ "Terminating execution of Rename operation now as some other thread already got exception or operation failed");
+ }
+
+ @Override
+ protected AzureBlobStorageTestAccount createTestAccount() throws Exception {
+ return AzureBlobStorageTestAccount.create();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2d2d97fa/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestNativeAzureFSAuthWithBlobSpecificKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestNativeAzureFSAuthWithBlobSpecificKeys.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestNativeAzureFSAuthWithBlobSpecificKeys.java
new file mode 100644
index 0000000..d7e4831
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestNativeAzureFSAuthWithBlobSpecificKeys.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azure;
+
+import org.apache.hadoop.conf.Configuration;
+
+import static org.apache.hadoop.fs.azure.SecureStorageInterfaceImpl.KEY_USE_CONTAINER_SASKEY_FOR_ALL_ACCESS;
+
+/**
+ * Test class to hold all WASB authorization tests that use blob-specific keys
+ * to access storage.
+ */
+public class ITestNativeAzureFSAuthWithBlobSpecificKeys
+ extends ITestNativeAzureFileSystemAuthorizationWithOwner {
+
+
+ @Override
+ public Configuration createConfiguration() {
+ Configuration conf = super.createConfiguration();
+ conf.set(KEY_USE_CONTAINER_SASKEY_FOR_ALL_ACCESS, "false");
+ return conf;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2d2d97fa/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestNativeAzureFSAuthorizationCaching.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestNativeAzureFSAuthorizationCaching.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestNativeAzureFSAuthorizationCaching.java
new file mode 100644
index 0000000..c73b1cc
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestNativeAzureFSAuthorizationCaching.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azure;
+
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Test;
+
+import static org.apache.hadoop.fs.azure.CachingAuthorizer.KEY_AUTH_SERVICE_CACHING_ENABLE;
+
+/**
+ * Test class to hold all WASB authorization caching related tests.
+ */
+public class ITestNativeAzureFSAuthorizationCaching
+ extends ITestNativeAzureFileSystemAuthorizationWithOwner {
+
+ private static final int DUMMY_TTL_VALUE = 5000;
+
+ @Override
+ public Configuration createConfiguration() {
+ Configuration conf = super.createConfiguration();
+ conf.set(KEY_AUTH_SERVICE_CACHING_ENABLE, "true");
+ return conf;
+ }
+
+ /**
+ * Test to verify cache behavior -- assert that PUT overwrites value if present
+ */
+ @Test
+ public void testCachePut() throws Throwable {
+ CachingAuthorizer<String, Integer> cache = new CachingAuthorizer<>(DUMMY_TTL_VALUE, "TEST");
+ cache.init(createConfiguration());
+ cache.put("TEST", 1);
+ cache.put("TEST", 3);
+ int result = cache.get("TEST");
+ assertEquals("Cache returned unexpected result", 3, result);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2d2d97fa/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestNativeAzureFSPageBlobLive.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestNativeAzureFSPageBlobLive.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestNativeAzureFSPageBlobLive.java
new file mode 100644
index 0000000..a4d8729
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestNativeAzureFSPageBlobLive.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.azure;
+
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Run the base Azure file system tests strictly on page blobs to make sure fundamental
+ * operations on page blob files and folders work as expected.
+ * These operations include create, delete, rename, list, and so on.
+ */
+public class ITestNativeAzureFSPageBlobLive extends
+ NativeAzureFileSystemBaseTest {
+
+ @Override
+ protected AzureBlobStorageTestAccount createTestAccount()
+ throws Exception {
+ Configuration conf = new Configuration();
+
+ // Configure the page blob directories key so every file created is a page blob.
+ conf.set(AzureNativeFileSystemStore.KEY_PAGE_BLOB_DIRECTORIES, "/");
+
+ // Configure the atomic rename directories key so every folder will have
+ // atomic rename applied.
+ conf.set(AzureNativeFileSystemStore.KEY_ATOMIC_RENAME_DIRECTORIES, "/");
+ return AzureBlobStorageTestAccount.create(conf);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2d2d97fa/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestNativeAzureFileSystemAppend.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestNativeAzureFileSystemAppend.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestNativeAzureFileSystemAppend.java
new file mode 100644
index 0000000..29611bf
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestNativeAzureFileSystemAppend.java
@@ -0,0 +1,350 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azure;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Arrays;
+
+import org.apache.commons.lang.RandomStringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.test.GenericTestUtils;
+
+import org.junit.Test;
+
+/**
+ * Test append operations.
+ */
+public class ITestNativeAzureFileSystemAppend extends AbstractWasbTestBase {
+
+ private Path testPath;
+
+ @Override
+ public Configuration createConfiguration() {
+ Configuration conf = super.createConfiguration();
+ conf.setBoolean(NativeAzureFileSystem.APPEND_SUPPORT_ENABLE_PROPERTY_NAME,
+ true);
+ return conf;
+ }
+
+ @Override
+ public void setUp() throws Exception {
+ super.setUp();
+ testPath = methodPath();
+ }
+
+
+ @Override
+ protected AzureBlobStorageTestAccount createTestAccount() throws Exception {
+ return AzureBlobStorageTestAccount.create(createConfiguration());
+ }
+
+ /*
+ * Helper method that creates test data of size provided by the
+ * "size" parameter.
+ */
+ private static byte[] getTestData(int size) {
+ byte[] testData = new byte[size];
+ System.arraycopy(RandomStringUtils.randomAlphabetic(size).getBytes(), 0, testData, 0, size);
+ return testData;
+ }
+
+ // Helper method to create file and write fileSize bytes of data on it.
+ private byte[] createBaseFileWithData(int fileSize, Path testPath) throws Throwable {
+
+ try(FSDataOutputStream createStream = fs.create(testPath)) {
+ byte[] fileData = null;
+
+ if (fileSize != 0) {
+ fileData = getTestData(fileSize);
+ createStream.write(fileData);
+ }
+ return fileData;
+ }
+ }
+
+ /*
+ * Helper method to verify a file data equal to "dataLength" parameter
+ */
+ private boolean verifyFileData(int dataLength, byte[] testData, int testDataIndex,
+ FSDataInputStream srcStream) {
+
+ try {
+
+ byte[] fileBuffer = new byte[dataLength];
+ byte[] testDataBuffer = new byte[dataLength];
+
+ int fileBytesRead = srcStream.read(fileBuffer);
+
+ if (fileBytesRead < dataLength) {
+ return false;
+ }
+
+ System.arraycopy(testData, testDataIndex, testDataBuffer, 0, dataLength);
+
+ if (!Arrays.equals(fileBuffer, testDataBuffer)) {
+ return false;
+ }
+
+ return true;
+
+ } catch (Exception ex) {
+ return false;
+ }
+
+ }
+
+ /*
+ * Helper method to verify Append on a testFile.
+ */
+ private boolean verifyAppend(byte[] testData, Path testFile) {
+
+ try(FSDataInputStream srcStream = fs.open(testFile)) {
+
+ int baseBufferSize = 2048;
+ int testDataSize = testData.length;
+ int testDataIndex = 0;
+
+ while (testDataSize > baseBufferSize) {
+
+ if (!verifyFileData(baseBufferSize, testData, testDataIndex, srcStream)) {
+ return false;
+ }
+ testDataIndex += baseBufferSize;
+ testDataSize -= baseBufferSize;
+ }
+
+ if (!verifyFileData(testDataSize, testData, testDataIndex, srcStream)) {
+ return false;
+ }
+
+ return true;
+ } catch(Exception ex) {
+ return false;
+ }
+ }
+
+ /*
+ * Test case to verify if an append on small size data works. This tests
+ * append E2E
+ */
+ @Test
+ public void testSingleAppend() throws Throwable{
+
+ FSDataOutputStream appendStream = null;
+ try {
+ int baseDataSize = 50;
+ byte[] baseDataBuffer = createBaseFileWithData(baseDataSize, testPath);
+
+ int appendDataSize = 20;
+ byte[] appendDataBuffer = getTestData(appendDataSize);
+ appendStream = fs.append(testPath, 10);
+ appendStream.write(appendDataBuffer);
+ appendStream.close();
+ byte[] testData = new byte[baseDataSize + appendDataSize];
+ System.arraycopy(baseDataBuffer, 0, testData, 0, baseDataSize);
+ System.arraycopy(appendDataBuffer, 0, testData, baseDataSize, appendDataSize);
+
+ assertTrue(verifyAppend(testData, testPath));
+ } finally {
+ if (appendStream != null) {
+ appendStream.close();
+ }
+ }
+ }
+
+ /*
+ * Test case to verify append to an empty file.
+ */
+ @Test
+ public void testSingleAppendOnEmptyFile() throws Throwable {
+
+ FSDataOutputStream appendStream = null;
+
+ try {
+ createBaseFileWithData(0, testPath);
+
+ int appendDataSize = 20;
+ byte[] appendDataBuffer = getTestData(appendDataSize);
+ appendStream = fs.append(testPath, 10);
+ appendStream.write(appendDataBuffer);
+ appendStream.close();
+
+ assertTrue(verifyAppend(appendDataBuffer, testPath));
+ } finally {
+ if (appendStream != null) {
+ appendStream.close();
+ }
+ }
+ }
+
+ /*
+ * Test to verify that we can open only one Append stream on a File.
+ */
+ @Test
+ public void testSingleAppenderScenario() throws Throwable {
+
+ FSDataOutputStream appendStream1 = null;
+ FSDataOutputStream appendStream2 = null;
+ IOException ioe = null;
+ try {
+ createBaseFileWithData(0, testPath);
+ appendStream1 = fs.append(testPath, 10);
+ boolean encounteredException = false;
+ try {
+ appendStream2 = fs.append(testPath, 10);
+ } catch(IOException ex) {
+ encounteredException = true;
+ ioe = ex;
+ }
+
+ appendStream1.close();
+
+ assertTrue(encounteredException);
+ GenericTestUtils.assertExceptionContains("Unable to set Append lease on the Blob", ioe);
+ } finally {
+ if (appendStream1 != null) {
+ appendStream1.close();
+ }
+
+ if (appendStream2 != null) {
+ appendStream2.close();
+ }
+ }
+ }
+
+ /*
+ * Tests to verify multiple appends on a Blob.
+ */
+ @Test
+ public void testMultipleAppends() throws Throwable {
+
+ int baseDataSize = 50;
+ byte[] baseDataBuffer = createBaseFileWithData(baseDataSize, testPath);
+
+ int appendDataSize = 100;
+ int targetAppendCount = 50;
+ byte[] testData = new byte[baseDataSize + (appendDataSize*targetAppendCount)];
+ int testDataIndex = 0;
+ System.arraycopy(baseDataBuffer, 0, testData, testDataIndex, baseDataSize);
+ testDataIndex += baseDataSize;
+
+ int appendCount = 0;
+
+ FSDataOutputStream appendStream = null;
+
+ try {
+ while (appendCount < targetAppendCount) {
+
+ byte[] appendDataBuffer = getTestData(appendDataSize);
+ appendStream = fs.append(testPath, 30);
+ appendStream.write(appendDataBuffer);
+ appendStream.close();
+
+ System.arraycopy(appendDataBuffer, 0, testData, testDataIndex, appendDataSize);
+ testDataIndex += appendDataSize;
+ appendCount++;
+ }
+
+ assertTrue(verifyAppend(testData, testPath));
+
+ } finally {
+ if (appendStream != null) {
+ appendStream.close();
+ }
+ }
+ }
+
+ /*
+ * Test to verify we multiple appends on the same stream.
+ */
+ @Test
+ public void testMultipleAppendsOnSameStream() throws Throwable {
+
+ int baseDataSize = 50;
+ byte[] baseDataBuffer = createBaseFileWithData(baseDataSize, testPath);
+ int appendDataSize = 100;
+ int targetAppendCount = 50;
+ byte[] testData = new byte[baseDataSize + (appendDataSize*targetAppendCount)];
+ int testDataIndex = 0;
+ System.arraycopy(baseDataBuffer, 0, testData, testDataIndex, baseDataSize);
+ testDataIndex += baseDataSize;
+ int appendCount = 0;
+
+ FSDataOutputStream appendStream = null;
+
+ try {
+
+ while (appendCount < targetAppendCount) {
+
+ appendStream = fs.append(testPath, 50);
+
+ int singleAppendChunkSize = 20;
+ int appendRunSize = 0;
+ while (appendRunSize < appendDataSize) {
+
+ byte[] appendDataBuffer = getTestData(singleAppendChunkSize);
+ appendStream.write(appendDataBuffer);
+ System.arraycopy(appendDataBuffer, 0, testData,
+ testDataIndex + appendRunSize, singleAppendChunkSize);
+
+ appendRunSize += singleAppendChunkSize;
+ }
+
+ appendStream.close();
+ testDataIndex += appendDataSize;
+ appendCount++;
+ }
+
+ assertTrue(verifyAppend(testData, testPath));
+ } finally {
+ if (appendStream != null) {
+ appendStream.close();
+ }
+ }
+ }
+
+ @Test(expected=UnsupportedOperationException.class)
+ /*
+ * Test to verify the behavior when Append Support configuration flag is set to false
+ */
+ public void testFalseConfigurationFlagBehavior() throws Throwable {
+
+ fs = testAccount.getFileSystem();
+ Configuration conf = fs.getConf();
+ conf.setBoolean(NativeAzureFileSystem.APPEND_SUPPORT_ENABLE_PROPERTY_NAME, false);
+ URI uri = fs.getUri();
+ fs.initialize(uri, conf);
+
+ FSDataOutputStream appendStream = null;
+
+ try {
+ createBaseFileWithData(0, testPath);
+ appendStream = fs.append(testPath, 10);
+ } finally {
+ if (appendStream != null) {
+ appendStream.close();
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2d2d97fa/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestNativeAzureFileSystemAtomicRenameDirList.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestNativeAzureFileSystemAtomicRenameDirList.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestNativeAzureFileSystemAtomicRenameDirList.java
new file mode 100644
index 0000000..869a31c
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestNativeAzureFileSystemAtomicRenameDirList.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azure;
+
+import java.io.IOException;
+import java.net.URI;
+
+import org.apache.hadoop.conf.Configuration;
+
+import org.junit.Test;
+
+/**
+ * Test atomic renaming.
+ */
+public class ITestNativeAzureFileSystemAtomicRenameDirList
+ extends AbstractWasbTestBase {
+
+ // HBase-site config controlling HBase root dir
+ private static final String HBASE_ROOT_DIR_CONF_STRING = "hbase.rootdir";
+ private static final String HBASE_ROOT_DIR_VALUE_ON_DIFFERENT_FS =
+ "wasb://somedifferentfilesystem.blob.core.windows.net/hbase";
+
+ @Override
+ protected AzureBlobStorageTestAccount createTestAccount() throws Exception {
+ return AzureBlobStorageTestAccount.create();
+ }
+
+ @Test
+ public void testAtomicRenameKeyDoesntNPEOnInitializingWithNonDefaultURI()
+ throws IOException {
+ NativeAzureFileSystem azureFs = fs;
+ AzureNativeFileSystemStore azureStore = azureFs.getStore();
+ Configuration conf = fs.getConf();
+ conf.set(HBASE_ROOT_DIR_CONF_STRING, HBASE_ROOT_DIR_VALUE_ON_DIFFERENT_FS);
+ URI uri = fs.getUri();
+ fs.initialize(uri, conf);
+ azureStore.isAtomicRenameKey("anyrandomkey");
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2d2d97fa/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestNativeAzureFileSystemAuthorizationWithOwner.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestNativeAzureFileSystemAuthorizationWithOwner.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestNativeAzureFileSystemAuthorizationWithOwner.java
new file mode 100644
index 0000000..3ec42f0
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestNativeAzureFileSystemAuthorizationWithOwner.java
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azure;
+
+import org.apache.hadoop.fs.contract.ContractTestUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import java.security.PrivilegedExceptionAction;
+
+import org.apache.hadoop.fs.Path;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Test class that runs wasb authorization tests with owner check enabled.
+ */
+public class ITestNativeAzureFileSystemAuthorizationWithOwner
+ extends TestNativeAzureFileSystemAuthorization {
+
+ @Override
+ public void setUp() throws Exception {
+ super.setUp();
+ authorizer.init(fs.getConf(), true);
+ }
+
+ /**
+ * Test case when owner matches current user.
+ */
+ @Test
+ public void testOwnerPermissionPositive() throws Throwable {
+
+ Path parentDir = new Path("/testOwnerPermissionPositive");
+ Path testPath = new Path(parentDir, "test.data");
+
+ authorizer.addAuthRule("/", WasbAuthorizationOperations.WRITE.toString(), true);
+ authorizer.addAuthRule(testPath.toString(), WasbAuthorizationOperations.READ.toString(), true);
+ authorizer.addAuthRule(parentDir.toString(), WasbAuthorizationOperations.WRITE.toString(), true);
+ // additional rule used for assertPathExists
+ authorizer.addAuthRule(parentDir.toString(), WasbAuthorizationOperations.READ.toString(), true);
+ fs.updateWasbAuthorizer(authorizer);
+
+ try {
+ // creates parentDir with owner as current user
+ fs.mkdirs(parentDir);
+ ContractTestUtils.assertPathExists(fs, "parentDir does not exist", parentDir);
+
+ fs.create(testPath);
+ fs.getFileStatus(testPath);
+ ContractTestUtils.assertPathExists(fs, "testPath does not exist", testPath);
+
+ } finally {
+ allowRecursiveDelete(fs, parentDir.toString());
+ fs.delete(parentDir, true);
+ }
+ }
+
+ /**
+ * Negative test case for owner does not match current user.
+ */
+ @Test
+ public void testOwnerPermissionNegative() throws Throwable {
+ expectedEx.expect(WasbAuthorizationException.class);
+
+ Path parentDir = new Path("/testOwnerPermissionNegative");
+ Path childDir = new Path(parentDir, "childDir");
+
+ setExpectedFailureMessage("mkdirs", childDir);
+
+ authorizer.addAuthRule("/", WasbAuthorizationOperations.WRITE.toString(), true);
+ authorizer.addAuthRule(parentDir.toString(), WasbAuthorizationOperations.WRITE.toString(), true);
+
+ fs.updateWasbAuthorizer(authorizer);
+
+ try{
+ fs.mkdirs(parentDir);
+ UserGroupInformation ugiSuperUser = UserGroupInformation.createUserForTesting(
+ "testuser", new String[] {});
+
+ ugiSuperUser.doAs(new PrivilegedExceptionAction<Void>() {
+ @Override
+ public Void run() throws Exception {
+ fs.mkdirs(childDir);
+ return null;
+ }
+ });
+
+ } finally {
+ allowRecursiveDelete(fs, parentDir.toString());
+ fs.delete(parentDir, true);
+ }
+ }
+
+ /**
+ * Test to verify that retrieving owner information does not
+ * throw when file/folder does not exist.
+ */
+ @Test
+ public void testRetrievingOwnerDoesNotFailWhenFileDoesNotExist() throws Throwable {
+
+ Path testdirectory = new Path("/testDirectory123454565");
+
+ String owner = fs.getOwnerForPath(testdirectory);
+ assertEquals("", owner);
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2d2d97fa/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestNativeAzureFileSystemClientLogging.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestNativeAzureFileSystemClientLogging.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestNativeAzureFileSystemClientLogging.java
new file mode 100644
index 0000000..f73a763
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestNativeAzureFileSystemClientLogging.java
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azure;
+
+import java.net.URI;
+import java.util.StringTokenizer;
+
+import org.apache.commons.logging.impl.Log4JLogger;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.test.GenericTestUtils.LogCapturer;
+import org.apache.log4j.Logger;
+import org.junit.Test;
+
+/**
+ * Test to validate Azure storage client side logging. Tests works only when
+ * testing with Live Azure storage because Emulator does not have support for
+ * client-side logging.
+ *
+ * <I>Important: </I> Do not attempt to move off commons-logging.
+ * The tests will fail.
+ */
+public class ITestNativeAzureFileSystemClientLogging
+ extends AbstractWasbTestBase {
+
+ // Core-site config controlling Azure Storage Client logging
+ private static final String KEY_LOGGING_CONF_STRING = "fs.azure.storage.client.logging";
+
+ // Temporary directory created using WASB.
+ private static final String TEMP_DIR = "tempDir";
+
+ /*
+ * Helper method to verify the client logging is working. This check primarily
+ * checks to make sure we see a line in the logs corresponding to the entity
+ * that is created during test run.
+ */
+ private boolean verifyStorageClientLogs(String capturedLogs, String entity)
+ throws Exception {
+
+ URI uri = testAccount.getRealAccount().getBlobEndpoint();
+ String container = testAccount.getRealContainer().getName();
+ String validateString = uri + Path.SEPARATOR + container + Path.SEPARATOR
+ + entity;
+ boolean entityFound = false;
+
+ StringTokenizer tokenizer = new StringTokenizer(capturedLogs, "\n");
+
+ while (tokenizer.hasMoreTokens()) {
+ String token = tokenizer.nextToken();
+ if (token.contains(validateString)) {
+ entityFound = true;
+ break;
+ }
+ }
+ return entityFound;
+ }
+
+ /*
+ * Helper method that updates the core-site config to enable/disable logging.
+ */
+ private void updateFileSystemConfiguration(Boolean loggingFlag)
+ throws Exception {
+
+ Configuration conf = fs.getConf();
+ conf.set(KEY_LOGGING_CONF_STRING, loggingFlag.toString());
+ URI uri = fs.getUri();
+ fs.initialize(uri, conf);
+ }
+
+ // Using WASB code to communicate with Azure Storage.
+ private void performWASBOperations() throws Exception {
+
+ Path tempDir = new Path(Path.SEPARATOR + TEMP_DIR);
+ fs.mkdirs(tempDir);
+ fs.delete(tempDir, true);
+ }
+
+ @Test
+ public void testLoggingEnabled() throws Exception {
+
+ LogCapturer logs = LogCapturer.captureLogs(new Log4JLogger(Logger
+ .getRootLogger()));
+
+ // Update configuration based on the Test.
+ updateFileSystemConfiguration(true);
+
+ performWASBOperations();
+
+ String output = getLogOutput(logs);
+ assertTrue("Log entry " + TEMP_DIR + " not found in " + output,
+ verifyStorageClientLogs(output, TEMP_DIR));
+ }
+
+ protected String getLogOutput(LogCapturer logs) {
+ String output = logs.getOutput();
+ assertTrue("No log created/captured", !output.isEmpty());
+ return output;
+ }
+
+ @Test
+ public void testLoggingDisabled() throws Exception {
+
+ LogCapturer logs = LogCapturer.captureLogs(new Log4JLogger(Logger
+ .getRootLogger()));
+
+ // Update configuration based on the Test.
+ updateFileSystemConfiguration(false);
+
+ performWASBOperations();
+ String output = getLogOutput(logs);
+
+ assertFalse("Log entry " + TEMP_DIR + " found in " + output,
+ verifyStorageClientLogs(output, TEMP_DIR));
+ }
+
+ @Override
+ protected AzureBlobStorageTestAccount createTestAccount() throws Exception {
+ return AzureBlobStorageTestAccount.create();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2d2d97fa/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestNativeAzureFileSystemConcurrencyLive.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestNativeAzureFileSystemConcurrencyLive.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestNativeAzureFileSystemConcurrencyLive.java
new file mode 100644
index 0000000..87cac15
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestNativeAzureFileSystemConcurrencyLive.java
@@ -0,0 +1,185 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azure;
+
+
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+/***
+ * Test class to hold all Live Azure storage concurrency tests.
+ */
+public class ITestNativeAzureFileSystemConcurrencyLive
+ extends AbstractWasbTestBase {
+
+ private static final int THREAD_COUNT = 102;
+ private static final int TEST_EXECUTION_TIMEOUT = 5000;
+
+ @Override
+ protected AzureBlobStorageTestAccount createTestAccount() throws Exception {
+ return AzureBlobStorageTestAccount.create();
+ }
+
+ /**
+ * Validate contract for FileSystem.create when overwrite is true and there
+ * are concurrent callers of FileSystem.delete. An existing file should be
+ * overwritten, even if the original destination exists but is deleted by an
+ * external agent during the create operation.
+ */
+ @Test(timeout = TEST_EXECUTION_TIMEOUT)
+ public void testConcurrentCreateDeleteFile() throws Exception {
+ Path testFile = methodPath();
+
+ List<CreateFileTask> tasks = new ArrayList<>(THREAD_COUNT);
+
+ for (int i = 0; i < THREAD_COUNT; i++) {
+ tasks.add(new CreateFileTask(fs, testFile));
+ }
+
+ ExecutorService es = null;
+
+ try {
+ es = Executors.newFixedThreadPool(THREAD_COUNT);
+
+ List<Future<Void>> futures = es.invokeAll(tasks);
+
+ for (Future<Void> future : futures) {
+ Assert.assertTrue(future.isDone());
+
+ // we are using Callable<V>, so if an exception
+ // occurred during the operation, it will be thrown
+ // when we call get
+ Assert.assertEquals(null, future.get());
+ }
+ } finally {
+ if (es != null) {
+ es.shutdownNow();
+ }
+ }
+ }
+
+ /**
+ * Validate contract for FileSystem.delete when invoked concurrently.
+ * One of the threads should successfully delete the file and return true;
+ * all other threads should return false.
+ */
+ @Test(timeout = TEST_EXECUTION_TIMEOUT)
+ public void testConcurrentDeleteFile() throws Exception {
+ Path testFile = new Path("test.dat");
+ fs.create(testFile).close();
+
+ List<DeleteFileTask> tasks = new ArrayList<>(THREAD_COUNT);
+
+ for (int i = 0; i < THREAD_COUNT; i++) {
+ tasks.add(new DeleteFileTask(fs, testFile));
+ }
+
+ ExecutorService es = null;
+ try {
+ es = Executors.newFixedThreadPool(THREAD_COUNT);
+
+ List<Future<Boolean>> futures = es.invokeAll(tasks);
+
+ int successCount = 0;
+ for (Future<Boolean> future : futures) {
+ Assert.assertTrue(future.isDone());
+
+ // we are using Callable<V>, so if an exception
+ // occurred during the operation, it will be thrown
+ // when we call get
+ Boolean success = future.get();
+ if (success) {
+ successCount++;
+ }
+ }
+
+ Assert.assertEquals(
+ "Exactly one delete operation should return true.",
+ 1,
+ successCount);
+ } finally {
+ if (es != null) {
+ es.shutdownNow();
+ }
+ }
+ }
+
+ abstract class FileSystemTask<V> implements Callable<V> {
+ private final FileSystem fileSystem;
+ private final Path path;
+
+ protected FileSystem getFileSystem() {
+ return this.fileSystem;
+ }
+
+ protected Path getFilePath() {
+ return this.path;
+ }
+
+ FileSystemTask(FileSystem fs, Path p) {
+ this.fileSystem = fs;
+ this.path = p;
+ }
+
+ public abstract V call() throws Exception;
+ }
+
+ class DeleteFileTask extends FileSystemTask<Boolean> {
+
+ DeleteFileTask(FileSystem fs, Path p) {
+ super(fs, p);
+ }
+
+ @Override
+ public Boolean call() throws Exception {
+ return this.getFileSystem().delete(this.getFilePath(), false);
+ }
+ }
+
+ class CreateFileTask extends FileSystemTask<Void> {
+ CreateFileTask(FileSystem fs, Path p) {
+ super(fs, p);
+ }
+
+ public Void call() throws Exception {
+ FileSystem fs = getFileSystem();
+ Path p = getFilePath();
+
+ // Create an empty file and close the stream.
+ FSDataOutputStream stream = fs.create(p, true);
+ stream.close();
+
+ // Delete the file. We don't care if delete returns true or false.
+ // We just want to ensure the file does not exist.
+ this.getFileSystem().delete(this.getFilePath(), false);
+
+ return null;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2d2d97fa/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestNativeAzureFileSystemContractEmulator.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestNativeAzureFileSystemContractEmulator.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestNativeAzureFileSystemContractEmulator.java
new file mode 100644
index 0000000..4836fc4
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestNativeAzureFileSystemContractEmulator.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azure;
+
+import static org.junit.Assume.assumeNotNull;
+
+import org.apache.hadoop.fs.FileSystemContractBaseTest;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.azure.integration.AzureTestUtils;
+
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.rules.TestName;
+
+/**
+ * Run the {@code FileSystemContractBaseTest} tests against the emulator
+ */
+public class ITestNativeAzureFileSystemContractEmulator extends
+ FileSystemContractBaseTest {
+ private AzureBlobStorageTestAccount testAccount;
+ private Path basePath;
+
+ @Rule
+ public TestName methodName = new TestName();
+
+ private void nameThread() {
+ Thread.currentThread().setName("JUnit-" + methodName.getMethodName());
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ nameThread();
+ testAccount = AzureBlobStorageTestAccount.createForEmulator();
+ if (testAccount != null) {
+ fs = testAccount.getFileSystem();
+ }
+ assumeNotNull(fs);
+ basePath = fs.makeQualified(
+ AzureTestUtils.createTestPath(
+ new Path("ITestNativeAzureFileSystemContractEmulator")));
+ }
+
+ @Override
+ public void tearDown() throws Exception {
+ super.tearDown();
+ testAccount = AzureTestUtils.cleanup(testAccount);
+ fs = null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2d2d97fa/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestNativeAzureFileSystemContractLive.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestNativeAzureFileSystemContractLive.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestNativeAzureFileSystemContractLive.java
new file mode 100644
index 0000000..d3d1bd8
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestNativeAzureFileSystemContractLive.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azure;
+
+import static org.junit.Assume.assumeNotNull;
+
+import org.apache.hadoop.fs.FileSystemContractBaseTest;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.azure.integration.AzureTestConstants;
+import org.apache.hadoop.fs.azure.integration.AzureTestUtils;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+
+/**
+ * Run the {@link FileSystemContractBaseTest} test suite against azure storage.
+ */
+public class ITestNativeAzureFileSystemContractLive extends
+ FileSystemContractBaseTest {
+ private AzureBlobStorageTestAccount testAccount;
+ private Path basePath;
+
+ @Rule
+ public TestName methodName = new TestName();
+
+ private void nameThread() {
+ Thread.currentThread().setName("JUnit-" + methodName.getMethodName());
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ nameThread();
+ testAccount = AzureBlobStorageTestAccount.create();
+ if (testAccount != null) {
+ fs = testAccount.getFileSystem();
+ }
+ assumeNotNull(fs);
+ basePath = fs.makeQualified(
+ AzureTestUtils.createTestPath(
+ new Path("NativeAzureFileSystemContractLive")));
+ }
+
+ @Override
+ public void tearDown() throws Exception {
+ super.tearDown();
+ testAccount = AzureTestUtils.cleanup(testAccount);
+ fs = null;
+ }
+
+ @Override
+ public Path getTestBaseDir() {
+ return basePath;
+ }
+
+ protected int getGlobalTimeout() {
+ return AzureTestConstants.AZURE_TEST_TIMEOUT;
+ }
+
+ /**
+ * The following tests are failing on Azure and the Azure
+ * file system code needs to be modified to make them pass.
+ * A separate work item has been opened for this.
+ */
+ @Ignore
+ @Test
+ public void testMoveFileUnderParent() throws Throwable {
+ }
+
+ @Ignore
+ @Test
+ public void testRenameFileToSelf() throws Throwable {
+ }
+
+ @Ignore
+ @Test
+ public void testRenameChildDirForbidden() throws Exception {
+ }
+
+ @Ignore
+ @Test
+ public void testMoveDirUnderParent() throws Throwable {
+ }
+
+ @Ignore
+ @Test
+ public void testRenameDirToSelf() throws Throwable {
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2d2d97fa/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestNativeAzureFileSystemContractPageBlobLive.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestNativeAzureFileSystemContractPageBlobLive.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestNativeAzureFileSystemContractPageBlobLive.java
new file mode 100644
index 0000000..03e90aa
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestNativeAzureFileSystemContractPageBlobLive.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azure;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystemContractBaseTest;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.azure.integration.AzureTestConstants;
+import org.apache.hadoop.fs.azure.integration.AzureTestUtils;
+
+import static org.junit.Assume.assumeNotNull;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+
+/**
+ * Run the {@link FileSystemContractBaseTest} test suite against azure
+ * storage, after switching the FS using page blobs everywhere.
+ */
+public class ITestNativeAzureFileSystemContractPageBlobLive extends
+ FileSystemContractBaseTest {
+ private AzureBlobStorageTestAccount testAccount;
+ private Path basePath;
+ @Rule
+ public TestName methodName = new TestName();
+
+ private void nameThread() {
+ Thread.currentThread().setName("JUnit-" + methodName.getMethodName());
+ }
+
+ private AzureBlobStorageTestAccount createTestAccount()
+ throws Exception {
+ Configuration conf = new Configuration();
+
+ // Configure the page blob directories key so every file created is a page blob.
+ conf.set(AzureNativeFileSystemStore.KEY_PAGE_BLOB_DIRECTORIES, "/");
+
+ // Configure the atomic rename directories key so every folder will have
+ // atomic rename applied.
+ conf.set(AzureNativeFileSystemStore.KEY_ATOMIC_RENAME_DIRECTORIES, "/");
+ return AzureBlobStorageTestAccount.create(conf);
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ testAccount = createTestAccount();
+ assumeNotNull(testAccount);
+ fs = testAccount.getFileSystem();
+ basePath = AzureTestUtils.pathForTests(fs, "filesystemcontractpageblob");
+ }
+
+ @Override
+ public void tearDown() throws Exception {
+ testAccount = AzureTestUtils.cleanup(testAccount);
+ fs = null;
+ }
+
+ protected int getGlobalTimeout() {
+ return AzureTestConstants.AZURE_TEST_TIMEOUT;
+ }
+
+ @Override
+ public Path getTestBaseDir() {
+ return basePath;
+ }
+
+ /**
+ * The following tests are failing on Azure and the Azure
+ * file system code needs to be modified to make them pass.
+ * A separate work item has been opened for this.
+ */
+ @Ignore
+ @Test
+ public void testMoveFileUnderParent() throws Throwable {
+ }
+
+ @Ignore
+ @Test
+ public void testRenameFileToSelf() throws Throwable {
+ }
+
+ @Ignore
+ @Test
+ public void testRenameChildDirForbidden() throws Exception {
+ }
+
+ @Ignore
+ @Test
+ public void testMoveDirUnderParent() throws Throwable {
+ }
+
+ @Ignore
+ @Test
+ public void testRenameDirToSelf() throws Throwable {
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org