You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by st...@apache.org on 2017/09/15 16:39:32 UTC
[07/20] hadoop git commit: HADOOP-14553. Add (parallelized)
integration tests to hadoop-azure Contributed by Steve Loughran
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2d2d97fa/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestNativeAzureFileSystemLive.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestNativeAzureFileSystemLive.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestNativeAzureFileSystemLive.java
new file mode 100644
index 0000000..f969968
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestNativeAzureFileSystemLive.java
@@ -0,0 +1,236 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azure;
+
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+
+import org.junit.Test;
+
+import com.microsoft.azure.storage.StorageException;
+
+/**
+ * Tests the Native Azure file system (WASB) against an actual blob store.
+ */
+public class ITestNativeAzureFileSystemLive extends
+ NativeAzureFileSystemBaseTest {
+
+ @Override
+ protected AzureBlobStorageTestAccount createTestAccount() throws Exception {
+ return AzureBlobStorageTestAccount.create();
+ }
+
+ @Test
+ public void testLazyRenamePendingCanOverwriteExistingFile()
+ throws Exception {
+ final String srcFile = "srcFile";
+ final String dstFile = "dstFile";
+ Path srcPath = path(srcFile);
+ FSDataOutputStream srcStream = fs.create(srcPath);
+ assertTrue(fs.exists(srcPath));
+ Path dstPath = path(dstFile);
+ FSDataOutputStream dstStream = fs.create(dstPath);
+ assertTrue(fs.exists(dstPath));
+ NativeAzureFileSystem nfs = fs;
+ final String fullSrcKey = nfs.pathToKey(nfs.makeAbsolute(srcPath));
+ final String fullDstKey = nfs.pathToKey(nfs.makeAbsolute(dstPath));
+ nfs.getStoreInterface().rename(fullSrcKey, fullDstKey, true, null);
+ assertTrue(fs.exists(dstPath));
+ assertFalse(fs.exists(srcPath));
+ IOUtils.cleanupWithLogger(null, srcStream);
+ IOUtils.cleanupWithLogger(null, dstStream);
+ }
+ /**
+ * Tests fs.delete() function to delete a blob when another blob is holding a
+ * lease on it. Delete if called without a lease should fail if another process
+ * is holding a lease and throw appropriate exception
+ * This is a scenario that would happen in HMaster startup when it tries to
+ * clean up the temp dirs while the HMaster process which was killed earlier
+ * held lease on the blob when doing some DDL operation
+ */
+ @Test
+ public void testDeleteThrowsExceptionWithLeaseExistsErrorMessage()
+ throws Exception {
+ LOG.info("Starting test");
+ // Create the file
+ Path path = methodPath();
+ fs.create(path);
+ assertPathExists("test file", path);
+ NativeAzureFileSystem nfs = fs;
+ final String fullKey = nfs.pathToKey(nfs.makeAbsolute(path));
+ final AzureNativeFileSystemStore store = nfs.getStore();
+
+ // Acquire the lease on the file in a background thread
+ final CountDownLatch leaseAttemptComplete = new CountDownLatch(1);
+ final CountDownLatch beginningDeleteAttempt = new CountDownLatch(1);
+ Thread t = new Thread() {
+ @Override
+ public void run() {
+ // Acquire the lease and then signal the main test thread.
+ SelfRenewingLease lease = null;
+ try {
+ lease = store.acquireLease(fullKey);
+ LOG.info("Lease acquired: " + lease.getLeaseID());
+ } catch (AzureException e) {
+ LOG.warn("Lease acqusition thread unable to acquire lease", e);
+ } finally {
+ leaseAttemptComplete.countDown();
+ }
+
+ // Wait for the main test thread to signal it will attempt the delete.
+ try {
+ beginningDeleteAttempt.await();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+
+ // Keep holding the lease past the lease acquisition retry interval, so
+ // the test covers the case of delete retrying to acquire the lease.
+ try {
+ Thread.sleep(SelfRenewingLease.LEASE_ACQUIRE_RETRY_INTERVAL * 3);
+ } catch (InterruptedException ex) {
+ Thread.currentThread().interrupt();
+ }
+
+ try {
+ if (lease != null){
+ LOG.info("Freeing lease");
+ lease.free();
+ }
+ } catch (StorageException se) {
+ LOG.warn("Unable to free lease.", se);
+ }
+ }
+ };
+
+ // Start the background thread and wait for it to signal the lease is held.
+ t.start();
+ try {
+ leaseAttemptComplete.await();
+ } catch (InterruptedException ex) {
+ Thread.currentThread().interrupt();
+ }
+
+ // Try to delete the same file
+ beginningDeleteAttempt.countDown();
+ store.delete(fullKey);
+
+ // At this point file SHOULD BE DELETED
+ assertPathDoesNotExist("Leased path", path);
+ }
+
+ /**
+ * Check that isPageBlobKey works as expected. This assumes that
+ * in the test configuration, the list of supported page blob directories
+ * only includes "pageBlobs". That's why this test is made specific
+ * to this subclass.
+ */
+ @Test
+ public void testIsPageBlobKey() {
+ AzureNativeFileSystemStore store = fs.getStore();
+
+ // Use literal strings so it's easier to understand the tests.
+ // In case the constant changes, we want to know about it so we can update this test.
+ assertEquals(AzureBlobStorageTestAccount.DEFAULT_PAGE_BLOB_DIRECTORY, "pageBlobs");
+
+ // URI prefix for test environment.
+ String uriPrefix = "file:///";
+
+ // negative tests
+ String[] negativeKeys = { "", "/", "bar", "bar/", "bar/pageBlobs", "bar/pageBlobs/foo",
+ "bar/pageBlobs/foo/", "/pageBlobs/", "/pageBlobs", "pageBlobs", "pageBlobsxyz/" };
+ for (String s : negativeKeys) {
+ assertFalse(store.isPageBlobKey(s));
+ assertFalse(store.isPageBlobKey(uriPrefix + s));
+ }
+
+ // positive tests
+ String[] positiveKeys = { "pageBlobs/", "pageBlobs/foo/", "pageBlobs/foo/bar/" };
+ for (String s : positiveKeys) {
+ assertTrue(store.isPageBlobKey(s));
+ assertTrue(store.isPageBlobKey(uriPrefix + s));
+ }
+ }
+
+ /**
+ * Test that isAtomicRenameKey() works as expected.
+ */
+ @Test
+ public void testIsAtomicRenameKey() {
+
+ AzureNativeFileSystemStore store = fs.getStore();
+
+ // We want to know if the default configuration changes so we can fix
+ // this test.
+ assertEquals(AzureBlobStorageTestAccount.DEFAULT_ATOMIC_RENAME_DIRECTORIES,
+ "/atomicRenameDir1,/atomicRenameDir2");
+
+ // URI prefix for test environment.
+ String uriPrefix = "file:///";
+
+ // negative tests
+ String[] negativeKeys = { "", "/", "bar", "bar/", "bar/hbase",
+ "bar/hbase/foo", "bar/hbase/foo/", "/hbase/", "/hbase", "hbase",
+ "hbasexyz/", "foo/atomicRenameDir1/"};
+ for (String s : negativeKeys) {
+ assertFalse(store.isAtomicRenameKey(s));
+ assertFalse(store.isAtomicRenameKey(uriPrefix + s));
+ }
+
+ // Positive tests. The directories for atomic rename are /hbase
+ // plus the ones in the configuration (DEFAULT_ATOMIC_RENAME_DIRECTORIES
+ // for this test).
+ String[] positiveKeys = { "hbase/", "hbase/foo/", "hbase/foo/bar/",
+ "atomicRenameDir1/foo/", "atomicRenameDir2/bar/"};
+ for (String s : positiveKeys) {
+ assertTrue(store.isAtomicRenameKey(s));
+ assertTrue(store.isAtomicRenameKey(uriPrefix + s));
+ }
+ }
+
+ /**
+ * Tests fs.mkdir() function to create a target blob while another thread
+ * is holding the lease on the blob. mkdir should not fail since the blob
+ * already exists.
+ * This is a scenario that would happen in HBase distributed log splitting.
+ * Multiple threads will try to create and update "recovered.edits" folder
+ * under the same path.
+ */
+ @Test
+ public void testMkdirOnExistingFolderWithLease() throws Exception {
+ SelfRenewingLease lease;
+ // Create the folder
+ Path path = methodPath();
+ fs.mkdirs(path);
+ NativeAzureFileSystem nfs = fs;
+ String fullKey = nfs.pathToKey(nfs.makeAbsolute(path));
+ AzureNativeFileSystemStore store = nfs.getStore();
+ // Acquire the lease on the folder
+ lease = store.acquireLease(fullKey);
+ assertNotNull("lease ID", lease.getLeaseID() != null);
+ // Try to create the same folder
+ store.storeEmptyFolder(fullKey,
+ nfs.createPermissionStatus(FsPermission.getDirDefault()));
+ lease.free();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2d2d97fa/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestOutOfBandAzureBlobOperationsLive.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestOutOfBandAzureBlobOperationsLive.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestOutOfBandAzureBlobOperationsLive.java
new file mode 100644
index 0000000..b63aaf0
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestOutOfBandAzureBlobOperationsLive.java
@@ -0,0 +1,185 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azure;
+
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.junit.Test;
+
+import com.microsoft.azure.storage.blob.BlobOutputStream;
+import com.microsoft.azure.storage.blob.CloudBlockBlob;
+
+/**
+ * Live blob operations.
+ */
+public class ITestOutOfBandAzureBlobOperationsLive extends AbstractWasbTestBase {
+
+ @Override
+ protected AzureBlobStorageTestAccount createTestAccount() throws Exception {
+ return AzureBlobStorageTestAccount.create();
+ }
+
+ // scenario for this particular test described at MONARCH-HADOOP-764
+ // creating a file out-of-band would confuse mkdirs("<oobfilesUncleFolder>")
+ // eg oob creation of "user/<name>/testFolder/a/input/file"
+ // Then wasb creation of "user/<name>/testFolder/a/output" fails
+ @Test
+ public void outOfBandFolder_uncleMkdirs() throws Exception {
+
+ // NOTE: manual use of CloubBlockBlob targets working directory explicitly.
+ // WASB driver methods prepend working directory implicitly.
+ String workingDir = "user/"
+ + UserGroupInformation.getCurrentUser().getShortUserName() + "/";
+
+ CloudBlockBlob blob = testAccount.getBlobReference(workingDir
+ + "testFolder1/a/input/file");
+ BlobOutputStream s = blob.openOutputStream();
+ s.close();
+ assertTrue(fs.exists(new Path("testFolder1/a/input/file")));
+
+ Path targetFolder = new Path("testFolder1/a/output");
+ assertTrue(fs.mkdirs(targetFolder));
+ }
+
+ // scenario for this particular test described at MONARCH-HADOOP-764
+ @Test
+ public void outOfBandFolder_parentDelete() throws Exception {
+
+ // NOTE: manual use of CloubBlockBlob targets working directory explicitly.
+ // WASB driver methods prepend working directory implicitly.
+ String workingDir = "user/"
+ + UserGroupInformation.getCurrentUser().getShortUserName() + "/";
+ CloudBlockBlob blob = testAccount.getBlobReference(workingDir
+ + "testFolder2/a/input/file");
+ BlobOutputStream s = blob.openOutputStream();
+ s.close();
+ assertTrue(fs.exists(new Path("testFolder2/a/input/file")));
+
+ Path targetFolder = new Path("testFolder2/a/input");
+ assertTrue(fs.delete(targetFolder, true));
+ }
+
+ @Test
+ public void outOfBandFolder_rootFileDelete() throws Exception {
+
+ CloudBlockBlob blob = testAccount.getBlobReference("fileY");
+ BlobOutputStream s = blob.openOutputStream();
+ s.close();
+ assertTrue(fs.exists(new Path("/fileY")));
+ assertTrue(fs.delete(new Path("/fileY"), true));
+ }
+
+ @Test
+ public void outOfBandFolder_firstLevelFolderDelete() throws Exception {
+
+ CloudBlockBlob blob = testAccount.getBlobReference("folderW/file");
+ BlobOutputStream s = blob.openOutputStream();
+ s.close();
+ assertTrue(fs.exists(new Path("/folderW")));
+ assertTrue(fs.exists(new Path("/folderW/file")));
+ assertTrue(fs.delete(new Path("/folderW"), true));
+ }
+
+ // scenario for this particular test described at MONARCH-HADOOP-764
+ @Test
+ public void outOfBandFolder_siblingCreate() throws Exception {
+
+ // NOTE: manual use of CloubBlockBlob targets working directory explicitly.
+ // WASB driver methods prepend working directory implicitly.
+ String workingDir = "user/"
+ + UserGroupInformation.getCurrentUser().getShortUserName() + "/";
+ CloudBlockBlob blob = testAccount.getBlobReference(workingDir
+ + "testFolder3/a/input/file");
+ BlobOutputStream s = blob.openOutputStream();
+ s.close();
+ assertTrue(fs.exists(new Path("testFolder3/a/input/file")));
+
+ Path targetFile = new Path("testFolder3/a/input/file2");
+ FSDataOutputStream s2 = fs.create(targetFile);
+ s2.close();
+ }
+
+ // scenario for this particular test described at MONARCH-HADOOP-764
+ // creating a new file in the root folder
+ @Test
+ public void outOfBandFolder_create_rootDir() throws Exception {
+ Path targetFile = new Path("/newInRoot");
+ FSDataOutputStream s2 = fs.create(targetFile);
+ s2.close();
+ }
+
+ // scenario for this particular test described at MONARCH-HADOOP-764
+ @Test
+ public void outOfBandFolder_rename() throws Exception {
+
+ // NOTE: manual use of CloubBlockBlob targets working directory explicitly.
+ // WASB driver methods prepend working directory implicitly.
+ String workingDir = "user/"
+ + UserGroupInformation.getCurrentUser().getShortUserName() + "/";
+ CloudBlockBlob blob = testAccount.getBlobReference(workingDir
+ + "testFolder4/a/input/file");
+ BlobOutputStream s = blob.openOutputStream();
+ s.close();
+
+ Path srcFilePath = new Path("testFolder4/a/input/file");
+ assertTrue(fs.exists(srcFilePath));
+
+ Path destFilePath = new Path("testFolder4/a/input/file2");
+ fs.rename(srcFilePath, destFilePath);
+ }
+
+ // Verify that you can rename a file which is the only file in an implicit folder in the
+ // WASB file system.
+ // scenario for this particular test described at MONARCH-HADOOP-892
+ @Test
+ public void outOfBandSingleFile_rename() throws Exception {
+
+ //NOTE: manual use of CloubBlockBlob targets working directory explicitly.
+ // WASB driver methods prepend working directory implicitly.
+ String workingDir = "user/" + UserGroupInformation.getCurrentUser().getShortUserName() + "/";
+ CloudBlockBlob blob = testAccount.getBlobReference(workingDir + "testFolder5/a/input/file");
+ BlobOutputStream s = blob.openOutputStream();
+ s.close();
+
+ Path srcFilePath = new Path("testFolder5/a/input/file");
+ assertTrue(fs.exists(srcFilePath));
+
+ Path destFilePath = new Path("testFolder5/file2");
+ fs.rename(srcFilePath, destFilePath);
+ }
+
+ // WASB must force explicit parent directories in create, delete, mkdirs, rename.
+ // scenario for this particular test described at MONARCH-HADOOP-764
+ @Test
+ public void outOfBandFolder_rename_rootLevelFiles() throws Exception {
+
+ // NOTE: manual use of CloubBlockBlob targets working directory explicitly.
+ // WASB driver methods prepend working directory implicitly.
+ CloudBlockBlob blob = testAccount.getBlobReference("fileX");
+ BlobOutputStream s = blob.openOutputStream();
+ s.close();
+
+ Path srcFilePath = new Path("/fileX");
+ assertTrue(fs.exists(srcFilePath));
+
+ Path destFilePath = new Path("/fileXrename");
+ fs.rename(srcFilePath, destFilePath);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2d2d97fa/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestReadAndSeekPageBlobAfterWrite.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestReadAndSeekPageBlobAfterWrite.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestReadAndSeekPageBlobAfterWrite.java
new file mode 100644
index 0000000..f2af116
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestReadAndSeekPageBlobAfterWrite.java
@@ -0,0 +1,341 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azure;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Arrays;
+import java.util.Random;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.azure.integration.AbstractAzureScaleTest;
+import org.apache.hadoop.util.Time;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.hadoop.fs.azure.integration.AzureTestUtils .*;
+
+/**
+ * Write data into a page blob and verify you can read back all of it
+ * or just a part of it.
+ */
+public class ITestReadAndSeekPageBlobAfterWrite extends AbstractAzureScaleTest {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(ITestReadAndSeekPageBlobAfterWrite.class);
+
+ private FileSystem fs;
+ private byte[] randomData;
+
+ // Page blob physical page size
+ private static final int PAGE_SIZE = PageBlobFormatHelpers.PAGE_SIZE;
+
+ // Size of data on page (excluding header)
+ private static final int PAGE_DATA_SIZE = PAGE_SIZE - PageBlobFormatHelpers.PAGE_HEADER_SIZE;
+ private static final int MAX_BYTES = 33554432; // maximum bytes in a file that we'll test
+ private static final int MAX_PAGES = MAX_BYTES / PAGE_SIZE; // maximum number of pages we'll test
+ private Random rand = new Random();
+
+ // A key with a prefix under /pageBlobs, which for the test file system will
+ // force use of a page blob.
+ private static final String KEY = "/pageBlobs/file.dat";
+
+ // path of page blob file to read and write
+ private Path blobPath;
+
+ @Override
+ public void setUp() throws Exception {
+ super.setUp();
+ fs = getTestAccount().getFileSystem();
+ // Make sure we are using an integral number of pages.
+ assertEquals(0, MAX_BYTES % PAGE_SIZE);
+
+ // load an in-memory array of random data
+ randomData = new byte[PAGE_SIZE * MAX_PAGES];
+ rand.nextBytes(randomData);
+
+ blobPath = blobPath("ITestReadAndSeekPageBlobAfterWrite");
+ }
+
+ @Override
+ public void tearDown() throws Exception {
+ deleteQuietly(fs, blobPath, true);
+ super.tearDown();
+ }
+
+ /**
+ * Make sure the file name (key) is a page blob file name. If anybody changes that,
+ * we need to come back and update this test class.
+ */
+ @Test
+ public void testIsPageBlobFileName() {
+ AzureNativeFileSystemStore store = ((NativeAzureFileSystem) fs).getStore();
+ String[] a = blobPath.toUri().getPath().split("/");
+ String key2 = a[1] + "/";
+ assertTrue("Not a page blob: " + blobPath, store.isPageBlobKey(key2));
+ }
+
+ /**
+ * For a set of different file sizes, write some random data to a page blob,
+ * read it back, and compare that what was read is the same as what was written.
+ */
+ @Test
+ public void testReadAfterWriteRandomData() throws IOException {
+
+ // local shorthand
+ final int pds = PAGE_DATA_SIZE;
+
+ // Test for sizes at and near page boundaries
+ int[] dataSizes = {
+
+ // on first page
+ 0, 1, 2, 3,
+
+ // Near first physical page boundary (because the implementation
+ // stores PDS + the page header size bytes on each page).
+ pds - 1, pds, pds + 1, pds + 2, pds + 3,
+
+ // near second physical page boundary
+ (2 * pds) - 1, (2 * pds), (2 * pds) + 1, (2 * pds) + 2, (2 * pds) + 3,
+
+ // near tenth physical page boundary
+ (10 * pds) - 1, (10 * pds), (10 * pds) + 1, (10 * pds) + 2, (10 * pds) + 3,
+
+ // test one big size, >> 4MB (an internal buffer size in the code)
+ MAX_BYTES
+ };
+
+ for (int i : dataSizes) {
+ testReadAfterWriteRandomData(i);
+ }
+ }
+
+ private void testReadAfterWriteRandomData(int size) throws IOException {
+ writeRandomData(size);
+ readRandomDataAndVerify(size);
+ }
+
+ /**
+ * Read "size" bytes of data and verify that what was read and what was written
+ * are the same.
+ */
+ private void readRandomDataAndVerify(int size) throws AzureException, IOException {
+ byte[] b = new byte[size];
+ FSDataInputStream stream = fs.open(blobPath);
+ int bytesRead = stream.read(b);
+ stream.close();
+ assertEquals(bytesRead, size);
+
+ // compare the data read to the data written
+ assertTrue(comparePrefix(randomData, b, size));
+ }
+
+ // return true if the beginning "size" values of the arrays are the same
+ private boolean comparePrefix(byte[] a, byte[] b, int size) {
+ if (a.length < size || b.length < size) {
+ return false;
+ }
+ for (int i = 0; i < size; i++) {
+ if (a[i] != b[i]) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ // Write a specified amount of random data to the file path for this test class.
+ private void writeRandomData(int size) throws IOException {
+ OutputStream output = fs.create(blobPath);
+ output.write(randomData, 0, size);
+ output.close();
+ }
+
+ /**
+ * Write data to a page blob, open it, seek, and then read a range of data.
+ * Then compare that the data read from that range is the same as the data originally written.
+ */
+ @Test
+ public void testPageBlobSeekAndReadAfterWrite() throws IOException {
+ writeRandomData(PAGE_SIZE * MAX_PAGES);
+ int recordSize = 100;
+ byte[] b = new byte[recordSize];
+
+
+ try(FSDataInputStream stream = fs.open(blobPath)) {
+ // Seek to a boundary around the middle of the 6th page
+ int seekPosition = 5 * PAGE_SIZE + 250;
+ stream.seek(seekPosition);
+
+ // Read a record's worth of bytes and verify results
+ int bytesRead = stream.read(b);
+ verifyReadRandomData(b, bytesRead, seekPosition, recordSize);
+
+ // Seek to another spot and read a record greater than a page
+ seekPosition = 10 * PAGE_SIZE + 250;
+ stream.seek(seekPosition);
+ recordSize = 1000;
+ b = new byte[recordSize];
+ bytesRead = stream.read(b);
+ verifyReadRandomData(b, bytesRead, seekPosition, recordSize);
+
+ // Read the last 100 bytes of the file
+ recordSize = 100;
+ seekPosition = PAGE_SIZE * MAX_PAGES - recordSize;
+ stream.seek(seekPosition);
+ b = new byte[recordSize];
+ bytesRead = stream.read(b);
+ verifyReadRandomData(b, bytesRead, seekPosition, recordSize);
+
+ // Read past the end of the file and we should get only partial data.
+ recordSize = 100;
+ seekPosition = PAGE_SIZE * MAX_PAGES - recordSize + 50;
+ stream.seek(seekPosition);
+ b = new byte[recordSize];
+ bytesRead = stream.read(b);
+ assertEquals(50, bytesRead);
+
+ // compare last 50 bytes written with those read
+ byte[] tail = Arrays.copyOfRange(randomData, seekPosition, randomData.length);
+ assertTrue(comparePrefix(tail, b, 50));
+ }
+ }
+
+ // Verify that reading a record of data after seeking gives the expected data.
+ private void verifyReadRandomData(byte[] b, int bytesRead, int seekPosition, int recordSize) {
+ byte[] originalRecordData =
+ Arrays.copyOfRange(randomData, seekPosition, seekPosition + recordSize + 1);
+ assertEquals(recordSize, bytesRead);
+ assertTrue(comparePrefix(originalRecordData, b, recordSize));
+ }
+
+ // Test many small flushed writes interspersed with periodic hflush calls.
+ // For manual testing, increase NUM_WRITES to a large number.
+ // The goal for a long-running manual test is to make sure that it finishes
+ // and the close() call does not time out. It also facilitates debugging into
+ // hflush/hsync.
+ @Test
+ public void testManySmallWritesWithHFlush() throws IOException {
+ writeAndReadOneFile(50, 100, 20);
+ }
+
+ /**
+ * Write a total of numWrites * recordLength data to a file, read it back,
+ * and check to make sure what was read is the same as what was written.
+ * The syncInterval is the number of writes after which to call hflush to
+ * force the data to storage.
+ */
+ private void writeAndReadOneFile(int numWrites,
+ int recordLength, int syncInterval) throws IOException {
+
+ // A lower bound on the minimum time we think it will take to do
+ // a write to Azure storage.
+ final long MINIMUM_EXPECTED_TIME = 20;
+ LOG.info("Writing " + numWrites * recordLength + " bytes to " + blobPath.getName());
+ FSDataOutputStream output = fs.create(blobPath);
+ int writesSinceHFlush = 0;
+ try {
+
+ // Do a flush and hflush to exercise case for empty write queue in PageBlobOutputStream,
+ // to test concurrent execution gates.
+ output.flush();
+ output.hflush();
+ for (int i = 0; i < numWrites; i++) {
+ output.write(randomData, i * recordLength, recordLength);
+ writesSinceHFlush++;
+ output.flush();
+ if ((i % syncInterval) == 0) {
+ output.hflush();
+ writesSinceHFlush = 0;
+ }
+ }
+ } finally {
+ long start = Time.monotonicNow();
+ output.close();
+ long end = Time.monotonicNow();
+ LOG.debug("close duration = " + (end - start) + " msec.");
+ if (writesSinceHFlush > 0) {
+ assertTrue(String.format(
+ "close duration with >= 1 pending write is %d, less than minimum expected of %d",
+ end - start, MINIMUM_EXPECTED_TIME),
+ end - start >= MINIMUM_EXPECTED_TIME);
+ }
+ }
+
+ // Read the data back and check it.
+ FSDataInputStream stream = fs.open(blobPath);
+ int SIZE = numWrites * recordLength;
+ byte[] b = new byte[SIZE];
+ try {
+ stream.seek(0);
+ stream.read(b, 0, SIZE);
+ verifyReadRandomData(b, SIZE, 0, SIZE);
+ } finally {
+ stream.close();
+ }
+
+ // delete the file
+ fs.delete(blobPath, false);
+ }
+
+ // Test writing to a large file repeatedly as a stress test.
+ // Set the repetitions to a larger number for manual testing
+ // for a longer stress run.
+ @Test
+ public void testLargeFileStress() throws IOException {
+ int numWrites = 32;
+ int recordSize = 1024 * 1024;
+ int syncInterval = 10;
+ int repetitions = 1;
+ for (int i = 0; i < repetitions; i++) {
+ writeAndReadOneFile(numWrites, recordSize, syncInterval);
+ }
+ }
+
+ // Write to a file repeatedly to verify that it extends.
+ // The page blob file should start out at 128MB and finish at 256MB.
+ public void testFileSizeExtension() throws IOException {
+ final int writeSize = 1024 * 1024;
+ final int numWrites = 129;
+ final byte dataByte = 5;
+ byte[] data = new byte[writeSize];
+ Arrays.fill(data, dataByte);
+ try (FSDataOutputStream output = fs.create(blobPath)) {
+ for (int i = 0; i < numWrites; i++) {
+ output.write(data);
+ output.hflush();
+ LOG.debug("total writes = " + (i + 1));
+ }
+ }
+
+ // Show that we wrote more than the default page blob file size.
+ assertTrue(numWrites * writeSize > PageBlobOutputStream.PAGE_BLOB_MIN_SIZE);
+
+ // Verify we can list the new size. That will prove we expanded the file.
+ FileStatus[] status = fs.listStatus(blobPath);
+ assertEquals("File size hasn't changed " + status,
+ numWrites * writeSize, status[0].getLen());
+ LOG.debug("Total bytes written to " + blobPath + " = " + status[0].getLen());
+ fs.delete(blobPath, false);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2d2d97fa/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestWasbRemoteCallHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestWasbRemoteCallHelper.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestWasbRemoteCallHelper.java
new file mode 100644
index 0000000..062bc36
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestWasbRemoteCallHelper.java
@@ -0,0 +1,568 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azure;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.contract.ContractTestUtils;
+import org.apache.hadoop.io.retry.RetryUtils;
+import org.apache.http.Header;
+import org.apache.http.HttpResponse;
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpStatus;
+import org.apache.http.StatusLine;
+import org.apache.http.ProtocolVersion;
+import org.apache.http.ParseException;
+import org.apache.http.HeaderElement;
+import org.apache.http.client.HttpClient;
+import org.apache.http.client.methods.HttpGet;
+import org.hamcrest.Description;
+import org.hamcrest.TypeSafeMatcher;
+import org.junit.Assume;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.mockito.ArgumentMatcher;
+import org.mockito.Mockito;
+
+import java.io.ByteArrayInputStream;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.nio.charset.StandardCharsets;
+
+import static org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.KEY_USE_SECURE_MODE;
+import static org.mockito.Matchers.argThat;
+import static org.mockito.Mockito.atLeast;
+import static org.mockito.Mockito.times;
+
+/**
+ * Test class to hold all WasbRemoteCallHelper tests.
+ */
+public class ITestWasbRemoteCallHelper
+ extends AbstractWasbTestBase {
+ public static final String EMPTY_STRING = "";
+ private static final int INVALID_HTTP_STATUS_CODE_999 = 999;
+
+ @Override
+ protected AzureBlobStorageTestAccount createTestAccount() throws Exception {
+ Configuration conf = new Configuration();
+ conf.set(NativeAzureFileSystem.KEY_AZURE_AUTHORIZATION, "true");
+ conf.set(RemoteWasbAuthorizerImpl.KEY_REMOTE_AUTH_SERVICE_URLS, "http://localhost1/,http://localhost2/,http://localhost:8080");
+ return AzureBlobStorageTestAccount.create(conf);
+ }
+
+ @Override
+ public void setUp() throws Exception {
+ super.setUp();
+ boolean useSecureMode = fs.getConf().getBoolean(KEY_USE_SECURE_MODE, false);
+ boolean useAuthorization = fs.getConf()
+ .getBoolean(NativeAzureFileSystem.KEY_AZURE_AUTHORIZATION, false);
+ Assume.assumeTrue("Test valid when both SecureMode and Authorization are enabled .. skipping",
+ useSecureMode && useAuthorization);
+ }
+
+ @Rule
+ public ExpectedException expectedEx = ExpectedException.none();
+
+ /**
+ * Test invalid status-code.
+ * @throws Throwable
+ */
+ @Test // (expected = WasbAuthorizationException.class)
+ public void testInvalidStatusCode() throws Throwable {
+
+ setupExpectations();
+
+ // set up mocks
+ HttpClient mockHttpClient = Mockito.mock(HttpClient.class);
+ HttpResponse mockHttpResponse = Mockito.mock(HttpResponse.class);
+ Mockito.when(mockHttpClient.execute(Mockito.<HttpGet>any()))
+ .thenReturn(mockHttpResponse);
+ Mockito.when(mockHttpResponse.getStatusLine())
+ .thenReturn(newStatusLine(INVALID_HTTP_STATUS_CODE_999));
+ // finished setting up mocks
+
+ performop(mockHttpClient);
+ }
+
+ /**
+ * Test invalid Content-Type.
+ * @throws Throwable
+ */
+ @Test // (expected = WasbAuthorizationException.class)
+ public void testInvalidContentType() throws Throwable {
+
+ setupExpectations();
+
+ // set up mocks
+ HttpClient mockHttpClient = Mockito.mock(HttpClient.class);
+ HttpResponse mockHttpResponse = Mockito.mock(HttpResponse.class);
+ Mockito.when(mockHttpClient.execute(Mockito.<HttpGet>any())).thenReturn(mockHttpResponse);
+ Mockito.when(mockHttpResponse.getStatusLine()).thenReturn(newStatusLine(HttpStatus.SC_OK));
+ Mockito.when(mockHttpResponse.getFirstHeader("Content-Type"))
+ .thenReturn(newHeader("Content-Type", "text/plain"));
+ // finished setting up mocks
+
+ performop(mockHttpClient);
+ }
+
+ /**
+ * Test missing Content-Length.
+ * @throws Throwable
+ */
+ @Test // (expected = WasbAuthorizationException.class)
+ public void testMissingContentLength() throws Throwable {
+
+ setupExpectations();
+
+ // set up mocks
+ HttpClient mockHttpClient = Mockito.mock(HttpClient.class);
+ HttpResponse mockHttpResponse = Mockito.mock(HttpResponse.class);
+ Mockito.when(mockHttpClient.execute(Mockito.<HttpGet>any())).thenReturn(mockHttpResponse);
+ Mockito.when(mockHttpResponse.getStatusLine()).thenReturn(newStatusLine(HttpStatus.SC_OK));
+ Mockito.when(mockHttpResponse.getFirstHeader("Content-Type"))
+ .thenReturn(newHeader("Content-Type", "application/json"));
+ // finished setting up mocks
+
+ performop(mockHttpClient);
+ }
+
+ /**
+ * Test Content-Length exceeds max.
+ * @throws Throwable
+ */
+ @Test // (expected = WasbAuthorizationException.class)
+ public void testContentLengthExceedsMax() throws Throwable {
+
+ setupExpectations();
+
+ // set up mocks
+ HttpClient mockHttpClient = Mockito.mock(HttpClient.class);
+ HttpResponse mockHttpResponse = Mockito.mock(HttpResponse.class);
+ Mockito.when(mockHttpClient.execute(Mockito.<HttpGet>any())).thenReturn(mockHttpResponse);
+ Mockito.when(mockHttpResponse.getStatusLine()).thenReturn(newStatusLine(HttpStatus.SC_OK));
+ Mockito.when(mockHttpResponse.getFirstHeader("Content-Type"))
+ .thenReturn(newHeader("Content-Type", "application/json"));
+ Mockito.when(mockHttpResponse.getFirstHeader("Content-Length"))
+ .thenReturn(newHeader("Content-Length", "2048"));
+ // finished setting up mocks
+
+ performop(mockHttpClient);
+ }
+
+ /**
+ * Test invalid Content-Length value
+ * @throws Throwable
+ */
+ @Test // (expected = WasbAuthorizationException.class)
+ public void testInvalidContentLengthValue() throws Throwable {
+
+ setupExpectations();
+
+ // set up mocks
+ HttpClient mockHttpClient = Mockito.mock(HttpClient.class);
+ HttpResponse mockHttpResponse = Mockito.mock(HttpResponse.class);
+ Mockito.when(mockHttpClient.execute(Mockito.<HttpGet>any())).thenReturn(mockHttpResponse);
+ Mockito.when(mockHttpResponse.getStatusLine()).thenReturn(newStatusLine(HttpStatus.SC_OK));
+ Mockito.when(mockHttpResponse.getFirstHeader("Content-Type"))
+ .thenReturn(newHeader("Content-Type", "application/json"));
+ Mockito.when(mockHttpResponse.getFirstHeader("Content-Length"))
+ .thenReturn(newHeader("Content-Length", "20abc48"));
+ // finished setting up mocks
+
+ performop(mockHttpClient);
+ }
+
+ /**
+ * Test valid JSON response.
+ * @throws Throwable
+ */
+ @Test
+ public void testValidJSONResponse() throws Throwable {
+
+ // set up mocks
+ HttpClient mockHttpClient = Mockito.mock(HttpClient.class);
+
+ HttpResponse mockHttpResponse = Mockito.mock(HttpResponse.class);
+ HttpEntity mockHttpEntity = Mockito.mock(HttpEntity.class);
+
+ Mockito.when(mockHttpClient.execute(Mockito.<HttpGet>any())).thenReturn(mockHttpResponse);
+ Mockito.when(mockHttpResponse.getStatusLine()).thenReturn(newStatusLine(HttpStatus.SC_OK));
+ Mockito.when(mockHttpResponse.getFirstHeader("Content-Type"))
+ .thenReturn(newHeader("Content-Type", "application/json"));
+ Mockito.when(mockHttpResponse.getFirstHeader("Content-Length"))
+ .thenReturn(newHeader("Content-Length", "1024"));
+ Mockito.when(mockHttpResponse.getEntity()).thenReturn(mockHttpEntity);
+ Mockito.when(mockHttpEntity.getContent())
+ .thenReturn(new ByteArrayInputStream(validJsonResponse().getBytes(StandardCharsets.UTF_8)))
+ .thenReturn(new ByteArrayInputStream(validJsonResponse().getBytes(StandardCharsets.UTF_8)))
+ .thenReturn(new ByteArrayInputStream(validJsonResponse().getBytes(StandardCharsets.UTF_8)));
+ // finished setting up mocks
+
+ performop(mockHttpClient);
+ }
+
+ /**
+ * Test malformed JSON response.
+ * @throws Throwable
+ */
+ @Test // (expected = WasbAuthorizationException.class)
+ public void testMalFormedJSONResponse() throws Throwable {
+
+ expectedEx.expect(WasbAuthorizationException.class);
+ expectedEx.expectMessage("com.fasterxml.jackson.core.JsonParseException: Unexpected end-of-input in FIELD_NAME");
+
+ // set up mocks
+ HttpClient mockHttpClient = Mockito.mock(HttpClient.class);
+
+ HttpResponse mockHttpResponse = Mockito.mock(HttpResponse.class);
+ HttpEntity mockHttpEntity = Mockito.mock(HttpEntity.class);
+
+ Mockito.when(mockHttpClient.execute(Mockito.<HttpGet>any())).thenReturn(mockHttpResponse);
+ Mockito.when(mockHttpResponse.getStatusLine()).thenReturn(newStatusLine(HttpStatus.SC_OK));
+ Mockito.when(mockHttpResponse.getFirstHeader("Content-Type"))
+ .thenReturn(newHeader("Content-Type", "application/json"));
+ Mockito.when(mockHttpResponse.getFirstHeader("Content-Length"))
+ .thenReturn(newHeader("Content-Length", "1024"));
+ Mockito.when(mockHttpResponse.getEntity()).thenReturn(mockHttpEntity);
+ Mockito.when(mockHttpEntity.getContent())
+ .thenReturn(new ByteArrayInputStream(malformedJsonResponse().getBytes(StandardCharsets.UTF_8)));
+ // finished setting up mocks
+
+ performop(mockHttpClient);
+ }
+
+ /**
+ * Test valid JSON response failure response code.
+ * @throws Throwable
+ */
+ @Test // (expected = WasbAuthorizationException.class)
+ public void testFailureCodeJSONResponse() throws Throwable {
+
+ expectedEx.expect(WasbAuthorizationException.class);
+ expectedEx.expectMessage("Remote authorization service encountered an error Unauthorized");
+
+ // set up mocks
+ HttpClient mockHttpClient = Mockito.mock(HttpClient.class);
+
+ HttpResponse mockHttpResponse = Mockito.mock(HttpResponse.class);
+ HttpEntity mockHttpEntity = Mockito.mock(HttpEntity.class);
+
+ Mockito.when(mockHttpClient.execute(Mockito.<HttpGet>any())).thenReturn(mockHttpResponse);
+ Mockito.when(mockHttpResponse.getStatusLine()).thenReturn(newStatusLine(HttpStatus.SC_OK));
+ Mockito.when(mockHttpResponse.getFirstHeader("Content-Type"))
+ .thenReturn(newHeader("Content-Type", "application/json"));
+ Mockito.when(mockHttpResponse.getFirstHeader("Content-Length"))
+ .thenReturn(newHeader("Content-Length", "1024"));
+ Mockito.when(mockHttpResponse.getEntity()).thenReturn(mockHttpEntity);
+ Mockito.when(mockHttpEntity.getContent())
+ .thenReturn(new ByteArrayInputStream(failureCodeJsonResponse().getBytes(StandardCharsets.UTF_8)));
+ // finished setting up mocks
+
+ performop(mockHttpClient);
+ }
+
+ @Test
+ public void testWhenOneInstanceIsDown() throws Throwable {
+
+ boolean isAuthorizationCachingEnabled = fs.getConf().getBoolean(CachingAuthorizer.KEY_AUTH_SERVICE_CACHING_ENABLE, false);
+
+ // set up mocks
+ HttpClient mockHttpClient = Mockito.mock(HttpClient.class);
+ HttpEntity mockHttpEntity = Mockito.mock(HttpEntity.class);
+
+ HttpResponse mockHttpResponseService1 = Mockito.mock(HttpResponse.class);
+ Mockito.when(mockHttpResponseService1.getStatusLine())
+ .thenReturn(newStatusLine(HttpStatus.SC_INTERNAL_SERVER_ERROR));
+ Mockito.when(mockHttpResponseService1.getFirstHeader("Content-Type"))
+ .thenReturn(newHeader("Content-Type", "application/json"));
+ Mockito.when(mockHttpResponseService1.getFirstHeader("Content-Length"))
+ .thenReturn(newHeader("Content-Length", "1024"));
+ Mockito.when(mockHttpResponseService1.getEntity())
+ .thenReturn(mockHttpEntity);
+
+ HttpResponse mockHttpResponseService2 = Mockito.mock(HttpResponse.class);
+ Mockito.when(mockHttpResponseService2.getStatusLine())
+ .thenReturn(newStatusLine(HttpStatus.SC_OK));
+ Mockito.when(mockHttpResponseService2.getFirstHeader("Content-Type"))
+ .thenReturn(newHeader("Content-Type", "application/json"));
+ Mockito.when(mockHttpResponseService2.getFirstHeader("Content-Length"))
+ .thenReturn(newHeader("Content-Length", "1024"));
+ Mockito.when(mockHttpResponseService2.getEntity())
+ .thenReturn(mockHttpEntity);
+
+ HttpResponse mockHttpResponseServiceLocal = Mockito.mock(HttpResponse.class);
+ Mockito.when(mockHttpResponseServiceLocal.getStatusLine())
+ .thenReturn(newStatusLine(HttpStatus.SC_INTERNAL_SERVER_ERROR));
+ Mockito.when(mockHttpResponseServiceLocal.getFirstHeader("Content-Type"))
+ .thenReturn(newHeader("Content-Type", "application/json"));
+ Mockito.when(mockHttpResponseServiceLocal.getFirstHeader("Content-Length"))
+ .thenReturn(newHeader("Content-Length", "1024"));
+ Mockito.when(mockHttpResponseServiceLocal.getEntity())
+ .thenReturn(mockHttpEntity);
+
+
+
+ class HttpGetForService1 extends ArgumentMatcher<HttpGet>{
+ @Override public boolean matches(Object o) {
+ return checkHttpGetMatchHost((HttpGet) o, "localhost1");
+ }
+ }
+ class HttpGetForService2 extends ArgumentMatcher<HttpGet>{
+ @Override public boolean matches(Object o) {
+ return checkHttpGetMatchHost((HttpGet) o, "localhost2");
+ }
+ }
+ class HttpGetForServiceLocal extends ArgumentMatcher<HttpGet>{
+ @Override public boolean matches(Object o) {
+ try {
+ return checkHttpGetMatchHost((HttpGet) o, InetAddress.getLocalHost().getCanonicalHostName());
+ } catch (UnknownHostException e) {
+ return checkHttpGetMatchHost((HttpGet) o, "localhost");
+ }
+ }
+ }
+ Mockito.when(mockHttpClient.execute(argThat(new HttpGetForService1())))
+ .thenReturn(mockHttpResponseService1);
+ Mockito.when(mockHttpClient.execute(argThat(new HttpGetForService2())))
+ .thenReturn(mockHttpResponseService2);
+ Mockito.when(mockHttpClient.execute(argThat(new HttpGetForServiceLocal())))
+ .thenReturn(mockHttpResponseServiceLocal);
+
+ //Need 2 times because performop() does 2 fs operations.
+ Mockito.when(mockHttpEntity.getContent())
+ .thenReturn(new ByteArrayInputStream(validJsonResponse()
+ .getBytes(StandardCharsets.UTF_8)))
+ .thenReturn(new ByteArrayInputStream(validJsonResponse()
+ .getBytes(StandardCharsets.UTF_8)))
+ .thenReturn(new ByteArrayInputStream(validJsonResponse()
+ .getBytes(StandardCharsets.UTF_8)));
+ // finished setting up mocks
+
+ performop(mockHttpClient);
+
+ int expectedNumberOfInvocations = isAuthorizationCachingEnabled ? 1 : 2;
+ Mockito.verify(mockHttpClient, times(expectedNumberOfInvocations)).execute(Mockito.argThat(new HttpGetForServiceLocal()));
+ Mockito.verify(mockHttpClient, times(expectedNumberOfInvocations)).execute(Mockito.argThat(new HttpGetForService2()));
+ }
+
+ @Test
+ public void testWhenServiceInstancesAreDown() throws Throwable {
+ //expectedEx.expect(WasbAuthorizationException.class);
+ // set up mocks
+ HttpClient mockHttpClient = Mockito.mock(HttpClient.class);
+ HttpEntity mockHttpEntity = Mockito.mock(HttpEntity.class);
+
+ HttpResponse mockHttpResponseService1 = Mockito.mock(HttpResponse.class);
+ Mockito.when(mockHttpResponseService1.getStatusLine())
+ .thenReturn(newStatusLine(HttpStatus.SC_INTERNAL_SERVER_ERROR));
+ Mockito.when(mockHttpResponseService1.getFirstHeader("Content-Type"))
+ .thenReturn(newHeader("Content-Type", "application/json"));
+ Mockito.when(mockHttpResponseService1.getFirstHeader("Content-Length"))
+ .thenReturn(newHeader("Content-Length", "1024"));
+ Mockito.when(mockHttpResponseService1.getEntity())
+ .thenReturn(mockHttpEntity);
+
+ HttpResponse mockHttpResponseService2 = Mockito.mock(HttpResponse.class);
+ Mockito.when(mockHttpResponseService2.getStatusLine())
+ .thenReturn(newStatusLine(
+ HttpStatus.SC_INTERNAL_SERVER_ERROR));
+ Mockito.when(mockHttpResponseService2.getFirstHeader("Content-Type"))
+ .thenReturn(newHeader("Content-Type", "application/json"));
+ Mockito.when(mockHttpResponseService2.getFirstHeader("Content-Length"))
+ .thenReturn(newHeader("Content-Length", "1024"));
+ Mockito.when(mockHttpResponseService2.getEntity())
+ .thenReturn(mockHttpEntity);
+
+ HttpResponse mockHttpResponseService3 = Mockito.mock(HttpResponse.class);
+ Mockito.when(mockHttpResponseService3.getStatusLine())
+ .thenReturn(newStatusLine(
+ HttpStatus.SC_INTERNAL_SERVER_ERROR));
+ Mockito.when(mockHttpResponseService3.getFirstHeader("Content-Type"))
+ .thenReturn(newHeader("Content-Type", "application/json"));
+ Mockito.when(mockHttpResponseService3.getFirstHeader("Content-Length"))
+ .thenReturn(newHeader("Content-Length", "1024"));
+ Mockito.when(mockHttpResponseService3.getEntity())
+ .thenReturn(mockHttpEntity);
+
+ class HttpGetForService1 extends ArgumentMatcher<HttpGet>{
+ @Override public boolean matches(Object o) {
+ return checkHttpGetMatchHost((HttpGet) o, "localhost1");
+ }
+ }
+ class HttpGetForService2 extends ArgumentMatcher<HttpGet>{
+ @Override public boolean matches(Object o) {
+ return checkHttpGetMatchHost((HttpGet) o, "localhost2");
+ }
+ }
+ class HttpGetForService3 extends ArgumentMatcher<HttpGet> {
+ @Override public boolean matches(Object o){
+ try {
+ return checkHttpGetMatchHost((HttpGet) o, InetAddress.getLocalHost().getCanonicalHostName());
+ } catch (UnknownHostException e) {
+ return checkHttpGetMatchHost((HttpGet) o, "localhost");
+ }
+ }
+ }
+ Mockito.when(mockHttpClient.execute(argThat(new HttpGetForService1())))
+ .thenReturn(mockHttpResponseService1);
+ Mockito.when(mockHttpClient.execute(argThat(new HttpGetForService2())))
+ .thenReturn(mockHttpResponseService2);
+ Mockito.when(mockHttpClient.execute(argThat(new HttpGetForService3())))
+ .thenReturn(mockHttpResponseService3);
+
+ //Need 3 times because performop() does 3 fs operations.
+ Mockito.when(mockHttpEntity.getContent())
+ .thenReturn(new ByteArrayInputStream(
+ validJsonResponse().getBytes(StandardCharsets.UTF_8)))
+ .thenReturn(new ByteArrayInputStream(
+ validJsonResponse().getBytes(StandardCharsets.UTF_8)))
+ .thenReturn(new ByteArrayInputStream(
+ validJsonResponse().getBytes(StandardCharsets.UTF_8)));
+ // finished setting up mocks
+ try {
+ performop(mockHttpClient);
+ }catch (WasbAuthorizationException e){
+ e.printStackTrace();
+ Mockito.verify(mockHttpClient, atLeast(2))
+ .execute(argThat(new HttpGetForService1()));
+ Mockito.verify(mockHttpClient, atLeast(2))
+ .execute(argThat(new HttpGetForService2()));
+ Mockito.verify(mockHttpClient, atLeast(3))
+ .execute(argThat(new HttpGetForService3()));
+ Mockito.verify(mockHttpClient, times(7)).execute(Mockito.<HttpGet>any());
+ }
+ }
+
+ private void setupExpectations() {
+ expectedEx.expect(WasbAuthorizationException.class);
+
+ class MatchesPattern extends TypeSafeMatcher<String> {
+ private String pattern;
+
+ MatchesPattern(String pattern) {
+ this.pattern = pattern;
+ }
+
+ @Override protected boolean matchesSafely(String item) {
+ return item.matches(pattern);
+ }
+
+ @Override public void describeTo(Description description) {
+ description.appendText("matches pattern ").appendValue(pattern);
+ }
+
+ @Override protected void describeMismatchSafely(String item,
+ Description mismatchDescription) {
+ mismatchDescription.appendText("does not match");
+ }
+ }
+
+ expectedEx.expectMessage(new MatchesPattern(
+ "org\\.apache\\.hadoop\\.fs\\.azure\\.WasbRemoteCallException: "
+ + "Encountered error while making remote call to "
+ + "http:\\/\\/localhost1\\/,http:\\/\\/localhost2\\/,http:\\/\\/localhost:8080 retried 6 time\\(s\\)\\."));
+ }
+
+ private void performop(HttpClient mockHttpClient) throws Throwable {
+
+ Path testPath = new Path("/", "test.dat");
+
+ RemoteWasbAuthorizerImpl authorizer = new RemoteWasbAuthorizerImpl();
+ authorizer.init(fs.getConf());
+ WasbRemoteCallHelper mockWasbRemoteCallHelper = new WasbRemoteCallHelper(
+ RetryUtils.getMultipleLinearRandomRetry(new Configuration(),
+ EMPTY_STRING, true,
+ EMPTY_STRING, "1000,3,10000,2"));
+ mockWasbRemoteCallHelper.updateHttpClient(mockHttpClient);
+ authorizer.updateWasbRemoteCallHelper(mockWasbRemoteCallHelper);
+ fs.updateWasbAuthorizer(authorizer);
+
+ fs.create(testPath);
+ ContractTestUtils.assertPathExists(fs, "testPath was not created", testPath);
+ fs.delete(testPath, false);
+ }
+
+ private String validJsonResponse() {
+ return "{"
+ + "\"responseCode\": 0,"
+ + "\"authorizationResult\": true,"
+ + "\"responseMessage\": \"Authorized\""
+ + "}";
+ }
+
+ private String malformedJsonResponse() {
+ return "{"
+ + "\"responseCode\": 0,"
+ + "\"authorizationResult\": true,"
+ + "\"responseMessage\":";
+ }
+
+ private String failureCodeJsonResponse() {
+ return "{"
+ + "\"responseCode\": 1,"
+ + "\"authorizationResult\": false,"
+ + "\"responseMessage\": \"Unauthorized\""
+ + "}";
+ }
+
+ private StatusLine newStatusLine(int statusCode) {
+ return new StatusLine() {
+ @Override
+ public ProtocolVersion getProtocolVersion() {
+ return new ProtocolVersion("HTTP", 1, 1);
+ }
+
+ @Override
+ public int getStatusCode() {
+ return statusCode;
+ }
+
+ @Override
+ public String getReasonPhrase() {
+ return "Reason Phrase";
+ }
+ };
+ }
+
+ private Header newHeader(String name, String value) {
+ return new Header() {
+ @Override
+ public String getName() {
+ return name;
+ }
+
+ @Override
+ public String getValue() {
+ return value;
+ }
+
+ @Override
+ public HeaderElement[] getElements() throws ParseException {
+ return new HeaderElement[0];
+ }
+ };
+ }
+
+ /** Check that a HttpGet request is with given remote host. */
+ private static boolean checkHttpGetMatchHost(HttpGet g, String h) {
+ return g != null && g.getURI().getHost().equals(h);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2d2d97fa/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestWasbUriAndConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestWasbUriAndConfiguration.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestWasbUriAndConfiguration.java
new file mode 100644
index 0000000..bee0220
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestWasbUriAndConfiguration.java
@@ -0,0 +1,610 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azure;
+
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY;
+import static org.junit.Assume.assumeNotNull;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.URI;
+import java.util.Date;
+import java.util.EnumSet;
+import java.io.File;
+
+import org.apache.hadoop.fs.azure.integration.AzureTestUtils;
+import org.apache.hadoop.security.ProviderUtils;
+import org.apache.hadoop.security.alias.CredentialProvider;
+import org.apache.hadoop.security.alias.CredentialProviderFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.AbstractFileSystem;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.UnsupportedFileSystemException;
+import org.apache.hadoop.fs.azure.AzureBlobStorageTestAccount.CreateOptions;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import com.microsoft.azure.storage.blob.CloudBlobContainer;
+import com.microsoft.azure.storage.blob.CloudBlockBlob;
+
+public class ITestWasbUriAndConfiguration extends AbstractWasbTestWithTimeout {
+
+ private static final int FILE_SIZE = 4096;
+ private static final String PATH_DELIMITER = "/";
+
+ protected String accountName;
+ protected String accountKey;
+ protected static Configuration conf = null;
+ private boolean runningInSASMode = false;
+ @Rule
+ public final TemporaryFolder tempDir = new TemporaryFolder();
+
+ private AzureBlobStorageTestAccount testAccount;
+
+ @After
+ public void tearDown() throws Exception {
+ testAccount = AzureTestUtils.cleanupTestAccount(testAccount);
+ }
+
+ @Before
+ public void setMode() {
+ runningInSASMode = AzureBlobStorageTestAccount.createTestConfiguration().
+ getBoolean(AzureNativeFileSystemStore.KEY_USE_SECURE_MODE, false);
+ }
+
+ private boolean validateIOStreams(Path filePath) throws IOException {
+ // Capture the file system from the test account.
+ FileSystem fs = testAccount.getFileSystem();
+ return validateIOStreams(fs, filePath);
+ }
+
+ private boolean validateIOStreams(FileSystem fs, Path filePath)
+ throws IOException {
+
+ // Create and write a file
+ OutputStream outputStream = fs.create(filePath);
+ outputStream.write(new byte[FILE_SIZE]);
+ outputStream.close();
+
+ // Return true if the the count is equivalent to the file size.
+ return (FILE_SIZE == readInputStream(fs, filePath));
+ }
+
+ private int readInputStream(Path filePath) throws IOException {
+ // Capture the file system from the test account.
+ FileSystem fs = testAccount.getFileSystem();
+ return readInputStream(fs, filePath);
+ }
+
+ private int readInputStream(FileSystem fs, Path filePath) throws IOException {
+ // Read the file
+ InputStream inputStream = fs.open(filePath);
+ int count = 0;
+ while (inputStream.read() >= 0) {
+ count++;
+ }
+ inputStream.close();
+
+ // Return true if the the count is equivalent to the file size.
+ return count;
+ }
+
+ // Positive tests to exercise making a connection with to Azure account using
+ // account key.
+ @Test
+ public void testConnectUsingKey() throws Exception {
+
+ testAccount = AzureBlobStorageTestAccount.create();
+ assumeNotNull(testAccount);
+
+ // Validate input and output on the connection.
+ assertTrue(validateIOStreams(new Path("/wasb_scheme")));
+ }
+
+ @Test
+ public void testConnectUsingSAS() throws Exception {
+
+ Assume.assumeFalse(runningInSASMode);
+ // Create the test account with SAS credentials.
+ testAccount = AzureBlobStorageTestAccount.create("",
+ EnumSet.of(CreateOptions.UseSas, CreateOptions.CreateContainer));
+ assumeNotNull(testAccount);
+ // Validate input and output on the connection.
+ // NOTE: As of 4/15/2013, Azure Storage has a deficiency that prevents the
+ // full scenario from working (CopyFromBlob doesn't work with SAS), so
+ // just do a minor check until that is corrected.
+ assertFalse(testAccount.getFileSystem().exists(new Path("/IDontExist")));
+ //assertTrue(validateIOStreams(new Path("/sastest.txt")));
+ }
+
+ @Test
+ public void testConnectUsingSASReadonly() throws Exception {
+
+ Assume.assumeFalse(runningInSASMode);
+ // Create the test account with SAS credentials.
+ testAccount = AzureBlobStorageTestAccount.create("", EnumSet.of(
+ CreateOptions.UseSas, CreateOptions.CreateContainer,
+ CreateOptions.Readonly));
+ assumeNotNull(testAccount);
+
+ // Create a blob in there
+ final String blobKey = "blobForReadonly";
+ CloudBlobContainer container = testAccount.getRealContainer();
+ CloudBlockBlob blob = container.getBlockBlobReference(blobKey);
+ ByteArrayInputStream inputStream = new ByteArrayInputStream(new byte[] { 1,
+ 2, 3 });
+ blob.upload(inputStream, 3);
+ inputStream.close();
+
+ // Make sure we can read it from the file system
+ Path filePath = new Path("/" + blobKey);
+ FileSystem fs = testAccount.getFileSystem();
+ assertTrue(fs.exists(filePath));
+ byte[] obtained = new byte[3];
+ DataInputStream obtainedInputStream = fs.open(filePath);
+ obtainedInputStream.readFully(obtained);
+ obtainedInputStream.close();
+ assertEquals(3, obtained[2]);
+ }
+
+ @Test
+ public void testConnectUsingAnonymous() throws Exception {
+
+ // Create test account with anonymous credentials
+ testAccount = AzureBlobStorageTestAccount.createAnonymous("testWasb.txt",
+ FILE_SIZE);
+ assumeNotNull(testAccount);
+
+ // Read the file from the public folder using anonymous credentials.
+ assertEquals(FILE_SIZE, readInputStream(new Path("/testWasb.txt")));
+ }
+
+ @Test
+ public void testConnectToEmulator() throws Exception {
+ testAccount = AzureBlobStorageTestAccount.createForEmulator();
+ assumeNotNull(testAccount);
+ assertTrue(validateIOStreams(new Path("/testFile")));
+ }
+
+ /**
+ * Tests that we can connect to fully qualified accounts outside of
+ * blob.core.windows.net
+ */
+ @Test
+ public void testConnectToFullyQualifiedAccountMock() throws Exception {
+ Configuration conf = new Configuration();
+ AzureBlobStorageTestAccount.setMockAccountKey(conf,
+ "mockAccount.mock.authority.net");
+ AzureNativeFileSystemStore store = new AzureNativeFileSystemStore();
+ MockStorageInterface mockStorage = new MockStorageInterface();
+ store.setAzureStorageInteractionLayer(mockStorage);
+ NativeAzureFileSystem fs = new NativeAzureFileSystem(store);
+ fs.initialize(
+ new URI("wasb://mockContainer@mockAccount.mock.authority.net"), conf);
+ fs.createNewFile(new Path("/x"));
+ assertTrue(mockStorage.getBackingStore().exists(
+ "http://mockAccount.mock.authority.net/mockContainer/x"));
+ fs.close();
+ }
+
+ public void testConnectToRoot() throws Exception {
+
+ // Set up blob names.
+ final String blobPrefix = String.format("wasbtests-%s-%tQ-blob",
+ System.getProperty("user.name"), new Date());
+ final String inblobName = blobPrefix + "_In" + ".txt";
+ final String outblobName = blobPrefix + "_Out" + ".txt";
+
+ // Create test account with default root access.
+ testAccount = AzureBlobStorageTestAccount.createRoot(inblobName, FILE_SIZE);
+ assumeNotNull(testAccount);
+
+ // Read the file from the default container.
+ assertEquals(FILE_SIZE, readInputStream(new Path(PATH_DELIMITER
+ + inblobName)));
+
+ try {
+ // Capture file system.
+ FileSystem fs = testAccount.getFileSystem();
+
+ // Create output path and open an output stream to the root folder.
+ Path outputPath = new Path(PATH_DELIMITER + outblobName);
+ OutputStream outputStream = fs.create(outputPath);
+ fail("Expected an AzureException when writing to root folder.");
+ outputStream.write(new byte[FILE_SIZE]);
+ outputStream.close();
+ } catch (AzureException e) {
+ assertTrue(true);
+ } catch (Exception e) {
+ String errMsg = String.format(
+ "Expected AzureException but got %s instead.", e);
+ assertTrue(errMsg, false);
+ }
+ }
+
+ // Positive tests to exercise throttling I/O path. Connections are made to an
+ // Azure account using account key.
+ //
+ public void testConnectWithThrottling() throws Exception {
+
+ testAccount = AzureBlobStorageTestAccount.createThrottled();
+
+ // Validate input and output on the connection.
+ assertTrue(validateIOStreams(new Path("/wasb_scheme")));
+ }
+
+ /**
+ * Creates a file and writes a single byte with the given value in it.
+ */
+ private static void writeSingleByte(FileSystem fs, Path testFile, int toWrite)
+ throws Exception {
+ OutputStream outputStream = fs.create(testFile);
+ outputStream.write(toWrite);
+ outputStream.close();
+ }
+
+ /**
+ * Reads the file given and makes sure that it's a single-byte file with the
+ * given value in it.
+ */
+ private static void assertSingleByteValue(FileSystem fs, Path testFile,
+ int expectedValue) throws Exception {
+ InputStream inputStream = fs.open(testFile);
+ int byteRead = inputStream.read();
+ assertTrue("File unexpectedly empty: " + testFile, byteRead >= 0);
+ assertTrue("File has more than a single byte: " + testFile,
+ inputStream.read() < 0);
+ inputStream.close();
+ assertEquals("Unxpected content in: " + testFile, expectedValue, byteRead);
+ }
+
+ @Test
+ public void testMultipleContainers() throws Exception {
+ AzureBlobStorageTestAccount firstAccount = AzureBlobStorageTestAccount
+ .create("first"), secondAccount = AzureBlobStorageTestAccount
+ .create("second");
+ assumeNotNull(firstAccount);
+ assumeNotNull(secondAccount);
+ try {
+ FileSystem firstFs = firstAccount.getFileSystem(),
+ secondFs = secondAccount.getFileSystem();
+ Path testFile = new Path("/testWasb");
+ assertTrue(validateIOStreams(firstFs, testFile));
+ assertTrue(validateIOStreams(secondFs, testFile));
+ // Make sure that we're really dealing with two file systems here.
+ writeSingleByte(firstFs, testFile, 5);
+ writeSingleByte(secondFs, testFile, 7);
+ assertSingleByteValue(firstFs, testFile, 5);
+ assertSingleByteValue(secondFs, testFile, 7);
+ } finally {
+ firstAccount.cleanup();
+ secondAccount.cleanup();
+ }
+ }
+
+ @Test
+ public void testDefaultKeyProvider() throws Exception {
+ Configuration conf = new Configuration();
+ String account = "testacct";
+ String key = "testkey";
+
+ conf.set(SimpleKeyProvider.KEY_ACCOUNT_KEY_PREFIX + account, key);
+
+ String result = AzureNativeFileSystemStore.getAccountKeyFromConfiguration(
+ account, conf);
+ assertEquals(key, result);
+ }
+
+ @Test
+ public void testCredsFromCredentialProvider() throws Exception {
+
+ Assume.assumeFalse(runningInSASMode);
+ String account = "testacct";
+ String key = "testkey";
+ // set up conf to have a cred provider
+ final Configuration conf = new Configuration();
+ final File file = tempDir.newFile("test.jks");
+ final URI jks = ProviderUtils.nestURIForLocalJavaKeyStoreProvider(
+ file.toURI());
+ conf.set(CredentialProviderFactory.CREDENTIAL_PROVIDER_PATH,
+ jks.toString());
+
+ provisionAccountKey(conf, account, key);
+
+ // also add to configuration as clear text that should be overridden
+ conf.set(SimpleKeyProvider.KEY_ACCOUNT_KEY_PREFIX + account,
+ key + "cleartext");
+
+ String result = AzureNativeFileSystemStore.getAccountKeyFromConfiguration(
+ account, conf);
+ // result should contain the credential provider key not the config key
+ assertEquals("AccountKey incorrect.", key, result);
+ }
+
+ void provisionAccountKey(
+ final Configuration conf, String account, String key) throws Exception {
+ // add our creds to the provider
+ final CredentialProvider provider =
+ CredentialProviderFactory.getProviders(conf).get(0);
+ provider.createCredentialEntry(
+ SimpleKeyProvider.KEY_ACCOUNT_KEY_PREFIX + account, key.toCharArray());
+ provider.flush();
+ }
+
+ @Test
+ public void testValidKeyProvider() throws Exception {
+ Configuration conf = new Configuration();
+ String account = "testacct";
+ String key = "testkey";
+
+ conf.set(SimpleKeyProvider.KEY_ACCOUNT_KEY_PREFIX + account, key);
+ conf.setClass("fs.azure.account.keyprovider." + account,
+ SimpleKeyProvider.class, KeyProvider.class);
+ String result = AzureNativeFileSystemStore.getAccountKeyFromConfiguration(
+ account, conf);
+ assertEquals(key, result);
+ }
+
+ @Test
+ public void testInvalidKeyProviderNonexistantClass() throws Exception {
+ Configuration conf = new Configuration();
+ String account = "testacct";
+
+ conf.set("fs.azure.account.keyprovider." + account,
+ "org.apache.Nonexistant.Class");
+ try {
+ AzureNativeFileSystemStore.getAccountKeyFromConfiguration(account, conf);
+ Assert.fail("Nonexistant key provider class should have thrown a "
+ + "KeyProviderException");
+ } catch (KeyProviderException e) {
+ }
+ }
+
+ @Test
+ public void testInvalidKeyProviderWrongClass() throws Exception {
+ Configuration conf = new Configuration();
+ String account = "testacct";
+
+ conf.set("fs.azure.account.keyprovider." + account, "java.lang.String");
+ try {
+ AzureNativeFileSystemStore.getAccountKeyFromConfiguration(account, conf);
+ Assert.fail("Key provider class that doesn't implement KeyProvider "
+ + "should have thrown a KeyProviderException");
+ } catch (KeyProviderException e) {
+ }
+ }
+
+ /**
+ * Tests the cases when the URI is specified with no authority, i.e.
+ * wasb:///path/to/file.
+ */
+ @Test
+ public void testNoUriAuthority() throws Exception {
+ // For any combination of default FS being asv(s)/wasb(s)://c@a/ and
+ // the actual URI being asv(s)/wasb(s):///, it should work.
+
+ String[] wasbAliases = new String[] { "wasb", "wasbs" };
+ for (String defaultScheme : wasbAliases) {
+ for (String wantedScheme : wasbAliases) {
+ testAccount = AzureBlobStorageTestAccount.createMock();
+ Configuration conf = testAccount.getFileSystem().getConf();
+ String authority = testAccount.getFileSystem().getUri().getAuthority();
+ URI defaultUri = new URI(defaultScheme, authority, null, null, null);
+ conf.set(FS_DEFAULT_NAME_KEY, defaultUri.toString());
+ // Add references to file system implementations for wasb and wasbs.
+ conf.addResource("azure-test.xml");
+ URI wantedUri = new URI(wantedScheme + ":///random/path");
+ NativeAzureFileSystem obtained = (NativeAzureFileSystem) FileSystem
+ .get(wantedUri, conf);
+ assertNotNull(obtained);
+ assertEquals(new URI(wantedScheme, authority, null, null, null),
+ obtained.getUri());
+ // Make sure makeQualified works as expected
+ Path qualified = obtained.makeQualified(new Path(wantedUri));
+ assertEquals(new URI(wantedScheme, authority, wantedUri.getPath(),
+ null, null), qualified.toUri());
+ // Cleanup for the next iteration to not cache anything in FS
+ testAccount.cleanup();
+ FileSystem.closeAll();
+ }
+ }
+ // If the default FS is not a WASB FS, then specifying a URI without
+ // authority for the Azure file system should throw.
+ testAccount = AzureBlobStorageTestAccount.createMock();
+ Configuration conf = testAccount.getFileSystem().getConf();
+ conf.set(FS_DEFAULT_NAME_KEY, "file:///");
+ try {
+ FileSystem.get(new URI("wasb:///random/path"), conf);
+ fail("Should've thrown.");
+ } catch (IllegalArgumentException e) {
+ }
+ }
+
+ @Test
+ public void testWasbAsDefaultFileSystemHasNoPort() throws Exception {
+ try {
+ testAccount = AzureBlobStorageTestAccount.createMock();
+ Configuration conf = testAccount.getFileSystem().getConf();
+ String authority = testAccount.getFileSystem().getUri().getAuthority();
+ URI defaultUri = new URI("wasb", authority, null, null, null);
+ conf.set(FS_DEFAULT_NAME_KEY, defaultUri.toString());
+ conf.addResource("azure-test.xml");
+
+ FileSystem fs = FileSystem.get(conf);
+ assertTrue(fs instanceof NativeAzureFileSystem);
+ assertEquals(-1, fs.getUri().getPort());
+
+ AbstractFileSystem afs = FileContext.getFileContext(conf)
+ .getDefaultFileSystem();
+ assertTrue(afs instanceof Wasb);
+ assertEquals(-1, afs.getUri().getPort());
+ } finally {
+ testAccount.cleanup();
+ FileSystem.closeAll();
+ }
+ }
+
+ /**
+ * Tests the cases when the scheme specified is 'wasbs'.
+ */
+ @Test
+ public void testAbstractFileSystemImplementationForWasbsScheme() throws Exception {
+ try {
+ testAccount = AzureBlobStorageTestAccount.createMock();
+ Configuration conf = testAccount.getFileSystem().getConf();
+ String authority = testAccount.getFileSystem().getUri().getAuthority();
+ URI defaultUri = new URI("wasbs", authority, null, null, null);
+ conf.set(FS_DEFAULT_NAME_KEY, defaultUri.toString());
+ conf.set("fs.AbstractFileSystem.wasbs.impl", "org.apache.hadoop.fs.azure.Wasbs");
+ conf.addResource("azure-test.xml");
+
+ FileSystem fs = FileSystem.get(conf);
+ assertTrue(fs instanceof NativeAzureFileSystem);
+ assertEquals("wasbs", fs.getScheme());
+
+ AbstractFileSystem afs = FileContext.getFileContext(conf)
+ .getDefaultFileSystem();
+ assertTrue(afs instanceof Wasbs);
+ assertEquals(-1, afs.getUri().getPort());
+ assertEquals("wasbs", afs.getUri().getScheme());
+ } finally {
+ testAccount.cleanup();
+ FileSystem.closeAll();
+ }
+ }
+
+ @Test
+ public void testNoAbstractFileSystemImplementationSpecifiedForWasbsScheme() throws Exception {
+ try {
+ testAccount = AzureBlobStorageTestAccount.createMock();
+ Configuration conf = testAccount.getFileSystem().getConf();
+ String authority = testAccount.getFileSystem().getUri().getAuthority();
+ URI defaultUri = new URI("wasbs", authority, null, null, null);
+ conf.set(FS_DEFAULT_NAME_KEY, defaultUri.toString());
+
+ FileSystem fs = FileSystem.get(conf);
+ assertTrue(fs instanceof NativeAzureFileSystem);
+ assertEquals("wasbs", fs.getScheme());
+
+ // should throw if 'fs.AbstractFileSystem.wasbs.impl'' is not specified
+ try{
+ FileContext.getFileContext(conf).getDefaultFileSystem();
+ fail("Should've thrown.");
+ }catch(UnsupportedFileSystemException e){
+ }
+
+ } finally {
+ testAccount.cleanup();
+ FileSystem.closeAll();
+ }
+ }
+
+ @Test
+ public void testCredentialProviderPathExclusions() throws Exception {
+ String providerPath =
+ "user:///,jceks://wasb/user/hrt_qa/sqoopdbpasswd.jceks," +
+ "jceks://hdfs@nn1.example.com/my/path/test.jceks";
+ Configuration config = new Configuration();
+ config.set(CredentialProviderFactory.CREDENTIAL_PROVIDER_PATH,
+ providerPath);
+ String newPath = "user:///,jceks://hdfs@nn1.example.com/my/path/test.jceks";
+
+ excludeAndTestExpectations(config, newPath);
+ }
+
+ @Test
+ public void testExcludeAllProviderTypesFromConfig() throws Exception {
+ String providerPath =
+ "jceks://wasb/tmp/test.jceks," +
+ "jceks://wasb@/my/path/test.jceks";
+ Configuration config = new Configuration();
+ config.set(CredentialProviderFactory.CREDENTIAL_PROVIDER_PATH,
+ providerPath);
+ String newPath = null;
+
+ excludeAndTestExpectations(config, newPath);
+ }
+
+ void excludeAndTestExpectations(Configuration config, String newPath)
+ throws Exception {
+ Configuration conf = ProviderUtils.excludeIncompatibleCredentialProviders(
+ config, NativeAzureFileSystem.class);
+ String effectivePath = conf.get(
+ CredentialProviderFactory.CREDENTIAL_PROVIDER_PATH, null);
+ assertEquals(newPath, effectivePath);
+ }
+
+ @Test
+ public void testUserAgentConfig() throws Exception {
+ // Set the user agent
+ try {
+ testAccount = AzureBlobStorageTestAccount.createMock();
+ Configuration conf = testAccount.getFileSystem().getConf();
+ String authority = testAccount.getFileSystem().getUri().getAuthority();
+ URI defaultUri = new URI("wasbs", authority, null, null, null);
+ conf.set(FS_DEFAULT_NAME_KEY, defaultUri.toString());
+ conf.set("fs.AbstractFileSystem.wasbs.impl", "org.apache.hadoop.fs.azure.Wasbs");
+
+ conf.set(AzureNativeFileSystemStore.USER_AGENT_ID_KEY, "TestClient");
+
+ FileSystem fs = FileSystem.get(conf);
+ AbstractFileSystem afs = FileContext.getFileContext(conf).getDefaultFileSystem();
+
+ assertTrue(afs instanceof Wasbs);
+ assertEquals(-1, afs.getUri().getPort());
+ assertEquals("wasbs", afs.getUri().getScheme());
+
+ } finally {
+ testAccount.cleanup();
+ FileSystem.closeAll();
+ }
+
+ // Unset the user agent
+ try {
+ testAccount = AzureBlobStorageTestAccount.createMock();
+ Configuration conf = testAccount.getFileSystem().getConf();
+ String authority = testAccount.getFileSystem().getUri().getAuthority();
+ URI defaultUri = new URI("wasbs", authority, null, null, null);
+ conf.set(FS_DEFAULT_NAME_KEY, defaultUri.toString());
+ conf.set("fs.AbstractFileSystem.wasbs.impl", "org.apache.hadoop.fs.azure.Wasbs");
+
+ conf.unset(AzureNativeFileSystemStore.USER_AGENT_ID_KEY);
+
+ FileSystem fs = FileSystem.get(conf);
+ AbstractFileSystem afs = FileContext.getFileContext(conf).getDefaultFileSystem();
+ assertTrue(afs instanceof Wasbs);
+ assertEquals(-1, afs.getUri().getPort());
+ assertEquals("wasbs", afs.getUri().getScheme());
+
+ } finally {
+ testAccount.cleanup();
+ FileSystem.closeAll();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2d2d97fa/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/MockWasbAuthorizerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/MockWasbAuthorizerImpl.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/MockWasbAuthorizerImpl.java
index 9fbab49..7354499 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/MockWasbAuthorizerImpl.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/MockWasbAuthorizerImpl.java
@@ -38,11 +38,12 @@ public class MockWasbAuthorizerImpl implements WasbAuthorizerInterface {
private boolean performOwnerMatch;
private CachingAuthorizer<CachedAuthorizerEntry, Boolean> cache;
- // The full qualified URL to the root directory
+ // The full qualified URL to the root directory
private String qualifiedPrefixUrl;
public MockWasbAuthorizerImpl(NativeAzureFileSystem fs) {
- qualifiedPrefixUrl = new Path("/").makeQualified(fs.getUri(), fs.getWorkingDirectory())
+ qualifiedPrefixUrl = new Path("/").makeQualified(fs.getUri(),
+ fs.getWorkingDirectory())
.toString().replaceAll("/$", "");
cache = new CachingAuthorizer<>(TimeUnit.MINUTES.convert(5L, TimeUnit.MINUTES), "AUTHORIZATION");
}
@@ -64,19 +65,23 @@ public class MockWasbAuthorizerImpl implements WasbAuthorizerInterface {
public void addAuthRule(String wasbAbsolutePath,
String accessType, boolean access) {
- wasbAbsolutePath = qualifiedPrefixUrl + wasbAbsolutePath;
- AuthorizationComponent component = wasbAbsolutePath.endsWith("*")
- ? new AuthorizationComponent("^" + wasbAbsolutePath.replace("*", ".*"), accessType)
+ wasbAbsolutePath = qualifiedPrefixUrl + wasbAbsolutePath;
+ AuthorizationComponent component = wasbAbsolutePath.endsWith("*")
+ ? new AuthorizationComponent("^" + wasbAbsolutePath.replace("*", ".*"),
+ accessType)
: new AuthorizationComponent(wasbAbsolutePath, accessType);
this.authRules.put(component, access);
}
@Override
- public boolean authorize(String wasbAbsolutePath, String accessType, String owner)
+ public boolean authorize(String wasbAbsolutePath,
+ String accessType,
+ String owner)
throws WasbAuthorizationException {
- if (wasbAbsolutePath.endsWith(NativeAzureFileSystem.FolderRenamePending.SUFFIX)) {
+ if (wasbAbsolutePath.endsWith(
+ NativeAzureFileSystem.FolderRenamePending.SUFFIX)) {
return true;
}
@@ -108,20 +113,23 @@ public class MockWasbAuthorizerImpl implements WasbAuthorizerInterface {
// In case of root("/"), owner match does not happen because owner is returned as empty string.
// we try to force owner match just for purpose of tests to make sure all operations work seemlessly with owner.
if (this.performOwnerMatch
- && StringUtils.equalsIgnoreCase(wasbAbsolutePath, qualifiedPrefixUrl + "/")) {
+ && StringUtils.equalsIgnoreCase(wasbAbsolutePath,
+ qualifiedPrefixUrl + "/")) {
owner = currentUserShortName;
}
boolean shouldEvaluateOwnerAccess = owner != null && !owner.isEmpty()
- && this.performOwnerMatch;
+ && this.performOwnerMatch;
- boolean isOwnerMatch = StringUtils.equalsIgnoreCase(currentUserShortName, owner);
+ boolean isOwnerMatch = StringUtils.equalsIgnoreCase(currentUserShortName,
+ owner);
AuthorizationComponent component =
new AuthorizationComponent(wasbAbsolutePath, accessType);
if (authRules.containsKey(component)) {
- return shouldEvaluateOwnerAccess ? isOwnerMatch && authRules.get(component) : authRules.get(component);
+ return shouldEvaluateOwnerAccess ? isOwnerMatch && authRules.get(
+ component) : authRules.get(component);
} else {
// Regex-pattern match if we don't have a straight match
for (Map.Entry<AuthorizationComponent, Boolean> entry : authRules.entrySet()) {
@@ -129,8 +137,11 @@ public class MockWasbAuthorizerImpl implements WasbAuthorizerInterface {
String keyPath = key.getWasbAbsolutePath();
String keyAccess = key.getAccessType();
- if (keyPath.endsWith("*") && Pattern.matches(keyPath, wasbAbsolutePath) && keyAccess.equals(accessType)) {
- return shouldEvaluateOwnerAccess ? isOwnerMatch && entry.getValue() : entry.getValue();
+ if (keyPath.endsWith("*") && Pattern.matches(keyPath, wasbAbsolutePath)
+ && keyAccess.equals(accessType)) {
+ return shouldEvaluateOwnerAccess
+ ? isOwnerMatch && entry.getValue()
+ : entry.getValue();
}
}
return false;
@@ -141,47 +152,47 @@ public class MockWasbAuthorizerImpl implements WasbAuthorizerInterface {
authRules.clear();
cache.clear();
}
-}
-class AuthorizationComponent {
+ private static class AuthorizationComponent {
- private String wasbAbsolutePath;
- private String accessType;
+ private final String wasbAbsolutePath;
+ private final String accessType;
- public AuthorizationComponent(String wasbAbsolutePath,
- String accessType) {
- this.wasbAbsolutePath = wasbAbsolutePath;
- this.accessType = accessType;
- }
+ AuthorizationComponent(String wasbAbsolutePath,
+ String accessType) {
+ this.wasbAbsolutePath = wasbAbsolutePath;
+ this.accessType = accessType;
+ }
- @Override
- public int hashCode() {
- return this.wasbAbsolutePath.hashCode() ^ this.accessType.hashCode();
- }
+ @Override
+ public int hashCode() {
+ return this.wasbAbsolutePath.hashCode() ^ this.accessType.hashCode();
+ }
- @Override
- public boolean equals(Object obj) {
+ @Override
+ public boolean equals(Object obj) {
- if (obj == this) {
- return true;
- }
+ if (obj == this) {
+ return true;
+ }
- if (obj == null
- || !(obj instanceof AuthorizationComponent)) {
- return false;
- }
+ if (obj == null
+ || !(obj instanceof AuthorizationComponent)) {
+ return false;
+ }
- return ((AuthorizationComponent)obj).
- getWasbAbsolutePath().equals(this.wasbAbsolutePath)
- && ((AuthorizationComponent)obj).
- getAccessType().equals(this.accessType);
- }
+ return ((AuthorizationComponent) obj).
+ getWasbAbsolutePath().equals(this.wasbAbsolutePath)
+ && ((AuthorizationComponent) obj).
+ getAccessType().equals(this.accessType);
+ }
- public String getWasbAbsolutePath() {
- return this.wasbAbsolutePath;
- }
+ public String getWasbAbsolutePath() {
+ return this.wasbAbsolutePath;
+ }
- public String getAccessType() {
- return accessType;
+ public String getAccessType() {
+ return accessType;
+ }
}
-}
\ No newline at end of file
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org