You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cn...@apache.org on 2014/10/08 23:24:39 UTC

[2/5] HADOOP-10809. hadoop-azure: page blob support. Contributed by Dexter Bradshaw, Mostafa Elhemali, Eric Hanson, and Mike Liddell.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2217e2f8/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/MockStorageInterface.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/MockStorageInterface.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/MockStorageInterface.java
index b03997c..047ea1b 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/MockStorageInterface.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/MockStorageInterface.java
@@ -33,7 +33,7 @@ import java.util.TimeZone;
 
 import org.apache.commons.httpclient.URIException;
 import org.apache.commons.httpclient.util.URIUtil;
-import org.apache.commons.io.output.ByteArrayOutputStream;
+import org.apache.commons.lang.NotImplementedException;
 
 import com.microsoft.windowsazure.storage.CloudStorageAccount;
 import com.microsoft.windowsazure.storage.OperationContext;
@@ -44,10 +44,15 @@ import com.microsoft.windowsazure.storage.StorageUri;
 import com.microsoft.windowsazure.storage.blob.BlobListingDetails;
 import com.microsoft.windowsazure.storage.blob.BlobProperties;
 import com.microsoft.windowsazure.storage.blob.BlobRequestOptions;
+import com.microsoft.windowsazure.storage.blob.CloudBlob;
 import com.microsoft.windowsazure.storage.blob.CloudBlobContainer;
 import com.microsoft.windowsazure.storage.blob.CloudBlobDirectory;
 import com.microsoft.windowsazure.storage.blob.CopyState;
 import com.microsoft.windowsazure.storage.blob.ListBlobItem;
+import com.microsoft.windowsazure.storage.blob.PageRange;
+
+import javax.ws.rs.core.UriBuilder;
+import javax.ws.rs.core.UriBuilderException;
 
 /**
  * A mock implementation of the Azure Storage interaction layer for unit tests.
@@ -55,7 +60,8 @@ import com.microsoft.windowsazure.storage.blob.ListBlobItem;
  */
 public class MockStorageInterface extends StorageInterface {
   private InMemoryBlockBlobStore backingStore;
-  private final ArrayList<PreExistingContainer> preExistingContainers = new ArrayList<MockStorageInterface.PreExistingContainer>();
+  private final ArrayList<PreExistingContainer> preExistingContainers =
+      new ArrayList<MockStorageInterface.PreExistingContainer>();
   private String baseUriString;
 
   public InMemoryBlockBlobStore getBackingStore() {
@@ -107,6 +113,33 @@ public class MockStorageInterface extends StorageInterface {
     return null;
   }
 
+  /**
+   * Utility function used to convert a given URI to a decoded string
+   * representation sent to the backing store. URIs coming as input
+   * to this class will be encoded by the URI class, and we want
+   * the underlying storage to store keys in their original UTF-8 form.
+   */
+  private static String convertUriToDecodedString(URI uri) {
+    try {
+      String result = URIUtil.decode(uri.toString());
+      return result;
+    } catch (URIException e) {
+      throw new AssertionError("Failed to decode URI: " + uri.toString());
+    }
+  }
+
+  private static URI convertKeyToEncodedUri(String key) {
+    try {
+      String encodedKey = URIUtil.encodePath(key);
+      URI uri = new URI(encodedKey);
+      return uri;
+    } catch (URISyntaxException e) {
+      throw new AssertionError("Failed to encode key: " + key);
+    } catch (URIException e) {
+      throw new AssertionError("Failed to encode key: " + key);
+    }
+  }
+
   @Override
   public CloudBlobContainerWrapper getContainerReference(String name)
       throws URISyntaxException, StorageException {
@@ -196,6 +229,12 @@ public class MockStorageInterface extends StorageInterface {
           false)), null, 0);
     }
 
