You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cn...@apache.org on 2014/10/08 23:24:41 UTC

[4/5] HADOOP-10809. hadoop-azure: page blob support. Contributed by Dexter Bradshaw, Mostafa Elhemali, Eric Hanson, and Mike Liddell.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2217e2f8/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java
index dae957e..076c48a 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java
@@ -25,14 +25,21 @@ import java.io.InputStream;
 import java.io.OutputStream;
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.text.SimpleDateFormat;
 import java.util.ArrayList;
-import java.util.Calendar;
 import java.util.Date;
+import java.util.EnumSet;
+import java.util.Iterator;
 import java.util.Set;
+import java.util.TimeZone;
 import java.util.TreeSet;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang.exception.ExceptionUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -40,6 +47,7 @@ import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.BufferedFSInputStream;
+import org.apache.hadoop.fs.CreateFlag;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FSInputStream;
@@ -50,12 +58,26 @@ import org.apache.hadoop.fs.azure.metrics.AzureFileSystemInstrumentation;
 import org.apache.hadoop.fs.azure.metrics.AzureFileSystemMetricsSystem;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.permission.PermissionStatus;
+import org.apache.hadoop.fs.azure.AzureException;
+import org.apache.hadoop.fs.azure.StorageInterface.CloudBlobWrapper;
+import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.Progressable;
 
+
+import org.codehaus.jackson.JsonNode;
+import org.codehaus.jackson.JsonParseException;
+import org.codehaus.jackson.JsonParser;
+import org.codehaus.jackson.map.JsonMappingException;
+import org.codehaus.jackson.map.ObjectMapper;
+
 import com.google.common.annotations.VisibleForTesting;
