You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by st...@apache.org on 2017/09/15 16:39:32 UTC

[07/20] hadoop git commit: HADOOP-14553. Add (parallelized) integration tests to hadoop-azure Contributed by Steve Loughran

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2d2d97fa/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestNativeAzureFileSystemLive.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestNativeAzureFileSystemLive.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestNativeAzureFileSystemLive.java
new file mode 100644
index 0000000..f969968
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestNativeAzureFileSystemLive.java
@@ -0,0 +1,236 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azure;
+
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+
+import org.junit.Test;
+
+import com.microsoft.azure.storage.StorageException;
+
+/**
+ * Tests the Native Azure file system (WASB) against an actual blob store.
+ */
+public class ITestNativeAzureFileSystemLive extends
+    NativeAzureFileSystemBaseTest {
+
+  @Override
+  protected AzureBlobStorageTestAccount createTestAccount() throws Exception {
+    return AzureBlobStorageTestAccount.create();
+  }
+
+  @Test
+  public void testLazyRenamePendingCanOverwriteExistingFile()
+    throws Exception {
+    final String srcFile = "srcFile";
+    final String dstFile = "dstFile";
+    Path srcPath = path(srcFile);
+    FSDataOutputStream srcStream = fs.create(srcPath);
+    assertTrue(fs.exists(srcPath));
+    Path dstPath = path(dstFile);
+    FSDataOutputStream dstStream = fs.create(dstPath);
+    assertTrue(fs.exists(dstPath));
+    NativeAzureFileSystem nfs = fs;
+    final String fullSrcKey = nfs.pathToKey(nfs.makeAbsolute(srcPath));
+    final String fullDstKey = nfs.pathToKey(nfs.makeAbsolute(dstPath));
+    nfs.getStoreInterface().rename(fullSrcKey, fullDstKey, true, null);
+    assertTrue(fs.exists(dstPath));
+    assertFalse(fs.exists(srcPath));
+    IOUtils.cleanupWithLogger(null, srcStream);
+    IOUtils.cleanupWithLogger(null, dstStream);
+  }
+  /**
+   * Tests fs.delete() function to delete a blob when another blob is holding a
+   * lease on it. Delete if called without a lease should fail if another process
+   * is holding a lease and throw appropriate exception
+   * This is a scenario that would happen in HMaster startup when it tries to
+   * clean up the temp dirs while the HMaster process which was killed earlier
+   * held lease on the blob when doing some DDL operation
+   */
+  @Test
+  public void testDeleteThrowsExceptionWithLeaseExistsErrorMessage()
+      throws Exception {
+    LOG.info("Starting test");
+    // Create the file
+    Path path = methodPath();
+    fs.create(path);
+    assertPathExists("test file", path);
+    NativeAzureFileSystem nfs = fs;
+    final String fullKey = nfs.pathToKey(nfs.makeAbsolute(path));
+    final AzureNativeFileSystemStore store = nfs.getStore();
+
+    // Acquire the lease on the file in a background thread
+    final CountDownLatch leaseAttemptComplete = new CountDownLatch(1);
+    final CountDownLatch beginningDeleteAttempt = new CountDownLatch(1);
+    Thread t = new Thread() {
+      @Override
+      public void run() {
+        // Acquire the lease and then signal the main test thread.
+        SelfRenewingLease lease = null;
+        try {
+          lease = store.acquireLease(fullKey);
+          LOG.info("Lease acquired: " + lease.getLeaseID());
+        } catch (AzureException e) {
+          LOG.warn("Lease acqusition thread unable to acquire lease", e);
+        } finally {
+          leaseAttemptComplete.countDown();
+        }
+
+        // Wait for the main test thread to signal it will attempt the delete.
+        try {
+          beginningDeleteAttempt.await();
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+        }
+
+        // Keep holding the lease past the lease acquisition retry interval, so
+        // the test covers the case of delete retrying to acquire the lease.
+        try {
+          Thread.sleep(SelfRenewingLease.LEASE_ACQUIRE_RETRY_INTERVAL * 3);
+        } catch (InterruptedException ex) {
+          Thread.currentThread().interrupt();
+        }
+
+        try {
+          if (lease != null){
+            LOG.info("Freeing lease");
+            lease.free();
+          }
+        } catch (StorageException se) {
+          LOG.warn("Unable to free lease.", se);
+        }
+      }
+    };
+
+    // Start the background thread and wait for it to signal the lease is held.
+    t.start();
+    try {
+      leaseAttemptComplete.await();
+    } catch (InterruptedException ex) {
+      Thread.currentThread().interrupt();
+    }
+
+    // Try to delete the same file
+    beginningDeleteAttempt.countDown();
+    store.delete(fullKey);
+
+    // At this point file SHOULD BE DELETED
+    assertPathDoesNotExist("Leased path", path);
+  }
+
+  /**
+   * Check that isPageBlobKey works as expected. This assumes that
+   * in the test configuration, the list of supported page blob directories
+   * only includes "pageBlobs". That's why this test is made specific
+   * to this subclass.
+   */
+  @Test
+  public void testIsPageBlobKey() {
+    AzureNativeFileSystemStore store = fs.getStore();
+
+    // Use literal strings so it's easier to understand the tests.
+    // In case the constant changes, we want to know about it so we can update this test.
+    assertEquals(AzureBlobStorageTestAccount.DEFAULT_PAGE_BLOB_DIRECTORY, "pageBlobs");
+
+    // URI prefix for test environment.
+    String uriPrefix = "file:///";
+
+    // negative tests
+    String[] negativeKeys = { "", "/", "bar", "bar/", "bar/pageBlobs", "bar/pageBlobs/foo",
+        "bar/pageBlobs/foo/", "/pageBlobs/", "/pageBlobs", "pageBlobs", "pageBlobsxyz/" };
+    for (String s : negativeKeys) {
+      assertFalse(store.isPageBlobKey(s));
+      assertFalse(store.isPageBlobKey(uriPrefix + s));
+    }
+
+    // positive tests
+    String[] positiveKeys = { "pageBlobs/", "pageBlobs/foo/", "pageBlobs/foo/bar/" };
+    for (String s : positiveKeys) {
+      assertTrue(store.isPageBlobKey(s));
+      assertTrue(store.isPageBlobKey(uriPrefix + s));
+    }
+  }
+
+  /**
+   * Test that isAtomicRenameKey() works as expected.
+   */
+  @Test
+  public void testIsAtomicRenameKey() {
+
+    AzureNativeFileSystemStore store = fs.getStore();
+
+    // We want to know if the default configuration changes so we can fix
+    // this test.
+    assertEquals(AzureBlobStorageTestAccount.DEFAULT_ATOMIC_RENAME_DIRECTORIES,
+        "/atomicRenameDir1,/atomicRenameDir2");
+
+    // URI prefix for test environment.
+    String uriPrefix = "file:///";
+
+    // negative tests
+    String[] negativeKeys = { "", "/", "bar", "bar/", "bar/hbase",
+        "bar/hbase/foo", "bar/hbase/foo/", "/hbase/", "/hbase", "hbase",
+        "hbasexyz/", "foo/atomicRenameDir1/"};
+    for (String s : negativeKeys) {
+      assertFalse(store.isAtomicRenameKey(s));
+      assertFalse(store.isAtomicRenameKey(uriPrefix + s));
+    }
+
+    // Positive tests. The directories for atomic rename are /hbase
+    // plus the ones in the configuration (DEFAULT_ATOMIC_RENAME_DIRECTORIES
+    // for this test).
+    String[] positiveKeys = { "hbase/", "hbase/foo/", "hbase/foo/bar/",
+        "atomicRenameDir1/foo/", "atomicRenameDir2/bar/"};
+    for (String s : positiveKeys) {
+      assertTrue(store.isAtomicRenameKey(s));
+      assertTrue(store.isAtomicRenameKey(uriPrefix + s));
+    }
+  }
+
+  /**
+   * Tests fs.mkdir() function to create a target blob while another thread
+   * is holding the lease on the blob. mkdir should not fail since the blob
+   * already exists.
+   * This is a scenario that would happen in HBase distributed log splitting.
+   * Multiple threads will try to create and update "recovered.edits" folder
+   * under the same path.
+   */
+  @Test
+  public void testMkdirOnExistingFolderWithLease() throws Exception {
+    SelfRenewingLease lease;
+    // Create the folder
+    Path path = methodPath();
+    fs.mkdirs(path);
+    NativeAzureFileSystem nfs = fs;
+    String fullKey = nfs.pathToKey(nfs.makeAbsolute(path));
+    AzureNativeFileSystemStore store = nfs.getStore();
+    // Acquire the lease on the folder
+    lease = store.acquireLease(fullKey);
+    assertNotNull("lease ID", lease.getLeaseID() != null);
+    // Try to create the same folder
+    store.storeEmptyFolder(fullKey,
+      nfs.createPermissionStatus(FsPermission.getDirDefault()));
+    lease.free();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2d2d97fa/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestOutOfBandAzureBlobOperationsLive.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestOutOfBandAzureBlobOperationsLive.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestOutOfBandAzureBlobOperationsLive.java
new file mode 100644
index 0000000..b63aaf0
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestOutOfBandAzureBlobOperationsLive.java
@@ -0,0 +1,185 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azure;
+
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.junit.Test;
+
+import com.microsoft.azure.storage.blob.BlobOutputStream;
+import com.microsoft.azure.storage.blob.CloudBlockBlob;
+
+/**
+ * Live blob operations.
+ */
+public class ITestOutOfBandAzureBlobOperationsLive extends AbstractWasbTestBase {
+
+  @Override
+  protected AzureBlobStorageTestAccount createTestAccount() throws Exception {
+    return AzureBlobStorageTestAccount.create();
+  }
+
+  // scenario for this particular test described at MONARCH-HADOOP-764
+  // creating a file out-of-band would confuse mkdirs("<oobfilesUncleFolder>")
+  // eg oob creation of "user/<name>/testFolder/a/input/file"
+  // Then wasb creation of "user/<name>/testFolder/a/output" fails
+  @Test
+  public void outOfBandFolder_uncleMkdirs() throws Exception {
+
+    // NOTE: manual use of CloubBlockBlob targets working directory explicitly.
+    // WASB driver methods prepend working directory implicitly.
+    String workingDir = "user/"
+        + UserGroupInformation.getCurrentUser().getShortUserName() + "/";
+
+    CloudBlockBlob blob = testAccount.getBlobReference(workingDir
+        + "testFolder1/a/input/file");
+    BlobOutputStream s = blob.openOutputStream();
+    s.close();
+    assertTrue(fs.exists(new Path("testFolder1/a/input/file")));
+
+    Path targetFolder = new Path("testFolder1/a/output");
+    assertTrue(fs.mkdirs(targetFolder));
+  }
+
+  // scenario for this particular test described at MONARCH-HADOOP-764
+  @Test
+  public void outOfBandFolder_parentDelete() throws Exception {
+
+    // NOTE: manual use of CloubBlockBlob targets working directory explicitly.
+    // WASB driver methods prepend working directory implicitly.
+    String workingDir = "user/"
+        + UserGroupInformation.getCurrentUser().getShortUserName() + "/";
+    CloudBlockBlob blob = testAccount.getBlobReference(workingDir
+        + "testFolder2/a/input/file");
+    BlobOutputStream s = blob.openOutputStream();
+    s.close();
+    assertTrue(fs.exists(new Path("testFolder2/a/input/file")));
+
+    Path targetFolder = new Path("testFolder2/a/input");
+    assertTrue(fs.delete(targetFolder, true));
+  }
+
+  @Test
+  public void outOfBandFolder_rootFileDelete() throws Exception {
+
+    CloudBlockBlob blob = testAccount.getBlobReference("fileY");
+    BlobOutputStream s = blob.openOutputStream();
+    s.close();
+    assertTrue(fs.exists(new Path("/fileY")));
+    assertTrue(fs.delete(new Path("/fileY"), true));
+  }
+
+  @Test
+  public void outOfBandFolder_firstLevelFolderDelete() throws Exception {
+
+    CloudBlockBlob blob = testAccount.getBlobReference("folderW/file");
+    BlobOutputStream s = blob.openOutputStream();
+    s.close();
+    assertTrue(fs.exists(new Path("/folderW")));
+    assertTrue(fs.exists(new Path("/folderW/file")));
+    assertTrue(fs.delete(new Path("/folderW"), true));
+  }
+
+  // scenario for this particular test described at MONARCH-HADOOP-764
+  @Test
+  public void outOfBandFolder_siblingCreate() throws Exception {
+
+    // NOTE: manual use of CloubBlockBlob targets working directory explicitly.
+    // WASB driver methods prepend working directory implicitly.
+    String workingDir = "user/"
+        + UserGroupInformation.getCurrentUser().getShortUserName() + "/";
+    CloudBlockBlob blob = testAccount.getBlobReference(workingDir
+        + "testFolder3/a/input/file");
+    BlobOutputStream s = blob.openOutputStream();
+    s.close();
+    assertTrue(fs.exists(new Path("testFolder3/a/input/file")));
+
+    Path targetFile = new Path("testFolder3/a/input/file2");
+    FSDataOutputStream s2 = fs.create(targetFile);
+    s2.close();
+  }
+
+  // scenario for this particular test described at MONARCH-HADOOP-764
+  // creating a new file in the root folder
+  @Test
+  public void outOfBandFolder_create_rootDir() throws Exception {
+    Path targetFile = new Path("/newInRoot");
+    FSDataOutputStream s2 = fs.create(targetFile);
+    s2.close();
+  }
+
+  // scenario for this particular test described at MONARCH-HADOOP-764
+  @Test
+  public void outOfBandFolder_rename() throws Exception {
+
+    // NOTE: manual use of CloubBlockBlob targets working directory explicitly.
+    // WASB driver methods prepend working directory implicitly.
+    String workingDir = "user/"
+        + UserGroupInformation.getCurrentUser().getShortUserName() + "/";
+    CloudBlockBlob blob = testAccount.getBlobReference(workingDir
+        + "testFolder4/a/input/file");
+    BlobOutputStream s = blob.openOutputStream();
+    s.close();
+
+    Path srcFilePath = new Path("testFolder4/a/input/file");
+    assertTrue(fs.exists(srcFilePath));
+
+    Path destFilePath = new Path("testFolder4/a/input/file2");
+    fs.rename(srcFilePath, destFilePath);
+  }
+
+  // Verify that you can rename a file which is the only file in an implicit folder in the
+  // WASB file system.
+  // scenario for this particular test described at MONARCH-HADOOP-892
+  @Test
+  public void outOfBandSingleFile_rename() throws Exception {
+
+    //NOTE: manual use of CloubBlockBlob targets working directory explicitly.
+    //       WASB driver methods prepend working directory implicitly.
+    String workingDir = "user/" + UserGroupInformation.getCurrentUser().getShortUserName() + "/";
+    CloudBlockBlob blob = testAccount.getBlobReference(workingDir + "testFolder5/a/input/file");
+    BlobOutputStream s = blob.openOutputStream();
+    s.close();
+
+    Path srcFilePath = new Path("testFolder5/a/input/file");
+    assertTrue(fs.exists(srcFilePath));
+
+    Path destFilePath = new Path("testFolder5/file2");
+    fs.rename(srcFilePath, destFilePath);
+  }
+
+  // WASB must force explicit parent directories in create, delete, mkdirs, rename.
+  // scenario for this particular test described at MONARCH-HADOOP-764
+  @Test
+  public void outOfBandFolder_rename_rootLevelFiles() throws Exception {
+
+    // NOTE: manual use of CloubBlockBlob targets working directory explicitly.
+    // WASB driver methods prepend working directory implicitly.
+    CloudBlockBlob blob = testAccount.getBlobReference("fileX");
+    BlobOutputStream s = blob.openOutputStream();
+    s.close();
+
+    Path srcFilePath = new Path("/fileX");
+    assertTrue(fs.exists(srcFilePath));
+
+    Path destFilePath = new Path("/fileXrename");
+    fs.rename(srcFilePath, destFilePath);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2d2d97fa/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestReadAndSeekPageBlobAfterWrite.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestReadAndSeekPageBlobAfterWrite.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestReadAndSeekPageBlobAfterWrite.java
new file mode 100644
index 0000000..f2af116
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestReadAndSeekPageBlobAfterWrite.java
@@ -0,0 +1,341 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azure;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Arrays;
+import java.util.Random;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.azure.integration.AbstractAzureScaleTest;
+import org.apache.hadoop.util.Time;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.hadoop.fs.azure.integration.AzureTestUtils .*;
+
+/**
+ * Write data into a page blob and verify you can read back all of it
+ * or just a part of it.
+ */
+public class ITestReadAndSeekPageBlobAfterWrite extends AbstractAzureScaleTest {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(ITestReadAndSeekPageBlobAfterWrite.class);
+
+  private FileSystem fs;
+  private byte[] randomData;
+
+  // Page blob physical page size
+  private static final int PAGE_SIZE = PageBlobFormatHelpers.PAGE_SIZE;
+
+  // Size of data on page (excluding header)
+  private static final int PAGE_DATA_SIZE = PAGE_SIZE - PageBlobFormatHelpers.PAGE_HEADER_SIZE;
+  private static final int MAX_BYTES = 33554432; // maximum bytes in a file that we'll test
+  private static final int MAX_PAGES = MAX_BYTES / PAGE_SIZE; // maximum number of pages we'll test
+  private Random rand = new Random();
+
+  // A key with a prefix under /pageBlobs, which for the test file system will
+  // force use of a page blob.
+  private static final String KEY = "/pageBlobs/file.dat";
+
+  // path of page blob file to read and write
+  private Path blobPath;
+
+  @Override
+  public void setUp() throws Exception {
+    super.setUp();
+    fs = getTestAccount().getFileSystem();
+    // Make sure we are using an integral number of pages.
+    assertEquals(0, MAX_BYTES % PAGE_SIZE);
+
+    // load an in-memory array of random data
+    randomData = new byte[PAGE_SIZE * MAX_PAGES];
+    rand.nextBytes(randomData);
+
+    blobPath = blobPath("ITestReadAndSeekPageBlobAfterWrite");
+  }
+
+  @Override
+  public void tearDown() throws Exception {
+    deleteQuietly(fs, blobPath, true);
+    super.tearDown();
+  }
+
+  /**
+   * Make sure the file name (key) is a page blob file name. If anybody changes that,
+   * we need to come back and update this test class.
+   */
+  @Test
+  public void testIsPageBlobFileName() {
+    AzureNativeFileSystemStore store = ((NativeAzureFileSystem) fs).getStore();
+    String[] a = blobPath.toUri().getPath().split("/");
+    String key2 = a[1] + "/";
+    assertTrue("Not a page blob: " + blobPath, store.isPageBlobKey(key2));
+  }
+
+  /**
+   * For a set of different file sizes, write some random data to a page blob,
+   * read it back, and compare that what was read is the same as what was written.
+   */
+  @Test
+  public void testReadAfterWriteRandomData() throws IOException {
+
+    // local shorthand
+    final int pds = PAGE_DATA_SIZE;
+
+    // Test for sizes at and near page boundaries
+    int[] dataSizes = {
+
+        // on first page
+        0, 1, 2, 3,
+
+        // Near first physical page boundary (because the implementation
+        // stores PDS + the page header size bytes on each page).
+        pds - 1, pds, pds + 1, pds + 2, pds + 3,
+
+        // near second physical page boundary
+        (2 * pds) - 1, (2 * pds), (2 * pds) + 1, (2 * pds) + 2, (2 * pds) + 3,
+
+        // near tenth physical page boundary
+        (10 * pds) - 1, (10 * pds), (10 * pds) + 1, (10 * pds) + 2, (10 * pds) + 3,
+
+        // test one big size, >> 4MB (an internal buffer size in the code)
+        MAX_BYTES
+    };
+
+    for (int i : dataSizes) {
+      testReadAfterWriteRandomData(i);
+    }
+  }
+
+  private void testReadAfterWriteRandomData(int size) throws IOException {
+    writeRandomData(size);
+    readRandomDataAndVerify(size);
+  }
+
+  /**
+   * Read "size" bytes of data and verify that what was read and what was written
+   * are the same.
+   */
+  private void readRandomDataAndVerify(int size) throws AzureException, IOException {
+    byte[] b = new byte[size];
+    FSDataInputStream stream = fs.open(blobPath);
+    int bytesRead = stream.read(b);
+    stream.close();
+    assertEquals(bytesRead, size);
+
+    // compare the data read to the data written
+    assertTrue(comparePrefix(randomData, b, size));
+  }
+
+  // return true if the beginning "size" values of the arrays are the same
+  private boolean comparePrefix(byte[] a, byte[] b, int size) {
+    if (a.length < size || b.length < size) {
+      return false;
+    }
+    for (int i = 0; i < size; i++) {
+      if (a[i] != b[i]) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  // Write a specified amount of random data to the file path for this test class.
+  private void writeRandomData(int size) throws IOException {
+    OutputStream output = fs.create(blobPath);
+    output.write(randomData, 0, size);
+    output.close();
+  }
+
+  /**
+   * Write data to a page blob, open it, seek, and then read a range of data.
+   * Then compare that the data read from that range is the same as the data originally written.
+   */
+  @Test
+  public void testPageBlobSeekAndReadAfterWrite() throws IOException {
+    writeRandomData(PAGE_SIZE * MAX_PAGES);
+    int recordSize = 100;
+    byte[] b = new byte[recordSize];
+
+
+    try(FSDataInputStream stream = fs.open(blobPath)) {
+      // Seek to a boundary around the middle of the 6th page
+      int seekPosition = 5 * PAGE_SIZE + 250;
+      stream.seek(seekPosition);
+
+      // Read a record's worth of bytes and verify results
+      int bytesRead = stream.read(b);
+      verifyReadRandomData(b, bytesRead, seekPosition, recordSize);
+
+      // Seek to another spot and read a record greater than a page
+      seekPosition = 10 * PAGE_SIZE + 250;
+      stream.seek(seekPosition);
+      recordSize = 1000;
+      b = new byte[recordSize];
+      bytesRead = stream.read(b);
+      verifyReadRandomData(b, bytesRead, seekPosition, recordSize);
+
+      // Read the last 100 bytes of the file
+      recordSize = 100;
+      seekPosition = PAGE_SIZE * MAX_PAGES - recordSize;
+      stream.seek(seekPosition);
+      b = new byte[recordSize];
+      bytesRead = stream.read(b);
+      verifyReadRandomData(b, bytesRead, seekPosition, recordSize);
+
+      // Read past the end of the file and we should get only partial data.
+      recordSize = 100;
+      seekPosition = PAGE_SIZE * MAX_PAGES - recordSize + 50;
+      stream.seek(seekPosition);
+      b = new byte[recordSize];
+      bytesRead = stream.read(b);
+      assertEquals(50, bytesRead);
+
+      // compare last 50 bytes written with those read
+      byte[] tail = Arrays.copyOfRange(randomData, seekPosition, randomData.length);
+      assertTrue(comparePrefix(tail, b, 50));
+    }
+  }
+
+  // Verify that reading a record of data after seeking gives the expected data.
+  private void verifyReadRandomData(byte[] b, int bytesRead, int seekPosition, int recordSize) {
+    byte[] originalRecordData =
+        Arrays.copyOfRange(randomData, seekPosition, seekPosition + recordSize + 1);
+    assertEquals(recordSize, bytesRead);
+    assertTrue(comparePrefix(originalRecordData, b, recordSize));
+  }
+
+  // Test many small flushed writes interspersed with periodic hflush calls.
+  // For manual testing, increase NUM_WRITES to a large number.
+  // The goal for a long-running manual test is to make sure that it finishes
+  // and the close() call does not time out. It also facilitates debugging into
+  // hflush/hsync.
+  @Test
+  public void testManySmallWritesWithHFlush() throws IOException {
+    writeAndReadOneFile(50, 100, 20);
+  }
+
+  /**
+   * Write a total of numWrites * recordLength data to a file, read it back,
+   * and check to make sure what was read is the same as what was written.
+   * The syncInterval is the number of writes after which to call hflush to
+   * force the data to storage.
+   */
+  private void writeAndReadOneFile(int numWrites,
+      int recordLength, int syncInterval) throws IOException {
+
+    // A lower bound on the minimum time we think it will take to do
+    // a write to Azure storage.
+    final long MINIMUM_EXPECTED_TIME = 20;
+    LOG.info("Writing " + numWrites * recordLength + " bytes to " + blobPath.getName());
+    FSDataOutputStream output = fs.create(blobPath);
+    int writesSinceHFlush = 0;
+    try {
+
+      // Do a flush and hflush to exercise case for empty write queue in PageBlobOutputStream,
+      // to test concurrent execution gates.
+      output.flush();
+      output.hflush();
+      for (int i = 0; i < numWrites; i++) {
+        output.write(randomData, i * recordLength, recordLength);
+        writesSinceHFlush++;
+        output.flush();
+        if ((i % syncInterval) == 0) {
+          output.hflush();
+          writesSinceHFlush = 0;
+        }
+      }
+    } finally {
+      long start = Time.monotonicNow();
+      output.close();
+      long end = Time.monotonicNow();
+      LOG.debug("close duration = " + (end - start) + " msec.");
+      if (writesSinceHFlush > 0) {
+        assertTrue(String.format(
+            "close duration with >= 1 pending write is %d, less than minimum expected of %d",
+            end - start, MINIMUM_EXPECTED_TIME),
+            end - start >= MINIMUM_EXPECTED_TIME);
+        }
+    }
+
+    // Read the data back and check it.
+    FSDataInputStream stream = fs.open(blobPath);
+    int SIZE = numWrites * recordLength;
+    byte[] b = new byte[SIZE];
+    try {
+      stream.seek(0);
+      stream.read(b, 0, SIZE);
+      verifyReadRandomData(b, SIZE, 0, SIZE);
+    } finally {
+      stream.close();
+    }
+
+    // delete the file
+    fs.delete(blobPath, false);
+  }
+
+  // Test writing to a large file repeatedly as a stress test.
+  // Set the repetitions to a larger number for manual testing
+  // for a longer stress run.
+  @Test
+  public void testLargeFileStress() throws IOException {
+    int numWrites = 32;
+    int recordSize = 1024 * 1024;
+    int syncInterval = 10;
+    int repetitions = 1;
+    for (int i = 0; i < repetitions; i++) {
+      writeAndReadOneFile(numWrites, recordSize, syncInterval);
+    }
+  }
+  
+  // Write to a file repeatedly to verify that it extends.
+  // The page blob file should start out at 128MB and finish at 256MB.
+  public void testFileSizeExtension() throws IOException {
+    final int writeSize = 1024 * 1024;
+    final int numWrites = 129;
+    final byte dataByte = 5;
+    byte[] data = new byte[writeSize];
+    Arrays.fill(data, dataByte);
+    try (FSDataOutputStream output = fs.create(blobPath)) {
+      for (int i = 0; i < numWrites; i++) {
+        output.write(data);
+        output.hflush();
+        LOG.debug("total writes = " + (i + 1));
+      }
+    }
+
+    // Show that we wrote more than the default page blob file size.
+    assertTrue(numWrites * writeSize > PageBlobOutputStream.PAGE_BLOB_MIN_SIZE);
+
+    // Verify we can list the new size. That will prove we expanded the file.
+    FileStatus[] status = fs.listStatus(blobPath);
+    assertEquals("File size hasn't changed " + status,
+        numWrites * writeSize, status[0].getLen());
+    LOG.debug("Total bytes written to " + blobPath + " = " + status[0].getLen());
+    fs.delete(blobPath, false);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2d2d97fa/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestWasbRemoteCallHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestWasbRemoteCallHelper.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestWasbRemoteCallHelper.java
new file mode 100644
index 0000000..062bc36
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestWasbRemoteCallHelper.java
@@ -0,0 +1,568 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azure;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.contract.ContractTestUtils;
+import org.apache.hadoop.io.retry.RetryUtils;
+import org.apache.http.Header;
+import org.apache.http.HttpResponse;
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpStatus;
+import org.apache.http.StatusLine;
+import org.apache.http.ProtocolVersion;
+import org.apache.http.ParseException;
+import org.apache.http.HeaderElement;
+import org.apache.http.client.HttpClient;
+import org.apache.http.client.methods.HttpGet;
+import org.hamcrest.Description;
+import org.hamcrest.TypeSafeMatcher;
+import org.junit.Assume;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.mockito.ArgumentMatcher;
+import org.mockito.Mockito;
+
+import java.io.ByteArrayInputStream;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.nio.charset.StandardCharsets;
+
+import static org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.KEY_USE_SECURE_MODE;
+import static org.mockito.Matchers.argThat;
+import static org.mockito.Mockito.atLeast;
+import static org.mockito.Mockito.times;
+
+/**
+ * Test class to hold all WasbRemoteCallHelper tests.
+ */
+public class ITestWasbRemoteCallHelper
+    extends AbstractWasbTestBase {
+  public static final String EMPTY_STRING = "";
+  private static final int INVALID_HTTP_STATUS_CODE_999 = 999;
+
+  @Override
+  protected AzureBlobStorageTestAccount createTestAccount() throws Exception {
+    Configuration conf = new Configuration();
+    conf.set(NativeAzureFileSystem.KEY_AZURE_AUTHORIZATION, "true");
+    conf.set(RemoteWasbAuthorizerImpl.KEY_REMOTE_AUTH_SERVICE_URLS, "http://localhost1/,http://localhost2/,http://localhost:8080");
+    return AzureBlobStorageTestAccount.create(conf);
+  }
+
+  @Override
+  public void setUp() throws Exception {
+    super.setUp();
+    boolean useSecureMode = fs.getConf().getBoolean(KEY_USE_SECURE_MODE, false);
+    boolean useAuthorization = fs.getConf()
+        .getBoolean(NativeAzureFileSystem.KEY_AZURE_AUTHORIZATION, false);
+    Assume.assumeTrue("Test valid when both SecureMode and Authorization are enabled .. skipping",
+        useSecureMode && useAuthorization);
+  }
+
+  @Rule
+  public ExpectedException expectedEx = ExpectedException.none();
+
+  /**
+   * Test invalid status-code.
+   * @throws Throwable
+   */
+  @Test // (expected = WasbAuthorizationException.class)
+  public void testInvalidStatusCode() throws Throwable {
+
+    setupExpectations();
+
+    // set up mocks
+    HttpClient mockHttpClient = Mockito.mock(HttpClient.class);
+    HttpResponse mockHttpResponse = Mockito.mock(HttpResponse.class);
+    Mockito.when(mockHttpClient.execute(Mockito.<HttpGet>any()))
+        .thenReturn(mockHttpResponse);
+    Mockito.when(mockHttpResponse.getStatusLine())
+        .thenReturn(newStatusLine(INVALID_HTTP_STATUS_CODE_999));
+    // finished setting up mocks
+
+    performop(mockHttpClient);
+  }
+
+  /**
+   * Test invalid Content-Type.
+   * @throws Throwable
+   */
+  @Test // (expected = WasbAuthorizationException.class)
+  public void testInvalidContentType() throws Throwable {
+
+    setupExpectations();
+
+    // set up mocks
+    HttpClient mockHttpClient = Mockito.mock(HttpClient.class);
+    HttpResponse mockHttpResponse = Mockito.mock(HttpResponse.class);
+    Mockito.when(mockHttpClient.execute(Mockito.<HttpGet>any())).thenReturn(mockHttpResponse);
+    Mockito.when(mockHttpResponse.getStatusLine()).thenReturn(newStatusLine(HttpStatus.SC_OK));
+    Mockito.when(mockHttpResponse.getFirstHeader("Content-Type"))
+        .thenReturn(newHeader("Content-Type", "text/plain"));
+    // finished setting up mocks
+
+    performop(mockHttpClient);
+  }
+
+  /**
+   * Test missing Content-Length.
+   * @throws Throwable
+   */
+  @Test // (expected = WasbAuthorizationException.class)
+  public void testMissingContentLength() throws Throwable {
+
+    setupExpectations();
+
+    // set up mocks
+    HttpClient mockHttpClient = Mockito.mock(HttpClient.class);
+    HttpResponse mockHttpResponse = Mockito.mock(HttpResponse.class);
+    Mockito.when(mockHttpClient.execute(Mockito.<HttpGet>any())).thenReturn(mockHttpResponse);
+    Mockito.when(mockHttpResponse.getStatusLine()).thenReturn(newStatusLine(HttpStatus.SC_OK));
+    Mockito.when(mockHttpResponse.getFirstHeader("Content-Type"))
+        .thenReturn(newHeader("Content-Type", "application/json"));
+    // finished setting up mocks
+
+    performop(mockHttpClient);
+  }
+
+  /**
+   * Test Content-Length exceeds max.
+   * @throws Throwable
+   */
+  @Test // (expected = WasbAuthorizationException.class)
+  public void testContentLengthExceedsMax() throws Throwable {
+
+    setupExpectations();
+
+    // set up mocks
+    HttpClient mockHttpClient = Mockito.mock(HttpClient.class);
+    HttpResponse mockHttpResponse = Mockito.mock(HttpResponse.class);
+    Mockito.when(mockHttpClient.execute(Mockito.<HttpGet>any())).thenReturn(mockHttpResponse);
+    Mockito.when(mockHttpResponse.getStatusLine()).thenReturn(newStatusLine(HttpStatus.SC_OK));
+    Mockito.when(mockHttpResponse.getFirstHeader("Content-Type"))
+        .thenReturn(newHeader("Content-Type", "application/json"));
+    Mockito.when(mockHttpResponse.getFirstHeader("Content-Length"))
+        .thenReturn(newHeader("Content-Length", "2048"));
+    // finished setting up mocks
+
+    performop(mockHttpClient);
+  }
+
+  /**
+   * Test invalid Content-Length value
+   * @throws Throwable
+   */
+  @Test // (expected = WasbAuthorizationException.class)
+  public void testInvalidContentLengthValue() throws Throwable {
+
+    setupExpectations();
+
+    // set up mocks
+    HttpClient mockHttpClient = Mockito.mock(HttpClient.class);
+    HttpResponse mockHttpResponse = Mockito.mock(HttpResponse.class);
+    Mockito.when(mockHttpClient.execute(Mockito.<HttpGet>any())).thenReturn(mockHttpResponse);
+    Mockito.when(mockHttpResponse.getStatusLine()).thenReturn(newStatusLine(HttpStatus.SC_OK));
+    Mockito.when(mockHttpResponse.getFirstHeader("Content-Type"))
+        .thenReturn(newHeader("Content-Type", "application/json"));
+    Mockito.when(mockHttpResponse.getFirstHeader("Content-Length"))
+        .thenReturn(newHeader("Content-Length", "20abc48"));
+    // finished setting up mocks
+
+    performop(mockHttpClient);
+  }
+
+  /**
+   * Test valid JSON response.
+   * @throws Throwable
+   */
+  @Test
+  public void testValidJSONResponse() throws Throwable {
+
+    // set up mocks
+    HttpClient mockHttpClient = Mockito.mock(HttpClient.class);
+
+    HttpResponse mockHttpResponse = Mockito.mock(HttpResponse.class);
+    HttpEntity mockHttpEntity = Mockito.mock(HttpEntity.class);
+
+    Mockito.when(mockHttpClient.execute(Mockito.<HttpGet>any())).thenReturn(mockHttpResponse);
+    Mockito.when(mockHttpResponse.getStatusLine()).thenReturn(newStatusLine(HttpStatus.SC_OK));
+    Mockito.when(mockHttpResponse.getFirstHeader("Content-Type"))
+        .thenReturn(newHeader("Content-Type", "application/json"));
+    Mockito.when(mockHttpResponse.getFirstHeader("Content-Length"))
+        .thenReturn(newHeader("Content-Length", "1024"));
+    Mockito.when(mockHttpResponse.getEntity()).thenReturn(mockHttpEntity);
+    Mockito.when(mockHttpEntity.getContent())
+        .thenReturn(new ByteArrayInputStream(validJsonResponse().getBytes(StandardCharsets.UTF_8)))
+        .thenReturn(new ByteArrayInputStream(validJsonResponse().getBytes(StandardCharsets.UTF_8)))
+        .thenReturn(new ByteArrayInputStream(validJsonResponse().getBytes(StandardCharsets.UTF_8)));
+    // finished setting up mocks
+
+    performop(mockHttpClient);
+  }
+
+  /**
+   * Test malformed JSON response.
+   * @throws Throwable
+   */
+  @Test // (expected = WasbAuthorizationException.class)
+  public void testMalFormedJSONResponse() throws Throwable {
+
+    expectedEx.expect(WasbAuthorizationException.class);
+    expectedEx.expectMessage("com.fasterxml.jackson.core.JsonParseException: Unexpected end-of-input in FIELD_NAME");
+
+    // set up mocks
+    HttpClient mockHttpClient = Mockito.mock(HttpClient.class);
+
+    HttpResponse mockHttpResponse = Mockito.mock(HttpResponse.class);
+    HttpEntity mockHttpEntity = Mockito.mock(HttpEntity.class);
+
+    Mockito.when(mockHttpClient.execute(Mockito.<HttpGet>any())).thenReturn(mockHttpResponse);
+    Mockito.when(mockHttpResponse.getStatusLine()).thenReturn(newStatusLine(HttpStatus.SC_OK));
+    Mockito.when(mockHttpResponse.getFirstHeader("Content-Type"))
+        .thenReturn(newHeader("Content-Type", "application/json"));
+    Mockito.when(mockHttpResponse.getFirstHeader("Content-Length"))
+        .thenReturn(newHeader("Content-Length", "1024"));
+    Mockito.when(mockHttpResponse.getEntity()).thenReturn(mockHttpEntity);
+    Mockito.when(mockHttpEntity.getContent())
+        .thenReturn(new ByteArrayInputStream(malformedJsonResponse().getBytes(StandardCharsets.UTF_8)));
+    // finished setting up mocks
+
+    performop(mockHttpClient);
+  }
+
+  /**
+   * Test valid JSON response failure response code.
+   * @throws Throwable
+   */
+  @Test // (expected = WasbAuthorizationException.class)
+  public void testFailureCodeJSONResponse() throws Throwable {
+
+    expectedEx.expect(WasbAuthorizationException.class);
+    expectedEx.expectMessage("Remote authorization service encountered an error Unauthorized");
+
+    // set up mocks
+    HttpClient mockHttpClient = Mockito.mock(HttpClient.class);
+
+    HttpResponse mockHttpResponse = Mockito.mock(HttpResponse.class);
+    HttpEntity mockHttpEntity = Mockito.mock(HttpEntity.class);
+
+    Mockito.when(mockHttpClient.execute(Mockito.<HttpGet>any())).thenReturn(mockHttpResponse);
+    Mockito.when(mockHttpResponse.getStatusLine()).thenReturn(newStatusLine(HttpStatus.SC_OK));
+    Mockito.when(mockHttpResponse.getFirstHeader("Content-Type"))
+        .thenReturn(newHeader("Content-Type", "application/json"));
+    Mockito.when(mockHttpResponse.getFirstHeader("Content-Length"))
+        .thenReturn(newHeader("Content-Length", "1024"));
+    Mockito.when(mockHttpResponse.getEntity()).thenReturn(mockHttpEntity);
+    Mockito.when(mockHttpEntity.getContent())
+        .thenReturn(new ByteArrayInputStream(failureCodeJsonResponse().getBytes(StandardCharsets.UTF_8)));
+    // finished setting up mocks
+
+    performop(mockHttpClient);
+  }
+
+  @Test
+  public void testWhenOneInstanceIsDown() throws Throwable {
+
+    boolean isAuthorizationCachingEnabled = fs.getConf().getBoolean(CachingAuthorizer.KEY_AUTH_SERVICE_CACHING_ENABLE, false);
+
+    // set up mocks
+    HttpClient mockHttpClient = Mockito.mock(HttpClient.class);
+    HttpEntity mockHttpEntity = Mockito.mock(HttpEntity.class);
+
+    HttpResponse mockHttpResponseService1 = Mockito.mock(HttpResponse.class);
+    Mockito.when(mockHttpResponseService1.getStatusLine())
+        .thenReturn(newStatusLine(HttpStatus.SC_INTERNAL_SERVER_ERROR));
+    Mockito.when(mockHttpResponseService1.getFirstHeader("Content-Type"))
+        .thenReturn(newHeader("Content-Type", "application/json"));
+    Mockito.when(mockHttpResponseService1.getFirstHeader("Content-Length"))
+        .thenReturn(newHeader("Content-Length", "1024"));
+    Mockito.when(mockHttpResponseService1.getEntity())
+        .thenReturn(mockHttpEntity);
+
+    HttpResponse mockHttpResponseService2 = Mockito.mock(HttpResponse.class);
+    Mockito.when(mockHttpResponseService2.getStatusLine())
+        .thenReturn(newStatusLine(HttpStatus.SC_OK));
+    Mockito.when(mockHttpResponseService2.getFirstHeader("Content-Type"))
+        .thenReturn(newHeader("Content-Type", "application/json"));
+    Mockito.when(mockHttpResponseService2.getFirstHeader("Content-Length"))
+        .thenReturn(newHeader("Content-Length", "1024"));
+    Mockito.when(mockHttpResponseService2.getEntity())
+        .thenReturn(mockHttpEntity);
+
+    HttpResponse mockHttpResponseServiceLocal = Mockito.mock(HttpResponse.class);
+    Mockito.when(mockHttpResponseServiceLocal.getStatusLine())
+        .thenReturn(newStatusLine(HttpStatus.SC_INTERNAL_SERVER_ERROR));
+    Mockito.when(mockHttpResponseServiceLocal.getFirstHeader("Content-Type"))
+        .thenReturn(newHeader("Content-Type", "application/json"));
+    Mockito.when(mockHttpResponseServiceLocal.getFirstHeader("Content-Length"))
+        .thenReturn(newHeader("Content-Length", "1024"));
+    Mockito.when(mockHttpResponseServiceLocal.getEntity())
+        .thenReturn(mockHttpEntity);
+
+
+
+    class HttpGetForService1 extends ArgumentMatcher<HttpGet>{
+      @Override public boolean matches(Object o) {
+        return checkHttpGetMatchHost((HttpGet) o, "localhost1");
+      }
+    }
+    class HttpGetForService2 extends ArgumentMatcher<HttpGet>{
+      @Override public boolean matches(Object o) {
+        return checkHttpGetMatchHost((HttpGet) o, "localhost2");
+      }
+    }
+    class HttpGetForServiceLocal extends ArgumentMatcher<HttpGet>{
+      @Override public boolean matches(Object o) {
+        try {
+          return checkHttpGetMatchHost((HttpGet) o, InetAddress.getLocalHost().getCanonicalHostName());
+        } catch (UnknownHostException e) {
+          return checkHttpGetMatchHost((HttpGet) o, "localhost");
+        }
+      }
+    }
+    Mockito.when(mockHttpClient.execute(argThat(new HttpGetForService1())))
+        .thenReturn(mockHttpResponseService1);
+    Mockito.when(mockHttpClient.execute(argThat(new HttpGetForService2())))
+        .thenReturn(mockHttpResponseService2);
+    Mockito.when(mockHttpClient.execute(argThat(new HttpGetForServiceLocal())))
+        .thenReturn(mockHttpResponseServiceLocal);
+
+    //Need 2 times because performop()  does 2 fs operations.
+    Mockito.when(mockHttpEntity.getContent())
+        .thenReturn(new ByteArrayInputStream(validJsonResponse()
+            .getBytes(StandardCharsets.UTF_8)))
+        .thenReturn(new ByteArrayInputStream(validJsonResponse()
+            .getBytes(StandardCharsets.UTF_8)))
+        .thenReturn(new ByteArrayInputStream(validJsonResponse()
+            .getBytes(StandardCharsets.UTF_8)));
+    // finished setting up mocks
+
+    performop(mockHttpClient);
+
+    int expectedNumberOfInvocations = isAuthorizationCachingEnabled ? 1 : 2;
+    Mockito.verify(mockHttpClient, times(expectedNumberOfInvocations)).execute(Mockito.argThat(new HttpGetForServiceLocal()));
+    Mockito.verify(mockHttpClient, times(expectedNumberOfInvocations)).execute(Mockito.argThat(new HttpGetForService2()));
+  }
+
+  @Test
+  public void testWhenServiceInstancesAreDown() throws Throwable {
+    //expectedEx.expect(WasbAuthorizationException.class);
+    // set up mocks
+    HttpClient mockHttpClient = Mockito.mock(HttpClient.class);
+    HttpEntity mockHttpEntity = Mockito.mock(HttpEntity.class);
+
+    HttpResponse mockHttpResponseService1 = Mockito.mock(HttpResponse.class);
+    Mockito.when(mockHttpResponseService1.getStatusLine())
+        .thenReturn(newStatusLine(HttpStatus.SC_INTERNAL_SERVER_ERROR));
+    Mockito.when(mockHttpResponseService1.getFirstHeader("Content-Type"))
+        .thenReturn(newHeader("Content-Type", "application/json"));
+    Mockito.when(mockHttpResponseService1.getFirstHeader("Content-Length"))
+        .thenReturn(newHeader("Content-Length", "1024"));
+    Mockito.when(mockHttpResponseService1.getEntity())
+        .thenReturn(mockHttpEntity);
+
+    HttpResponse mockHttpResponseService2 = Mockito.mock(HttpResponse.class);
+    Mockito.when(mockHttpResponseService2.getStatusLine())
+        .thenReturn(newStatusLine(
+        HttpStatus.SC_INTERNAL_SERVER_ERROR));
+    Mockito.when(mockHttpResponseService2.getFirstHeader("Content-Type"))
+        .thenReturn(newHeader("Content-Type", "application/json"));
+    Mockito.when(mockHttpResponseService2.getFirstHeader("Content-Length"))
+        .thenReturn(newHeader("Content-Length", "1024"));
+    Mockito.when(mockHttpResponseService2.getEntity())
+        .thenReturn(mockHttpEntity);
+
+    HttpResponse mockHttpResponseService3 = Mockito.mock(HttpResponse.class);
+    Mockito.when(mockHttpResponseService3.getStatusLine())
+        .thenReturn(newStatusLine(
+            HttpStatus.SC_INTERNAL_SERVER_ERROR));
+    Mockito.when(mockHttpResponseService3.getFirstHeader("Content-Type"))
+        .thenReturn(newHeader("Content-Type", "application/json"));
+    Mockito.when(mockHttpResponseService3.getFirstHeader("Content-Length"))
+        .thenReturn(newHeader("Content-Length", "1024"));
+    Mockito.when(mockHttpResponseService3.getEntity())
+        .thenReturn(mockHttpEntity);
+
+    class HttpGetForService1 extends ArgumentMatcher<HttpGet>{
+      @Override public boolean matches(Object o) {
+        return checkHttpGetMatchHost((HttpGet) o, "localhost1");
+      }
+    }
+    class HttpGetForService2 extends ArgumentMatcher<HttpGet>{
+      @Override public boolean matches(Object o) {
+        return checkHttpGetMatchHost((HttpGet) o, "localhost2");
+      }
+    }
+    class HttpGetForService3 extends ArgumentMatcher<HttpGet> {
+      @Override public boolean matches(Object o){
+        try {
+          return checkHttpGetMatchHost((HttpGet) o, InetAddress.getLocalHost().getCanonicalHostName());
+        } catch (UnknownHostException e) {
+          return checkHttpGetMatchHost((HttpGet) o, "localhost");
+        }
+      }
+    }
+    Mockito.when(mockHttpClient.execute(argThat(new HttpGetForService1())))
+        .thenReturn(mockHttpResponseService1);
+    Mockito.when(mockHttpClient.execute(argThat(new HttpGetForService2())))
+        .thenReturn(mockHttpResponseService2);
+    Mockito.when(mockHttpClient.execute(argThat(new HttpGetForService3())))
+        .thenReturn(mockHttpResponseService3);
+
+    //Need 3 times because performop()  does 3 fs operations.
+    Mockito.when(mockHttpEntity.getContent())
+        .thenReturn(new ByteArrayInputStream(
+            validJsonResponse().getBytes(StandardCharsets.UTF_8)))
+        .thenReturn(new ByteArrayInputStream(
+            validJsonResponse().getBytes(StandardCharsets.UTF_8)))
+        .thenReturn(new ByteArrayInputStream(
+            validJsonResponse().getBytes(StandardCharsets.UTF_8)));
+    // finished setting up mocks
+    try {
+      performop(mockHttpClient);
+    }catch (WasbAuthorizationException e){
+      e.printStackTrace();
+      Mockito.verify(mockHttpClient, atLeast(2))
+          .execute(argThat(new HttpGetForService1()));
+      Mockito.verify(mockHttpClient, atLeast(2))
+          .execute(argThat(new HttpGetForService2()));
+      Mockito.verify(mockHttpClient, atLeast(3))
+          .execute(argThat(new HttpGetForService3()));
+      Mockito.verify(mockHttpClient, times(7)).execute(Mockito.<HttpGet>any());
+    }
+  }
+
+  private void setupExpectations() {
+    expectedEx.expect(WasbAuthorizationException.class);
+
+    class MatchesPattern extends TypeSafeMatcher<String> {
+      private String pattern;
+
+      MatchesPattern(String pattern) {
+        this.pattern = pattern;
+      }
+
+      @Override protected boolean matchesSafely(String item) {
+        return item.matches(pattern);
+      }
+
+      @Override public void describeTo(Description description) {
+        description.appendText("matches pattern ").appendValue(pattern);
+      }
+
+      @Override protected void describeMismatchSafely(String item,
+          Description mismatchDescription) {
+        mismatchDescription.appendText("does not match");
+      }
+    }
+
+    expectedEx.expectMessage(new MatchesPattern(
+        "org\\.apache\\.hadoop\\.fs\\.azure\\.WasbRemoteCallException: "
+            + "Encountered error while making remote call to "
+            + "http:\\/\\/localhost1\\/,http:\\/\\/localhost2\\/,http:\\/\\/localhost:8080 retried 6 time\\(s\\)\\."));
+  }
+
+  private void performop(HttpClient mockHttpClient) throws Throwable {
+
+    Path testPath = new Path("/", "test.dat");
+
+    RemoteWasbAuthorizerImpl authorizer = new RemoteWasbAuthorizerImpl();
+    authorizer.init(fs.getConf());
+    WasbRemoteCallHelper mockWasbRemoteCallHelper = new WasbRemoteCallHelper(
+        RetryUtils.getMultipleLinearRandomRetry(new Configuration(),
+            EMPTY_STRING, true,
+            EMPTY_STRING, "1000,3,10000,2"));
+    mockWasbRemoteCallHelper.updateHttpClient(mockHttpClient);
+    authorizer.updateWasbRemoteCallHelper(mockWasbRemoteCallHelper);
+    fs.updateWasbAuthorizer(authorizer);
+
+    fs.create(testPath);
+    ContractTestUtils.assertPathExists(fs, "testPath was not created", testPath);
+    fs.delete(testPath, false);
+  }
+
+  private String validJsonResponse() {
+    return "{"
+        + "\"responseCode\": 0,"
+        + "\"authorizationResult\": true,"
+        + "\"responseMessage\": \"Authorized\""
+        + "}";
+  }
+
+  private String malformedJsonResponse() {
+    return "{"
+        + "\"responseCode\": 0,"
+        + "\"authorizationResult\": true,"
+        + "\"responseMessage\":";
+  }
+
+  private String failureCodeJsonResponse() {
+    return "{"
+        + "\"responseCode\": 1,"
+        + "\"authorizationResult\": false,"
+        + "\"responseMessage\": \"Unauthorized\""
+        + "}";
+  }
+
+  private StatusLine newStatusLine(int statusCode) {
+    return new StatusLine() {
+      @Override
+      public ProtocolVersion getProtocolVersion() {
+        return new ProtocolVersion("HTTP", 1, 1);
+      }
+
+      @Override
+      public int getStatusCode() {
+        return statusCode;
+      }
+
+      @Override
+      public String getReasonPhrase() {
+        return "Reason Phrase";
+      }
+    };
+  }
+
+  private Header newHeader(String name, String value) {
+    return new Header() {
+      @Override
+      public String getName() {
+        return name;
+      }
+
+      @Override
+      public String getValue() {
+        return value;
+      }
+
+      @Override
+      public HeaderElement[] getElements() throws ParseException {
+        return new HeaderElement[0];
+      }
+    };
+  }
+
+  /** Check that a HttpGet request is with given remote host. */
+  private static boolean checkHttpGetMatchHost(HttpGet g, String h) {
+    return g != null && g.getURI().getHost().equals(h);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2d2d97fa/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestWasbUriAndConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestWasbUriAndConfiguration.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestWasbUriAndConfiguration.java
new file mode 100644
index 0000000..bee0220
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestWasbUriAndConfiguration.java
@@ -0,0 +1,610 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azure;
+
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY;
+import static org.junit.Assume.assumeNotNull;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.URI;
+import java.util.Date;
+import java.util.EnumSet;
+import java.io.File;
+
+import org.apache.hadoop.fs.azure.integration.AzureTestUtils;
+import org.apache.hadoop.security.ProviderUtils;
+import org.apache.hadoop.security.alias.CredentialProvider;
+import org.apache.hadoop.security.alias.CredentialProviderFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.AbstractFileSystem;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.UnsupportedFileSystemException;
+import org.apache.hadoop.fs.azure.AzureBlobStorageTestAccount.CreateOptions;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import com.microsoft.azure.storage.blob.CloudBlobContainer;
+import com.microsoft.azure.storage.blob.CloudBlockBlob;
+
+public class ITestWasbUriAndConfiguration extends AbstractWasbTestWithTimeout {
+
+  private static final int FILE_SIZE = 4096;
+  private static final String PATH_DELIMITER = "/";
+
+  protected String accountName;
+  protected String accountKey;
+  protected static Configuration conf = null;
+  private boolean runningInSASMode = false;
+  @Rule
+  public final TemporaryFolder tempDir = new TemporaryFolder();
+
+  private AzureBlobStorageTestAccount testAccount;
+
+  @After
+  public void tearDown() throws Exception {
+    testAccount = AzureTestUtils.cleanupTestAccount(testAccount);
+  }
+
+  @Before
+  public void setMode() {
+    runningInSASMode = AzureBlobStorageTestAccount.createTestConfiguration().
+        getBoolean(AzureNativeFileSystemStore.KEY_USE_SECURE_MODE, false);
+  }
+
+  private boolean validateIOStreams(Path filePath) throws IOException {
+    // Capture the file system from the test account.
+    FileSystem fs = testAccount.getFileSystem();
+    return validateIOStreams(fs, filePath);
+  }
+
+  private boolean validateIOStreams(FileSystem fs, Path filePath)
+      throws IOException {
+
+    // Create and write a file
+    OutputStream outputStream = fs.create(filePath);
+    outputStream.write(new byte[FILE_SIZE]);
+    outputStream.close();
+
+    // Return true if the the count is equivalent to the file size.
+    return (FILE_SIZE == readInputStream(fs, filePath));
+  }
+
+  private int readInputStream(Path filePath) throws IOException {
+    // Capture the file system from the test account.
+    FileSystem fs = testAccount.getFileSystem();
+    return readInputStream(fs, filePath);
+  }
+
+  private int readInputStream(FileSystem fs, Path filePath) throws IOException {
+    // Read the file
+    InputStream inputStream = fs.open(filePath);
+    int count = 0;
+    while (inputStream.read() >= 0) {
+      count++;
+    }
+    inputStream.close();
+
+    // Return true if the the count is equivalent to the file size.
+    return count;
+  }
+
+  // Positive tests to exercise making a connection with to Azure account using
+  // account key.
+  @Test
+  public void testConnectUsingKey() throws Exception {
+
+    testAccount = AzureBlobStorageTestAccount.create();
+    assumeNotNull(testAccount);
+
+    // Validate input and output on the connection.
+    assertTrue(validateIOStreams(new Path("/wasb_scheme")));
+  }
+
+  @Test
+  public void testConnectUsingSAS() throws Exception {
+
+    Assume.assumeFalse(runningInSASMode);
+    // Create the test account with SAS credentials.
+    testAccount = AzureBlobStorageTestAccount.create("",
+        EnumSet.of(CreateOptions.UseSas, CreateOptions.CreateContainer));
+    assumeNotNull(testAccount);
+    // Validate input and output on the connection.
+    // NOTE: As of 4/15/2013, Azure Storage has a deficiency that prevents the
+    // full scenario from working (CopyFromBlob doesn't work with SAS), so
+    // just do a minor check until that is corrected.
+    assertFalse(testAccount.getFileSystem().exists(new Path("/IDontExist")));
+    //assertTrue(validateIOStreams(new Path("/sastest.txt")));
+  }
+
+  @Test
+  public void testConnectUsingSASReadonly() throws Exception {
+
+    Assume.assumeFalse(runningInSASMode);
+    // Create the test account with SAS credentials.
+    testAccount = AzureBlobStorageTestAccount.create("", EnumSet.of(
+        CreateOptions.UseSas, CreateOptions.CreateContainer,
+        CreateOptions.Readonly));
+    assumeNotNull(testAccount);
+
+    // Create a blob in there
+    final String blobKey = "blobForReadonly";
+    CloudBlobContainer container = testAccount.getRealContainer();
+    CloudBlockBlob blob = container.getBlockBlobReference(blobKey);
+    ByteArrayInputStream inputStream = new ByteArrayInputStream(new byte[] { 1,
+        2, 3 });
+    blob.upload(inputStream, 3);
+    inputStream.close();
+
+    // Make sure we can read it from the file system
+    Path filePath = new Path("/" + blobKey);
+    FileSystem fs = testAccount.getFileSystem();
+    assertTrue(fs.exists(filePath));
+    byte[] obtained = new byte[3];
+    DataInputStream obtainedInputStream = fs.open(filePath);
+    obtainedInputStream.readFully(obtained);
+    obtainedInputStream.close();
+    assertEquals(3, obtained[2]);
+  }
+
+  @Test
+  public void testConnectUsingAnonymous() throws Exception {
+
+    // Create test account with anonymous credentials
+    testAccount = AzureBlobStorageTestAccount.createAnonymous("testWasb.txt",
+        FILE_SIZE);
+    assumeNotNull(testAccount);
+
+    // Read the file from the public folder using anonymous credentials.
+    assertEquals(FILE_SIZE, readInputStream(new Path("/testWasb.txt")));
+  }
+
+  @Test
+  public void testConnectToEmulator() throws Exception {
+    testAccount = AzureBlobStorageTestAccount.createForEmulator();
+    assumeNotNull(testAccount);
+    assertTrue(validateIOStreams(new Path("/testFile")));
+  }
+
+  /**
+   * Tests that we can connect to fully qualified accounts outside of
+   * blob.core.windows.net
+   */
+  @Test
+  public void testConnectToFullyQualifiedAccountMock() throws Exception {
+    Configuration conf = new Configuration();
+    AzureBlobStorageTestAccount.setMockAccountKey(conf,
+        "mockAccount.mock.authority.net");
+    AzureNativeFileSystemStore store = new AzureNativeFileSystemStore();
+    MockStorageInterface mockStorage = new MockStorageInterface();
+    store.setAzureStorageInteractionLayer(mockStorage);
+    NativeAzureFileSystem fs = new NativeAzureFileSystem(store);
+    fs.initialize(
+        new URI("wasb://mockContainer@mockAccount.mock.authority.net"), conf);
+    fs.createNewFile(new Path("/x"));
+    assertTrue(mockStorage.getBackingStore().exists(
+        "http://mockAccount.mock.authority.net/mockContainer/x"));
+    fs.close();
+  }
+
+  public void testConnectToRoot() throws Exception {
+
+    // Set up blob names.
+    final String blobPrefix = String.format("wasbtests-%s-%tQ-blob",
+        System.getProperty("user.name"), new Date());
+    final String inblobName = blobPrefix + "_In" + ".txt";
+    final String outblobName = blobPrefix + "_Out" + ".txt";
+
+    // Create test account with default root access.
+    testAccount = AzureBlobStorageTestAccount.createRoot(inblobName, FILE_SIZE);
+    assumeNotNull(testAccount);
+
+    // Read the file from the default container.
+    assertEquals(FILE_SIZE, readInputStream(new Path(PATH_DELIMITER
+        + inblobName)));
+
+    try {
+      // Capture file system.
+      FileSystem fs = testAccount.getFileSystem();
+
+      // Create output path and open an output stream to the root folder.
+      Path outputPath = new Path(PATH_DELIMITER + outblobName);
+      OutputStream outputStream = fs.create(outputPath);
+      fail("Expected an AzureException when writing to root folder.");
+      outputStream.write(new byte[FILE_SIZE]);
+      outputStream.close();
+    } catch (AzureException e) {
+      assertTrue(true);
+    } catch (Exception e) {
+      String errMsg = String.format(
+          "Expected AzureException but got %s instead.", e);
+      assertTrue(errMsg, false);
+    }
+  }
+
+  // Positive tests to exercise throttling I/O path. Connections are made to an
+  // Azure account using account key.
+  //
+  public void testConnectWithThrottling() throws Exception {
+
+    testAccount = AzureBlobStorageTestAccount.createThrottled();
+
+    // Validate input and output on the connection.
+    assertTrue(validateIOStreams(new Path("/wasb_scheme")));
+  }
+
+  /**
+   * Creates a file and writes a single byte with the given value in it.
+   */
+  private static void writeSingleByte(FileSystem fs, Path testFile, int toWrite)
+      throws Exception {
+    OutputStream outputStream = fs.create(testFile);
+    outputStream.write(toWrite);
+    outputStream.close();
+  }
+
+  /**
+   * Reads the file given and makes sure that it's a single-byte file with the
+   * given value in it.
+   */
+  private static void assertSingleByteValue(FileSystem fs, Path testFile,
+      int expectedValue) throws Exception {
+    InputStream inputStream = fs.open(testFile);
+    int byteRead = inputStream.read();
+    assertTrue("File unexpectedly empty: " + testFile, byteRead >= 0);
+    assertTrue("File has more than a single byte: " + testFile,
+        inputStream.read() < 0);
+    inputStream.close();
+    assertEquals("Unxpected content in: " + testFile, expectedValue, byteRead);
+  }
+
+  @Test
+  public void testMultipleContainers() throws Exception {
+    AzureBlobStorageTestAccount firstAccount = AzureBlobStorageTestAccount
+        .create("first"), secondAccount = AzureBlobStorageTestAccount
+        .create("second");
+    assumeNotNull(firstAccount);
+    assumeNotNull(secondAccount);
+    try {
+      FileSystem firstFs = firstAccount.getFileSystem(),
+          secondFs = secondAccount.getFileSystem();
+      Path testFile = new Path("/testWasb");
+      assertTrue(validateIOStreams(firstFs, testFile));
+      assertTrue(validateIOStreams(secondFs, testFile));
+      // Make sure that we're really dealing with two file systems here.
+      writeSingleByte(firstFs, testFile, 5);
+      writeSingleByte(secondFs, testFile, 7);
+      assertSingleByteValue(firstFs, testFile, 5);
+      assertSingleByteValue(secondFs, testFile, 7);
+    } finally {
+      firstAccount.cleanup();
+      secondAccount.cleanup();
+    }
+  }
+
+  @Test
+  public void testDefaultKeyProvider() throws Exception {
+    Configuration conf = new Configuration();
+    String account = "testacct";
+    String key = "testkey";
+
+    conf.set(SimpleKeyProvider.KEY_ACCOUNT_KEY_PREFIX + account, key);
+
+    String result = AzureNativeFileSystemStore.getAccountKeyFromConfiguration(
+        account, conf);
+    assertEquals(key, result);
+  }
+
+  @Test
+  public void testCredsFromCredentialProvider() throws Exception {
+
+    Assume.assumeFalse(runningInSASMode);
+    String account = "testacct";
+    String key = "testkey";
+    // set up conf to have a cred provider
+    final Configuration conf = new Configuration();
+    final File file = tempDir.newFile("test.jks");
+    final URI jks = ProviderUtils.nestURIForLocalJavaKeyStoreProvider(
+        file.toURI());
+    conf.set(CredentialProviderFactory.CREDENTIAL_PROVIDER_PATH,
+        jks.toString());
+
+    provisionAccountKey(conf, account, key);
+
+    // also add to configuration as clear text that should be overridden
+    conf.set(SimpleKeyProvider.KEY_ACCOUNT_KEY_PREFIX + account,
+        key + "cleartext");
+
+    String result = AzureNativeFileSystemStore.getAccountKeyFromConfiguration(
+        account, conf);
+    // result should contain the credential provider key not the config key
+    assertEquals("AccountKey incorrect.", key, result);
+  }
+
+  void provisionAccountKey(
+      final Configuration conf, String account, String key) throws Exception {
+    // add our creds to the provider
+    final CredentialProvider provider =
+        CredentialProviderFactory.getProviders(conf).get(0);
+    provider.createCredentialEntry(
+        SimpleKeyProvider.KEY_ACCOUNT_KEY_PREFIX + account, key.toCharArray());
+    provider.flush();
+  }
+
+  @Test
+  public void testValidKeyProvider() throws Exception {
+    Configuration conf = new Configuration();
+    String account = "testacct";
+    String key = "testkey";
+
+    conf.set(SimpleKeyProvider.KEY_ACCOUNT_KEY_PREFIX + account, key);
+    conf.setClass("fs.azure.account.keyprovider." + account,
+        SimpleKeyProvider.class, KeyProvider.class);
+    String result = AzureNativeFileSystemStore.getAccountKeyFromConfiguration(
+        account, conf);
+    assertEquals(key, result);
+  }
+
+  @Test
+  public void testInvalidKeyProviderNonexistantClass() throws Exception {
+    Configuration conf = new Configuration();
+    String account = "testacct";
+
+    conf.set("fs.azure.account.keyprovider." + account,
+        "org.apache.Nonexistant.Class");
+    try {
+      AzureNativeFileSystemStore.getAccountKeyFromConfiguration(account, conf);
+      Assert.fail("Nonexistant key provider class should have thrown a "
+          + "KeyProviderException");
+    } catch (KeyProviderException e) {
+    }
+  }
+
+  @Test
+  public void testInvalidKeyProviderWrongClass() throws Exception {
+    Configuration conf = new Configuration();
+    String account = "testacct";
+
+    conf.set("fs.azure.account.keyprovider." + account, "java.lang.String");
+    try {
+      AzureNativeFileSystemStore.getAccountKeyFromConfiguration(account, conf);
+      Assert.fail("Key provider class that doesn't implement KeyProvider "
+          + "should have thrown a KeyProviderException");
+    } catch (KeyProviderException e) {
+    }
+  }
+
+  /**
+   * Tests the cases when the URI is specified with no authority, i.e.
+   * wasb:///path/to/file.
+   */
+  @Test
+  public void testNoUriAuthority() throws Exception {
+    // For any combination of default FS being asv(s)/wasb(s)://c@a/ and
+    // the actual URI being asv(s)/wasb(s):///, it should work.
+
+    String[] wasbAliases = new String[] { "wasb", "wasbs" };
+    for (String defaultScheme : wasbAliases) {
+      for (String wantedScheme : wasbAliases) {
+        testAccount = AzureBlobStorageTestAccount.createMock();
+        Configuration conf = testAccount.getFileSystem().getConf();
+        String authority = testAccount.getFileSystem().getUri().getAuthority();
+        URI defaultUri = new URI(defaultScheme, authority, null, null, null);
+        conf.set(FS_DEFAULT_NAME_KEY, defaultUri.toString());
+        // Add references to file system implementations for wasb and wasbs.
+        conf.addResource("azure-test.xml");
+        URI wantedUri = new URI(wantedScheme + ":///random/path");
+        NativeAzureFileSystem obtained = (NativeAzureFileSystem) FileSystem
+            .get(wantedUri, conf);
+        assertNotNull(obtained);
+        assertEquals(new URI(wantedScheme, authority, null, null, null),
+            obtained.getUri());
+        // Make sure makeQualified works as expected
+        Path qualified = obtained.makeQualified(new Path(wantedUri));
+        assertEquals(new URI(wantedScheme, authority, wantedUri.getPath(),
+            null, null), qualified.toUri());
+        // Cleanup for the next iteration to not cache anything in FS
+        testAccount.cleanup();
+        FileSystem.closeAll();
+      }
+    }
+    // If the default FS is not a WASB FS, then specifying a URI without
+    // authority for the Azure file system should throw.
+    testAccount = AzureBlobStorageTestAccount.createMock();
+    Configuration conf = testAccount.getFileSystem().getConf();
+    conf.set(FS_DEFAULT_NAME_KEY, "file:///");
+    try {
+      FileSystem.get(new URI("wasb:///random/path"), conf);
+      fail("Should've thrown.");
+    } catch (IllegalArgumentException e) {
+    }
+  }
+
+  @Test
+  public void testWasbAsDefaultFileSystemHasNoPort() throws Exception {
+    try {
+      testAccount = AzureBlobStorageTestAccount.createMock();
+      Configuration conf = testAccount.getFileSystem().getConf();
+      String authority = testAccount.getFileSystem().getUri().getAuthority();
+      URI defaultUri = new URI("wasb", authority, null, null, null);
+      conf.set(FS_DEFAULT_NAME_KEY, defaultUri.toString());
+      conf.addResource("azure-test.xml");
+
+      FileSystem fs = FileSystem.get(conf);
+      assertTrue(fs instanceof NativeAzureFileSystem);
+      assertEquals(-1, fs.getUri().getPort());
+
+      AbstractFileSystem afs = FileContext.getFileContext(conf)
+          .getDefaultFileSystem();
+      assertTrue(afs instanceof Wasb);
+      assertEquals(-1, afs.getUri().getPort());
+    } finally {
+      testAccount.cleanup();
+      FileSystem.closeAll();
+    }
+  }
+
+   /**
+   * Tests the cases when the scheme specified is 'wasbs'.
+   */
+  @Test
+  public void testAbstractFileSystemImplementationForWasbsScheme() throws Exception {
+    try {
+      testAccount = AzureBlobStorageTestAccount.createMock();
+      Configuration conf = testAccount.getFileSystem().getConf();
+      String authority = testAccount.getFileSystem().getUri().getAuthority();
+      URI defaultUri = new URI("wasbs", authority, null, null, null);
+      conf.set(FS_DEFAULT_NAME_KEY, defaultUri.toString());
+      conf.set("fs.AbstractFileSystem.wasbs.impl", "org.apache.hadoop.fs.azure.Wasbs");
+      conf.addResource("azure-test.xml");
+
+      FileSystem fs = FileSystem.get(conf);
+      assertTrue(fs instanceof NativeAzureFileSystem);
+      assertEquals("wasbs", fs.getScheme());
+
+      AbstractFileSystem afs = FileContext.getFileContext(conf)
+          .getDefaultFileSystem();
+      assertTrue(afs instanceof Wasbs);
+      assertEquals(-1, afs.getUri().getPort());
+      assertEquals("wasbs", afs.getUri().getScheme());
+    } finally {
+      testAccount.cleanup();
+      FileSystem.closeAll();
+    }
+  }
+
+  @Test
+  public void testNoAbstractFileSystemImplementationSpecifiedForWasbsScheme() throws Exception {
+    try {
+      testAccount = AzureBlobStorageTestAccount.createMock();
+      Configuration conf = testAccount.getFileSystem().getConf();
+      String authority = testAccount.getFileSystem().getUri().getAuthority();
+      URI defaultUri = new URI("wasbs", authority, null, null, null);
+      conf.set(FS_DEFAULT_NAME_KEY, defaultUri.toString());
+
+      FileSystem fs = FileSystem.get(conf);
+      assertTrue(fs instanceof NativeAzureFileSystem);
+      assertEquals("wasbs", fs.getScheme());
+
+      // should throw if 'fs.AbstractFileSystem.wasbs.impl'' is not specified
+      try{
+        FileContext.getFileContext(conf).getDefaultFileSystem();
+        fail("Should've thrown.");
+      }catch(UnsupportedFileSystemException e){
+      }
+
+    } finally {
+      testAccount.cleanup();
+      FileSystem.closeAll();
+    }
+  }
+
+  @Test
+  public void testCredentialProviderPathExclusions() throws Exception {
+    String providerPath =
+        "user:///,jceks://wasb/user/hrt_qa/sqoopdbpasswd.jceks," +
+        "jceks://hdfs@nn1.example.com/my/path/test.jceks";
+    Configuration config = new Configuration();
+    config.set(CredentialProviderFactory.CREDENTIAL_PROVIDER_PATH,
+        providerPath);
+    String newPath = "user:///,jceks://hdfs@nn1.example.com/my/path/test.jceks";
+
+    excludeAndTestExpectations(config, newPath);
+  }
+
+  @Test
+  public void testExcludeAllProviderTypesFromConfig() throws Exception {
+    String providerPath =
+        "jceks://wasb/tmp/test.jceks," +
+        "jceks://wasb@/my/path/test.jceks";
+    Configuration config = new Configuration();
+    config.set(CredentialProviderFactory.CREDENTIAL_PROVIDER_PATH,
+        providerPath);
+    String newPath = null;
+
+    excludeAndTestExpectations(config, newPath);
+  }
+
+  void excludeAndTestExpectations(Configuration config, String newPath)
+    throws Exception {
+    Configuration conf = ProviderUtils.excludeIncompatibleCredentialProviders(
+        config, NativeAzureFileSystem.class);
+    String effectivePath = conf.get(
+        CredentialProviderFactory.CREDENTIAL_PROVIDER_PATH, null);
+    assertEquals(newPath, effectivePath);
+  }
+
+  @Test
+  public void testUserAgentConfig() throws Exception {
+    // Set the user agent
+    try {
+      testAccount = AzureBlobStorageTestAccount.createMock();
+      Configuration conf = testAccount.getFileSystem().getConf();
+      String authority = testAccount.getFileSystem().getUri().getAuthority();
+      URI defaultUri = new URI("wasbs", authority, null, null, null);
+      conf.set(FS_DEFAULT_NAME_KEY, defaultUri.toString());
+      conf.set("fs.AbstractFileSystem.wasbs.impl", "org.apache.hadoop.fs.azure.Wasbs");
+
+      conf.set(AzureNativeFileSystemStore.USER_AGENT_ID_KEY, "TestClient");
+
+      FileSystem fs = FileSystem.get(conf);
+      AbstractFileSystem afs = FileContext.getFileContext(conf).getDefaultFileSystem();
+
+      assertTrue(afs instanceof Wasbs);
+      assertEquals(-1, afs.getUri().getPort());
+      assertEquals("wasbs", afs.getUri().getScheme());
+
+    } finally {
+      testAccount.cleanup();
+      FileSystem.closeAll();
+    }
+
+    // Unset the user agent
+    try {
+      testAccount = AzureBlobStorageTestAccount.createMock();
+      Configuration conf = testAccount.getFileSystem().getConf();
+      String authority = testAccount.getFileSystem().getUri().getAuthority();
+      URI defaultUri = new URI("wasbs", authority, null, null, null);
+      conf.set(FS_DEFAULT_NAME_KEY, defaultUri.toString());
+      conf.set("fs.AbstractFileSystem.wasbs.impl", "org.apache.hadoop.fs.azure.Wasbs");
+
+      conf.unset(AzureNativeFileSystemStore.USER_AGENT_ID_KEY);
+
+      FileSystem fs = FileSystem.get(conf);
+      AbstractFileSystem afs = FileContext.getFileContext(conf).getDefaultFileSystem();
+      assertTrue(afs instanceof Wasbs);
+      assertEquals(-1, afs.getUri().getPort());
+      assertEquals("wasbs", afs.getUri().getScheme());
+
+    } finally {
+      testAccount.cleanup();
+      FileSystem.closeAll();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2d2d97fa/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/MockWasbAuthorizerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/MockWasbAuthorizerImpl.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/MockWasbAuthorizerImpl.java
index 9fbab49..7354499 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/MockWasbAuthorizerImpl.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/MockWasbAuthorizerImpl.java
@@ -38,11 +38,12 @@ public class MockWasbAuthorizerImpl implements WasbAuthorizerInterface {
   private boolean performOwnerMatch;
   private CachingAuthorizer<CachedAuthorizerEntry, Boolean> cache;
 
- // The full qualified URL to the root directory
+  // The full qualified URL to the root directory
   private String qualifiedPrefixUrl;
 
   public MockWasbAuthorizerImpl(NativeAzureFileSystem fs) {
-    qualifiedPrefixUrl = new Path("/").makeQualified(fs.getUri(), fs.getWorkingDirectory())
+    qualifiedPrefixUrl = new Path("/").makeQualified(fs.getUri(),
+        fs.getWorkingDirectory())
         .toString().replaceAll("/$", "");
     cache = new CachingAuthorizer<>(TimeUnit.MINUTES.convert(5L, TimeUnit.MINUTES), "AUTHORIZATION");
   }
@@ -64,19 +65,23 @@ public class MockWasbAuthorizerImpl implements WasbAuthorizerInterface {
 
   public void addAuthRule(String wasbAbsolutePath,
       String accessType, boolean access) {
-        wasbAbsolutePath = qualifiedPrefixUrl + wasbAbsolutePath;
-        AuthorizationComponent component = wasbAbsolutePath.endsWith("*")
-        ? new AuthorizationComponent("^" + wasbAbsolutePath.replace("*", ".*"), accessType)
+    wasbAbsolutePath = qualifiedPrefixUrl + wasbAbsolutePath;
+    AuthorizationComponent component = wasbAbsolutePath.endsWith("*")
+        ? new AuthorizationComponent("^" + wasbAbsolutePath.replace("*", ".*"),
+        accessType)
         : new AuthorizationComponent(wasbAbsolutePath, accessType);
 
     this.authRules.put(component, access);
   }
 
   @Override
-  public boolean authorize(String wasbAbsolutePath, String accessType, String owner)
+  public boolean authorize(String wasbAbsolutePath,
+      String accessType,
+      String owner)
       throws WasbAuthorizationException {
 
-    if (wasbAbsolutePath.endsWith(NativeAzureFileSystem.FolderRenamePending.SUFFIX)) {
+    if (wasbAbsolutePath.endsWith(
+        NativeAzureFileSystem.FolderRenamePending.SUFFIX)) {
       return true;
     }
 
@@ -108,20 +113,23 @@ public class MockWasbAuthorizerImpl implements WasbAuthorizerInterface {
     // In case of root("/"), owner match does not happen because owner is returned as empty string.
     // we try to force owner match just for purpose of tests to make sure all operations work seemlessly with owner.
     if (this.performOwnerMatch
-      && StringUtils.equalsIgnoreCase(wasbAbsolutePath, qualifiedPrefixUrl + "/")) {
+        && StringUtils.equalsIgnoreCase(wasbAbsolutePath,
+        qualifiedPrefixUrl + "/")) {
       owner = currentUserShortName;
     }
 
     boolean shouldEvaluateOwnerAccess = owner != null && !owner.isEmpty()
-      && this.performOwnerMatch;
+        && this.performOwnerMatch;
 
-    boolean isOwnerMatch =  StringUtils.equalsIgnoreCase(currentUserShortName, owner);
+    boolean isOwnerMatch = StringUtils.equalsIgnoreCase(currentUserShortName,
+        owner);
 
     AuthorizationComponent component =
         new AuthorizationComponent(wasbAbsolutePath, accessType);
 
     if (authRules.containsKey(component)) {
-      return shouldEvaluateOwnerAccess ? isOwnerMatch && authRules.get(component) : authRules.get(component);
+      return shouldEvaluateOwnerAccess ? isOwnerMatch && authRules.get(
+          component) : authRules.get(component);
     } else {
       // Regex-pattern match if we don't have a straight match
       for (Map.Entry<AuthorizationComponent, Boolean> entry : authRules.entrySet()) {
@@ -129,8 +137,11 @@ public class MockWasbAuthorizerImpl implements WasbAuthorizerInterface {
         String keyPath = key.getWasbAbsolutePath();
         String keyAccess = key.getAccessType();
 
-        if (keyPath.endsWith("*") && Pattern.matches(keyPath, wasbAbsolutePath) && keyAccess.equals(accessType)) {
-          return shouldEvaluateOwnerAccess ? isOwnerMatch && entry.getValue() : entry.getValue();
+        if (keyPath.endsWith("*") && Pattern.matches(keyPath, wasbAbsolutePath)
+            && keyAccess.equals(accessType)) {
+          return shouldEvaluateOwnerAccess
+              ? isOwnerMatch && entry.getValue()
+              : entry.getValue();
         }
       }
       return false;
@@ -141,47 +152,47 @@ public class MockWasbAuthorizerImpl implements WasbAuthorizerInterface {
     authRules.clear();
     cache.clear();
   }
-}
 
-class AuthorizationComponent {
+  private static class AuthorizationComponent {
 
-  private String wasbAbsolutePath;
-  private String accessType;
+    private final String wasbAbsolutePath;
+    private final String accessType;
 
-  public AuthorizationComponent(String wasbAbsolutePath,
-      String accessType) {
-    this.wasbAbsolutePath = wasbAbsolutePath;
-    this.accessType = accessType;
-  }
+    AuthorizationComponent(String wasbAbsolutePath,
+        String accessType) {
+      this.wasbAbsolutePath = wasbAbsolutePath;
+      this.accessType = accessType;
+    }
 
-  @Override
-  public int hashCode() {
-    return this.wasbAbsolutePath.hashCode() ^ this.accessType.hashCode();
-  }
+    @Override
+    public int hashCode() {
+      return this.wasbAbsolutePath.hashCode() ^ this.accessType.hashCode();
+    }
 
-  @Override
-  public boolean equals(Object obj) {
+    @Override
+    public boolean equals(Object obj) {
 
-    if (obj == this) {
-      return true;
-    }
+      if (obj == this) {
+        return true;
+      }
 
-    if (obj == null
-        || !(obj instanceof AuthorizationComponent)) {
-      return false;
-    }
+      if (obj == null
+          || !(obj instanceof AuthorizationComponent)) {
+        return false;
+      }
 
-    return ((AuthorizationComponent)obj).
-              getWasbAbsolutePath().equals(this.wasbAbsolutePath)
-            && ((AuthorizationComponent)obj).
-              getAccessType().equals(this.accessType);
-  }
+      return ((AuthorizationComponent) obj).
+          getWasbAbsolutePath().equals(this.wasbAbsolutePath)
+          && ((AuthorizationComponent) obj).
+          getAccessType().equals(this.accessType);
+    }
 
-  public String getWasbAbsolutePath() {
-    return this.wasbAbsolutePath;
-  }
+    public String getWasbAbsolutePath() {
+      return this.wasbAbsolutePath;
+    }
 
-  public String getAccessType() {
-    return accessType;
+    public String getAccessType() {
+      return accessType;
+    }
   }
-}
\ No newline at end of file
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org