+    @Override
+    public CloudPageBlobWrapper getPageBlobReference(String blobAddressUri)
+        throws URISyntaxException, StorageException {
+      return new MockCloudPageBlobWrapper(new URI(blobAddressUri), null, 0);
+    }
+
     // helper to create full URIs for directory and blob.
     // use withTrailingSlash=true to get a good path for a directory.
     private String fullUriString(String relativePath, boolean withTrailingSlash) {
@@ -260,24 +299,41 @@ public class MockStorageInterface extends StorageInterface {
         BlobRequestOptions options, OperationContext opContext)
         throws URISyntaxException, StorageException {
       ArrayList<ListBlobItem> ret = new ArrayList<ListBlobItem>();
-      String fullPrefix = prefix == null ? uri.toString() : new URI(
-          uri.getScheme(), uri.getAuthority(), uri.getPath() + prefix,
-          uri.getQuery(), uri.getFragment()).toString();
-      boolean includeMetadata = listingDetails
-          .contains(BlobListingDetails.METADATA);
+      URI searchUri = null;
+      if (prefix == null) {
+        searchUri = uri;
+      } else {
+        try {
+          searchUri = UriBuilder.fromUri(uri).path(prefix).build();
+        } catch (UriBuilderException e) {
+          throw new AssertionError("Failed to encode path: " + prefix);
+        }
+      }
+
+      String fullPrefix = convertUriToDecodedString(searchUri);
+      boolean includeMetadata = listingDetails.contains(BlobListingDetails.METADATA);
       HashSet<String> addedDirectories = new HashSet<String>();
-      for (InMemoryBlockBlobStore.ListBlobEntry current : backingStore
-          .listBlobs(fullPrefix, includeMetadata)) {
+      for (InMemoryBlockBlobStore.ListBlobEntry current : backingStore.listBlobs(
+          fullPrefix, includeMetadata)) {
         int indexOfSlash = current.getKey().indexOf('/', fullPrefix.length());
         if (useFlatBlobListing || indexOfSlash < 0) {
-          ret.add(new MockCloudBlockBlobWrapper(new URI(current.getKey()),
-              current.getMetadata(), current.getContentLength()));
+          if (current.isPageBlob()) {
+            ret.add(new MockCloudPageBlobWrapper(
+                convertKeyToEncodedUri(current.getKey()),
+                current.getMetadata(),
+                current.getContentLength()));
+          } else {
+          ret.add(new MockCloudBlockBlobWrapper(
+              convertKeyToEncodedUri(current.getKey()),
+              current.getMetadata(),
+              current.getContentLength()));
+          }
         } else {
           String directoryName = current.getKey().substring(0, indexOfSlash);
           if (!addedDirectories.contains(directoryName)) {
             addedDirectories.add(current.getKey());
-            ret.add(new MockCloudBlobDirectoryWrapper(new URI(directoryName
-                + "/")));
+            ret.add(new MockCloudBlobDirectoryWrapper(new URI(
+                directoryName + "/")));
           }
         }
       }
@@ -286,35 +342,35 @@ public class MockStorageInterface extends StorageInterface {
 
     @Override
     public StorageUri getStorageUri() {
-      throw new UnsupportedOperationException();
+      throw new NotImplementedException();
     }
-
   }
 
-  class MockCloudBlockBlobWrapper extends CloudBlockBlobWrapper {
-    private URI uri;
-    private HashMap<String, String> metadata = new HashMap<String, String>();
-    private BlobProperties properties;
+  abstract class MockCloudBlobWrapper implements CloudBlobWrapper {
+    protected final URI uri;
+    protected HashMap<String, String> metadata =
+        new HashMap<String, String>();
+    protected BlobProperties properties;
 
-    public MockCloudBlockBlobWrapper(URI uri, HashMap<String, String> metadata,
+    protected MockCloudBlobWrapper(URI uri, HashMap<String, String> metadata,
         int length) {
       this.uri = uri;
       this.metadata = metadata;
       this.properties = new BlobProperties();
       this.properties.setLength(length);
-      this.properties.setLastModified(Calendar.getInstance(
-          TimeZone.getTimeZone("UTC")).getTime());
+      this.properties.setLastModified(
+          Calendar.getInstance(TimeZone.getTimeZone("UTC")).getTime());
     }
 
-    private void refreshProperties(boolean getMetadata) {
-      if (backingStore.exists(uri.toString())) {
-        byte[] content = backingStore.getContent(uri.toString());
+    protected void refreshProperties(boolean getMetadata) {
+      if (backingStore.exists(convertUriToDecodedString(uri))) {
+        byte[] content = backingStore.getContent(convertUriToDecodedString(uri));
         properties = new BlobProperties();
         properties.setLength(content.length);
-        properties.setLastModified(Calendar.getInstance(
-            TimeZone.getTimeZone("UTC")).getTime());
+        properties.setLastModified(
+            Calendar.getInstance(TimeZone.getTimeZone("UTC")).getTime());
         if (getMetadata) {
-          metadata = backingStore.getMetadata(uri.toString());
+          metadata = backingStore.getMetadata(convertUriToDecodedString(uri));
         }
       }
     }
@@ -347,26 +403,27 @@ public class MockStorageInterface extends StorageInterface {
     }
 
     @Override
-    public void startCopyFromBlob(CloudBlockBlobWrapper sourceBlob,
+    public void startCopyFromBlob(URI source,
         OperationContext opContext) throws StorageException, URISyntaxException {
-      backingStore.copy(sourceBlob.getUri().toString(), uri.toString());
-      // it would be best if backingStore.properties.CopyState were tracked
-      // If implemented, update azureNativeFileSystemStore.waitForCopyToComplete
+      backingStore.copy(convertUriToDecodedString(source), convertUriToDecodedString(uri));
+      //TODO: set the backingStore.properties.CopyState and
+      //      update azureNativeFileSystemStore.waitForCopyToComplete
     }
 
     @Override
     public CopyState getCopyState() {
-      return this.properties.getCopyState();
+       return this.properties.getCopyState();
     }
 
     @Override
-    public void delete(OperationContext opContext) throws StorageException {
-      backingStore.delete(uri.toString());
+    public void delete(OperationContext opContext, SelfRenewingLease lease)
+        throws StorageException {
+      backingStore.delete(convertUriToDecodedString(uri));
     }
 
     @Override
     public boolean exists(OperationContext opContext) throws StorageException {
-      return backingStore.exists(uri.toString());
+      return backingStore.exists(convertUriToDecodedString(uri));
     }
 
     @Override
@@ -383,37 +440,90 @@ public class MockStorageInterface extends StorageInterface {
     @Override
     public InputStream openInputStream(BlobRequestOptions options,
         OperationContext opContext) throws StorageException {
-      return new ByteArrayInputStream(backingStore.getContent(uri.toString()));
+      return new ByteArrayInputStream(
+          backingStore.getContent(convertUriToDecodedString(uri)));
+    }
+
+    @Override
+    public void uploadMetadata(OperationContext opContext)
+        throws StorageException {
+      backingStore.setMetadata(convertUriToDecodedString(uri), metadata);
+    }
+
+    @Override
+    public void downloadRange(long offset, long length, OutputStream os,
+        BlobRequestOptions options, OperationContext opContext)
+        throws StorageException {
+      throw new NotImplementedException();
+    }
+  }
+
+  class MockCloudBlockBlobWrapper extends MockCloudBlobWrapper
+    implements CloudBlockBlobWrapper {
+    public MockCloudBlockBlobWrapper(URI uri, HashMap<String, String> metadata,
+        int length) {
+      super(uri, metadata, length);
     }
 
     @Override
     public OutputStream openOutputStream(BlobRequestOptions options,
         OperationContext opContext) throws StorageException {
-      return backingStore.upload(uri.toString(), metadata);
+      return backingStore.uploadBlockBlob(convertUriToDecodedString(uri),
+          metadata);
     }
 
     @Override
-    public void upload(InputStream sourceStream, OperationContext opContext)
-        throws StorageException, IOException {
-      ByteArrayOutputStream allContent = new ByteArrayOutputStream();
-      allContent.write(sourceStream);
-      backingStore.setContent(uri.toString(), allContent.toByteArray(),
-          metadata);
-      refreshProperties(false);
-      allContent.close();
+    public void setStreamMinimumReadSizeInBytes(int minimumReadSizeBytes) {
     }
 
     @Override
-    public void uploadMetadata(OperationContext opContext)
-        throws StorageException {
-      backingStore.setContent(uri.toString(),
-          backingStore.getContent(uri.toString()), metadata);
+    public void setWriteBlockSizeInBytes(int writeBlockSizeBytes) {
     }
 
     @Override
-    public void uploadProperties(OperationContext opContext)
-        throws StorageException {
-      refreshProperties(false);
+    public StorageUri getStorageUri() {
+      return null;
+    }
+
+    @Override
+    public void uploadProperties(OperationContext context, SelfRenewingLease lease) {
+    }
+
+    @Override
+    public SelfRenewingLease acquireLease() {
+      return null;
+    }
+
+    @Override
+    public CloudBlob getBlob() {
+      return null;
+    }
+  }
+
+  class MockCloudPageBlobWrapper extends MockCloudBlobWrapper
+    implements CloudPageBlobWrapper {
+    public MockCloudPageBlobWrapper(URI uri, HashMap<String, String> metadata,
+        int length) {
+      super(uri, metadata, length);
+    }
+
+    @Override
+    public void create(long length, BlobRequestOptions options,
+        OperationContext opContext) throws StorageException {
+      throw new NotImplementedException();
+    }
+
+    @Override
+    public void uploadPages(InputStream sourceStream, long offset, long length,
+        BlobRequestOptions options, OperationContext opContext)
+        throws StorageException, IOException {
+      throw new NotImplementedException();
+    }
+
+    @Override
+    public ArrayList<PageRange> downloadPageRanges(BlobRequestOptions options,
+        OperationContext opContext) throws StorageException {
+      throw new NotImplementedException();
     }
 
     @Override
@@ -426,8 +536,23 @@ public class MockStorageInterface extends StorageInterface {
 
     @Override
     public StorageUri getStorageUri() {
-      throw new UnsupportedOperationException();
+        throw new NotImplementedException();
     }
 
+    @Override
+    public void uploadProperties(OperationContext opContext,
+        SelfRenewingLease lease)
+        throws StorageException {
+    }
+
+    @Override
+    public SelfRenewingLease acquireLease() {
+      return null;
+    }
+
+    @Override
+    public CloudBlob getBlob() {
+      return null;
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2217e2f8/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/NativeAzureFileSystemBaseTest.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/NativeAzureFileSystemBaseTest.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/NativeAzureFileSystemBaseTest.java
index e731b21..01cf713 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/NativeAzureFileSystemBaseTest.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/NativeAzureFileSystemBaseTest.java
@@ -22,6 +22,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 import static org.junit.Assume.assumeNotNull;
 
 import java.io.BufferedReader;
@@ -32,10 +33,15 @@ import java.io.InputStream;
 import java.io.InputStreamReader;
 import java.io.OutputStream;
 import java.io.OutputStreamWriter;
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Calendar;
 import java.util.Date;
 import java.util.TimeZone;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
@@ -49,6 +55,13 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
+import org.apache.hadoop.fs.azure.AzureException;
+import org.apache.hadoop.fs.azure.NativeAzureFileSystem.FolderRenamePending;
+
+import com.microsoft.windowsazure.storage.AccessCondition;
+import com.microsoft.windowsazure.storage.StorageException;
+import com.microsoft.windowsazure.storage.blob.CloudBlob;
+
 /*
  * Tests the Native Azure file system (WASB) against an actual blob store if
  * provided in the environment.
@@ -59,12 +72,13 @@ import org.junit.Test;
  */
 public abstract class NativeAzureFileSystemBaseTest {
 
-  private FileSystem fs;
+  protected FileSystem fs;
   private AzureBlobStorageTestAccount testAccount;
   private final long modifiedTimeErrorMargin = 5 * 1000; // Give it +/-5 seconds
 
-  protected abstract AzureBlobStorageTestAccount createTestAccount()
-      throws Exception;
+  protected abstract AzureBlobStorageTestAccount createTestAccount() throws Exception;
+
+  public static final Log LOG = LogFactory.getLog(NativeAzureFileSystemBaseTest.class);
 
   @Before
   public void setUp() throws Exception {
@@ -140,7 +154,7 @@ public abstract class NativeAzureFileSystemBaseTest {
   private void testOwnership(Path pathUnderTest) throws IOException {
     FileStatus ret = fs.getFileStatus(pathUnderTest);
     UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
-    assertEquals(ret.getOwner(), currentUser.getShortUserName());
+    assertTrue(ret.getOwner().equals(currentUser.getShortUserName()));
     fs.delete(pathUnderTest, true);
   }
 
@@ -177,18 +191,29 @@ public abstract class NativeAzureFileSystemBaseTest {
     fs.delete(testFolder, true);
   }
 
-  @Test
-  public void testDeepFileCreation() throws Exception {
-    Path testFile = new Path("deep/file/creation/test");
-    FsPermission permission = FsPermission.createImmutable((short) 644);
+  void testDeepFileCreationBase(String testFilePath, String firstDirPath, String middleDirPath,
+          short permissionShort, short umaskedPermissionShort) throws Exception  {
+    Path testFile = new Path(testFilePath);
+    Path firstDir = new Path(firstDirPath);
+    Path middleDir = new Path(middleDirPath);
+    FsPermission permission = FsPermission.createImmutable(permissionShort);
+    FsPermission umaskedPermission = FsPermission.createImmutable(umaskedPermissionShort);
+
     createEmptyFile(testFile, permission);
+    FsPermission rootPerm = fs.getFileStatus(firstDir.getParent()).getPermission();
+    FsPermission inheritPerm = FsPermission.createImmutable((short)(rootPerm.toShort() | 0300));
     assertTrue(fs.exists(testFile));
-    assertTrue(fs.exists(new Path("deep")));
-    assertTrue(fs.exists(new Path("deep/file/creation")));
-    FileStatus ret = fs.getFileStatus(new Path("deep/file"));
-    assertTrue(ret.isDirectory());
-    assertEqualsIgnoreStickyBit(permission, ret.getPermission());
-    assertTrue(fs.delete(new Path("deep"), true));
+    assertTrue(fs.exists(firstDir));
+    assertTrue(fs.exists(middleDir));
+    // verify that the indirectly created directory inherited its permissions from the root directory
+    FileStatus directoryStatus = fs.getFileStatus(middleDir);
+    assertTrue(directoryStatus.isDirectory());
+    assertEqualsIgnoreStickyBit(inheritPerm, directoryStatus.getPermission());
+    // verify that the file itself has the permissions as specified
+    FileStatus fileStatus = fs.getFileStatus(testFile);
+    assertFalse(fileStatus.isDirectory());
+    assertEqualsIgnoreStickyBit(umaskedPermission, fileStatus.getPermission());
+    assertTrue(fs.delete(firstDir, true));
     assertFalse(fs.exists(testFile));
 
     // An alternative test scenario would've been to delete the file first,
@@ -196,6 +221,22 @@ public abstract class NativeAzureFileSystemBaseTest {
     // doesn't actually work as expected right now.
   }
 
+  @Test
+  public void testDeepFileCreation() throws Exception {
+    // normal permissions in user home
+    testDeepFileCreationBase("deep/file/creation/test", "deep", "deep/file/creation", (short)0644, (short)0644);
+    // extra permissions in user home. umask will change the actual permissions.
+    testDeepFileCreationBase("deep/file/creation/test", "deep", "deep/file/creation", (short)0777, (short)0755);
+    // normal permissions in root
+    testDeepFileCreationBase("/deep/file/creation/test", "/deep", "/deep/file/creation", (short)0644, (short)0644);
+    // less permissions in root
+    testDeepFileCreationBase("/deep/file/creation/test", "/deep", "/deep/file/creation", (short)0700, (short)0700);
+    // one indirectly created directory in root
+    testDeepFileCreationBase("/deep/file", "/deep", "/deep", (short)0644, (short)0644);
+    // one indirectly created directory in user home
+    testDeepFileCreationBase("deep/file", "deep", "deep", (short)0644, (short)0644);
+  }
+
   private static enum RenameVariation {
     NormalFileName, SourceInAFolder, SourceWithSpace, SourceWithPlusAndPercent
   }
@@ -206,20 +247,20 @@ public abstract class NativeAzureFileSystemBaseTest {
       System.out.printf("Rename variation: %s\n", variation);
       Path originalFile;
       switch (variation) {
-      case NormalFileName:
-        originalFile = new Path("fileToRename");
-        break;
-      case SourceInAFolder:
-        originalFile = new Path("file/to/rename");
-        break;
-      case SourceWithSpace:
-        originalFile = new Path("file to rename");
-        break;
-      case SourceWithPlusAndPercent:
-        originalFile = new Path("file+to%rename");
-        break;
-      default:
-        throw new Exception("Unknown variation");
+        case NormalFileName:
+          originalFile = new Path("fileToRename");
+          break;
+        case SourceInAFolder:
+          originalFile = new Path("file/to/rename");
+          break;
+        case SourceWithSpace:
+          originalFile = new Path("file to rename");
+          break;
+        case SourceWithPlusAndPercent:
+          originalFile = new Path("file+to%rename");
+          break;
+        default:
+          throw new Exception("Unknown variation");
       }
       Path destinationFile = new Path("file/resting/destination");
       assertTrue(fs.createNewFile(originalFile));
@@ -227,7 +268,8 @@ public abstract class NativeAzureFileSystemBaseTest {
       assertFalse(fs.rename(originalFile, destinationFile)); // Parent directory
       // doesn't exist
       assertTrue(fs.mkdirs(destinationFile.getParent()));
-      assertTrue(fs.rename(originalFile, destinationFile));
+      boolean result = fs.rename(originalFile, destinationFile);
+      assertTrue(result);
       assertTrue(fs.exists(destinationFile));
       assertFalse(fs.exists(originalFile));
       fs.delete(destinationFile.getParent(), true);
@@ -239,10 +281,10 @@ public abstract class NativeAzureFileSystemBaseTest {
     Path testFile = new Path("deep/file/rename/test");
     FsPermission permission = FsPermission.createImmutable((short) 644);
     createEmptyFile(testFile, permission);
-    assertTrue(fs.rename(new Path("deep/file"), new Path("deep/renamed")));
+    boolean renameResult = fs.rename(new Path("deep/file"), new Path("deep/renamed"));
+    assertTrue(renameResult);
     assertFalse(fs.exists(testFile));
-    FileStatus newStatus = fs
-        .getFileStatus(new Path("deep/renamed/rename/test"));
+    FileStatus newStatus = fs.getFileStatus(new Path("deep/renamed/rename/test"));
     assertNotNull(newStatus);
     assertEqualsIgnoreStickyBit(permission, newStatus.getPermission());
     assertTrue(fs.delete(new Path("deep"), true));
@@ -256,21 +298,25 @@ public abstract class NativeAzureFileSystemBaseTest {
   public void testRenameFolder() throws Exception {
     for (RenameFolderVariation variation : RenameFolderVariation.values()) {
       Path originalFolder = new Path("folderToRename");
-      if (variation != RenameFolderVariation.CreateJustInnerFile){
+      if (variation != RenameFolderVariation.CreateJustInnerFile) {
         assertTrue(fs.mkdirs(originalFolder));
       }
       Path innerFile = new Path(originalFolder, "innerFile");
-      if (variation != RenameFolderVariation.CreateJustFolder){
+      Path innerFile2 = new Path(originalFolder, "innerFile2");
+      if (variation != RenameFolderVariation.CreateJustFolder) {
         assertTrue(fs.createNewFile(innerFile));
+        assertTrue(fs.createNewFile(innerFile2));
       }
       Path destination = new Path("renamedFolder");
       assertTrue(fs.rename(originalFolder, destination));
       assertTrue(fs.exists(destination));
-      if (variation != RenameFolderVariation.CreateJustFolder){
+      if (variation != RenameFolderVariation.CreateJustFolder) {
         assertTrue(fs.exists(new Path(destination, innerFile.getName())));
+        assertTrue(fs.exists(new Path(destination, innerFile2.getName())));
       }
       assertFalse(fs.exists(originalFolder));
       assertFalse(fs.exists(innerFile));
+      assertFalse(fs.exists(innerFile2));
       fs.delete(destination, true);
     }
   }
@@ -365,6 +411,43 @@ public abstract class NativeAzureFileSystemBaseTest {
   }
 
   @Test
+  public void testChineseCharacters() throws Exception {
+    // Create a file and a folder with Chinese (non-ASCI) characters
+    String chinese = "" + '\u963f' + '\u4db5';
+    String fileName = "filename" + chinese;
+    String directoryName = chinese;
+    fs.create(new Path(directoryName, fileName)).close();
+    FileStatus[] listing = fs.listStatus(new Path(directoryName));
+    assertEquals(1, listing.length);
+    assertEquals(fileName, listing[0].getPath().getName());
+    FileStatus status = fs.getFileStatus(new Path(directoryName, fileName));
+    assertEquals(fileName, status.getPath().getName());
+    InputStream stream = fs.open(new Path(directoryName, fileName));
+    assertNotNull(stream);
+    stream.close();
+    assertTrue(fs.delete(new Path(directoryName, fileName), true));
+    assertTrue(fs.delete(new Path(directoryName), true));
+  }
+
+  @Test
+  public void testChineseCharactersFolderRename() throws Exception {
+    // Create a file and a folder with Chinese (non-ASCI) characters
+    String chinese = "" + '\u963f' + '\u4db5';
+    String fileName = "filename" + chinese;
+    String srcDirectoryName = chinese;
+    String targetDirectoryName = "target" + chinese;
+    fs.create(new Path(srcDirectoryName, fileName)).close();
+    fs.rename(new Path(srcDirectoryName), new Path(targetDirectoryName));
+    FileStatus[] listing = fs.listStatus(new Path(targetDirectoryName));
+    assertEquals(1, listing.length);
+    assertEquals(fileName, listing[0].getPath().getName());
+    FileStatus status = fs.getFileStatus(new Path(targetDirectoryName, fileName));
+    assertEquals(fileName, status.getPath().getName());
+    assertTrue(fs.delete(new Path(targetDirectoryName, fileName), true));
+    assertTrue(fs.delete(new Path(targetDirectoryName), true));
+  }
+
+  @Test
   public void testReadingDirectoryAsFile() throws Exception {
     Path dir = new Path("/x");
     assertTrue(fs.mkdirs(dir));
@@ -403,7 +486,12 @@ public abstract class NativeAzureFileSystemBaseTest {
     assertEquals("supergroup", newStatus.getGroup());
     assertEquals(UserGroupInformation.getCurrentUser().getShortUserName(),
         newStatus.getOwner());
-    assertEquals(1, newStatus.getLen());
+
+    // Don't check the file length for page blobs. Only block blobs
+    // provide the actual length of bytes written.
+    if (!(this instanceof TestNativeAzureFSPageBlobLive)) {
+      assertEquals(1, newStatus.getLen());
+    }
   }
 
   @Test
@@ -429,7 +517,12 @@ public abstract class NativeAzureFileSystemBaseTest {
     assertNotNull(newStatus);
     assertEquals("newUser", newStatus.getOwner());
     assertEquals("supergroup", newStatus.getGroup());
-    assertEquals(1, newStatus.getLen());
+
+    // File length is only reported to be the size of bytes written to the file for block blobs.
+    // So only check it for block blobs, not page blobs.
+    if (!(this instanceof TestNativeAzureFSPageBlobLive)) {
+      assertEquals(1, newStatus.getLen());
+    }
     fs.setOwner(newFile, null, "newGroup");
     newStatus = fs.getFileStatus(newFile);
     assertNotNull(newStatus);
@@ -506,14 +599,570 @@ public abstract class NativeAzureFileSystemBaseTest {
     testModifiedTime(destFolder);
   }
 
+  /**
+   * Verify we can get file status of a directory with various forms of
+   * the directory file name, including the nonstandard but legal form
+   * ending in "/.". Check that we're getting status for a directory.
+   */
   @Test
   public void testListSlash() throws Exception {
     Path testFolder = new Path("/testFolder");
     Path testFile = new Path(testFolder, "testFile");
     assertTrue(fs.mkdirs(testFolder));
     assertTrue(fs.createNewFile(testFile));
-    FileStatus status = fs.getFileStatus(new Path("/testFolder/."));
-    assertNotNull(status);
+    FileStatus status;
+    status = fs.getFileStatus(new Path("/testFolder"));
+    assertTrue(status.isDirectory());
+    status = fs.getFileStatus(new Path("/testFolder/"));
+    assertTrue(status.isDirectory());
+    status = fs.getFileStatus(new Path("/testFolder/."));
+    assertTrue(status.isDirectory());
+  }
+
+  @Test
+  public void testCannotCreatePageBlobByDefault() throws Exception {
+
+    // Verify that the page blob directory list configuration setting
+    // is not set in the default configuration.
+    Configuration conf = new Configuration();
+    String[] rawPageBlobDirs =
+        conf.getStrings(AzureNativeFileSystemStore.KEY_PAGE_BLOB_DIRECTORIES);
+    assertTrue(rawPageBlobDirs == null);
+  }
+
+  /*
+   * Set up a situation where a folder rename is partway finished.
+   * Then apply redo to finish the rename.
+   *
+   * The original source folder *would* have had contents
+   * folderToRename  (0 byte dummy file for directory)
+   * folderToRename/innerFile
+   * folderToRename/innerFile2
+   *
+   * The actual source folder (after partial rename and failure)
+   *
+   * folderToRename
+   * folderToRename/innerFile2
+   *
+   * The actual target folder (after partial rename and failure)
+   *
+   * renamedFolder
+   * renamedFolder/innerFile
+   */
+  @Test
+  public void testRedoRenameFolder() throws IOException {
+    // create original folder
+    String srcKey = "folderToRename";
+    Path originalFolder = new Path(srcKey);
+    assertTrue(fs.mkdirs(originalFolder));
+    Path innerFile = new Path(originalFolder, "innerFile");
+    assertTrue(fs.createNewFile(innerFile));
+    Path innerFile2 = new Path(originalFolder, "innerFile2");
+    assertTrue(fs.createNewFile(innerFile2));
+
+    String dstKey = "renamedFolder";
+
+    // propose (but don't do) the rename
+    Path home = fs.getHomeDirectory();
+    String relativeHomeDir = getRelativePath(home.toString());
+    NativeAzureFileSystem.FolderRenamePending pending =
+        new NativeAzureFileSystem.FolderRenamePending(
+            relativeHomeDir + "/" + srcKey,
+            relativeHomeDir + "/" + dstKey, null,
+            (NativeAzureFileSystem) fs);
+
+    // get the rename pending file contents
+    String renameDescription = pending.makeRenamePendingFileContents();
+
+    // Remove one file from source folder to simulate a partially done
+    // rename operation.
+    assertTrue(fs.delete(innerFile, false));
+
+    // Create the destination folder with just one file in it, again
+    // to simulate a partially done rename.
+    Path destination = new Path(dstKey);
+    Path innerDest = new Path(destination, "innerFile");
+    assertTrue(fs.createNewFile(innerDest));
+
+    // Create a rename-pending file and write rename information to it.
+    final String renamePendingStr = "folderToRename-RenamePending.json";
+    Path renamePendingFile = new Path(renamePendingStr);
+    FSDataOutputStream out = fs.create(renamePendingFile, true);
+    assertTrue(out != null);
+    writeString(out, renameDescription);
+
+    // Redo the rename operation based on the contents of the -RenamePending.json file.
+    // Trigger the redo by checking for existence of the original folder. It must appear
+    // to not exist.
+    assertFalse(fs.exists(originalFolder));
+
+    // Verify that the target is there, and the source is gone.
+    assertTrue(fs.exists(destination));
+    assertTrue(fs.exists(new Path(destination, innerFile.getName())));
+    assertTrue(fs.exists(new Path(destination, innerFile2.getName())));
+    assertFalse(fs.exists(originalFolder));
+    assertFalse(fs.exists(innerFile));
+    assertFalse(fs.exists(innerFile2));
+
+    // Verify that there's no RenamePending file left.
+    assertFalse(fs.exists(renamePendingFile));
+
+    // Verify that we can list the target directory.
+    FileStatus[] listed = fs.listStatus(destination);
+    assertEquals(2, listed.length);
+
+    // List the home directory and show the contents is a directory.
+    Path root = fs.getHomeDirectory();
+    listed = fs.listStatus(root);
+    assertEquals(1, listed.length);
+    assertTrue(listed[0].isDirectory());
+  }
+
+  /**
+   * If there is a folder to be renamed inside a parent folder,
+   * then when you list the parent folder, you should only see
+   * the final result, after the rename.
+   */
+  @Test
+  public void testRedoRenameFolderInFolderListing() throws IOException {
+
+    // create original folder
+    String parent = "parent";
+    Path parentFolder = new Path(parent);
+    assertTrue(fs.mkdirs(parentFolder));
+    Path inner = new Path(parentFolder, "innerFolder");
+    assertTrue(fs.mkdirs(inner));
+    Path inner2 = new Path(parentFolder, "innerFolder2");
+    assertTrue(fs.mkdirs(inner2));
+    Path innerFile = new Path(inner2, "file");
+    assertTrue(fs.createNewFile(innerFile));
+
+    Path inner2renamed = new Path(parentFolder, "innerFolder2Renamed");
+
+    // propose (but don't do) the rename of innerFolder2
+    Path home = fs.getHomeDirectory();
+    String relativeHomeDir = getRelativePath(home.toString());
+    NativeAzureFileSystem.FolderRenamePending pending =
+        new NativeAzureFileSystem.FolderRenamePending(
+            relativeHomeDir + "/" + inner2,
+            relativeHomeDir + "/" + inner2renamed, null,
+            (NativeAzureFileSystem) fs);
+
+    // Create a rename-pending file and write rename information to it.
+    final String renamePendingStr = inner2 + FolderRenamePending.SUFFIX;
+    Path renamePendingFile = new Path(renamePendingStr);
+    FSDataOutputStream out = fs.create(renamePendingFile, true);
+    assertTrue(out != null);
+    writeString(out, pending.makeRenamePendingFileContents());
+
+    // Redo the rename operation based on the contents of the
+    // -RenamePending.json file. Trigger the redo by checking for existence of
+    // the original folder. It must appear to not exist.
+    FileStatus[] listed = fs.listStatus(parentFolder);
+    assertEquals(2, listed.length);
+    assertTrue(listed[0].isDirectory());
+    assertTrue(listed[1].isDirectory());
+
+    // The rename pending file is not a directory, so at this point we know the
+    // redo has been done.
+    assertFalse(fs.exists(inner2)); // verify original folder is gone
+    assertTrue(fs.exists(inner2renamed)); // verify the target is there
+    assertTrue(fs.exists(new Path(inner2renamed, "file")));
+  }
+
+  /**
+   * Test the situation where a rename pending file exists but the rename
+   * is really done. This could happen if the rename process died just
+   * before deleting the rename pending file. It exercises a non-standard
+   * code path in redo().
+   */
+  @Test
+  public void testRenameRedoFolderAlreadyDone() throws IOException {
+    // create only destination folder
+    String orig = "originalFolder";
+    String dest = "renamedFolder";
+    Path destPath = new Path(dest);
+    assertTrue(fs.mkdirs(destPath));
+
+    // propose (but don't do) the rename of innerFolder2
+    Path home = fs.getHomeDirectory();
+    String relativeHomeDir = getRelativePath(home.toString());
+    NativeAzureFileSystem.FolderRenamePending pending =
+        new NativeAzureFileSystem.FolderRenamePending(
+            relativeHomeDir + "/" + orig,
+            relativeHomeDir + "/" + dest, null,
+            (NativeAzureFileSystem) fs);
+
+    // Create a rename-pending file and write rename information to it.
+    final String renamePendingStr = orig + FolderRenamePending.SUFFIX;
+    Path renamePendingFile = new Path(renamePendingStr);
+    FSDataOutputStream out = fs.create(renamePendingFile, true);
+    assertTrue(out != null);
+    writeString(out, pending.makeRenamePendingFileContents());
+
+    try {
+      pending.redo();
+    } catch (Exception e) {
+      fail();
+    }
+
+    // Make sure rename pending file is gone.
+    FileStatus[] listed = fs.listStatus(new Path("/"));
+    assertEquals(1, listed.length);
+    assertTrue(listed[0].isDirectory());
+  }
+
+  @Test
+  public void testRedoFolderRenameAll() throws IllegalArgumentException, IOException {
+    {
+      FileFolder original = new FileFolder("folderToRename");
+      original.add("innerFile").add("innerFile2");
+      FileFolder partialSrc = original.copy();
+      FileFolder partialDst = original.copy();
+      partialDst.setName("renamedFolder");
+      partialSrc.setPresent(0, false);
+      partialDst.setPresent(1, false);
+
+      testRenameRedoFolderSituation(original, partialSrc, partialDst);
+    }
+    {
+      FileFolder original = new FileFolder("folderToRename");
+      original.add("file1").add("file2").add("file3");
+      FileFolder partialSrc = original.copy();
+      FileFolder partialDst = original.copy();
+      partialDst.setName("renamedFolder");
+
+      // Set up this state before the redo:
+      // folderToRename: file1       file3
+      // renamedFolder:  file1 file2
+      // This gives code coverage for all 3 expected cases for individual file
+      // redo.
+      partialSrc.setPresent(1, false);
+      partialDst.setPresent(2, false);
+
+      testRenameRedoFolderSituation(original, partialSrc, partialDst);
+    }
+    {
+      // Simulate a situation with folder with a large number of files in it.
+      // For the first half of the files, they will be in the destination
+      // but not the source. For the second half, they will be in the source
+      // but not the destination. There will be one file in the middle that is
+      // in both source and destination. Then trigger redo and verify.
+      // For testing larger folder sizes, manually change this, temporarily, and
+      // edit the SIZE value.
+      final int SIZE = 5;
+      assertTrue(SIZE >= 3);
+      // Try a lot of files in the folder.
+      FileFolder original = new FileFolder("folderToRename");
+      for (int i = 0; i < SIZE; i++) {
+        original.add("file" + Integer.toString(i));
+      }
+      FileFolder partialSrc = original.copy();
+      FileFolder partialDst = original.copy();
+      partialDst.setName("renamedFolder");
+      for (int i = 0; i < SIZE; i++) {
+        partialSrc.setPresent(i, i >= SIZE / 2);
+        partialDst.setPresent(i, i <= SIZE / 2);
+      }
+
+      testRenameRedoFolderSituation(original, partialSrc, partialDst);
+    }
+    {
+      // Do a nested folder, like so:
+      // folderToRename:
+      //   nestedFolder: a, b, c
+      //   p
+      //   q
+      //
+      // Then delete file 'a' from the source and add it to destination.
+      // Then trigger redo.
+
+      FileFolder original = new FileFolder("folderToRename");
+      FileFolder nested = new FileFolder("nestedFolder");
+      nested.add("a").add("b").add("c");
+      original.add(nested).add("p").add("q");
+
+      FileFolder partialSrc = original.copy();
+      FileFolder partialDst = original.copy();
+      partialDst.setName("renamedFolder");
+
+      // logically remove 'a' from source
+      partialSrc.getMember(0).setPresent(0, false);
+
+      // logically eliminate b, c from destination
+      partialDst.getMember(0).setPresent(1, false);
+      partialDst.getMember(0).setPresent(2, false);
+
+      testRenameRedoFolderSituation(original, partialSrc, partialDst);
+    }
+  }
+
+  private void testRenameRedoFolderSituation(
+      FileFolder fullSrc,
+      FileFolder partialSrc,
+      FileFolder partialDst) throws IllegalArgumentException, IOException {
+
+    // make file folder tree for source
+    fullSrc.create();
+
+    // set up rename pending file
+    fullSrc.makeRenamePending(partialDst);
+
+    // prune away some files (as marked) from source to simulate partial rename
+    partialSrc.prune();
+
+    // Create only the files indicated for the destination to indicate a partial rename.
+    partialDst.create();
+
+    // trigger redo
+    assertFalse(fullSrc.exists());
+
+    // verify correct results
+    partialDst.verifyExists();
+    fullSrc.verifyGone();
+
+    // delete the new folder to leave no garbage behind
+    fs.delete(new Path(partialDst.getName()), true);
+  }
+
+  // Mock up of a generalized folder (which can also be a leaf-level file)
+  // for rename redo testing.
+  private class FileFolder {
+    private String name;
+
+    // For rename testing, indicates whether an expected
+    // file is present in the source or target folder.
+    private boolean present;
+    ArrayList<FileFolder> members; // Null if a leaf file, otherwise not null.
+
+    // Make a new, empty folder (not a regular leaf file).
+    public FileFolder(String name) {
+      this.name = name;
+      this.present = true;
+      members = new ArrayList<FileFolder>();
+    }
+
+    public FileFolder getMember(int i) {
+      return members.get(i);
+    }
+
+    // Verify a folder and all its contents are gone. This is only to
+    // be called on the root of a FileFolder.
+    public void verifyGone() throws IllegalArgumentException, IOException {
+      assertFalse(fs.exists(new Path(name)));
+      assertTrue(isFolder());
+      verifyGone(new Path(name), members);
+    }
+
+    private void verifyGone(Path prefix, ArrayList<FileFolder> members2) throws IOException {
+      for (FileFolder f : members2) {
+        f.verifyGone(prefix);
+      }
+    }
+
+    private void verifyGone(Path prefix) throws IOException {
+      assertFalse(fs.exists(new Path(prefix, name)));
+      if (isLeaf()) {
+        return;
+      }
+      for (FileFolder f : members) {
+        f.verifyGone(new Path(prefix, name));
+      }
+    }
+
+    public void verifyExists() throws IllegalArgumentException, IOException {
+
+      // verify the root is present
+      assertTrue(fs.exists(new Path(name)));
+      assertTrue(isFolder());
+
+      // check the members
+      verifyExists(new Path(name), members);
+    }
+
+    private void verifyExists(Path prefix, ArrayList<FileFolder> members2) throws IOException {
+      for (FileFolder f : members2) {
+        f.verifyExists(prefix);
+      }
+    }
+
+    private void verifyExists(Path prefix) throws IOException {
+
+      // verify this file/folder is present
+      assertTrue(fs.exists(new Path(prefix, name)));
+
+      // verify members are present
+      if (isLeaf()) {
+        return;
+      }
+
+      for (FileFolder f : members) {
+        f.verifyExists(new Path(prefix, name));
+      }
+    }
+
+    public boolean exists() throws IOException {
+      return fs.exists(new Path(name));
+    }
+
+    // Make a rename pending file for the situation where we rename
+    // this object (the source) to the specified destination.
+    public void makeRenamePending(FileFolder dst) throws IOException {
+
+      // Propose (but don't do) the rename.
+      Path home = fs.getHomeDirectory();
+      String relativeHomeDir = getRelativePath(home.toString());
+      NativeAzureFileSystem.FolderRenamePending pending =
+          new NativeAzureFileSystem.FolderRenamePending(
+              relativeHomeDir + "/" + this.getName(),
+              relativeHomeDir + "/" + dst.getName(), null,
+              (NativeAzureFileSystem) fs);
+
+      // Get the rename pending file contents.
+      String renameDescription = pending.makeRenamePendingFileContents();
+
+      // Create a rename-pending file and write rename information to it.
+      final String renamePendingStr = this.getName() + "-RenamePending.json";
+      Path renamePendingFile = new Path(renamePendingStr);
+      FSDataOutputStream out = fs.create(renamePendingFile, true);
+      assertTrue(out != null);
+      writeString(out, renameDescription);
+    }
+
+    // set whether a child is present or not
+    public void setPresent(int i, boolean b) {
+      members.get(i).setPresent(b);
+    }
+
+    // Make an uninitialized folder
+    private FileFolder() {
+      this.present = true;
+    }
+
+    public void setPresent(boolean value) {
+      present = value;
+    }
+
+    public FileFolder makeLeaf(String name) {
+      FileFolder f = new FileFolder();
+      f.setName(name);
+      return f;
+    }
+
+    void setName(String name) {
+      this.name = name;
+    }
+
+    public String getName() {
+      return name;
+    }
+
+    public boolean isLeaf() {
+      return members == null;
+    }
+
+    public boolean isFolder() {
+      return members != null;
+    }
+
+    FileFolder add(FileFolder folder) {
+      members.add(folder);
+      return this;
+    }
+
+    // Add a leaf file (by convention, if you pass a string argument, you get a leaf).
+    FileFolder add(String file) {
+      FileFolder leaf = makeLeaf(file);
+      members.add(leaf);
+      return this;
+    }
+
+    public FileFolder copy() {
+      if (isLeaf()) {
+        return makeLeaf(name);
+      } else {
+        FileFolder f = new FileFolder(name);
+        for (FileFolder member : members) {
+          f.add(member.copy());
+        }
+        return f;
+      }
+    }
+
+    // Create the folder structure. Return true on success, or else false.
+    public void create() throws IllegalArgumentException, IOException {
+      create(null);
+    }
+
+    private void create(Path prefix) throws IllegalArgumentException, IOException {
+      if (isFolder()) {
+        if (present) {
+          assertTrue(fs.mkdirs(makePath(prefix, name)));
+        }
+        create(makePath(prefix, name), members);
+      } else if (isLeaf()) {
+        if (present) {
+          assertTrue(fs.createNewFile(makePath(prefix, name)));
+        }
+      } else {
+        assertTrue("The object must be a (leaf) file or a folder.", false);
+      }
+    }
+
+    private void create(Path prefix, ArrayList<FileFolder> members2) throws IllegalArgumentException, IOException {
+      for (FileFolder f : members2) {
+        f.create(prefix);
+      }
+    }
+
+    private Path makePath(Path prefix, String name) {
+      if (prefix == null) {
+        return new Path(name);
+      } else {
+        return new Path(prefix, name);
+      }
+    }
+
+    // Remove the files marked as not present.
+    public void prune() throws IOException {
+      prune(null);
+    }
+
+    private void prune(Path prefix) throws IOException {
+      Path path = null;
+      if (prefix == null) {
+        path = new Path(name);
+      } else {
+        path = new Path(prefix, name);
+      }
+      if (isLeaf() && !present) {
+        assertTrue(fs.delete(path, false));
+      } else if (isFolder() && !present) {
+        assertTrue(fs.delete(path, true));
+      } else if (isFolder()) {
+        for (FileFolder f : members) {
+          f.prune(path);
+        }
+      }
+    }
+  }
+
+  private String getRelativePath(String path) {
+    // example input: wasb://wasbtests-ehans-1404322046279@ehans9.blob.core.windows.net/user/ehans/folderToRename
+    // example result: user/ehans/folderToRename
+
+    // Find the third / position and return input substring after that.
+    int slashCount = 0; // number of slashes so far
+    int i;
+    for (i = 0; i < path.length(); i++) {
+      if (path.charAt(i) == '/') {
+        slashCount++;
+        if (slashCount == 3) {
+          return path.substring(i + 1, path.length());
+        }
+      }
+    }
+    throw new RuntimeException("Incorrect path prefix -- expected wasb://.../...");
   }
 
   @Test
@@ -523,6 +1172,84 @@ public abstract class NativeAzureFileSystemBaseTest {
     fs.close();
   }
 
+  // Test the available() method for the input stream returned by fs.open().
+  // This works for both page and block blobs.
+  int FILE_SIZE = 4 * 1024 * 1024 + 1; // Make this 1 bigger than internal
+                                       // buffer used in BlobInputStream
+                                       // to exercise that case.
+  int MAX_STRIDE = FILE_SIZE + 1;
+  Path PATH = new Path("/available.dat");
+  @Test
+  public void testAvailable() throws IOException {
+
+    // write FILE_SIZE bytes to page blob
+    FSDataOutputStream out = fs.create(PATH);
+    byte[] data = new byte[FILE_SIZE];
+    Arrays.fill(data, (byte) 5);
+    out.write(data, 0, FILE_SIZE);
+    out.close();
+
+    // Test available() for different read sizes
+    verifyAvailable(1);
+    verifyAvailable(100);
+    verifyAvailable(5000);
+    verifyAvailable(FILE_SIZE);
+    verifyAvailable(MAX_STRIDE);
+
+    fs.delete(PATH, false);
+  }
+
+  // Verify that available() for the input stream is always >= 1 unless we've
+  // consumed all the input, and then it is 0. This is to match expectations by
+  // HBase which were set based on behavior of DFSInputStream.available().
+  private void verifyAvailable(int readStride) throws IOException {
+    FSDataInputStream in = fs.open(PATH);
+    try {
+      byte[] inputBuffer = new byte[MAX_STRIDE];
+      int position = 0;
+      int bytesRead = 0;
+      while(bytesRead != FILE_SIZE) {
+        bytesRead += in.read(inputBuffer, position, readStride);
+        int available = in.available();
+        if (bytesRead < FILE_SIZE) {
+          if (available < 1) {
+            fail(String.format(
+                  "expected available > 0 but got: "
+                      + "position = %d, bytesRead = %d, in.available() = %d",
+                  position, bytesRead, available));
+          }
+        }
+      }
+      int available = in.available();
+      assertTrue(available == 0);
+    } finally {
+      in.close();
+    }
+  }
+
+  @Test
+  public void testGetFileSizeFromListing() throws IOException {
+    Path path = new Path("file.dat");
+    final int PAGE_SIZE = 512;
+    final int FILE_SIZE = PAGE_SIZE + 1;
+
+    // write FILE_SIZE bytes to page blob
+    FSDataOutputStream out = fs.create(path);
+    byte[] data = new byte[FILE_SIZE];
+    Arrays.fill(data, (byte) 5);
+    out.write(data, 0, FILE_SIZE);
+    out.close();
+
+    // list the file to get its properties
+    FileStatus[] status = fs.listStatus(path);
+    assertEquals(1, status.length);
+
+    // The file length should report the number of bytes
+    // written for either page or block blobs (subclasses
+    // of this test class will exercise both).
+    assertEquals(FILE_SIZE, status[0].getLen());
+  }
+
   private boolean testModifiedTime(Path testPath, long time) throws Exception {
     FileStatus fileStatus = fs.getFileStatus(testPath);
     final long errorMargin = modifiedTimeErrorMargin;
@@ -530,16 +1257,45 @@ public abstract class NativeAzureFileSystemBaseTest {
     return (lastModified > (time - errorMargin) && lastModified < (time + errorMargin));
   }
 
+  @SuppressWarnings("deprecation")
+  @Test
+  public void testCreateNonRecursive() throws Exception {
+    Path testFolder = new Path("/testFolder");
+    Path testFile = new Path(testFolder, "testFile");
+    try {
+      fs.createNonRecursive(testFile, true, 1024, (short)1, 1024, null);
+      assertTrue("Should've thrown", false);
+    } catch (FileNotFoundException e) {
+    }
+    fs.mkdirs(testFolder);
+    fs.createNonRecursive(testFile, true, 1024, (short)1, 1024, null)
+      .close();
+    assertTrue(fs.exists(testFile));
+  }
+
+  public void testFileEndingInDot() throws Exception {
+    Path testFolder = new Path("/testFolder.");
+    Path testFile = new Path(testFolder, "testFile.");
+    assertTrue(fs.mkdirs(testFolder));
+    assertTrue(fs.createNewFile(testFile));
+    assertTrue(fs.exists(testFile));
+    FileStatus[] listed = fs.listStatus(testFolder);
+    assertEquals(1, listed.length);
+    assertEquals("testFile.", listed[0].getPath().getName());
+  }
   private void testModifiedTime(Path testPath) throws Exception {
     Calendar utc = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
     long currentUtcTime = utc.getTime().getTime();
     FileStatus fileStatus = fs.getFileStatus(testPath);
-    assertTrue("Modification time "
-        + new Date(fileStatus.getModificationTime()) + " is not close to now: "
-        + utc.getTime(), testModifiedTime(testPath, currentUtcTime));
+    final long errorMargin = 10 * 1000; // Give it +/-10 seconds
+    assertTrue("Modification time " +
+        new Date(fileStatus.getModificationTime()) + " is not close to now: " +
+        utc.getTime(),
+        fileStatus.getModificationTime() > (currentUtcTime - errorMargin) &&
+        fileStatus.getModificationTime() < (currentUtcTime + errorMargin));
   }
 
-   private void createEmptyFile(Path testFile, FsPermission permission)
+  private void createEmptyFile(Path testFile, FsPermission permission)
       throws IOException {
     FSDataOutputStream outputStream = fs.create(testFile, permission, true,
         4096, (short) 1, 1024, null);
@@ -563,7 +1319,7 @@ public abstract class NativeAzureFileSystemBaseTest {
     final int BUFFER_SIZE = 1024;
     char[] buffer = new char[BUFFER_SIZE];
     int count = reader.read(buffer, 0, BUFFER_SIZE);
-    if (count >= BUFFER_SIZE) {
+    if (count > BUFFER_SIZE) {
       throw new IOException("Exceeded buffer size");
     }
     inputStream.close();
@@ -578,7 +1334,6 @@ public abstract class NativeAzureFileSystemBaseTest {
       throws IOException {
     FSDataOutputStream outputStream = fs.create(path, true);
     writeString(outputStream, value);
-    outputStream.close();
   }
 
   private void writeString(FSDataOutputStream outputStream, String value)
@@ -588,4 +1343,175 @@ public abstract class NativeAzureFileSystemBaseTest {
     writer.write(value);
     writer.close();
   }
+
+  @Test
+  // Acquire and free a Lease object. Wait for more than the lease
+  // timeout, to make sure the lease renews itself.
+  public void testSelfRenewingLease() throws IllegalArgumentException, IOException,
+    InterruptedException, StorageException {
+
+    SelfRenewingLease lease;
+    final String FILE_KEY = "file";
+    fs.create(new Path(FILE_KEY));
+    NativeAzureFileSystem nfs = (NativeAzureFileSystem) fs;
+    String fullKey = nfs.pathToKey(nfs.makeAbsolute(new Path(FILE_KEY)));
+    AzureNativeFileSystemStore store = nfs.getStore();
+    lease = store.acquireLease(fullKey);
+    assertTrue(lease.getLeaseID() != null);
+
+    // The sleep time for the keep-alive thread is 40 seconds, so sleep just
+    // a little beyond that, to make sure the keep-alive thread wakes up
+    // and renews the lease.
+    Thread.sleep(42000);
+    lease.free();
+
+    // Check that the lease is really freed.
+    CloudBlob blob = lease.getCloudBlob();
+
+    // Try to acquire it again, using direct Azure blob access.
+    // If that succeeds, then the lease was already freed.
+    String differentLeaseID = null;
+    try {
+      differentLeaseID = blob.acquireLease(15, null);
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail("Caught exception trying to directly re-acquire lease from Azure");
+    } finally {
+      assertTrue(differentLeaseID != null);
+      AccessCondition accessCondition = AccessCondition.generateEmptyCondition();
+      accessCondition.setLeaseID(differentLeaseID);
+      blob.releaseLease(accessCondition);
+    }
+  }
+
+  @Test
+  // Acquire a SelfRenewingLease object. Wait for more than the lease
+  // timeout, to make sure the lease renews itself. Delete the file.
+  // That will automatically free the lease.
+  // (that should work without any failures).
+  public void testSelfRenewingLeaseFileDelete()
+      throws IllegalArgumentException, IOException,
+        InterruptedException, StorageException {
+
+    SelfRenewingLease lease;
+    final String FILE_KEY = "file";
+    final Path path = new Path(FILE_KEY);
+    fs.create(path);
+    NativeAzureFileSystem nfs = (NativeAzureFileSystem) fs;
+    String fullKey = nfs.pathToKey(nfs.makeAbsolute(path));
+    lease = nfs.getStore().acquireLease(fullKey);
+    assertTrue(lease.getLeaseID() != null);
+
+    // The sleep time for the keep-alive thread is 40 seconds, so sleep just
+    // a little beyond that, to make sure the keep-alive thread wakes up
+    // and renews the lease.
+    Thread.sleep(42000);
+
+    nfs.getStore().delete(fullKey, lease);
+
+    // Check that the file is really gone and the lease is freed.
+    assertTrue(!fs.exists(path));
+    assertTrue(lease.isFreed());
+  }
+
+  // Variables to check assertions in next test.
+  private long firstEndTime;
+  private long secondStartTime;
+
+  // Create two threads. One will get a lease on a file.
+  // The second one will try to get the lease and thus block.
+  // Then the first one will free the lease and the second
+  // one will get it and proceed.
+  @Test
+  public void testLeaseAsDistributedLock() throws IllegalArgumentException,
+      IOException {
+    final String LEASE_LOCK_FILE_KEY = "file";
+    fs.create(new Path(LEASE_LOCK_FILE_KEY));
+    NativeAzureFileSystem nfs = (NativeAzureFileSystem) fs;
+    String fullKey = nfs.pathToKey(nfs.makeAbsolute(new Path(LEASE_LOCK_FILE_KEY)));
+
+    Thread first = new Thread(new LeaseLockAction("first-thread", fullKey));
+    first.start();
+    Thread second = new Thread(new LeaseLockAction("second-thread", fullKey));
+    second.start();
+    try {
+
+      // Wait for the two  threads to finish.
+      first.join();
+      second.join();
+      assertTrue(firstEndTime < secondStartTime);
+    } catch (InterruptedException e) {
+      fail("Unable to wait for threads to finish");
+      Thread.currentThread().interrupt();
+    }
+  }
+
+  private class LeaseLockAction implements Runnable {
+    private String name;
+    private String key;
+
+    LeaseLockAction(String name, String key) {
+      this.name = name;
+      this.key = key;
+    }
+
+    @Override
+    public void run() {
+      LOG.info("starting thread " + name);
+      SelfRenewingLease lease = null;
+      NativeAzureFileSystem nfs = (NativeAzureFileSystem) fs;
+
+      if (name.equals("first-thread")) {
+        try {
+          lease = nfs.getStore().acquireLease(key);
+          LOG.info(name + " acquired lease " + lease.getLeaseID());
+        } catch (AzureException e) {
+          assertTrue("Unanticipated exception", false);
+        }
+        assertTrue(lease != null);
+        try {
+
+          // Sleep long enough for the lease to renew once.
+          Thread.sleep(SelfRenewingLease.LEASE_RENEWAL_PERIOD + 2000);
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+        }
+        try {
+          firstEndTime = System.currentTimeMillis();
+          lease.free();
+          LOG.info(name + " freed lease " + lease.getLeaseID());
+        } catch (StorageException e) {
+          fail("Unanticipated exception");
+        }
+      } else if (name.equals("second-thread")) {
+        try {
+
+          // sleep 2 sec to let first thread get ahead of this one
+          Thread.sleep(2000);
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+        }
+        try {
+          LOG.info(name + " before getting lease");
+          lease = nfs.getStore().acquireLease(key);
+          secondStartTime = System.currentTimeMillis();
+          LOG.info(name + " acquired lease " + lease.getLeaseID());
+        } catch (AzureException e) {
+          assertTrue("Unanticipated exception", false);
+        }
+        assertTrue(lease != null);
+        try {
+          lease.free();
+          LOG.info(name + " freed lease " + lease.getLeaseID());
+        } catch (StorageException e) {
+          assertTrue("Unanticipated exception", false);
+        }
+      } else {
+        assertTrue("Unknown thread name", false);
+      }
+
+      LOG.info(name + " is exiting.");
+    }
+
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2217e2f8/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/RunningLiveWasbTests.txt
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/RunningLiveWasbTests.txt b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/RunningLiveWasbTests.txt
new file mode 100644
index 0000000..54ba4d8
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/RunningLiveWasbTests.txt
@@ -0,0 +1,22 @@
+========================================================================
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+=========================================================================
+
+In order to run Windows Azure Storage Blob (WASB) unit tests against a live 
+Azure Storage account, you need to provide test account details in a configuration
+file called azure-test.xml. See hadoop-tools/hadoop-azure/README.txt for details
+on configuration, and how to run the tests.
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2217e2f8/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestAzureConcurrentOutOfBandIo.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestAzureConcurrentOutOfBandIo.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestAzureConcurrentOutOfBandIo.java
index c10ac0f..0894cf5 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestAzureConcurrentOutOfBandIo.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestAzureConcurrentOutOfBandIo.java
@@ -22,11 +22,10 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 import static org.junit.Assume.assumeNotNull;
 
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
+import java.io.*;
 import java.util.Arrays;
 
+import org.apache.hadoop.fs.azure.AzureException;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.permission.PermissionStatus;
 import org.junit.After;
@@ -100,13 +99,14 @@ public class TestAzureConcurrentOutOfBandIo {
     public void run() {
       byte[] dataBlockWrite = new byte[UPLOAD_BLOCK_SIZE];
 
-      DataOutputStream outputStream = null;
+      OutputStream outputStream = null;
 
       try {
         for (int i = 0; !done; i++) {
           // Write two 4 MB blocks to the blob.
           //
-          outputStream = writerStorageAccount.getStore().storefile(key,
+          outputStream = writerStorageAccount.getStore().storefile(
+              key,
               new PermissionStatus("", "", FsPermission.getDefault()));
 
           Arrays.fill(dataBlockWrite, (byte) (i % 256));
@@ -124,7 +124,7 @@ public class TestAzureConcurrentOutOfBandIo {
       } catch (IOException e) {
         System.out
             .println("DatablockWriter thread encountered an I/O exception."
-                + e.getMessage());
+            + e.getMessage());
       }
     }
   }
@@ -137,30 +137,29 @@ public class TestAzureConcurrentOutOfBandIo {
 
     // Write to blob to make sure it exists.
     //
-    // Write five 4 MB blocks to the blob. To ensure there is data in the blob
-    // before reading. This eliminates the race between the reader and writer
-    // threads.
-    DataOutputStream outputStream = testAccount.getStore().storefile(
-        "WASB_String.txt",
-        new PermissionStatus("", "", FsPermission.getDefault()));
-    Arrays.fill(dataBlockWrite, (byte) 255);
-    for (int i = 0; i < NUMBER_OF_BLOCKS; i++) {
-      outputStream.write(dataBlockWrite);
-    }
-
-    outputStream.flush();
-    outputStream.close();
-
-    // Start writing blocks to Azure store using the DataBlockWriter thread.
+   // Write five 4 MB blocks to the blob. To ensure there is data in the blob before
+   // reading.  This eliminates the race between the reader and writer threads.
+   OutputStream outputStream = testAccount.getStore().storefile(
+       "WASB_String.txt",
+       new PermissionStatus("", "", FsPermission.getDefault()));
+   Arrays.fill(dataBlockWrite, (byte) 255);
+   for (int i = 0; i < NUMBER_OF_BLOCKS; i++) {
+     outputStream.write(dataBlockWrite);
+   }
+
+   outputStream.flush();
+   outputStream.close();
+
+   // Start writing blocks to Azure store using the DataBlockWriter thread.
     DataBlockWriter writeBlockTask = new DataBlockWriter(testAccount,
         "WASB_String.txt");
-    writeBlockTask.startWriting();
-    int count = 0;
-    DataInputStream inputStream = null;
+   writeBlockTask.startWriting();
+   int count = 0;
+   DataInputStream inputStream = null;
 
-    for (int i = 0; i < 5; i++) {
-      try {
-        inputStream = testAccount.getStore().retrieve("WASB_String.txt", 0);
+   for (int i = 0; i < 5; i++) {
+     try {
+        inputStream = testAccount.getStore().retrieve("WASB_String.txt");
         count = 0;
         int c = 0;
 
@@ -173,17 +172,17 @@ public class TestAzureConcurrentOutOfBandIo {
           // Counting the number of bytes.
           count += c;
         }
-      } catch (IOException e) {
-        System.out.println(e.getCause().toString());
-        e.printStackTrace();
-        fail();
-      }
-
-      // Close the stream.
-      if (null != inputStream) {
-        inputStream.close();
-      }
-    }
+     } catch (IOException e) {
+       System.out.println(e.getCause().toString());
+       e.printStackTrace();
+       fail();
+     }
+
+     // Close the stream.
+     if (null != inputStream){
+       inputStream.close();
+     }
+   }
 
     // Stop writing blocks.
     writeBlockTask.stopWriting();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2217e2f8/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestAzureFileSystemErrorConditions.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestAzureFileSystemErrorConditions.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestAzureFileSystemErrorConditions.java
index 6e89822..febb605 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestAzureFileSystemErrorConditions.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestAzureFileSystemErrorConditions.java
@@ -32,7 +32,6 @@ import java.util.Arrays;
 import java.util.HashMap;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.TestHookOperationContext;
@@ -65,19 +64,18 @@ public class TestAzureFileSystemErrorConditions {
    */
   @Test
   public void testAccessUnauthorizedPublicContainer() throws Exception {
-    Configuration conf = new Configuration();
-    AzureBlobStorageTestAccount.addWasbToConfiguration(conf);
     Path noAccessPath = new Path(
         "wasb://nonExistentContainer@hopefullyNonExistentAccount/someFile");
     NativeAzureFileSystem.suppressRetryPolicy();
     try {
-      FileSystem.get(noAccessPath.toUri(), conf).open(noAccessPath);
+      FileSystem.get(noAccessPath.toUri(), new Configuration())
+        .open(noAccessPath);
       assertTrue("Should've thrown.", false);
     } catch (AzureException ex) {
-      assertTrue("Unexpected message in exception " + ex, ex.getMessage()
-          .contains(
-              "Unable to access container nonExistentContainer in account"
-                  + " hopefullyNonExistentAccount"));
+      assertTrue("Unexpected message in exception " + ex,
+          ex.getMessage().contains(
+          "Unable to access container nonExistentContainer in account" +
+          " hopefullyNonExistentAccount"));
     } finally {
       NativeAzureFileSystem.resumeRetryPolicy();
     }
@@ -104,11 +102,11 @@ public class TestAzureFileSystemErrorConditions {
         fs.listStatus(new Path("/"));
         passed = true;
       } catch (AzureException ex) {
-        assertTrue("Unexpected exception message: " + ex, ex.getMessage()
-            .contains("unsupported version: 2090-04-05."));
+        assertTrue("Unexpected exception message: " + ex,
+            ex.getMessage().contains("unsupported version: 2090-04-05."));
       }
-      assertFalse(
-          "Should've thrown an exception because of the wrong version.", passed);
+      assertFalse("Should've thrown an exception because of the wrong version.",
+          passed);
     } finally {
       fs.close();
     }
@@ -118,8 +116,7 @@ public class TestAzureFileSystemErrorConditions {
     boolean isTargetConnection(HttpURLConnection connection);
   }
 
-  private class TransientErrorInjector extends
-      StorageEvent<SendingRequestEvent> {
+  private class TransientErrorInjector extends StorageEvent<SendingRequestEvent> {
     final ConnectionRecognizer connectionRecognizer;
     private boolean injectedErrorOnce = false;
 
@@ -129,8 +126,7 @@ public class TestAzureFileSystemErrorConditions {
 
     @Override
     public void eventOccurred(SendingRequestEvent eventArg) {
-      HttpURLConnection connection = (HttpURLConnection) eventArg
-          .getConnectionObject();
+      HttpURLConnection connection = (HttpURLConnection)eventArg.getConnectionObject();
       if (!connectionRecognizer.isTargetConnection(connection)) {
         return;
       }
@@ -157,8 +153,8 @@ public class TestAzureFileSystemErrorConditions {
   @Test
   public void testTransientErrorOnDelete() throws Exception {
     // Need to do this test against a live storage account
-    AzureBlobStorageTestAccount testAccount = AzureBlobStorageTestAccount
-        .create();
+    AzureBlobStorageTestAccount testAccount =
+        AzureBlobStorageTestAccount.create();
     assumeNotNull(testAccount);
     try {
       NativeAzureFileSystem fs = testAccount.getFileSystem();
@@ -179,7 +175,7 @@ public class TestAzureFileSystemErrorConditions {
   private void writeAllThreeFile(NativeAzureFileSystem fs, Path testFile)
       throws IOException {
     byte[] buffer = new byte[ALL_THREE_FILE_SIZE];
-    Arrays.fill(buffer, (byte) 3);
+    Arrays.fill(buffer, (byte)3);
     OutputStream stream = fs.create(testFile);
     stream.write(buffer);
     stream.close();
@@ -189,7 +185,8 @@ public class TestAzureFileSystemErrorConditions {
       throws IOException {
     byte[] buffer = new byte[ALL_THREE_FILE_SIZE];
     InputStream inStream = fs.open(testFile);
-    assertEquals(buffer.length, inStream.read(buffer, 0, buffer.length));
+    assertEquals(buffer.length,
+        inStream.read(buffer, 0, buffer.length));
     inStream.close();
     for (int i = 0; i < buffer.length; i++) {
       assertEquals(3, buffer[i]);
@@ -199,8 +196,8 @@ public class TestAzureFileSystemErrorConditions {
   @Test
   public void testTransientErrorOnCommitBlockList() throws Exception {
     // Need to do this test against a live storage account
-    AzureBlobStorageTestAccount testAccount = AzureBlobStorageTestAccount
-        .create();
+    AzureBlobStorageTestAccount testAccount =
+        AzureBlobStorageTestAccount.create();
     assumeNotNull(testAccount);
     try {
       NativeAzureFileSystem fs = testAccount.getFileSystem();
@@ -222,8 +219,8 @@ public class TestAzureFileSystemErrorConditions {
   @Test
   public void testTransientErrorOnRead() throws Exception {
     // Need to do this test against a live storage account
-    AzureBlobStorageTestAccount testAccount = AzureBlobStorageTestAccount
-        .create();
+    AzureBlobStorageTestAccount testAccount =
+        AzureBlobStorageTestAccount.create();
     assumeNotNull(testAccount);
     try {
       NativeAzureFileSystem fs = testAccount.getFileSystem();
@@ -240,16 +237,4 @@ public class TestAzureFileSystemErrorConditions {
       testAccount.cleanup();
     }
   }
-  
-  // Tests an error during stream creation (in this case in the seek() implementation
-  // to verify the close-stream-on-error logic.
-  @Test (expected=AzureException.class)
-  public void testErrorDuringRetrieve() throws Exception {
-    NativeAzureFileSystem fs = AzureBlobStorageTestAccount.createMock().getFileSystem();
-    Path testFile = new Path("/testErrorDuringRetrieve");
-    writeAllThreeFile(fs, testFile);
-
-    FSDataInputStream stream = fs.open(testFile);
-    stream.seek(Integer.MAX_VALUE);
-  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2217e2f8/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestBlobDataValidation.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestBlobDataValidation.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestBlobDataValidation.java
index b585c56..25bd338 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestBlobDataValidation.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestBlobDataValidation.java
@@ -128,7 +128,7 @@ public class TestBlobDataValidation {
       if (!expectMd5Stored) {
         throw ex;
       }
-      StorageException cause = (StorageException) ex.getCause();
+      StorageException cause = (StorageException)ex.getCause();
       assertNotNull(cause);
       assertTrue("Unexpected cause: " + cause,
           cause.getErrorCode().equals(StorageErrorCodeStrings.INVALID_MD5));
@@ -212,13 +212,13 @@ public class TestBlobDataValidation {
     // validate the data as expected, but the HttpURLConnection wasn't
     // pluggable enough for me to do that.
     testAccount.getFileSystem().getStore()
-        .addTestHookToOperationContext(new TestHookOperationContext() {
-          @Override
+    .addTestHookToOperationContext(new TestHookOperationContext() {
+    @Override
           public OperationContext modifyOperationContext(
               OperationContext original) {
-            original.getResponseReceivedEventHandler().addListener(
-                new ContentMD5Checker(expectMd5Checked));
-            return original;
+      original.getResponseReceivedEventHandler().addListener(
+          new ContentMD5Checker(expectMd5Checked));
+      return original;
           }
         });
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2217e2f8/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestBlobMetadata.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestBlobMetadata.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestBlobMetadata.java
index b75fc38..6c49926 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestBlobMetadata.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestBlobMetadata.java
@@ -69,7 +69,8 @@ public class TestBlobMetadata {
       throws Exception {
     return String.format(
         "{\"owner\":\"%s\",\"group\":\"%s\",\"permissions\":\"%s\"}",
-        getExpectedOwner(), NativeAzureFileSystem.AZURE_DEFAULT_GROUP_DEFAULT,
+        getExpectedOwner(),
+        NativeAzureFileSystem.AZURE_DEFAULT_GROUP_DEFAULT,
         permissionString);
   }
 
@@ -80,8 +81,8 @@ public class TestBlobMetadata {
   public void testContainerVersionMetadata() throws Exception {
     // Do a write operation to trigger version stamp
     fs.createNewFile(new Path("/foo"));
-    HashMap<String, String> containerMetadata = backingStore
-        .getContainerMetadata();
+    HashMap<String, String> containerMetadata =
+        backingStore.getContainerMetadata();
     assertNotNull(containerMetadata);
     assertEquals(AzureNativeFileSystemStore.CURRENT_WASB_VERSION,
         containerMetadata.get(AzureNativeFileSystemStore.VERSION_METADATA_KEY));
@@ -226,26 +227,32 @@ public class TestBlobMetadata {
   @Test
   public void testOldPermissionMetadata() throws Exception {
     Path selfishFile = new Path("/noOneElse");
-    HashMap<String, String> metadata = new HashMap<String, String>();
-    metadata.put("asv_permission", getExpectedPermissionString("rw-------"));
-    backingStore.setContent(AzureBlobStorageTestAccount.toMockUri(selfishFile),
-        new byte[] {}, metadata);
-    FsPermission justMe = new FsPermission(FsAction.READ_WRITE, FsAction.NONE,
-        FsAction.NONE);
+    HashMap<String, String> metadata =
+        new HashMap<String, String>();
+    metadata.put("asv_permission",
+        getExpectedPermissionString("rw-------"));
+    backingStore.setContent(
+        AzureBlobStorageTestAccount.toMockUri(selfishFile),
+        new byte[] { },
+        metadata, false, 0);
+    FsPermission justMe = new FsPermission(
+        FsAction.READ_WRITE, FsAction.NONE, FsAction.NONE);
     FileStatus retrievedStatus = fs.getFileStatus(selfishFile);
     assertNotNull(retrievedStatus);
     assertEquals(justMe, retrievedStatus.getPermission());
     assertEquals(getExpectedOwner(), retrievedStatus.getOwner());
     assertEquals(NativeAzureFileSystem.AZURE_DEFAULT_GROUP_DEFAULT,
         retrievedStatus.getGroup());
-    FsPermission meAndYou = new FsPermission(FsAction.READ_WRITE,
-        FsAction.READ_WRITE, FsAction.NONE);
+    FsPermission meAndYou = new FsPermission(
+        FsAction.READ_WRITE, FsAction.READ_WRITE, FsAction.NONE);
     fs.setPermission(selfishFile, meAndYou);
-    metadata = backingStore.getMetadata(AzureBlobStorageTestAccount
-        .toMockUri(selfishFile));
+    metadata =
+        backingStore.getMetadata(
+            AzureBlobStorageTestAccount.toMockUri(selfishFile));
     assertNotNull(metadata);
     String storedPermission = metadata.get("hdi_permission");
-    assertEquals(getExpectedPermissionString("rw-rw----"), storedPermission);
+    assertEquals(getExpectedPermissionString("rw-rw----"),
+        storedPermission);
     assertNull(metadata.get("asv_permission"));
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2217e2f8/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestBlobTypeSpeedDifference.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestBlobTypeSpeedDifference.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestBlobTypeSpeedDifference.java
new file mode 100644
index 0000000..afb16ef
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestBlobTypeSpeedDifference.java
@@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azure;
+
+import java.io.*;
+import java.util.*;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.fs.azure.metrics.AzureFileSystemInstrumentation;
+
+import junit.framework.*;
+
+import org.junit.Test;
+
+
+/**
+ * A simple benchmark to find out the difference in speed between block
+ * and page blobs.
+ */
+public class TestBlobTypeSpeedDifference extends TestCase {
+  /**
+   * Writes data to the given stream of the given size, flushing every
+   * x bytes.
+   */
+  private static void writeTestFile(OutputStream writeStream,
+      long size, long flushInterval) throws IOException {
+    int bufferSize = (int) Math.min(1000, flushInterval);
+    byte[] buffer = new byte[bufferSize];
+    Arrays.fill(buffer, (byte) 7);
+    int bytesWritten = 0;
+    int bytesUnflushed = 0;
+    while (bytesWritten < size) {
+      int numberToWrite = (int) Math.min(bufferSize, size - bytesWritten);
+      writeStream.write(buffer, 0, numberToWrite);
+      bytesWritten += numberToWrite;
+      bytesUnflushed += numberToWrite;
+      if (bytesUnflushed >= flushInterval) {
+        writeStream.flush();
+        bytesUnflushed = 0;
+      }
+    }
+  }
+
+  private static class TestResult {
+    final long timeTakenInMs;
+    final long totalNumberOfRequests;
+
+    TestResult(long timeTakenInMs, long totalNumberOfRequests) {
+      this.timeTakenInMs = timeTakenInMs;
+      this.totalNumberOfRequests = totalNumberOfRequests;
+    }
+  }
+
+  /**
+   * Writes data to the given file of the given size, flushing every
+   * x bytes. Measure performance of that and return it.
+   */
+  private static TestResult writeTestFile(NativeAzureFileSystem fs, Path path,
+      long size, long flushInterval) throws IOException {
+    AzureFileSystemInstrumentation instrumentation =
+        fs.getInstrumentation();
+    long initialRequests = instrumentation.getCurrentWebResponses();
+    Date start = new Date();
+    OutputStream output = fs.create(path);
+    writeTestFile(output, size, flushInterval);
+    output.close();
+    long finalRequests = instrumentation.getCurrentWebResponses();
+    return new TestResult(new Date().getTime() - start.getTime(),
+        finalRequests - initialRequests);
+  }
+
+  /**
+   * Writes data to a block blob of the given size, flushing every
+   * x bytes. Measure performance of that and return it.
+   */
+  private static TestResult writeBlockBlobTestFile(NativeAzureFileSystem fs,
+      long size, long flushInterval) throws IOException {
+    return writeTestFile(fs, new Path("/blockBlob"), size, flushInterval);
+  }
+
+  /**
+   * Writes data to a page blob of the given size, flushing every
+   * x bytes. Measure performance of that and return it.
+   */
+  private static TestResult writePageBlobTestFile(NativeAzureFileSystem fs,
+      long size, long flushInterval) throws IOException {
+    return writeTestFile(fs,
+        AzureBlobStorageTestAccount.pageBlobPath("pageBlob"),
+        size, flushInterval);
+  }
+
+  /**
+   * Runs the benchmark over a small 10 KB file, flushing every 500 bytes.
+   */
+  @Test
+  public void testTenKbFileFrequentFlush() throws Exception {
+    AzureBlobStorageTestAccount testAccount =
+        AzureBlobStorageTestAccount.create();
+    if (testAccount == null) {
+      return;
+    }
+    try {
+      testForSizeAndFlushInterval(testAccount.getFileSystem(), 10 * 1000, 500);
+    } finally {
+      testAccount.cleanup();
+    }
+  }
+
+  /**
+   * Runs the benchmark for the given file size and flush frequency.
+   */
+  private static void testForSizeAndFlushInterval(NativeAzureFileSystem fs,
+      final long size, final long flushInterval) throws IOException {
+    for (int i = 0; i < 5; i++) {
+      TestResult pageBlobResults = writePageBlobTestFile(fs, size, flushInterval);
+      System.out.printf(
+          "Page blob upload took %d ms. Total number of requests: %d.\n",
+          pageBlobResults.timeTakenInMs, pageBlobResults.totalNumberOfRequests);
+      TestResult blockBlobResults = writeBlockBlobTestFile(fs, size, flushInterval);
+      System.out.printf(
+          "Block blob upload took %d ms. Total number of requests: %d.\n",
+          blockBlobResults.timeTakenInMs, blockBlobResults.totalNumberOfRequests);
+    }
+  }
+
+  /**
+   * Runs the benchmark for the given file size and flush frequency from the
+   * command line.
+   */
+  public static void main(String argv[]) throws Exception {
+    Configuration conf = new Configuration();
+    long size = 10 * 1000 * 1000;
+    long flushInterval = 2000;
+    if (argv.length > 0) {
+      size = Long.parseLong(argv[0]);
+    }
+    if (argv.length > 1) {
+      flushInterval = Long.parseLong(argv[1]);
+    }
+    testForSizeAndFlushInterval((NativeAzureFileSystem)FileSystem.get(conf),
+        size, flushInterval);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2217e2f8/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestNativeAzureFSPageBlobLive.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestNativeAzureFSPageBlobLive.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestNativeAzureFSPageBlobLive.java
new file mode 100644
index 0000000..208cff3
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestNativeAzureFSPageBlobLive.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.azure;
+
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Run the base Azure file system tests strictly on page blobs to make sure fundamental
+ * operations on page blob files and folders work as expected.
+ * These operations include create, delete, rename, list, and so on.
+ */
+public class TestNativeAzureFSPageBlobLive extends
+    NativeAzureFileSystemBaseTest {
+
+  @Override
+  protected AzureBlobStorageTestAccount createTestAccount()
+      throws Exception {
+    Configuration conf = new Configuration();
+
+    // Configure the page blob directories key so every file created is a page blob.
+    conf.set(AzureNativeFileSystemStore.KEY_PAGE_BLOB_DIRECTORIES, "/");
+
+    // Configure the atomic rename directories key so every folder will have
+    // atomic rename applied.
+    conf.set(AzureNativeFileSystemStore.KEY_ATOMIC_RENAME_DIRECTORIES, "/");
+    return AzureBlobStorageTestAccount.create(conf);
+  }
+}