-import com.microsoft.windowsazure.storage.core.Utility;
+import com.microsoft.windowsazure.storage.AccessCondition;
+import com.microsoft.windowsazure.storage.OperationContext;
+import com.microsoft.windowsazure.storage.StorageException;
+import com.microsoft.windowsazure.storage.blob.CloudBlob;
+import com.microsoft.windowsazure.storage.core.*;
 
 /**
  * <p>
@@ -68,6 +90,495 @@ import com.microsoft.windowsazure.storage.core.Utility;
 @InterfaceAudience.Public
 @InterfaceStability.Stable
 public class NativeAzureFileSystem extends FileSystem {
+  private static final int USER_WX_PERMISION = 0300;
+
+  /**
+   * A description of a folder rename operation, including the source and
+   * destination keys, and descriptions of the files in the source folder.
+   */
+  public static class FolderRenamePending {
+    private SelfRenewingLease folderLease;
+    private String srcKey;
+    private String dstKey;
+    private FileMetadata[] fileMetadata = null;    // descriptions of source files
+    private ArrayList<String> fileStrings = null;
+    private NativeAzureFileSystem fs;
+    private static final int MAX_RENAME_PENDING_FILE_SIZE = 10000000;
+    private static final int FORMATTING_BUFFER = 10000;
+    private boolean committed;
+    public static final String SUFFIX = "-RenamePending.json";
+
+    // Prepare in-memory information needed to do or redo a folder rename.
+    public FolderRenamePending(String srcKey, String dstKey, SelfRenewingLease lease,
+        NativeAzureFileSystem fs) throws IOException {
+      this.srcKey = srcKey;
+      this.dstKey = dstKey;
+      this.folderLease = lease;
+      this.fs = fs;
+      ArrayList<FileMetadata> fileMetadataList = new ArrayList<FileMetadata>();
+
+      // List all the files in the folder.
+      String priorLastKey = null;
+      do {
+        PartialListing listing = fs.getStoreInterface().listAll(srcKey, AZURE_LIST_ALL,
+          AZURE_UNBOUNDED_DEPTH, priorLastKey);
+        for(FileMetadata file : listing.getFiles()) {
+          fileMetadataList.add(file);
+        }
+        priorLastKey = listing.getPriorLastKey();
+      } while (priorLastKey != null);
+      fileMetadata = fileMetadataList.toArray(new FileMetadata[fileMetadataList.size()]);
+      this.committed = true;
+    }
+
+    // Prepare in-memory information needed to do or redo folder rename from
+    // a -RenamePending.json file read from storage. This constructor is to use during
+    // redo processing.
+    public FolderRenamePending(Path redoFile, NativeAzureFileSystem fs)
+        throws IllegalArgumentException, IOException {
+
+      this.fs = fs;
+
+      // open redo file
+      Path f = redoFile;
+      FSDataInputStream input = fs.open(f);
+      byte[] bytes = new byte[MAX_RENAME_PENDING_FILE_SIZE];
+      int l = input.read(bytes);
+      if (l < 0) {
+        throw new IOException(
+            "Error reading pending rename file contents -- no data available");
+      }
+      if (l == MAX_RENAME_PENDING_FILE_SIZE) {
+        throw new IOException(
+            "Error reading pending rename file contents -- "
+                + "maximum file size exceeded");
+      }
+      String contents = new String(bytes, 0, l);
+
+      // parse the JSON
+      ObjectMapper objMapper = new ObjectMapper();
+      objMapper.configure(JsonParser.Feature.ALLOW_UNQUOTED_FIELD_NAMES, true);
+      JsonNode json = null;
+      try {
+        json = objMapper.readValue(contents, JsonNode.class);
+        this.committed = true;
+      } catch (JsonMappingException e) {
+
+        // The -RedoPending.json file is corrupted, so we assume it was
+        // not completely written
+        // and the redo operation did not commit.
+        this.committed = false;
+      } catch (JsonParseException e) {
+        this.committed = false;
+      } catch (IOException e) {
+        this.committed = false;  
+      }
+      
+      if (!this.committed) {
+        LOG.error("Deleting corruped rename pending file "
+            + redoFile + "\n" + contents);
+
+        // delete the -RenamePending.json file
+        fs.delete(redoFile, false);
+        return;
+      }
+
+      // initialize this object's fields
+      ArrayList<String> fileStrList = new ArrayList<String>();
+      JsonNode oldFolderName = json.get("OldFolderName");
+      JsonNode newFolderName = json.get("NewFolderName");
+      if (oldFolderName == null || newFolderName == null) {
+    	  this.committed = false;
+      } else {
+        this.srcKey = oldFolderName.getTextValue();
+        this.dstKey = newFolderName.getTextValue();
+        if (this.srcKey == null || this.dstKey == null) {
+          this.committed = false;    	  
+        } else {
+          JsonNode fileList = json.get("FileList");
+          if (fileList == null) {
+            this.committed = false;	
+          } else {
+            for (int i = 0; i < fileList.size(); i++) {
+              fileStrList.add(fileList.get(i).getTextValue());
+            }
+          }
+        }
+      }
+      this.fileStrings = fileStrList;
+    }
+
+    public FileMetadata[] getFiles() {
+      return fileMetadata;
+    }
+
+    public SelfRenewingLease getFolderLease() {
+      return folderLease;
+    }
+
+    /**
+     * Write to disk the information needed to redo folder rename, in JSON format.
+     * The file name will be wasb://<sourceFolderPrefix>/folderName-RenamePending.json
+     * The file format will be:
+     * {
+     *   FormatVersion: "1.0",
+     *   OperationTime: "<YYYY-MM-DD HH:MM:SS.MMM>",
+     *   OldFolderName: "<key>",
+     *   NewFolderName: "<key>",
+     *   FileList: [ <string> , <string> , ... ]
+     * }
+     *
+     * Here's a sample:
+     * {
+     *  FormatVersion: "1.0",
+     *  OperationUTCTime: "2014-07-01 23:50:35.572",
+     *  OldFolderName: "user/ehans/folderToRename",
+     *  NewFolderName: "user/ehans/renamedFolder",
+     *  FileList: [
+     *    "innerFile",
+     *    "innerFile2"
+     *  ]
+     * }
+     * @throws IOException
+     */
+    public void writeFile(FileSystem fs) throws IOException {
+      Path path = getRenamePendingFilePath();
+      if (LOG.isDebugEnabled()){
+        LOG.debug("Preparing to write atomic rename state to " + path.toString());
+      }
+      OutputStream output = null;
+
+      String contents = makeRenamePendingFileContents();
+
+      // Write file.
+      try {
+        output = fs.create(path);
+        output.write(contents.getBytes());
+      } catch (IOException e) {
+        throw new IOException("Unable to write RenamePending file for folder rename from "
+            + srcKey + " to " + dstKey, e);
+      } finally {
+        IOUtils.cleanup(LOG, output);
+      }
+    }
+
+    /**
+     * Return the contents of the JSON file to represent the operations
+     * to be performed for a folder rename.
+     */
+    public String makeRenamePendingFileContents() {
+      SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
+      sdf.setTimeZone(TimeZone.getTimeZone("UTC"));
+      String time = sdf.format(new Date());
+
+      // Make file list string
+      StringBuilder builder = new StringBuilder();
+      builder.append("[\n");
+      for (int i = 0; i != fileMetadata.length; i++) {
+        if (i > 0) {
+          builder.append(",\n");
+        }
+        builder.append("    ");
+        String noPrefix = StringUtils.removeStart(fileMetadata[i].getKey(), srcKey + "/");
+
+        // Quote string file names, escaping any possible " characters or other
+        // necessary characters in the name.
+        builder.append(quote(noPrefix));
+        if (builder.length() >=
+            MAX_RENAME_PENDING_FILE_SIZE - FORMATTING_BUFFER) {
+
+          // Give up now to avoid using too much memory.
+          LOG.error("Internal error: Exceeded maximum rename pending file size of "
+              + MAX_RENAME_PENDING_FILE_SIZE + " bytes.");
+
+          // return some bad JSON with an error message to make it human readable
+          return "exceeded maximum rename pending file size";
+        }
+      }
+      builder.append("\n  ]");
+      String fileList = builder.toString();
+
+      // Make file contents as a string. Again, quote file names, escaping
+      // characters as appropriate.
+      String contents = "{\n"
+          + "  FormatVersion: \"1.0\",\n"
+          + "  OperationUTCTime: \"" + time + "\",\n"
+          + "  OldFolderName: " + quote(srcKey) + ",\n"
+          + "  NewFolderName: " + quote(dstKey) + ",\n"
+          + "  FileList: " + fileList + "\n"
+          + "}\n";
+
+      return contents;
+    }
+    
+    /**
+     * This is an exact copy of org.codehaus.jettison.json.JSONObject.quote 
+     * method.
+     * 
+     * Produce a string in double quotes with backslash sequences in all the
+     * right places. A backslash will be inserted within </, allowing JSON
+     * text to be delivered in HTML. In JSON text, a string cannot contain a
+     * control character or an unescaped quote or backslash.
+     * @param string A String
+     * @return  A String correctly formatted for insertion in a JSON text.
+     */
+    private String quote(String string) {
+        if (string == null || string.length() == 0) {
+            return "\"\"";
+        }
+
+        char c = 0;
+        int  i;
+        int  len = string.length();
+        StringBuilder sb = new StringBuilder(len + 4);
+        String t;
+
+        sb.append('"');
+        for (i = 0; i < len; i += 1) {
+            c = string.charAt(i);
+            switch (c) {
+            case '\\':
+            case '"':
+                sb.append('\\');
+                sb.append(c);
+                break;
+            case '/':
+                sb.append('\\');
+                sb.append(c);
+                break;
+            case '\b':
+                sb.append("\\b");
+                break;
+            case '\t':
+                sb.append("\\t");
+                break;
+            case '\n':
+                sb.append("\\n");
+                break;
+            case '\f':
+                sb.append("\\f");
+                break;
+            case '\r':
+                sb.append("\\r");
+                break;
+            default:
+                if (c < ' ') {
+                    t = "000" + Integer.toHexString(c);
+                    sb.append("\\u" + t.substring(t.length() - 4));
+                } else {
+                    sb.append(c);
+                }
+            }
+        }
+        sb.append('"');
+        return sb.toString();
+    }
+
+    public String getSrcKey() {
+      return srcKey;
+    }
+
+    public String getDstKey() {
+      return dstKey;
+    }
+
+    public FileMetadata getSourceMetadata() throws IOException {
+      return fs.getStoreInterface().retrieveMetadata(srcKey);
+    }
+
+    /**
+     * Execute a folder rename. This is the execution path followed
+     * when everything is working normally. See redo() for the alternate
+     * execution path for the case where we're recovering from a folder rename
+     * failure.
+     * @throws IOException
+     */
+    public void execute() throws IOException {
+
+      for (FileMetadata file : this.getFiles()) {
+
+        // Rename all materialized entries under the folder to point to the
+        // final destination.
+        if (file.getBlobMaterialization() == BlobMaterialization.Explicit) {
+          String srcName = file.getKey();
+          String suffix  = srcName.substring((this.getSrcKey()).length());
+          String dstName = this.getDstKey() + suffix;
+
+          // Rename gets exclusive access (via a lease) for files
+          // designated for atomic rename.
+          // The main use case is for HBase write-ahead log (WAL) and data
+          // folder processing correctness.  See the rename code for details.
+          boolean acquireLease = fs.getStoreInterface().isAtomicRenameKey(srcName);
+          fs.getStoreInterface().rename(srcName, dstName, acquireLease, null);
+        }
+      }
+
+      // Rename the source folder 0-byte root file itself.
+      FileMetadata srcMetadata2 = this.getSourceMetadata();
+      if (srcMetadata2.getBlobMaterialization() ==
+          BlobMaterialization.Explicit) {
+
+        // It already has a lease on it from the "prepare" phase so there's no
+        // need to get one now. Pass in existing lease to allow file delete.
+        fs.getStoreInterface().rename(this.getSrcKey(), this.getDstKey(),
+            false, folderLease);
+      }
+
+      // Update the last-modified time of the parent folders of both source and
+      // destination.
+      fs.updateParentFolderLastModifiedTime(srcKey);
+      fs.updateParentFolderLastModifiedTime(dstKey);
+    }
+
+    /** Clean up after execution of rename.
+     * @throws IOException */
+    public void cleanup() throws IOException {
+
+      if (fs.getStoreInterface().isAtomicRenameKey(srcKey)) {
+
+        // Remove RenamePending file
+        fs.delete(getRenamePendingFilePath(), false);
+
+        // Freeing source folder lease is not necessary since the source
+        // folder file was deleted.
+      }
+    }
+
+    private Path getRenamePendingFilePath() {
+      String fileName = srcKey + SUFFIX;
+      Path fileNamePath = keyToPath(fileName);
+      Path path = fs.makeAbsolute(fileNamePath);
+      return path;
+    }
+
+    /**
+     * Recover from a folder rename failure by redoing the intended work,
+     * as recorded in the -RenamePending.json file.
+     * 
+     * @throws IOException
+     */
+    public void redo() throws IOException {
+
+      if (!committed) {
+
+        // Nothing to do. The -RedoPending.json file should have already been
+        // deleted.
+        return;
+      }
+
+      // Try to get a lease on source folder to block concurrent access to it.
+      // It may fail if the folder is already gone. We don't check if the
+      // source exists explicitly because that could recursively trigger redo
+      // and give an infinite recursion.
+      SelfRenewingLease lease = null;
+      boolean sourceFolderGone = false;
+      try {
+        lease = fs.leaseSourceFolder(srcKey);
+      } catch (AzureException e) {
+
+        // If the source folder was not found then somebody probably
+        // raced with us and finished the rename first, or the
+        // first rename failed right before deleting the rename pending
+        // file.
+        String errorCode = "";
+        try {
+          StorageException se = (StorageException) e.getCause();
+          errorCode = se.getErrorCode();
+        } catch (Exception e2) {
+          ; // do nothing -- could not get errorCode
+        }
+        if (errorCode.equals("BlobNotFound")) {
+          sourceFolderGone = true;
+        } else {
+          throw new IOException(
+              "Unexpected error when trying to lease source folder name during "
+              + "folder rename redo",
+              e);
+        }
+      }
+
+      if (!sourceFolderGone) {
+        // Make sure the target folder exists.
+        Path dst = fullPath(dstKey);
+        if (!fs.exists(dst)) {
+          fs.mkdirs(dst);
+        }
+
+        // For each file inside the folder to be renamed,
+        // make sure it has been renamed.
+        for(String fileName : fileStrings) {
+          finishSingleFileRename(fileName);
+        }
+
+        // Remove the source folder. Don't check explicitly if it exists,
+        // to avoid triggering redo recursively.
+        try {
+          fs.getStoreInterface().delete(srcKey, lease);
+        } catch (Exception e) {
+          LOG.info("Unable to delete source folder during folder rename redo. "
+              + "If the source folder is already gone, this is not an error "
+              + "condition. Continuing with redo.", e);
+        }
+
+        // Update the last-modified time of the parent folders of both source
+        // and destination.
+        fs.updateParentFolderLastModifiedTime(srcKey);
+        fs.updateParentFolderLastModifiedTime(dstKey);
+      }
+
+      // Remove the -RenamePending.json file.
+      fs.delete(getRenamePendingFilePath(), false);
+    }
+
+    // See if the source file is still there, and if it is, rename it.
+    private void finishSingleFileRename(String fileName)
+        throws IOException {
+      Path srcFile = fullPath(srcKey, fileName);
+      Path dstFile = fullPath(dstKey, fileName);
+      boolean srcExists = fs.exists(srcFile);
+      boolean dstExists = fs.exists(dstFile);
+      if (srcExists && !dstExists) {
+
+        // Rename gets exclusive access (via a lease) for HBase write-ahead log
+        // (WAL) file processing correctness.  See the rename code for details.
+        String srcName = fs.pathToKey(srcFile);
+        String dstName = fs.pathToKey(dstFile);
+        fs.getStoreInterface().rename(srcName, dstName, true, null);
+      } else if (srcExists && dstExists) {
+
+        // Get a lease on source to block write access.
+        String srcName = fs.pathToKey(srcFile);
+        SelfRenewingLease lease = fs.acquireLease(srcFile);
+
+        // Delete the file. This will free the lease too.
+        fs.getStoreInterface().delete(srcName, lease);
+      } else if (!srcExists && dstExists) {
+
+        // The rename already finished, so do nothing.
+        ;
+      } else {
+        throw new IOException(
+            "Attempting to complete rename of file " + srcKey + "/" + fileName
+            + " during folder rename redo, and file was not found in source "
+            + "or destination.");
+      }
+    }
+
+    // Return an absolute path for the specific fileName within the folder
+    // specified by folderKey.
+    private Path fullPath(String folderKey, String fileName) {
+      return new Path(new Path(fs.getUri()), "/" + folderKey + "/" + fileName);
+    }
+
+    private Path fullPath(String fileKey) {
+      return new Path(new Path(fs.getUri()), "/" + fileKey);
+    }
+  }
+
+  private static final String TRAILING_PERIOD_PLACEHOLDER = "[[.]]";
+  private static final Pattern TRAILING_PERIOD_PLACEHOLDER_PATTERN =
+      Pattern.compile("\\[\\[\\.\\]\\](?=$|/)");
+  private static final Pattern TRAILING_PERIOD_PATTERN = Pattern.compile("\\.(?=$|/)");
 
   @Override
   public String getScheme() {
@@ -121,17 +632,53 @@ public class NativeAzureFileSystem extends FileSystem {
    */
   static final String AZURE_DEFAULT_GROUP_DEFAULT = "supergroup";
 
-  static final String AZURE_BLOCK_LOCATION_HOST_PROPERTY_NAME = "fs.azure.block.location.impersonatedhost";
-  private static final String AZURE_BLOCK_LOCATION_HOST_DEFAULT = "localhost";
+  static final String AZURE_BLOCK_LOCATION_HOST_PROPERTY_NAME =
+      "fs.azure.block.location.impersonatedhost";
+  private static final String AZURE_BLOCK_LOCATION_HOST_DEFAULT =
+      "localhost";
+  static final String AZURE_RINGBUFFER_CAPACITY_PROPERTY_NAME =
+      "fs.azure.ring.buffer.capacity";
+  static final String AZURE_OUTPUT_STREAM_BUFFER_SIZE_PROPERTY_NAME =
+      "fs.azure.output.stream.buffer.size";
 
   private class NativeAzureFsInputStream extends FSInputStream {
     private InputStream in;
     private final String key;
     private long pos = 0;
+    private boolean closed = false;
+    private boolean isPageBlob;
 
-    public NativeAzureFsInputStream(DataInputStream in, String key) {
+    // File length, valid only for streams over block blobs.
+    private long fileLength;
+
+    public NativeAzureFsInputStream(DataInputStream in, String key, long fileLength) {
       this.in = in;
       this.key = key;
+      this.isPageBlob = store.isPageBlobKey(key);
+      this.fileLength = fileLength;
+    }
+
+    /**
+     * Return the size of the remaining available bytes
+     * if the size is less than or equal to {@link Integer#MAX_VALUE},
+     * otherwise, return {@link Integer#MAX_VALUE}.
+     *
+     * This is to match the behavior of DFSInputStream.available(),
+     * which some clients may rely on (HBase write-ahead log reading in
+     * particular).
+     */
+    @Override
+    public synchronized int available() throws IOException {
+      if (isPageBlob) {
+        return in.available();
+      } else {
+        if (closed) {
+          throw new IOException("Stream closed");
+        }
+        final long remaining = this.fileLength - pos;
+        return remaining <= Integer.MAX_VALUE ?
+            (int) remaining : Integer.MAX_VALUE;
+      }
     }
 
     /*
@@ -140,7 +687,7 @@ public class NativeAzureFileSystem extends FileSystem {
      * because the end of the stream has been reached, the value -1 is returned.
      * This method blocks until input data is available, the end of the stream
      * is detected, or an exception is thrown.
-     * 
+     *
      * @returns int An integer corresponding to the byte read.
      */
     @Override
@@ -169,13 +716,13 @@ public class NativeAzureFileSystem extends FileSystem {
      * one byte. If no byte is available because the stream is at end of file,
      * the value -1 is returned; otherwise, at least one byte is read and stored
      * into b.
-     * 
+     *
      * @param b -- the buffer into which data is read
-     * 
+     *
      * @param off -- the start offset in the array b at which data is written
-     * 
+     *
      * @param len -- the maximum number of bytes read
-     * 
+     *
      * @ returns int The total number of byes read into the buffer, or -1 if
      * there is no more data because the end of stream is reached.
      */
@@ -196,15 +743,20 @@ public class NativeAzureFileSystem extends FileSystem {
     }
 
     @Override
-    public synchronized void close() throws IOException {
+    public void close() throws IOException {
       in.close();
+      closed = true;
     }
 
     @Override
     public synchronized void seek(long pos) throws IOException {
-      in.close();
-      in = store.retrieve(key, pos);
-      this.pos = pos;
+     in.close();
+     in = store.retrieve(key);
+     this.pos = in.skip(pos);
+     if (LOG.isDebugEnabled()) {
+       LOG.debug(String.format("Seek to position %d. Bytes skipped %d", pos,
+         this.pos));
+     }
     }
 
     @Override
@@ -468,7 +1020,8 @@ public class NativeAzureFileSystem extends FileSystem {
   }
 
   @Override
-  public void initialize(URI uri, Configuration conf) throws IOException {
+  public void initialize(URI uri, Configuration conf)
+      throws IOException, IllegalArgumentException {
     // Check authority for the URI to guarantee that it is non-null.
     uri = reconstructAuthorityIfNeeded(uri, conf);
     if (null == uri.getAuthority()) {
@@ -514,10 +1067,29 @@ public class NativeAzureFileSystem extends FileSystem {
     return actualStore;
   }
 
-  // Note: The logic for this method is confusing as to whether it strips the
-  // last slash or not (it adds it in the beginning, then strips it at the end).
-  // We should revisit that.
-  private String pathToKey(Path path) {
+  /**
+   * Azure Storage doesn't allow the blob names to end in a period,
+   * so encode this here to work around that limitation.
+   */
+  private static String encodeTrailingPeriod(String toEncode) {
+    Matcher matcher = TRAILING_PERIOD_PATTERN.matcher(toEncode);
+    return matcher.replaceAll(TRAILING_PERIOD_PLACEHOLDER);
+  }
+
+  /**
+   * Reverse the encoding done by encodeTrailingPeriod().
+   */
+  private static String decodeTrailingPeriod(String toDecode) {
+    Matcher matcher = TRAILING_PERIOD_PLACEHOLDER_PATTERN.matcher(toDecode);
+    return matcher.replaceAll(".");
+  }
+
+  /**
+   * Convert the path to a key. By convention, any leading or trailing slash is
+   * removed, except for the special case of a single slash.
+   */
+  @VisibleForTesting
+  public String pathToKey(Path path) {
     // Convert the path to a URI to parse the scheme, the authority, and the
     // path from the path object.
     URI tmpUri = path.toUri();
@@ -537,6 +1109,8 @@ public class NativeAzureFileSystem extends FileSystem {
 
     String key = null;
     key = newPath.toUri().getPath();
+    key = removeTrailingSlash(key);
+    key = encodeTrailingPeriod(key);
     if (key.length() == 1) {
       return key;
     } else {
@@ -544,14 +1118,34 @@ public class NativeAzureFileSystem extends FileSystem {
     }
   }
 
+  // Remove any trailing slash except for the case of a single slash.
+  private static String removeTrailingSlash(String key) {
+    if (key.length() == 0 || key.length() == 1) {
+      return key;
+    }
+    if (key.charAt(key.length() - 1) == '/') {
+      return key.substring(0, key.length() - 1);
+    } else {
+      return key;
+    }
+  }
+
   private static Path keyToPath(String key) {
     if (key.equals("/")) {
       return new Path("/"); // container
     }
-    return new Path("/" + key);
+    return new Path("/" + decodeTrailingPeriod(key));
   }
 
-  private Path makeAbsolute(Path path) {
+  /**
+   * Get the absolute version of the path (fully qualified).
+   * This is public for testing purposes.
+   *
+   * @param path
+   * @return fully qualified path
+   */
+  @VisibleForTesting
+  public Path makeAbsolute(Path path) {
     if (path.isAbsolute()) {
       return path;
     }
@@ -569,6 +1163,10 @@ public class NativeAzureFileSystem extends FileSystem {
     return actualStore;
   }
   
+  NativeFileSystemStore getStoreInterface() {
+    return store;
+  }
+
   /**
    * Gets the metrics source for this file system.
    * This is mainly here for unit testing purposes.
@@ -590,6 +1188,145 @@ public class NativeAzureFileSystem extends FileSystem {
   public FSDataOutputStream create(Path f, FsPermission permission,
       boolean overwrite, int bufferSize, short replication, long blockSize,
       Progressable progress) throws IOException {
+    return create(f, permission, overwrite, true,
+        bufferSize, replication, blockSize, progress,
+        (SelfRenewingLease) null);
+  }
+
+  /**
+   * Get a self-renewing lease on the specified file.
+   */
+  public SelfRenewingLease acquireLease(Path path) throws AzureException {
+    String fullKey = pathToKey(makeAbsolute(path));
+    return getStore().acquireLease(fullKey);
+  }
+
+  @Override
+  @SuppressWarnings("deprecation")
+  public FSDataOutputStream createNonRecursive(Path f, FsPermission permission,
+      boolean overwrite, int bufferSize, short replication, long blockSize,
+      Progressable progress) throws IOException {
+
+    Path parent = f.getParent();
+
+    // Get exclusive access to folder if this is a directory designated
+    // for atomic rename. The primary use case of for HBase write-ahead
+    // log file management.
+    SelfRenewingLease lease = null;
+    if (store.isAtomicRenameKey(pathToKey(f))) {
+      try {
+        lease = acquireLease(parent);
+      } catch (AzureException e) {
+
+        String errorCode = "";
+        try {
+          StorageException e2 = (StorageException) e.getCause();
+          errorCode = e2.getErrorCode();
+        } catch (Exception e3) {
+          // do nothing if cast fails
+        }
+        if (errorCode.equals("BlobNotFound")) {
+          throw new FileNotFoundException("Cannot create file " +
+              f.getName() + " because parent folder does not exist.");
+        }
+
+        LOG.warn("Got unexpected exception trying to get lease on "
+          + pathToKey(parent) + ". " + e.getMessage());
+        throw e;
+      }
+    }
+
+    // See if the parent folder exists. If not, throw error.
+    // The exists() check will push any pending rename operation forward,
+    // if there is one, and return false.
+    //
+    // At this point, we have exclusive access to the source folder
+    // via the lease, so we will not conflict with an active folder
+    // rename operation.
+    if (!exists(parent)) {
+      try {
+
+        // This'll let the keep-alive thread exit as soon as it wakes up.
+        lease.free();
+      } catch (Exception e) {
+        LOG.warn("Unable to free lease because: " + e.getMessage());
+      }
+      throw new FileNotFoundException("Cannot create file " +
+          f.getName() + " because parent folder does not exist.");
+    }
+
+    // Create file inside folder.
+    FSDataOutputStream out = null;
+    try {
+      out = create(f, permission, overwrite, false,
+          bufferSize, replication, blockSize, progress, lease);
+    } finally {
+      // Release exclusive access to folder.
+      try {
+        if (lease != null) {
+          lease.free();
+        }
+      } catch (Exception e) {
+        IOUtils.cleanup(LOG, out);
+        String msg = "Unable to free lease on " + parent.toUri();
+        LOG.error(msg);
+        throw new IOException(msg, e);
+      }
+    }
+    return out;
+  }
+
+  @Override
+  @SuppressWarnings("deprecation")
+  public FSDataOutputStream createNonRecursive(Path f, FsPermission permission,
+      EnumSet<CreateFlag> flags, int bufferSize, short replication, long blockSize,
+      Progressable progress) throws IOException {
+
+    // Check if file should be appended or overwritten. Assume that the file
+    // is overwritten on if the CREATE and OVERWRITE create flags are set. Note
+    // that any other combinations of create flags will result in an open new or
+    // open with append.
+    final EnumSet<CreateFlag> createflags =
+        EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE);
+    boolean overwrite = flags.containsAll(createflags);
+
+    // Delegate the create non-recursive call.
+    return this.createNonRecursive(f, permission, overwrite,
+        bufferSize, replication, blockSize, progress);
+  }
+
+  @Override
+  @SuppressWarnings("deprecation")
+  public FSDataOutputStream createNonRecursive(Path f,
+      boolean overwrite, int bufferSize, short replication, long blockSize,
+      Progressable progress) throws IOException {
+    return this.createNonRecursive(f, FsPermission.getFileDefault(),
+        overwrite, bufferSize, replication, blockSize, progress);
+  }
+
+
+  /**
+   * Create an Azure blob and return an output stream to use
+   * to write data to it.
+   *
+   * @param f
+   * @param permission
+   * @param overwrite
+   * @param createParent
+   * @param bufferSize
+   * @param replication
+   * @param blockSize
+   * @param progress
+   * @param parentFolderLease Lease on parent folder (or null if
+   * no lease).
+   * @return
+   * @throws IOException
+   */
+  private FSDataOutputStream create(Path f, FsPermission permission,
+      boolean overwrite, boolean createParent, int bufferSize,
+      short replication, long blockSize, Progressable progress,
+      SelfRenewingLease parentFolderLease)
+          throws IOException {
 
     if (LOG.isDebugEnabled()) {
       LOG.debug("Creating file: " + f.toString());
@@ -620,45 +1357,60 @@ public class NativeAzureFileSystem extends FileSystem {
       // already exists.
       String parentKey = pathToKey(parentFolder);
       FileMetadata parentMetadata = store.retrieveMetadata(parentKey);
-      if (parentMetadata != null
-          && parentMetadata.isDir()
-          && parentMetadata.getBlobMaterialization() == BlobMaterialization.Explicit) {
-        store.updateFolderLastModifiedTime(parentKey);
+      if (parentMetadata != null && parentMetadata.isDir() &&
+          parentMetadata.getBlobMaterialization() == BlobMaterialization.Explicit) {
+        store.updateFolderLastModifiedTime(parentKey, parentFolderLease);
       } else {
         // Make sure that the parent folder exists.
-        mkdirs(parentFolder, permission);
+        // Create it using inherited permissions from the first existing directory going up the path
+        Path firstExisting = parentFolder.getParent();
+        FileMetadata metadata = store.retrieveMetadata(pathToKey(firstExisting));
+        while(metadata == null) {
+          // Guaranteed to terminate properly because we will eventually hit root, which will return non-null metadata
+          firstExisting = firstExisting.getParent();
+          metadata = store.retrieveMetadata(pathToKey(firstExisting));
+        }
+        mkdirs(parentFolder, metadata.getPermissionStatus().getPermission(), true);
       }
     }
 
-    // Open the output blob stream based on the encoded key.
-    String keyEncoded = encodeKey(key);
-
     // Mask the permission first (with the default permission mask as well).
     FsPermission masked = applyUMask(permission, UMaskApplyMode.NewFile);
     PermissionStatus permissionStatus = createPermissionStatus(masked);
 
-    // First create a blob at the real key, pointing back to the temporary file
-    // This accomplishes a few things:
-    // 1. Makes sure we can create a file there.
-    // 2. Makes it visible to other concurrent threads/processes/nodes what
-    // we're
-    // doing.
-    // 3. Makes it easier to restore/cleanup data in the event of us crashing.
-    store.storeEmptyLinkFile(key, keyEncoded, permissionStatus);
-
-    // The key is encoded to point to a common container at the storage server.
-    // This reduces the number of splits on the server side when load balancing.
-    // Ingress to Azure storage can take advantage of earlier splits. We remove
-    // the root path to the key and prefix a random GUID to the tail (or leaf
-    // filename) of the key. Keys are thus broadly and randomly distributed over
-    // a single container to ease load balancing on the storage server. When the
-    // blob is committed it is renamed to its earlier key. Uncommitted blocks
-    // are not cleaned up and we leave it to Azure storage to garbage collect
-    // these
-    // blocks.
-    OutputStream bufOutStream = new NativeAzureFsOutputStream(store.storefile(
-        keyEncoded, permissionStatus), key, keyEncoded);
-
+    OutputStream bufOutStream;
+    if (store.isPageBlobKey(key)) {
+      // Store page blobs directly in-place without renames.
+      bufOutStream = store.storefile(key, permissionStatus);
+    } else {
+      // This is a block blob, so open the output blob stream based on the
+      // encoded key.
+      //
+      String keyEncoded = encodeKey(key);
+
+
+      // First create a blob at the real key, pointing back to the temporary file
+      // This accomplishes a few things:
+      // 1. Makes sure we can create a file there.
+      // 2. Makes it visible to other concurrent threads/processes/nodes what
+      // we're
+      // doing.
+      // 3. Makes it easier to restore/cleanup data in the event of us crashing.
+      store.storeEmptyLinkFile(key, keyEncoded, permissionStatus);
+
+      // The key is encoded to point to a common container at the storage server.
+      // This reduces the number of splits on the server side when load balancing.
+      // Ingress to Azure storage can take advantage of earlier splits. We remove
+      // the root path to the key and prefix a random GUID to the tail (or leaf
+      // filename) of the key. Keys are thus broadly and randomly distributed over
+      // a single container to ease load balancing on the storage server. When the
+      // blob is committed it is renamed to its earlier key. Uncommitted blocks
+      // are not cleaned up and we leave it to Azure storage to garbage collect
+      // these
+      // blocks.
+      bufOutStream = new NativeAzureFsOutputStream(store.storefile(
+          keyEncoded, permissionStatus), key, keyEncoded);
+    }
     // Construct the data output stream from the buffered output stream.
     FSDataOutputStream fsOut = new FSDataOutputStream(bufOutStream, statistics);
 
@@ -678,6 +1430,28 @@ public class NativeAzureFileSystem extends FileSystem {
 
   @Override
   public boolean delete(Path f, boolean recursive) throws IOException {
+    return delete(f, recursive, false);
+  }
+
+  /**
+   * Delete the specified file or folder. The parameter
+   * skipParentFolderLastModifidedTimeUpdate
+   * is used in the case of atomic folder rename redo. In that case, there is
+   * a lease on the parent folder, so (without reworking the code) modifying
+   * the parent folder update time will fail because of a conflict with the
+   * lease. Since we are going to delete the folder soon anyway so accurate
+   * modified time is not necessary, it's easier to just skip
+   * the modified time update.
+   *
+   * @param f
+   * @param recursive
+   * @param skipParentFolderLastModifidedTimeUpdate If true, don't update the folder last
+   * modified time.
+   * @return true if and only if the file is deleted
+   * @throws IOException
+   */
+  public boolean delete(Path f, boolean recursive,
+      boolean skipParentFolderLastModifidedTimeUpdate) throws IOException {
 
     if (LOG.isDebugEnabled()) {
       LOG.debug("Deleting file: " + f.toString());
@@ -723,11 +1497,13 @@ public class NativeAzureFileSystem extends FileSystem {
           store.storeEmptyFolder(parentKey,
               createPermissionStatus(FsPermission.getDefault()));
         } else {
-          store.updateFolderLastModifiedTime(parentKey);
+          if (!skipParentFolderLastModifidedTimeUpdate) {
+            store.updateFolderLastModifiedTime(parentKey, null);
+          }
         }
       }
-      instrumentation.fileDeleted();
       store.delete(key);
+      instrumentation.fileDeleted();
     } else {
       // The path specifies a folder. Recursively delete all entries under the
       // folder.
@@ -784,7 +1560,9 @@ public class NativeAzureFileSystem extends FileSystem {
       Path parent = absolutePath.getParent();
       if (parent != null && parent.getParent() != null) { // not root
         String parentKey = pathToKey(parent);
-        store.updateFolderLastModifiedTime(parentKey);
+        if (!skipParentFolderLastModifidedTimeUpdate) {
+          store.updateFolderLastModifiedTime(parentKey, null);
+        }
       }
       instrumentation.directoryDeleted();
     }
@@ -818,6 +1596,13 @@ public class NativeAzureFileSystem extends FileSystem {
           LOG.debug("Path " + f.toString() + "is a folder.");
         }
 
+        // If a rename operation for the folder was pending, redo it.
+        // Then the file does not exist, so signal that.
+        if (conditionalRedoFolderRename(f)) {
+          throw new FileNotFoundException(
+              absolutePath + ": No such file or directory.");
+        }
+
         // Return reference to the directory object.
         return newDirectory(meta, absolutePath);
       }
@@ -832,9 +1617,38 @@ public class NativeAzureFileSystem extends FileSystem {
     }
 
     // File not found. Throw exception no such file or directory.
-    // Note: Should never get to this point since the root always exists.
-    throw new FileNotFoundException(absolutePath
-        + ": No such file or directory.");
+    //
+    throw new FileNotFoundException(
+        absolutePath + ": No such file or directory.");
+  }
+
+  // Return true if there is a rename pending and we redo it, otherwise false.
+  private boolean conditionalRedoFolderRename(Path f) throws IOException {
+
+    // Can't rename /, so return immediately in that case.
+    if (f.getName().equals("")) {
+      return false;
+    }
+
+    // Check if there is a -RenamePending.json file for this folder, and if so,
+    // redo the rename.
+    Path absoluteRenamePendingFile = renamePendingFilePath(f);
+    if (exists(absoluteRenamePendingFile)) {
+      FolderRenamePending pending =
+          new FolderRenamePending(absoluteRenamePendingFile, this);
+      pending.redo();
+      return true;
+    } else {
+      return false;
+    }
+  }
+
+  // Return the path name that would be used for rename of folder with path f.
+  private Path renamePendingFilePath(Path f) {
+    Path absPath = makeAbsolute(f);
+    String key = pathToKey(absPath);
+    key += "-RenamePending.json";
+    return keyToPath(key);
   }
 
   @Override
@@ -867,6 +1681,17 @@ public class NativeAzureFileSystem extends FileSystem {
       }
       String partialKey = null;
       PartialListing listing = store.list(key, AZURE_LIST_ALL, 1, partialKey);
+
+      // For any -RenamePending.json files in the listing,
+      // push the rename forward.
+      boolean renamed = conditionalRedoFolderRenames(listing);
+
+      // If any renames were redone, get another listing,
+      // since the current one may have changed due to the redo.
+      if (renamed) {
+        listing = store.list(key, AZURE_LIST_ALL, 1, partialKey);
+      }
+
       for (FileMetadata fileMetadata : listing.getFiles()) {
         Path subpath = keyToPath(fileMetadata.getKey());
 
@@ -903,25 +1728,62 @@ public class NativeAzureFileSystem extends FileSystem {
     return status.toArray(new FileStatus[0]);
   }
 
+  // Redo any folder renames needed if there are rename pending files in the
+  // directory listing. Return true if one or more redo operations were done.
+  private boolean conditionalRedoFolderRenames(PartialListing listing)
+      throws IllegalArgumentException, IOException {
+    boolean renamed = false;
+    for (FileMetadata fileMetadata : listing.getFiles()) {
+      Path subpath = keyToPath(fileMetadata.getKey());
+      if (isRenamePendingFile(subpath)) {
+        FolderRenamePending pending =
+            new FolderRenamePending(subpath, this);
+        pending.redo();
+        renamed = true;
+      }
+    }
+    return renamed;
+  }
+
+  // True if this is a folder rename pending file, else false.
+  private boolean isRenamePendingFile(Path path) {
+    return path.toString().endsWith(FolderRenamePending.SUFFIX);
+  }
+
   private FileStatus newFile(FileMetadata meta, Path path) {
-    return new FileStatus(meta.getLength(), false, 1, blockSize,
-        meta.getLastModified(), 0, meta.getPermissionStatus().getPermission(),
-        meta.getPermissionStatus().getUserName(), meta.getPermissionStatus()
-            .getGroupName(),
+    return new FileStatus (
+        meta.getLength(),
+        false,
+        1,
+        blockSize,
+        meta.getLastModified(),
+        0,
+        meta.getPermissionStatus().getPermission(),
+        meta.getPermissionStatus().getUserName(),
+        meta.getPermissionStatus().getGroupName(),
         path.makeQualified(getUri(), getWorkingDirectory()));
   }
 
   private FileStatus newDirectory(FileMetadata meta, Path path) {
-    return new FileStatus(0, true, 1, blockSize, meta == null ? 0
-        : meta.getLastModified(), 0, meta == null ? FsPermission.getDefault()
-        : meta.getPermissionStatus().getPermission(), meta == null ? "" : meta
-        .getPermissionStatus().getUserName(), meta == null ? "" : meta
-        .getPermissionStatus().getGroupName(), path.makeQualified(getUri(),
-        getWorkingDirectory()));
+    return new FileStatus (
+        0,
+        true,
+        1,
+        blockSize,
+        meta == null ? 0 : meta.getLastModified(),
+        0,
+        meta == null ? FsPermission.getDefault() : meta.getPermissionStatus().getPermission(),
+        meta == null ? "" : meta.getPermissionStatus().getUserName(),
+        meta == null ? "" : meta.getPermissionStatus().getGroupName(),
+        path.makeQualified(getUri(), getWorkingDirectory()));
   }
 
   private static enum UMaskApplyMode {
-    NewFile, NewDirectory, ChangeExistingFile, ChangeExistingDirectory,
+    NewFile,
+    NewDirectory,
+    NewDirectoryNoUmask,
+    ChangeExistingFile,
+    ChangeExistingDirectory,
   }
 
   /**
@@ -958,13 +1820,19 @@ public class NativeAzureFileSystem extends FileSystem {
   private PermissionStatus createPermissionStatus(FsPermission permission)
       throws IOException {
     // Create the permission status for this file based on current user
-    return new PermissionStatus(UserGroupInformation.getCurrentUser()
-        .getShortUserName(), getConf().get(AZURE_DEFAULT_GROUP_PROPERTY_NAME,
-        AZURE_DEFAULT_GROUP_DEFAULT), permission);
+    return new PermissionStatus(
+        UserGroupInformation.getCurrentUser().getShortUserName(),
+        getConf().get(AZURE_DEFAULT_GROUP_PROPERTY_NAME,
+            AZURE_DEFAULT_GROUP_DEFAULT),
+        permission);
   }
 
   @Override
   public boolean mkdirs(Path f, FsPermission permission) throws IOException {
+      return mkdirs(f, permission, false);
+  }
+
+  public boolean mkdirs(Path f, FsPermission permission, boolean noUmask) throws IOException {
     if (LOG.isDebugEnabled()) {
       LOG.debug("Creating directory: " + f.toString());
     }
@@ -975,24 +1843,31 @@ public class NativeAzureFileSystem extends FileSystem {
     }
 
     Path absolutePath = makeAbsolute(f);
-    PermissionStatus permissionStatus = createPermissionStatus(applyUMask(
-        permission, UMaskApplyMode.NewDirectory));
+    PermissionStatus permissionStatus = null;
+    if(noUmask) {
+      // ensure owner still has wx permissions at the minimum
+      permissionStatus = createPermissionStatus(
+          applyUMask(FsPermission.createImmutable((short) (permission.toShort() | USER_WX_PERMISION)),
+              UMaskApplyMode.NewDirectoryNoUmask));
+    } else {
+      permissionStatus = createPermissionStatus(
+          applyUMask(permission, UMaskApplyMode.NewDirectory));
+    }
+
 
     ArrayList<String> keysToCreateAsFolder = new ArrayList<String>();
     ArrayList<String> keysToUpdateAsFolder = new ArrayList<String>();
     boolean childCreated = false;
     // Check that there is no file in the parent chain of the given path.
-    // Stop when you get to the root
-    for (Path current = absolutePath, parent = current.getParent(); parent != null; current = parent, parent = current
-        .getParent()) {
+    for (Path current = absolutePath, parent = current.getParent();
+        parent != null; // Stop when you get to the root
+        current = parent, parent = current.getParent()) {
       String currentKey = pathToKey(current);
       FileMetadata currentMetadata = store.retrieveMetadata(currentKey);
       if (currentMetadata != null && !currentMetadata.isDir()) {
-        throw new IOException("Cannot create directory " + f + " because "
-            + current + " is an existing file.");
-      } else if (currentMetadata == null
-          || (currentMetadata.isDir() && currentMetadata
-              .getBlobMaterialization() == BlobMaterialization.Implicit)) {
+        throw new IOException("Cannot create directory " + f + " because " +
+            current + " is an existing file.");
+      } else if (currentMetadata == null) {
         keysToCreateAsFolder.add(currentKey);
         childCreated = true;
       } else {
@@ -1009,18 +1884,8 @@ public class NativeAzureFileSystem extends FileSystem {
       store.storeEmptyFolder(currentKey, permissionStatus);
     }
 
-    // Take the time after finishing mkdirs as the modified time, and update all
-    // the existing directories' modified time to it uniformly.
-    final Calendar lastModifiedCalendar = Calendar
-        .getInstance(Utility.LOCALE_US);
-    lastModifiedCalendar.setTimeZone(Utility.UTC_ZONE);
-    Date lastModified = lastModifiedCalendar.getTime();
-    for (String key : keysToUpdateAsFolder) {
-      store.updateFolderLastModifiedTime(key, lastModified);
-    }
-
     instrumentation.directoryCreated();
-    
+
     // otherwise throws exception
     return true;
   }
@@ -1043,12 +1908,14 @@ public class NativeAzureFileSystem extends FileSystem {
     }
 
     return new FSDataInputStream(new BufferedFSInputStream(
-        new NativeAzureFsInputStream(store.retrieve(key), key), bufferSize));
+        new NativeAzureFsInputStream(store.retrieve(key), key, meta.getLength()), bufferSize));
   }
 
   @Override
   public boolean rename(Path src, Path dst) throws IOException {
 
+    FolderRenamePending renamePending = null;
+
     if (LOG.isDebugEnabled()) {
       LOG.debug("Moving " + src + " to " + dst);
     }
@@ -1065,91 +1932,28 @@ public class NativeAzureFileSystem extends FileSystem {
       return false;
     }
 
-    FileMetadata srcMetadata = store.retrieveMetadata(srcKey);
-    if (srcMetadata == null) {
-      // Source doesn't exist
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Source " + src + " doesn't exist, failing the rename.");
-      }
-      return false;
-    }
-
     // Figure out the final destination
     Path absoluteDst = makeAbsolute(dst);
     String dstKey = pathToKey(absoluteDst);
     FileMetadata dstMetadata = store.retrieveMetadata(dstKey);
-
-    // directory rename validations
-    if (srcMetadata.isDir()) {
-
-      // rename dir to self is an error
-      if (srcKey.equals(dstKey)) {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Renaming directory to itself is disallowed. path=" + src);
-        }
-        return false;
-      }
-
-      // rename dir to (sub-)child of self is an error. see
-      // FileSystemContractBaseTest.testRenameChildDirForbidden
-      if (dstKey.startsWith(srcKey + PATH_DELIMITER)) {
-
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Renaming directory to itself is disallowed. src=" + src
-              + " dest=" + dst);
-        }
-        return false;
-      }
-    }
-
-    // file rename early checks
-    if (!srcMetadata.isDir()) {
-      if (srcKey.equals(dstKey)) {
-        // rename file to self is OK
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Renaming file to itself. This is allowed and is treated as no-op. path="
-              + src);
-        }
-        return true;
-      }
-    }
-
-    // More validations..
-    // If target is dir but target already exists, alter the dst to be a
-    // subfolder.
-    // eg move("/a/file.txt", "/b") where "/b" already exists causes the target
-    // to be "/c/file.txt
     if (dstMetadata != null && dstMetadata.isDir()) {
+      // It's an existing directory.
       dstKey = pathToKey(makeAbsolute(new Path(dst, src.getName())));
-      // Best would be to update dstMetadata, but it is not used further, so set
-      // it to null and skip the additional cost
-      dstMetadata = null;
-      // dstMetadata = store.retrieveMetadata(dstKey);
       if (LOG.isDebugEnabled()) {
         LOG.debug("Destination " + dst
             + " is a directory, adjusted the destination to be " + dstKey);
       }
-
-      // rename dir to self is an error
-      if (srcKey.equals(dstKey)) {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Renaming directory to itself is disallowed. path=" + src);
-        }
-        return false;
-      }
-
     } else if (dstMetadata != null) {
-      // Otherwise, attempting to overwrite a file is error
+      // Attempting to overwrite a file using rename()
       if (LOG.isDebugEnabled()) {
         LOG.debug("Destination " + dst
             + " is an already existing file, failing the rename.");
       }
       return false;
     } else {
-      // Either dir or file and target doesn't exist.. Check that the parent
-      // directory exists.
-      FileMetadata parentOfDestMetadata = store
-          .retrieveMetadata(pathToKey(absoluteDst.getParent()));
+      // Check that the parent directory exists.
+      FileMetadata parentOfDestMetadata =
+          store.retrieveMetadata(pathToKey(absoluteDst.getParent()));
       if (parentOfDestMetadata == null) {
         if (LOG.isDebugEnabled()) {
           LOG.debug("Parent of the destination " + dst
@@ -1164,88 +1968,136 @@ public class NativeAzureFileSystem extends FileSystem {
         return false;
       }
     }
-
-    // Validations complete, do the move.
-    if (!srcMetadata.isDir()) {
+    FileMetadata srcMetadata = store.retrieveMetadata(srcKey);
+    if (srcMetadata == null) {
+      // Source doesn't exist
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Source " + src + " doesn't exist, failing the rename.");
+      }
+      return false;
+    } else if (!srcMetadata.isDir()) {
       if (LOG.isDebugEnabled()) {
         LOG.debug("Source " + src + " found as a file, renaming.");
       }
       store.rename(srcKey, dstKey);
     } else {
-      // Move everything inside the folder.
-      String priorLastKey = null;
 
-      // Calculate the index of the part of the string to be moved. That
-      // is everything on the path up to the folder name.
-      do {
-        // List all blobs rooted at the source folder.
-        PartialListing listing = store.listAll(srcKey, AZURE_LIST_ALL,
-            AZURE_UNBOUNDED_DEPTH, priorLastKey);
-
-        // Rename all the files in the folder.
-        for (FileMetadata file : listing.getFiles()) {
-          // Rename all materialized entries under the folder to point to the
-          // final destination.
-          if (file.getBlobMaterialization() == BlobMaterialization.Explicit) {
-            String srcName = file.getKey();
-            String suffix = srcName.substring(srcKey.length());
-            String dstName = dstKey + suffix;
-            store.rename(srcName, dstName);
-          }
-        }
-        priorLastKey = listing.getPriorLastKey();
-      } while (priorLastKey != null);
-      // Rename the top level empty blob for the folder.
-      if (srcMetadata.getBlobMaterialization() == BlobMaterialization.Explicit) {
-        store.rename(srcKey, dstKey);
+      // Prepare for, execute and clean up after of all files in folder, and
+      // the root file, and update the last modified time of the source and
+      // target parent folders. The operation can be redone if it fails part
+      // way through, by applying the "Rename Pending" file.
+
+      // The following code (internally) only does atomic rename preparation
+      // and lease management for page blob folders, limiting the scope of the
+      // operation to HBase log file folders, where atomic rename is required.
+      // In the future, we could generalize it easily to all folders.
+      renamePending = prepareAtomicFolderRename(srcKey, dstKey);
+      renamePending.execute();
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Renamed " + src + " to " + dst + " successfully.");
       }
+      renamePending.cleanup();
+      return true;
     }
 
-    // Update both source and destination parent folder last modified time.
-    Path srcParent = makeAbsolute(keyToPath(srcKey)).getParent();
-    if (srcParent != null && srcParent.getParent() != null) { // not root
-      String srcParentKey = pathToKey(srcParent);
+    // Update the last-modified time of the parent folders of both source
+    // and destination.
+    updateParentFolderLastModifiedTime(srcKey);
+    updateParentFolderLastModifiedTime(dstKey);
 
-      // ensure the srcParent is a materialized folder
-      FileMetadata srcParentMetadata = store.retrieveMetadata(srcParentKey);
-      if (srcParentMetadata.isDir()
-          && srcParentMetadata.getBlobMaterialization() == BlobMaterialization.Implicit) {
-        store.storeEmptyFolder(srcParentKey,
-            createPermissionStatus(FsPermission.getDefault()));
-      }
-
-      store.updateFolderLastModifiedTime(srcParentKey);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Renamed " + src + " to " + dst + " successfully.");
     }
+    return true;
+  }
 
-    Path destParent = makeAbsolute(keyToPath(dstKey)).getParent();
-    if (destParent != null && destParent.getParent() != null) { // not root
-      String dstParentKey = pathToKey(destParent);
+  /**
+   * Update the last-modified time of the parent folder of the file
+   * identified by key.
+   * @param key
+   * @throws IOException
+   */
+  private void updateParentFolderLastModifiedTime(String key)
+      throws IOException {
+    Path parent = makeAbsolute(keyToPath(key)).getParent();
+    if (parent != null && parent.getParent() != null) { // not root
+      String parentKey = pathToKey(parent);
 
-      // ensure the dstParent is a materialized folder
-      FileMetadata dstParentMetadata = store.retrieveMetadata(dstParentKey);
-      if (dstParentMetadata.isDir()
-          && dstParentMetadata.getBlobMaterialization() == BlobMaterialization.Implicit) {
-        store.storeEmptyFolder(dstParentKey,
-            createPermissionStatus(FsPermission.getDefault()));
-      }
+      // ensure the parent is a materialized folder
+      FileMetadata parentMetadata = store.retrieveMetadata(parentKey);
+      // The metadata could be null if the implicit folder only contains a
+      // single file. In this case, the parent folder no longer exists if the
+      // file is renamed; so we can safely ignore the null pointer case.
+      if (parentMetadata != null) {
+        if (parentMetadata.isDir()
+            && parentMetadata.getBlobMaterialization() == BlobMaterialization.Implicit) {
+          store.storeEmptyFolder(parentKey,
+              createPermissionStatus(FsPermission.getDefault()));
+        }
 
-      store.updateFolderLastModifiedTime(dstParentKey);
+        store.updateFolderLastModifiedTime(parentKey, null);
+      }
     }
+  }
 
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Renamed " + src + " to " + dst + " successfully.");
+  /**
+   * If the source is a page blob folder,
+   * prepare to rename this folder atomically. This means to get exclusive
+   * access to the source folder, and record the actions to be performed for
+   * this rename in a "Rename Pending" file. This code was designed to
+   * meet the needs of HBase, which requires atomic rename of write-ahead log
+   * (WAL) folders for correctness.
+   *
+   * Before calling this method, the caller must ensure that the source is a
+   * folder.
+   *
+   * For non-page-blob directories, prepare the in-memory information needed,
+   * but don't take the lease or write the redo file. This is done to limit the
+   * scope of atomic folder rename to HBase, at least at the time of writing
+   * this code.
+   *
+   * @param srcKey Source folder name.
+   * @param dstKey Destination folder name.
+   * @throws IOException
+   */
+  private FolderRenamePending prepareAtomicFolderRename(
+      String srcKey, String dstKey) throws IOException {
+
+    if (store.isAtomicRenameKey(srcKey)) {
+
+      // Block unwanted concurrent access to source folder.
+      SelfRenewingLease lease = leaseSourceFolder(srcKey);
+
+      // Prepare in-memory information needed to do or redo a folder rename.
+      FolderRenamePending renamePending =
+          new FolderRenamePending(srcKey, dstKey, lease, this);
+
+      // Save it to persistent storage to help recover if the operation fails.
+      renamePending.writeFile(this);
+      return renamePending;
+    } else {
+      FolderRenamePending renamePending =
+          new FolderRenamePending(srcKey, dstKey, null, this);
+      return renamePending;
     }
-    return true;
   }
 
   /**
-   * Return an array containing hostnames, offset and size of portions of the
-   * given file. For WASB we'll just lie and give fake hosts to make sure we get
-   * many splits in MR jobs.
+   * Get a self-renewing Azure blob lease on the source folder zero-byte file.
+   */
+  private SelfRenewingLease leaseSourceFolder(String srcKey)
+      throws AzureException {
+    return store.acquireLease(srcKey);
+  }
+
+  /**
+   * Return an array containing hostnames, offset and size of
+   * portions of the given file. For WASB we'll just lie and give
+   * fake hosts to make sure we get many splits in MR jobs.
    */
   @Override
-  public BlockLocation[] getFileBlockLocations(FileStatus file, long start,
-      long len) throws IOException {
+  public BlockLocation[] getFileBlockLocations(FileStatus file,
+      long start, long len) throws IOException {
     if (file == null) {
       return null;
     }
@@ -1306,11 +2158,12 @@ public class NativeAzureFileSystem extends FileSystem {
     if (metadata.getBlobMaterialization() == BlobMaterialization.Implicit) {
       // It's an implicit folder, need to materialize it.
       store.storeEmptyFolder(key, createPermissionStatus(permission));
-    } else if (!metadata.getPermissionStatus().getPermission()
-        .equals(permission)) {
-      store.changePermissionStatus(key, new PermissionStatus(metadata
-          .getPermissionStatus().getUserName(), metadata.getPermissionStatus()
-          .getGroupName(), permission));
+    } else if (!metadata.getPermissionStatus().getPermission().
+        equals(permission)) {
+      store.changePermissionStatus(key, new PermissionStatus(
+          metadata.getPermissionStatus().getUserName(),
+          metadata.getPermissionStatus().getGroupName(),
+          permission));
     }
   }
 
@@ -1324,10 +2177,11 @@ public class NativeAzureFileSystem extends FileSystem {
       throw new FileNotFoundException("File doesn't exist: " + p);
     }
     PermissionStatus newPermissionStatus = new PermissionStatus(
-        username == null ? metadata.getPermissionStatus().getUserName()
-            : username, groupname == null ? metadata.getPermissionStatus()
-            .getGroupName() : groupname, metadata.getPermissionStatus()
-            .getPermission());
+        username == null ?
+            metadata.getPermissionStatus().getUserName() : username,
+        groupname == null ?
+            metadata.getPermissionStatus().getGroupName() : groupname,
+        metadata.getPermissionStatus().getPermission());
     if (metadata.getBlobMaterialization() == BlobMaterialization.Implicit) {
       // It's an implicit folder, need to materialize it.
       store.storeEmptyFolder(key, newPermissionStatus);
@@ -1341,12 +2195,12 @@ public class NativeAzureFileSystem extends FileSystem {
     if (isClosed) {
       return;
     }
-    
+
     // Call the base close() to close any resources there.
     super.close();
-    // Close the store
+    // Close the store to close any resources there - e.g. the bandwidth
+    // updater thread would be stopped at this time.
     store.close();
-    
     // Notify the metrics system that this file system is closed, which may
     // trigger one final metrics push to get the accurate final file system
     // metrics out.
@@ -1364,16 +2218,17 @@ public class NativeAzureFileSystem extends FileSystem {
   }
 
   /**
-   * A handler that defines what to do with blobs whose upload was interrupted.
+   * A handler that defines what to do with blobs whose upload was
+   * interrupted.
    */
   private abstract class DanglingFileHandler {
     abstract void handleFile(FileMetadata file, FileMetadata tempFile)
-        throws IOException;
+      throws IOException;
   }
 
   /**
-   * Handler implementation for just deleting dangling files and cleaning them
-   * up.
+   * Handler implementation for just deleting dangling files and cleaning
+   * them up.
    */
   private class DanglingFileDeleter extends DanglingFileHandler {
     @Override
@@ -1388,8 +2243,8 @@ public class NativeAzureFileSystem extends FileSystem {
   }
 
   /**
-   * Handler implementation for just moving dangling files to recovery location
-   * (/lost+found).
+   * Handler implementation for just moving dangling files to recovery
+   * location (/lost+found).
    */
   private class DanglingFileRecoverer extends DanglingFileHandler {
     private final Path destination;
@@ -1405,8 +2260,8 @@ public class NativeAzureFileSystem extends FileSystem {
         LOG.debug("Recovering " + file.getKey());
       }
       // Move to the final destination
-      String finalDestinationKey = pathToKey(new Path(destination,
-          file.getKey()));
+      String finalDestinationKey =
+          pathToKey(new Path(destination, file.getKey()));
       store.rename(tempFile.getKey(), finalDestinationKey);
       if (!finalDestinationKey.equals(file.getKey())) {
         // Delete the empty link file now that we've restored it.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2217e2f8/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeFileSystemStore.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeFileSystemStore.java
index 4e1d0b6..0229cb7 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeFileSystemStore.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeFileSystemStore.java
@@ -53,6 +53,10 @@ interface NativeFileSystemStore {
   DataOutputStream storefile(String key, PermissionStatus permissionStatus)
       throws AzureException;
 
+  boolean isPageBlobKey(String key);
+
+  boolean isAtomicRenameKey(String key);
+
   void storeEmptyLinkFile(String key, String tempBlobKey,
       PermissionStatus permissionStatus) throws AzureException;
 
@@ -74,9 +78,12 @@ interface NativeFileSystemStore {
 
   void rename(String srcKey, String dstKey) throws IOException;
 
+  void rename(String srcKey, String dstKey, boolean acquireLease, SelfRenewingLease existingLease)
+      throws IOException;
+
   /**
    * Delete all keys with the given prefix. Used for testing.
-   * 
+   *
    * @throws IOException
    */
   @VisibleForTesting
@@ -84,15 +91,20 @@ interface NativeFileSystemStore {
 
   /**
    * Diagnostic method to dump state to the console.
-   * 
+   *
    * @throws IOException
    */
   void dump() throws IOException;
 
   void close();
 
-  void updateFolderLastModifiedTime(String key) throws AzureException;
-
-  void updateFolderLastModifiedTime(String key, Date lastModified)
+  void updateFolderLastModifiedTime(String key, SelfRenewingLease folderLease)
       throws AzureException;
+
+  void updateFolderLastModifiedTime(String key, Date lastModified,
+      SelfRenewingLease folderLease) throws AzureException;
+
+  void delete(String key, SelfRenewingLease lease) throws IOException;
+      
+  SelfRenewingLease acquireLease(String key) throws AzureException;
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2217e2f8/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/PageBlobFormatHelpers.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/PageBlobFormatHelpers.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/PageBlobFormatHelpers.java
new file mode 100644
index 0000000..ad11aac
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/PageBlobFormatHelpers.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azure;
+
+import java.nio.ByteBuffer;
+
+import com.microsoft.windowsazure.storage.blob.BlobRequestOptions;
+
+/**
+ * Constants and helper methods for ASV's custom data format in page blobs.
+ */
+final class PageBlobFormatHelpers {
+  public static final short PAGE_SIZE = 512;
+  public static final short PAGE_HEADER_SIZE = 2;
+  public static final short PAGE_DATA_SIZE = PAGE_SIZE - PAGE_HEADER_SIZE;
+
+  // Hide constructor for utility class.
+  private PageBlobFormatHelpers() {
+    
+  }
+  
+  /**
+   * Stores the given short as a two-byte array.
+   */
+  public static byte[] fromShort(short s) {
+    return ByteBuffer.allocate(2).putShort(s).array();
+  }
+
+  /**
+   * Retrieves a short from the given two bytes.
+   */
+  public static short toShort(byte firstByte, byte secondByte) {
+    return ByteBuffer.wrap(new byte[] { firstByte, secondByte })
+        .getShort();
+  }
+
+  public static BlobRequestOptions withMD5Checking() {
+    BlobRequestOptions options = new BlobRequestOptions();
+    options.setUseTransactionalContentMD5(true);
+    return options;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2217e2f8/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/PageBlobInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/PageBlobInputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/PageBlobInputStream.java
new file mode 100644
index 0000000..62b47ee
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/PageBlobInputStream.java
@@ -0,0 +1,455 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azure;
+
+import static org.apache.hadoop.fs.azure.PageBlobFormatHelpers.PAGE_DATA_SIZE;
+import static org.apache.hadoop.fs.azure.PageBlobFormatHelpers.PAGE_HEADER_SIZE;
+import static org.apache.hadoop.fs.azure.PageBlobFormatHelpers.PAGE_SIZE;
+import static org.apache.hadoop.fs.azure.PageBlobFormatHelpers.toShort;
+import static org.apache.hadoop.fs.azure.PageBlobFormatHelpers.withMD5Checking;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.azure.StorageInterface.CloudPageBlobWrapper;
+
+import com.microsoft.windowsazure.storage.OperationContext;
+import com.microsoft.windowsazure.storage.StorageException;
+import com.microsoft.windowsazure.storage.blob.BlobRequestOptions;
+import com.microsoft.windowsazure.storage.blob.PageRange;
+
+/**
+ * An input stream that reads file data from a page blob stored
+ * using ASV's custom format.
+ */
+
+final class PageBlobInputStream extends InputStream {
+  private static final Log LOG = LogFactory.getLog(PageBlobInputStream.class);
+
+  // The blob we're reading from.
+  private final CloudPageBlobWrapper blob;
+  // The operation context to use for storage requests.
+  private final OperationContext opContext;
+  // The number of pages remaining to be read from the server.
+  private long numberOfPagesRemaining;
+  // The current byte offset to start reading from the server next,
+  // equivalent to (total number of pages we've read) * (page size).
+  private long currentOffsetInBlob;
+  // The buffer holding the current data we last read from the server.
+  private byte[] currentBuffer;
+  // The current byte offset we're at in the buffer.
+  private int currentOffsetInBuffer;
+  // Maximum number of pages to get per any one request.
+  private static final int MAX_PAGES_PER_DOWNLOAD =
+      4 * 1024 * 1024 / PAGE_SIZE;
+  // Whether the stream has been closed.
+  private boolean closed = false;
+  // Total stream size, or -1 if not initialized.
+  long pageBlobSize = -1;
+  // Current position in stream of valid data.
+  long filePosition = 0;
+
+  /**
+   * Helper method to extract the actual data size of a page blob.
+   * This typically involves 2 service requests (one for page ranges, another
+   * for the last page's data).
+   *
+   * @param blob The blob to get the size from.
+   * @param opContext The operation context to use for the requests.
+   * @return The total data size of the blob in bytes.
+   * @throws IOException If the format is corrupt.
+   * @throws StorageException If anything goes wrong in the requests.
+   */
+  public static long getPageBlobSize(CloudPageBlobWrapper blob,
+      OperationContext opContext) throws IOException, StorageException {
+    // Get the page ranges for the blob. There should be one range starting
+    // at byte 0, but we tolerate (and ignore) ranges after the first one.
+    ArrayList<PageRange> pageRanges =
+        blob.downloadPageRanges(new BlobRequestOptions(), opContext);
+    if (pageRanges.size() == 0) {
+      return 0;
+    }
+    if (pageRanges.get(0).getStartOffset() != 0) {
+      // Not expected: we always upload our page blobs as a contiguous range
+      // starting at byte 0.
+      throw badStartRangeException(blob, pageRanges.get(0));
+    }
+    long totalRawBlobSize = pageRanges.get(0).getEndOffset() + 1;
+
+    // Get the last page.
+    long lastPageStart = totalRawBlobSize - PAGE_SIZE;
+    ByteArrayOutputStream baos = 
+        new ByteArrayOutputStream(PageBlobFormatHelpers.PAGE_SIZE);
+    blob.downloadRange(lastPageStart, PAGE_SIZE, baos,
+        new BlobRequestOptions(), opContext);
+
+    byte[] lastPage = baos.toByteArray();
+    short lastPageSize = getPageSize(blob, lastPage, 0);
+    long totalNumberOfPages = totalRawBlobSize / PAGE_SIZE;
+    return (totalNumberOfPages - 1) * PAGE_DATA_SIZE + lastPageSize;
+  }
+
+  /**
+   * Constructs a stream over the given page blob.
+   */
+  public PageBlobInputStream(CloudPageBlobWrapper blob,
+      OperationContext opContext)
+      throws IOException {
+    this.blob = blob;
+    this.opContext = opContext;
+    ArrayList<PageRange> allRanges;
+    try {
+      allRanges =
+          blob.downloadPageRanges(new BlobRequestOptions(), opContext);
+    } catch (StorageException e) {
+      throw new IOException(e);
+    }
+    if (allRanges.size() > 0) {
+      if (allRanges.get(0).getStartOffset() != 0) {
+        throw badStartRangeException(blob, allRanges.get(0));
+      }
+      if (allRanges.size() > 1) {
+        LOG.warn(String.format(
+            "Blob %s has %d page ranges beyond the first range. " 
+            + "Only reading the first range.",
+            blob.getUri(), allRanges.size() - 1));
+      }
+      numberOfPagesRemaining =
+          (allRanges.get(0).getEndOffset() + 1) / PAGE_SIZE;
+    } else {
+      numberOfPagesRemaining = 0;
+    }
+  }
+
+  /** Return the size of the remaining available bytes
+   * if the size is less than or equal to {@link Integer#MAX_VALUE},
+   * otherwise, return {@link Integer#MAX_VALUE}.
+   *
+   * This is to match the behavior of DFSInputStream.available(),
+   * which some clients may rely on (HBase write-ahead log reading in
+   * particular).
+   */
+  @Override
+  public synchronized int available() throws IOException {
+    if (closed) {
+      throw new IOException("Stream closed");
+    }
+    if (pageBlobSize == -1) {
+      try {
+        pageBlobSize = getPageBlobSize(blob, opContext);
+      } catch (StorageException e) {
+        throw new IOException("Unable to get page blob size.", e);
+      }
+    }
+
+    final long remaining = pageBlobSize - filePosition;
+    return remaining <= Integer.MAX_VALUE ?
+        (int) remaining : Integer.MAX_VALUE;
+  }
+
+  @Override
+  public synchronized void close() throws IOException {
+    closed = true;
+  }
+
+  private boolean dataAvailableInBuffer() {
+    return currentBuffer != null 
+        && currentOffsetInBuffer < currentBuffer.length;
+  }
+
+  /**
+   * Check our buffer and download more from the server if needed.
+   * @return true if there's more data in the buffer, false if we're done.
+   * @throws IOException
+   */
+  private synchronized boolean ensureDataInBuffer() throws IOException {
+    if (dataAvailableInBuffer()) {
+      // We still have some data in our buffer.
+      return true;
+    }
+    currentBuffer = null;
+    if (numberOfPagesRemaining == 0) {
+      // No more data to read.
+      return false;
+    }
+    final long pagesToRead = Math.min(MAX_PAGES_PER_DOWNLOAD,
+        numberOfPagesRemaining);
+    final int bufferSize = (int) (pagesToRead * PAGE_SIZE);
+ 
+    // Download page to current buffer.
+    try {
+      // Create a byte array output stream to capture the results of the
+      // download.
+      ByteArrayOutputStream baos = new ByteArrayOutputStream(bufferSize);
+      blob.downloadRange(currentOffsetInBlob, bufferSize, baos,
+          withMD5Checking(), opContext);
+      currentBuffer = baos.toByteArray();
+    } catch (StorageException e) {
+      throw new IOException(e);
+    }
+    numberOfPagesRemaining -= pagesToRead;
+    currentOffsetInBlob += bufferSize;
+    currentOffsetInBuffer = PAGE_HEADER_SIZE;
+
+    // Since we just downloaded a new buffer, validate its consistency.
+    validateCurrentBufferConsistency();
+
+    return true;
+  }
+
+  private void validateCurrentBufferConsistency()
+      throws IOException {
+    if (currentBuffer.length % PAGE_SIZE != 0) {
+      throw new AssertionError("Unexpected buffer size: " 
+      + currentBuffer.length);
+    }
+    int numberOfPages = currentBuffer.length / PAGE_SIZE;
+    for (int page = 0; page < numberOfPages; page++) {
+      short currentPageSize = getPageSize(blob, currentBuffer,
+          page * PAGE_SIZE);
+      // Calculate the number of pages that exist after this one
+      // in the blob.
+      long totalPagesAfterCurrent =
+          (numberOfPages - page - 1) + numberOfPagesRemaining;
+      // Only the last page is allowed to be not filled completely.
+      if (currentPageSize < PAGE_DATA_SIZE 
+          && totalPagesAfterCurrent > 0) {
+        throw fileCorruptException(blob, String.format(
+            "Page with partial data found in the middle (%d pages from the" 
+            + " end) that only has %d bytes of data.",
+            totalPagesAfterCurrent, currentPageSize));
+      }
+    }
+  }
+
+  // Reads the page size from the page header at the given offset.
+  private static short getPageSize(CloudPageBlobWrapper blob,
+      byte[] data, int offset) throws IOException {
+    short pageSize = toShort(data[offset], data[offset + 1]);
+    if (pageSize < 0 || pageSize > PAGE_DATA_SIZE) {
+      throw fileCorruptException(blob, String.format(
+          "Unexpected page size in the header: %d.",
+          pageSize));
+    }
+    return pageSize;
+  }
+
+  @Override
+  public synchronized int read(byte[] outputBuffer, int offset, int len)
+      throws IOException {
+    int numberOfBytesRead = 0;
+    while (len > 0) {
+      if (!ensureDataInBuffer()) {
+        filePosition += numberOfBytesRead;
+        return numberOfBytesRead;
+      }
+      int bytesRemainingInCurrentPage = getBytesRemainingInCurrentPage();
+      int numBytesToRead = Math.min(len, bytesRemainingInCurrentPage);
+      System.arraycopy(currentBuffer, currentOffsetInBuffer, outputBuffer,
+          offset, numBytesToRead);
+      numberOfBytesRead += numBytesToRead;
+      offset += numBytesToRead;
+      len -= numBytesToRead;
+      if (numBytesToRead == bytesRemainingInCurrentPage) {
+        // We've finished this page, move on to the next.
+        advancePagesInBuffer(1);
+      } else {
+        currentOffsetInBuffer += numBytesToRead;
+      }
+    }
+    filePosition += numberOfBytesRead;
+    return numberOfBytesRead;
+  }
+
+  @Override
+  public int read() throws IOException {
+    byte[] oneByte = new byte[1];
+    if (read(oneByte) == 0) {
+      return -1;
+    }
+    return oneByte[0];
+  }
+
+  /**
+   * Skips over and discards n bytes of data from this input stream.
+   * @param n the number of bytes to be skipped.
+   * @return the actual number of bytes skipped.
+   */
+  @Override
+  public synchronized long skip(long n) throws IOException {
+    long skipped = skipImpl(n);
+    filePosition += skipped; // track the position in the stream
+    return skipped;
+  }
+
+  private long skipImpl(long n) throws IOException {
+
+    if (n == 0) {
+      return 0;
+    }
+
+    // First skip within the current buffer as much as possible.
+    long skippedWithinBuffer = skipWithinBuffer(n);
+    if (skippedWithinBuffer > n) {
+      // TO CONSIDER: Using a contracts framework such as Google's cofoja for
+      // these post-conditions.
+      throw new AssertionError(String.format(
+          "Bug in skipWithinBuffer: it skipped over %d bytes when asked to "
+          + "skip %d bytes.", skippedWithinBuffer, n));
+    }
+    n -= skippedWithinBuffer;
+    long skipped = skippedWithinBuffer;
+
+    // Empty the current buffer, we're going beyond it.
+    currentBuffer = null;
+
+    // Skip over whole pages as necessary without retrieving them from the
+    // server.
+    long pagesToSkipOver = Math.min(
+        n / PAGE_DATA_SIZE,
+        numberOfPagesRemaining - 1);
+    numberOfPagesRemaining -= pagesToSkipOver;
+    currentOffsetInBlob += pagesToSkipOver * PAGE_SIZE;
+    skipped += pagesToSkipOver * PAGE_DATA_SIZE;
+    n -= pagesToSkipOver * PAGE_DATA_SIZE;
+    if (n == 0) {
+      return skipped;
+    }
+
+    // Now read in at the current position, and skip within current buffer.
+    if (!ensureDataInBuffer()) {
+      return skipped;
+    }
+    return skipped + skipWithinBuffer(n);
+  }
+
+  /**
+   * Skip over n bytes within the current buffer or just over skip the whole
+   * buffer if n is greater than the bytes remaining in the buffer.
+   * @param n The number of data bytes to skip.
+   * @return The number of bytes actually skipped.
+   * @throws IOException if data corruption found in the buffer.
+   */
+  private long skipWithinBuffer(long n) throws IOException {
+    if (!dataAvailableInBuffer()) {
+      return 0;
+    }
+    long skipped = 0;
+    // First skip within the current page.
+    skipped = skipWithinCurrentPage(n);
+    if (skipped > n) {
+      throw new AssertionError(String.format(
+          "Bug in skipWithinCurrentPage: it skipped over %d bytes when asked" 
+          + " to skip %d bytes.", skipped, n));
+    }
+    n -= skipped;
+    if (n == 0 || !dataAvailableInBuffer()) {
+      return skipped;
+    }
+
+    // Calculate how many whole pages (pages before the possibly partially
+    // filled last page) remain.
+    int currentPageIndex = currentOffsetInBuffer / PAGE_SIZE;
+    int numberOfPagesInBuffer = currentBuffer.length / PAGE_SIZE;
+    int wholePagesRemaining = numberOfPagesInBuffer - currentPageIndex - 1;
+
+    if (n < (PAGE_DATA_SIZE * wholePagesRemaining)) {
+      // I'm within one of the whole pages remaining, skip in there.
+      advancePagesInBuffer((int) (n / PAGE_DATA_SIZE));
+      currentOffsetInBuffer += n % PAGE_DATA_SIZE;
+      return n + skipped;
+    }
+
+    // Skip over the whole pages.
+    advancePagesInBuffer(wholePagesRemaining);
+    skipped += wholePagesRemaining * PAGE_DATA_SIZE;
+    n -= wholePagesRemaining * PAGE_DATA_SIZE;
+
+    // At this point we know we need to skip to somewhere in the last page,
+    // or just go to the end.
+    return skipWithinCurrentPage(n) + skipped;
+  }
+
+  /**
+   * Skip over n bytes within the current page or just over skip the whole
+   * page if n is greater than the bytes remaining in the page.
+   * @param n The number of data bytes to skip.
+   * @return The number of bytes actually skipped.
+   * @throws IOException if data corruption found in the buffer.
+   */
+  private long skipWithinCurrentPage(long n) throws IOException {
+    int remainingBytesInCurrentPage = getBytesRemainingInCurrentPage();
+    if (n < remainingBytesInCurrentPage) {
+      currentOffsetInBuffer += n;
+      return n;
+    } else {
+      advancePagesInBuffer(1);
+      return remainingBytesInCurrentPage;
+    }
+  }
+
+  /**
+   * Gets the number of bytes remaining within the current page in the buffer.
+   * @return The number of bytes remaining.
+   * @throws IOException if data corruption found in the buffer.
+   */
+  private int getBytesRemainingInCurrentPage() throws IOException {
+    if (!dataAvailableInBuffer()) {
+      return 0;
+    }
+    // Calculate our current position relative to the start of the current
+    // page.
+    int currentDataOffsetInPage =
+        (currentOffsetInBuffer % PAGE_SIZE) - PAGE_HEADER_SIZE;
+    int pageBoundary = getCurrentPageStartInBuffer();
+    // Get the data size of the current page from the header.
+    short sizeOfCurrentPage = getPageSize(blob, currentBuffer, pageBoundary);
+    return sizeOfCurrentPage - currentDataOffsetInPage;
+  }
+
+  private static IOException badStartRangeException(CloudPageBlobWrapper blob,
+      PageRange startRange) {
+    return fileCorruptException(blob, String.format(
+        "Page blobs for ASV should always use a page range starting at byte 0. " 
+        + "This starts at byte %d.",
+        startRange.getStartOffset()));
+  }
+
+  private void advancePagesInBuffer(int numberOfPages) {
+    currentOffsetInBuffer =
+        getCurrentPageStartInBuffer() 
+        + (numberOfPages * PAGE_SIZE) 
+        + PAGE_HEADER_SIZE;
+  }
+
+  private int getCurrentPageStartInBuffer() {
+    return PAGE_SIZE * (currentOffsetInBuffer / PAGE_SIZE);
+  }
+
+  private static IOException fileCorruptException(CloudPageBlobWrapper blob,
+      String reason) {
+    return new IOException(String.format(
+        "The page blob: '%s' is corrupt or has an unexpected format: %s.",
+        blob.getUri(), reason));
+  }
+}