You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cn...@apache.org on 2014/12/18 00:00:02 UTC

[16/24] hadoop git commit: HADOOP-10809. hadoop-azure: page blob support. Contributed by Dexter Bradshaw, Mostafa Elhemali, Eric Hanson, and Mike Liddell.

HADOOP-10809. hadoop-azure: page blob support. Contributed by Dexter Bradshaw, Mostafa Elhemali, Eric Hanson, and Mike Liddell.

(cherry picked from commit 2217e2f8ff418b88eac6ad36cafe3a9795a11f40)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/5a737026
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/5a737026
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/5a737026

Branch: refs/heads/branch-2
Commit: 5a737026cc365eb86c1d72c660fb13d8540a03b4
Parents: 9a2e4f4
Author: cnauroth <cn...@apache.org>
Authored: Wed Oct 8 14:20:23 2014 -0700
Committer: cnauroth <cn...@apache.org>
Committed: Wed Dec 17 14:57:13 2014 -0800

----------------------------------------------------------------------
 hadoop-common-project/hadoop-common/CHANGES.txt |    3 +
 hadoop-tools/hadoop-azure/README.txt            |   48 +
 .../dev-support/findbugs-exclude.xml            |   31 +
 .../hadoop-azure/src/config/checkstyle.xml      |    7 +-
 .../hadoop/fs/azure/AzureLinkedStack.java       |  217 +++
 .../fs/azure/AzureNativeFileSystemStore.java    |  740 +++++++---
 .../hadoop/fs/azure/NativeAzureFileSystem.java  | 1337 ++++++++++++++----
 .../hadoop/fs/azure/NativeFileSystemStore.java  |   22 +-
 .../hadoop/fs/azure/PageBlobFormatHelpers.java  |   58 +
 .../hadoop/fs/azure/PageBlobInputStream.java    |  455 ++++++
 .../hadoop/fs/azure/PageBlobOutputStream.java   |  497 +++++++
 .../apache/hadoop/fs/azure/PartialListing.java  |    2 +-
 .../hadoop/fs/azure/SelfRenewingLease.java      |  202 +++
 .../fs/azure/SelfThrottlingIntercept.java       |    4 +-
 .../fs/azure/ShellDecryptionKeyProvider.java    |    3 +-
 .../hadoop/fs/azure/SimpleKeyProvider.java      |    3 +-
 .../hadoop/fs/azure/StorageInterface.java       |  280 +++-
 .../hadoop/fs/azure/StorageInterfaceImpl.java   |  184 ++-
 .../fs/azure/SyncableDataOutputStream.java      |   56 +
 .../java/org/apache/hadoop/fs/azure/Wasb.java   |    1 -
 .../metrics/AzureFileSystemInstrumentation.java |    5 +-
 .../metrics/ResponseReceivedMetricUpdater.java  |    7 +-
 .../services/org.apache.hadoop.fs.FileSystem    |   17 +
 .../fs/azure/AzureBlobStorageTestAccount.java   |  155 +-
 .../hadoop/fs/azure/InMemoryBlockBlobStore.java |   70 +-
 .../hadoop/fs/azure/MockStorageInterface.java   |  233 ++-
 .../fs/azure/NativeAzureFileSystemBaseTest.java | 1016 ++++++++++++-
 .../hadoop/fs/azure/RunningLiveWasbTests.txt    |   22 +
 .../azure/TestAzureConcurrentOutOfBandIo.java   |   75 +-
 .../TestAzureFileSystemErrorConditions.java     |   57 +-
 .../hadoop/fs/azure/TestBlobDataValidation.java |   12 +-
 .../hadoop/fs/azure/TestBlobMetadata.java       |   35 +-
 .../fs/azure/TestBlobTypeSpeedDifference.java   |  160 +++
 .../fs/azure/TestNativeAzureFSPageBlobLive.java |   43 +
 .../TestNativeAzureFileSystemConcurrency.java   |    5 +-
 .../TestNativeAzureFileSystemContractLive.java  |   26 +
 ...TestNativeAzureFileSystemContractMocked.java |   25 +
 .../TestNativeAzureFileSystemFileNameCheck.java |   11 +-
 .../fs/azure/TestNativeAzureFileSystemLive.java |   75 +
 .../azure/TestNativeAzureFileSystemMocked.java  |   35 +
 ...stNativeAzureFileSystemOperationsMocked.java |   37 +-
 .../TestNativeAzureFileSystemUploadLogic.java   |  186 +++
 .../azure/TestOutOfBandAzureBlobOperations.java |   17 +-
 .../TestOutOfBandAzureBlobOperationsLive.java   |   21 +
 .../TestReadAndSeekPageBlobAfterWrite.java      |  333 +++++
 .../apache/hadoop/fs/azure/TestWasbFsck.java    |   33 +
 .../fs/azure/TestWasbUriAndConfiguration.java   |    9 +-
 .../fs/azure/metrics/AzureMetricsTestUtil.java  |    1 +
 .../TestAzureFileSystemInstrumentation.java     |   53 +-
 .../metrics/TestBandwidthGaugeUpdater.java      |   47 -
 .../src/test/resources/azure-test.xml           |   12 -
 51 files changed, 6046 insertions(+), 937 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/5a737026/hadoop-common-project/hadoop-common/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt
index 7450513..6509bcb 100644
--- a/hadoop-common-project/hadoop-common/CHANGES.txt
+++ b/hadoop-common-project/hadoop-common/CHANGES.txt
@@ -79,6 +79,9 @@ Release 2.7.0 - UNRELEASED
     HADOOP-11238. Update the NameNode's Group Cache in the background when
     possible (Chris Li via Colin P. McCabe)
 
+    HADOOP-10809. hadoop-azure: page blob support. (Dexter Bradshaw,
+    Mostafa Elhemali, Eric Hanson, and Mike Liddell via cnauroth)
+
   BUG FIXES
 
     HADOOP-11236. NFS: Fix javadoc warning in RpcProgram.java (Abhiraj Butala via harsh)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5a737026/hadoop-tools/hadoop-azure/README.txt
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/README.txt b/hadoop-tools/hadoop-azure/README.txt
index 73306d3..a1d1a65 100644
--- a/hadoop-tools/hadoop-azure/README.txt
+++ b/hadoop-tools/hadoop-azure/README.txt
@@ -77,6 +77,54 @@ src\test\resources\azure-test.xml. These settings augment the hadoop configurati
 For live tests, set the following in azure-test.xml:
  1. "fs.azure.test.account.name -> {azureStorageAccountName} 
  2. "fs.azure.account.key.{AccountName} -> {fullStorageKey}"
+ 
+===================================
+Page Blob Support and Configuration
+===================================
+
+The Azure Blob Storage interface for Hadoop supports two kinds of blobs, block blobs
+and page blobs. Block blobs are the default kind of blob and are good for most 
+big-data use cases, like input data for Hive, Pig, analytical map-reduce jobs etc. 
+Page blob handling in hadoop-azure was introduced to support HBase log files. 
+Page blobs can be written any number of times, whereas block blobs can only be 
+appended to 50,000 times before you run out of blocks and your writes will fail.
+That won't work for HBase logs, so page blob support was introduced to overcome
+this limitation.
+
+Page blobs can be used for other purposes beyond just HBase log files though.
+They support the Hadoop FileSystem interface. Page blobs can be up to 1TB in
+size, larger than the maximum 200GB size for block blobs.
+
+In order to have the files you create be page blobs, you must set the configuration
+variable fs.azure.page.blob.dir to a comma-separated list of folder names.
+E.g. 
+
+    /hbase/WALs,/hbase/oldWALs,/data/mypageblobfiles
+    
+You can set this to simply / to make all files page blobs.
+
+The configuration option fs.azure.page.blob.size is the default initial 
+size for a page blob. It must be 128MB or greater, and no more than 1TB,
+specified as an integer number of bytes.
+
+====================
+Atomic Folder Rename
+====================
+
+Azure storage stores files as a flat key/value store without formal support
+for folders. The hadoop-azure file system layer simulates folders on top
+of Azure storage. By default, folder rename in the hadoop-azure file system
+layer is not atomic. That means that a failure during a folder rename 
+could, for example, leave some folders in the original directory and
+some in the new one.
+
+HBase depends on atomic folder rename. Hence, a configuration setting was
+introduced called fs.azure.atomic.rename.dir that allows you to specify a 
+comma-separated list of directories to receive special treatment so that 
+folder rename is made atomic. The default value of this setting is just /hbase.
+Redo will be applied to finish a folder rename that fails. A file  
+<folderName>-renamePending.json may appear temporarily and is the record of 
+the intention of the rename operation, to allow redo in event of a failure. 
 
 =============
 Findbugs

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5a737026/hadoop-tools/hadoop-azure/dev-support/findbugs-exclude.xml
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/dev-support/findbugs-exclude.xml b/hadoop-tools/hadoop-azure/dev-support/findbugs-exclude.xml
index cc63141..cde1734 100644
--- a/hadoop-tools/hadoop-azure/dev-support/findbugs-exclude.xml
+++ b/hadoop-tools/hadoop-azure/dev-support/findbugs-exclude.xml
@@ -15,5 +15,36 @@
    limitations under the License.
 -->
 <FindBugsFilter>
+     <!-- It is okay to skip up to end of file. No need to check return value. -->
+     <Match>
+       <Class name="org.apache.hadoop.fs.azure.AzureNativeFileSystemStore" />
+       <Method name="retrieve" />
+       <Bug pattern="SR_NOT_CHECKED" />
+       <Priority value="2" />
+     </Match>
+
+     <!-- Returning fully loaded array to iterate through is a convenience
+	  and helps performance. -->
+     <Match>
+       <Class name="org.apache.hadoop.fs.azure.NativeAzureFileSystem$FolderRenamePending" />
+       <Method name="getFiles" />
+       <Bug pattern="EI_EXPOSE_REP" />
+       <Priority value="2" />
+     </Match>
+
+     <!-- Need to start keep-alive thread for SelfRenewingLease in constructor. -->
+     <Match>
+       <Class name="org.apache.hadoop.fs.azure.SelfRenewingLease" />
+       <Bug pattern="SC_START_IN_CTOR" />
+       <Priority value="2" />
+     </Match>
      
+     <!-- Using a key set iterator is fine because this is not a performance-critical
+	  method. -->
+     <Match>
+       <Class name="org.apache.hadoop.fs.azure.PageBlobOutputStream" />
+       <Method name="logAllStackTraces" />
+       <Bug pattern="WMI_WRONG_MAP_ITERATOR" />
+       <Priority value="2" />
+     </Match>
  </FindBugsFilter>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5a737026/hadoop-tools/hadoop-azure/src/config/checkstyle.xml
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/config/checkstyle.xml b/hadoop-tools/hadoop-azure/src/config/checkstyle.xml
index 3bfc23d..9df4f78 100644
--- a/hadoop-tools/hadoop-azure/src/config/checkstyle.xml
+++ b/hadoop-tools/hadoop-azure/src/config/checkstyle.xml
@@ -108,7 +108,10 @@
           <property name="max" value="3000"/>
         </module>
         
-        <module name="ParameterNumber"/>
+        <module name="ParameterNumber">
+          <property name="max" value="8"/>
+        </module>
+
 
 
         <!-- Checks for whitespace                               -->
@@ -152,7 +155,7 @@
         <module name="IllegalInstantiation"/>
         <module name="InnerAssignment"/>
         <module name="MagicNumber">
-            <property name="ignoreNumbers" value="-1, 0, 1, 2, 3, 4, 5, 6, 7, 8, 10, 1000"/>
+            <property name="ignoreNumbers" value="-1, 0, 1, 2, 3, 4, 5, 6, 7, 8, 10, 255, 1000, 1024"/>
         </module>
         <module name="MissingSwitchDefault"/>
         <module name="RedundantThrows"/>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5a737026/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureLinkedStack.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureLinkedStack.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureLinkedStack.java
new file mode 100644
index 0000000..4c52ef0
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureLinkedStack.java
@@ -0,0 +1,217 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azure;
+
+/**
+ * A simple generic stack implementation using linked lists. The stack
+ * implementation has five main operations:
+ * <ul>
+ * <li>push -- adds an element to the top of the stack</li>
+ * <li>pop -- removes an element from the top of the stack and returns a
+ * reference to it</li>
+ * <li>peek -- peek returns an element from the top of the stack without
+ * removing it</li>
+ * <li>isEmpty -- tests whether the stack is empty</li>
+ * <li>size -- returns the size of the stack</li>
+ * <li>toString -- returns a string representation of the stack.</li>
+ * </ul>
+ */
+
+public class AzureLinkedStack<E> {
+  /*
+   * Linked node for Azure stack collection.
+   */
+  private static class AzureLinkedNode<E> {
+    private E element; // Linked element on the list.
+    private AzureLinkedNode<E> next;// Reference to the next linked element on
+                                    // list.
+
+    /*
+     * The constructor builds the linked node with no successor
+     *
+     * @param element : The value of the element to be stored with this node.
+     */
+    private AzureLinkedNode(E anElement) {
+      element = anElement;
+      next = null;
+    }
+
+    /*
+     * Constructor builds a linked node with a specified successor. The
+     * successor may be null.
+     *
+     * @param anElement : new element to be created.
+     *
+     * @param nextElement: successor to the new element.
+     */
+    private AzureLinkedNode(E anElement, AzureLinkedNode<E> nextElement) {
+      element = anElement;
+      next = nextElement;
+    }
+
+    /*
+     * Get the element stored in the linked node.
+     *
+     * @return E : element stored in linked node.
+     */
+    private E getElement() {
+      return element;
+    }
+
+    /*
+     * Get the successor node to the element.
+     *
+     * @return E : reference to the succeeding node on the list.
+     */
+    private AzureLinkedNode<E> getNext() {
+      return next;
+    }
+  }
+
+  private int count; // The number of elements stored on the stack.
+  private AzureLinkedNode<E> top; // Top of the stack.
+
+  /*
+   * Constructor creating an empty stack.
+   */
+  public AzureLinkedStack() {
+    // Simply initialize the member variables.
+    //
+    count = 0;
+    top = null;
+  }
+
+  /*
+   * Adds an element to the top of the stack.
+   *
+   * @param element : element pushed to the top of the stack.
+   */
+  public void push(E element) {
+    // Create a new node containing a reference to be placed on the stack.
+    // Set the next reference to the new node to point to the current top
+    // of the stack. Set the top reference to point to the new node. Finally
+    // increment the count of nodes on the stack.
+    //
+    AzureLinkedNode<E> newNode = new AzureLinkedNode<E>(element, top);
+    top = newNode;
+    count++;
+  }
+
+  /*
+   * Removes the element at the top of the stack and returns a reference to it.
+   *
+   * @return E : element popped from the top of the stack.
+   *
+   * @throws Exception on pop from an empty stack.
+   */
+  public E pop() throws Exception {
+    // Make sure the stack is not empty. If it is empty, throw a StackEmpty
+    // exception.
+    //
+    if (isEmpty()) {
+      throw new Exception("AzureStackEmpty");
+    }
+
+    // Set a temporary reference equal to the element at the top of the stack,
+    // decrement the count of elements and return reference to the temporary.
+    //
+    E element = top.getElement();
+    top = top.getNext();
+    count--;
+
+    // Return the reference to the element that was at the top of the stack.
+    //
+    return element;
+  }
+
+  /*
+   * Return the top element of the stack without removing it.
+   *
+   * @return E
+   *
+   * @throws Exception on peek into an empty stack.
+   */
+  public E peek() throws Exception {
+    // Make sure the stack is not empty. If it is empty, throw a StackEmpty
+    // exception.
+    //
+    if (isEmpty()) {
+      throw new Exception("AzureStackEmpty");
+    }
+
+    // Set a temporary reference equal to the element at the top of the stack
+    // and return the temporary.
+    //
+    E element = top.getElement();
+    return element;
+  }
+
+  /*
+   * Determines whether the stack is empty
+   *
+   * @return boolean true if the stack is empty and false otherwise.
+   */
+  public boolean isEmpty() {
+    if (0 == size()) {
+      // Zero-sized stack so the stack is empty.
+      //
+      return true;
+    }
+
+    // The stack is not empty.
+    //
+    return false;
+  }
+
+  /*
+   * Determines the size of the stack
+   *
+   * @return int: Count of the number of elements in the stack.
+   */
+  public int size() {
+    return count;
+  }
+
+  /*
+   * Returns a string representation of the stack.
+   *
+   * @return String String representation of all elements in the stack.
+   */
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+
+    AzureLinkedNode<E> current = top;
+    for (int i = 0; i < size(); i++) {
+      E element = current.getElement();
+      sb.append(element.toString());
+      current = current.getNext();
+
+      // Insert commas between strings except after the last string.
+      //
+      if (size() - 1 > i) {
+        sb.append(", ");
+      }
+    }
+
+    // Return the string.
+    //
+    return sb.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5a737026/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java
index 5afbbbe..c091767 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java
@@ -20,31 +20,42 @@ package org.apache.hadoop.fs.azure;
 import static org.apache.hadoop.fs.azure.NativeAzureFileSystem.PATH_DELIMITER;
 
 import java.io.BufferedInputStream;
-import java.io.ByteArrayInputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.io.UnsupportedEncodingException;
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.net.URLDecoder;
+import java.net.URLEncoder;
 import java.security.InvalidKeyException;
 import java.util.ArrayList;
 import java.util.Calendar;
+import java.util.Collections;
 import java.util.Date;
 import java.util.EnumSet;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedList;
+import java.util.Locale;
 import java.util.Map;
+import java.util.Set;
 
+import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.azure.StorageInterface.CloudBlobContainerWrapper;
 import org.apache.hadoop.fs.azure.StorageInterface.CloudBlobDirectoryWrapper;
+import org.apache.hadoop.fs.azure.StorageInterface.CloudBlobWrapper;
 import org.apache.hadoop.fs.azure.StorageInterface.CloudBlockBlobWrapper;
+import org.apache.hadoop.fs.azure.StorageInterface.CloudPageBlobWrapper;
+import org.apache.hadoop.fs.azure.StorageInterfaceImpl.CloudPageBlobWrapperImpl;
 import org.apache.hadoop.fs.azure.metrics.AzureFileSystemInstrumentation;
 import org.apache.hadoop.fs.azure.metrics.BandwidthGaugeUpdater;
 import org.apache.hadoop.fs.azure.metrics.ErrorMetricUpdater;
@@ -72,7 +83,6 @@ import com.microsoft.windowsazure.storage.blob.DeleteSnapshotsOption;
 import com.microsoft.windowsazure.storage.blob.ListBlobItem;
 import com.microsoft.windowsazure.storage.core.Utility;
 
-
 /**
  * Core implementation of Windows Azure Filesystem for Hadoop.
  * Provides the bridging logic between Hadoop's abstract filesystem and Azure Storage 
@@ -140,6 +150,33 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
   static final String LINK_BACK_TO_UPLOAD_IN_PROGRESS_METADATA_KEY = "hdi_tmpupload";
   static final String OLD_LINK_BACK_TO_UPLOAD_IN_PROGRESS_METADATA_KEY = "asv_tmpupload";
 
+  /**
+   * Configuration key to indicate the set of directories in WASB where we
+   * should store files as page blobs instead of block blobs.
+   *
+   * Entries should be plain directory names (i.e. not URIs) with no leading or
+   * trailing slashes. Delimit the entries with commas.
+   */
+  public static final String KEY_PAGE_BLOB_DIRECTORIES =
+      "fs.azure.page.blob.dir";
+  /**
+   * The set of directories where we should store files as page blobs.
+   */
+  private Set<String> pageBlobDirs;
+  
+  /**
+   * Configuration key to indicate the set of directories in WASB where
+   * we should do atomic folder rename synchronized with createNonRecursive.
+   */
+  public static final String KEY_ATOMIC_RENAME_DIRECTORIES =
+      "fs.azure.atomic.rename.dir";
+
+  /**
+   * The set of directories where we should apply atomic folder rename
+   * synchronized with createNonRecursive.
+   */
+  private Set<String> atomicRenameDirs;
+
   private static final String HTTP_SCHEME = "http";
   private static final String HTTPS_SCHEME = "https";
   private static final String WASB_AUTHORITY_DELIMITER = "@";
@@ -148,6 +185,7 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
   private static final int DEFAULT_CONCURRENT_WRITES = 8;
 
   // Concurrent reads reads of data written out of band are disable by default.
+  //
   private static final boolean DEFAULT_READ_TOLERATE_CONCURRENT_APPEND = false;
 
   // Default block sizes
@@ -155,6 +193,8 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
   public static final int DEFAULT_UPLOAD_BLOCK_SIZE = 4 * 1024 * 1024;
 
   // Retry parameter defaults.
+  //
+
   private static final int DEFAULT_MIN_BACKOFF_INTERVAL = 1 * 1000; // 1s
   private static final int DEFAULT_MAX_BACKOFF_INTERVAL = 30 * 1000; // 30s
   private static final int DEFAULT_BACKOFF_INTERVAL = 1 * 1000; // 1s
@@ -169,6 +209,7 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
 
   private static final int STORAGE_CONNECTION_TIMEOUT_DEFAULT = 90;
 
+
   /**
    * MEMBER VARIABLES
    */
@@ -181,7 +222,7 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
   private boolean connectingUsingSAS = false;
   private AzureFileSystemInstrumentation instrumentation;
   private BandwidthGaugeUpdater bandwidthGaugeUpdater;
-  private static final JSON PERMISSION_JSON_SERIALIZER = createPermissionJsonSerializer();
+  private final static JSON PERMISSION_JSON_SERIALIZER = createPermissionJsonSerializer();
 
   private boolean suppressRetryPolicy = false;
   private boolean canCreateOrModifyContainer = false;
@@ -317,7 +358,7 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
   public BandwidthGaugeUpdater getBandwidthGaugeUpdater() {
     return bandwidthGaugeUpdater;
   }
-  
+
   /**
    * Check if concurrent reads and writes on the same blob are allowed.
    * 
@@ -333,19 +374,18 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
    * session with an Azure session. It parses the scheme to ensure it matches
    * the storage protocol supported by this file system.
    * 
-   * @param uri
-   *          - URI for target storage blob.
-   * @param conf
-   *          - reference to configuration object.
+   * @param uri - URI for target storage blob.
+   * @param conf - reference to configuration object.
+   * @param instrumentation - the metrics source that will keep track of operations here.
    * 
-   * @throws IllegalArgumentException
-   *           if URI or job object is null, or invalid scheme.
+   * @throws IllegalArgumentException if URI or job object is null, or invalid scheme.
    */
   @Override
-  public void initialize(URI uri, Configuration conf, AzureFileSystemInstrumentation instrumentation) throws AzureException {
-
-    if (null == this.storageInteractionLayer) {
-      this.storageInteractionLayer = new StorageInterfaceImpl();
+  public void initialize(URI uri, Configuration conf, AzureFileSystemInstrumentation instrumentation)
+      throws IllegalArgumentException, AzureException, IOException  {
+    
+    if (null == instrumentation) {
+      throw new IllegalArgumentException("Null instrumentation");
     }
 
     this.instrumentation = instrumentation;
@@ -377,6 +417,40 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
     // Start an Azure storage session.
     //
     createAzureStorageSession();
+
+    // Extract the directories that should contain page blobs
+    pageBlobDirs = getDirectorySet(KEY_PAGE_BLOB_DIRECTORIES);
+    LOG.debug("Page blob directories:  " + setToString(pageBlobDirs));
+
+    // Extract directories that should have atomic rename applied.
+    atomicRenameDirs = getDirectorySet(KEY_ATOMIC_RENAME_DIRECTORIES);
+    String hbaseRoot;
+    try {
+
+      // Add to this the hbase root directory, or /hbase is that is not set.
+      hbaseRoot = verifyAndConvertToStandardFormat(
+          sessionConfiguration.get("hbase.rootdir", "hbase"));
+      atomicRenameDirs.add(hbaseRoot);
+    } catch (URISyntaxException e) {
+      LOG.warn("Unable to initialize HBase root as an atomic rename directory.");
+    }
+    LOG.debug("Atomic rename directories:  " + setToString(atomicRenameDirs));
+  }
+
+  /**
+   * Helper to format a string for log output from Set<String>
+   */
+  private String setToString(Set<String> set) {
+    StringBuilder sb = new StringBuilder();
+    int i = 1;
+    for (String s : set) {
+      sb.append("/" + s);
+      if (i != set.size()) {
+        sb.append(", ");
+      }
+      i++;
+    }
+    return sb.toString();
   }
 
   /**
@@ -400,8 +474,7 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
           "Expected URI with a valid authority");
     }
 
-    // Check if authority container the delimiter separating the account name
-    // from the
+    // Check if authority container the delimiter separating the account name from the
     // the container.
     //
     if (!authority.contains(WASB_AUTHORITY_DELIMITER)) {
@@ -455,8 +528,7 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
     // The URI has a valid authority. Extract the container name. It is the
     // second component of the WASB URI authority.
     if (!authority.contains(WASB_AUTHORITY_DELIMITER)) {
-      // The authority does not have a container name. Use the default container
-      // by
+      // The authority does not have a container name. Use the default container by
       // setting the container name to the default Azure root container.
       //
       return AZURE_ROOT_CONTAINER;
@@ -491,9 +563,9 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
   private String getHTTPScheme() {
     String sessionScheme = sessionUri.getScheme();
     // Check if we're on a secure URI scheme: wasbs or the legacy asvs scheme.
-    if (sessionScheme != null
-        && (sessionScheme.equalsIgnoreCase("asvs") || sessionScheme
-            .equalsIgnoreCase("wasbs"))) {
+    if (sessionScheme != null &&
+        (sessionScheme.equalsIgnoreCase("asvs") ||
+         sessionScheme.equalsIgnoreCase("wasbs"))) {
       return HTTPS_SCHEME;
     } else {
       // At this point the scheme should be either null or asv or wasb.
@@ -565,20 +637,22 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
         Math.min(cpuCores, DEFAULT_CONCURRENT_WRITES));
 
     // Set up the exponential retry policy.
-    minBackoff = sessionConfiguration.getInt(KEY_MIN_BACKOFF_INTERVAL,
-        DEFAULT_MIN_BACKOFF_INTERVAL);
+    //
+    minBackoff = sessionConfiguration.getInt(
+        KEY_MIN_BACKOFF_INTERVAL, DEFAULT_MIN_BACKOFF_INTERVAL);
+
+    maxBackoff = sessionConfiguration.getInt(
+        KEY_MAX_BACKOFF_INTERVAL, DEFAULT_MAX_BACKOFF_INTERVAL);
 
-    maxBackoff = sessionConfiguration.getInt(KEY_MAX_BACKOFF_INTERVAL,
-        DEFAULT_MAX_BACKOFF_INTERVAL);
+    deltaBackoff = sessionConfiguration.getInt(
+        KEY_BACKOFF_INTERVAL, DEFAULT_BACKOFF_INTERVAL);
 
-    deltaBackoff = sessionConfiguration.getInt(KEY_BACKOFF_INTERVAL,
-        DEFAULT_BACKOFF_INTERVAL);
+    maxRetries = sessionConfiguration.getInt(
+        KEY_MAX_IO_RETRIES, DEFAULT_MAX_RETRY_ATTEMPTS);
 
-    maxRetries = sessionConfiguration.getInt(KEY_MAX_IO_RETRIES,
-        DEFAULT_MAX_RETRY_ATTEMPTS);
+    storageInteractionLayer.setRetryPolicyFactory(
+          new RetryExponentialRetry(minBackoff, deltaBackoff, maxBackoff, maxRetries));
 
-    storageInteractionLayer.setRetryPolicyFactory(new RetryExponentialRetry(
-        minBackoff, deltaBackoff, maxBackoff, maxRetries));
 
     // read the self-throttling config.
     selfThrottlingEnabled = sessionConfiguration.getBoolean(
@@ -659,13 +733,15 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
       StorageCredentials credentials, String containerName)
       throws URISyntaxException, StorageException, AzureException {
 
+    URI blobEndPoint;
     if (isStorageEmulatorAccount(accountName)) {
       isStorageEmulator = true;
-      CloudStorageAccount account = CloudStorageAccount
-          .getDevelopmentStorageAccount();
+      CloudStorageAccount account =
+          CloudStorageAccount.getDevelopmentStorageAccount();
       storageInteractionLayer.createBlobClient(account);
     } else {
-      URI blobEndPoint = new URI(getHTTPScheme() + "://" + accountName);
+      blobEndPoint = new URI(getHTTPScheme() + "://" +
+          accountName);
       storageInteractionLayer.createBlobClient(blobEndPoint, credentials);
     }
     suppressRetryPolicyInClientIfNeeded();
@@ -753,7 +829,8 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
    * @throws AzureException
    * @throws IOException
    */
-  private void createAzureStorageSession() throws AzureException {
+  private void createAzureStorageSession ()
+      throws AzureException, IOException {
 
     // Make sure this object was properly initialized with references to
     // the sessionUri and sessionConfiguration.
@@ -886,6 +963,106 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
   }
 
   /**
+   * Trims a suffix/prefix from the given string. For example if
+   * s is given as "/xy" and toTrim is "/", this method returns "xy"
+   */
+  private static String trim(String s, String toTrim) {
+    return StringUtils.removeEnd(StringUtils.removeStart(s, toTrim),
+        toTrim);
+  }
+
+  /**
+   * Checks if the given rawDir belongs to this account/container, and
+   * if so returns the canonicalized path for it. Otherwise return null.
+   */
+  private String verifyAndConvertToStandardFormat(String rawDir) throws URISyntaxException {
+    URI asUri = new URI(rawDir);
+    if (asUri.getAuthority() == null 
+        || asUri.getAuthority().toLowerCase(Locale.US).equalsIgnoreCase(
+        		sessionUri.getAuthority().toLowerCase(Locale.US))) {
+      // Applies to me.
+      return trim(asUri.getPath(), "/");
+    } else {
+      // Doen't apply to me.
+      return null;
+    }
+  }
+
+  /**
+   * Take a comma-separated list of directories from a configuration variable
+   * and transform it to a set of directories.
+   */
+  private Set<String> getDirectorySet(final String configVar)
+      throws AzureException {
+    String[] rawDirs = sessionConfiguration.getStrings(configVar, new String[0]);
+    Set<String> directorySet = new HashSet<String>();
+    for (String currentDir : rawDirs) {
+      String myDir;
+      try {
+        myDir = verifyAndConvertToStandardFormat(currentDir);
+      } catch (URISyntaxException ex) {
+        throw new AzureException(String.format(
+            "The directory %s specified in the configuration entry %s is not" +
+            " a valid URI.",
+            currentDir, configVar));
+      }
+      if (myDir != null) {
+        directorySet.add(myDir);
+      }
+    }
+    return directorySet;
+  }
+
+  /**
+   * Checks if the given key in Azure Storage should be stored as a page
+   * blob instead of block blob.
+   * @throws URISyntaxException
+   */
+  public boolean isPageBlobKey(String key) {
+    return isKeyForDirectorySet(key, pageBlobDirs);
+  }
+
+  /**
+   * Checks if the given key in Azure storage should have synchronized
+   * atomic folder rename createNonRecursive implemented.
+   */
+  @Override
+  public boolean isAtomicRenameKey(String key) {
+    return isKeyForDirectorySet(key, atomicRenameDirs);
+  }
+
+  public boolean isKeyForDirectorySet(String key, Set<String> dirSet) {
+    String defaultFS = FileSystem.getDefaultUri(sessionConfiguration).toString();
+    for (String dir : dirSet) {
+      if (dir.isEmpty() ||
+          key.startsWith(dir + "/")) {
+        return true;
+      }
+
+      // Allow for blob directories with paths relative to the default file
+      // system.
+      //
+      try {
+        URI uriPageBlobDir = new URI (dir);
+        if (null == uriPageBlobDir.getAuthority()) {
+          // Concatenate the default file system prefix with the relative
+          // page blob directory path.
+          //
+          if (key.startsWith(trim(defaultFS, "/") + "/" + dir + "/")){
+            return true;
+          }
+        }
+      } catch (URISyntaxException e) {
+        LOG.info(String.format(
+                   "URI syntax error creating URI for %s", dir));
+      }
+    }
+    return false;
+  }
+
+  
+  
+  /**
    * This should be called from any method that does any modifications to the
    * underlying container: it makes sure to put the WASB current version in the
    * container's metadata if it's not already there.
@@ -1032,15 +1209,15 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
 
   private BlobRequestOptions getDownloadOptions() {
     BlobRequestOptions options = new BlobRequestOptions();
-    options.setRetryPolicyFactory(new RetryExponentialRetry(minBackoff,
-        deltaBackoff, maxBackoff, maxRetries));
+    options.setRetryPolicyFactory(
+          new RetryExponentialRetry(minBackoff, deltaBackoff, maxBackoff, maxRetries));
     options.setUseTransactionalContentMD5(getUseTransactionalContentMD5());
     return options;
   }
 
   @Override
-  public DataOutputStream storefile(String key,
-      PermissionStatus permissionStatus) throws AzureException {
+  public DataOutputStream storefile(String key, PermissionStatus permissionStatus)
+      throws AzureException {
     try {
 
       // Check if a session exists, if not create a session with the
@@ -1066,19 +1243,20 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
       checkContainer(ContainerAccessType.PureWrite);
 
       /**
-       * Note: Windows Azure Blob Storage does not allow the creation of
-       * arbitrary directory paths under the default $root directory. This is by
-       * design to eliminate ambiguity in specifying a implicit blob address. A
-       * blob in the $root container cannot include a / in its name and must be
-       * careful not to include a trailing '/' when referencing blobs in the
-       * $root container. A '/; in the $root container permits ambiguous blob
-       * names as in the following example involving two containers $root and
-       * mycontainer: http://myaccount.blob.core.windows.net/$root
-       * http://myaccount.blob.core.windows.net/mycontainer If the URL
-       * "mycontainer/somefile.txt were allowed in $root then the URL:
-       * http://myaccount.blob.core.windows.net/mycontainer/myblob.txt could
-       * mean either: (1) container=mycontainer; blob=myblob.txt (2)
-       * container=$root; blob=mycontainer/myblob.txt
+       * Note: Windows Azure Blob Storage does not allow the creation of arbitrary directory
+       *      paths under the default $root directory.  This is by design to eliminate
+       *      ambiguity in specifying a implicit blob address. A blob in the $root conatiner
+       *      cannot include a / in its name and must be careful not to include a trailing
+       *      '/' when referencing  blobs in the $root container.
+       *      A '/; in the $root container permits ambiguous blob names as in the following
+       *      example involving two containers $root and mycontainer:
+       *                http://myaccount.blob.core.windows.net/$root
+       *                http://myaccount.blob.core.windows.net/mycontainer
+       *      If the URL "mycontainer/somefile.txt were allowed in $root then the URL:
+       *                http://myaccount.blob.core.windows.net/mycontainer/myblob.txt
+       *      could mean either:
+       *        (1) container=mycontainer; blob=myblob.txt
+       *        (2) container=$root; blob=mycontainer/myblob.txt
        * 
        * To avoid this type of ambiguity the Azure blob storage prevents
        * arbitrary path under $root. For a simple and more consistent user
@@ -1097,17 +1275,15 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
         throw new AzureException(errMsg);
       }
 
-      // Get the block blob reference from the store's container and
+      // Get the blob reference from the store's container and
       // return it.
-      CloudBlockBlobWrapper blob = getBlobReference(key);
+      CloudBlobWrapper blob = getBlobReference(key);
       storePermissionStatus(blob, permissionStatus);
 
       // Create the output stream for the Azure blob.
-      OutputStream outputStream = blob.openOutputStream(getUploadOptions(),
-          getInstrumentedContext());
-
-      // Return to caller with DataOutput stream.
-      DataOutputStream dataOutStream = new DataOutputStream(outputStream);
+      //
+      OutputStream outputStream = openOutputStream(blob);
+      DataOutputStream dataOutStream = new SyncableDataOutputStream(outputStream);
       return dataOutStream;
     } catch (Exception e) {
       // Caught exception while attempting to open the blob output stream.
@@ -1117,6 +1293,40 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
   }
 
   /**
+   * Opens a new output stream to the given blob (page or block blob)
+   * to populate it from scratch with data.
+   */
+  private OutputStream openOutputStream(final CloudBlobWrapper blob)
+      throws StorageException {
+    if (blob instanceof CloudPageBlobWrapperImpl){
+      return new PageBlobOutputStream(
+          (CloudPageBlobWrapper)blob, getInstrumentedContext(), sessionConfiguration);
+    } else {
+
+      // Handle both ClouldBlockBlobWrapperImpl and (only for the test code path)
+      // MockCloudBlockBlobWrapper.
+      return ((CloudBlockBlobWrapper) blob).openOutputStream(getUploadOptions(),
+                getInstrumentedContext());
+    }
+  }
+
+  /**
+   * Opens a new input stream for the given blob (page or block blob)
+   * to read its data.
+   */
+  private InputStream openInputStream(CloudBlobWrapper blob)
+      throws StorageException, IOException {
+    if (blob instanceof CloudBlockBlobWrapper) {
+      return blob.openInputStream(getDownloadOptions(),
+          getInstrumentedContext(isConcurrentOOBAppendAllowed()));
+    } else {
+      return new PageBlobInputStream(
+          (CloudPageBlobWrapper) blob, getInstrumentedContext(
+              isConcurrentOOBAppendAllowed()));
+    }
+  }
+
+  /**
    * Default permission to use when no permission metadata is found.
    * 
    * @return The default permission to use.
@@ -1125,7 +1335,7 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
     return new PermissionStatus("", "", FsPermission.getDefault());
   }
 
-  private static void storeMetadataAttribute(CloudBlockBlobWrapper blob,
+  private static void storeMetadataAttribute(CloudBlobWrapper blob,
       String key, String value) {
     HashMap<String, String> metadata = blob.getMetadata();
     if (null == metadata) {
@@ -1135,7 +1345,7 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
     blob.setMetadata(metadata);
   }
 
-  private static String getMetadataAttribute(CloudBlockBlobWrapper blob,
+  private static String getMetadataAttribute(CloudBlobWrapper blob,
       String... keyAlternatives) {
     HashMap<String, String> metadata = blob.getMetadata();
     if (null == metadata) {
@@ -1149,7 +1359,7 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
     return null;
   }
 
-  private static void removeMetadataAttribute(CloudBlockBlobWrapper blob,
+  private static void removeMetadataAttribute(CloudBlobWrapper blob,
       String key) {
     HashMap<String, String> metadata = blob.getMetadata();
     if (metadata != null) {
@@ -1158,7 +1368,7 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
     }
   }
 
-  private void storePermissionStatus(CloudBlockBlobWrapper blob,
+  private static void storePermissionStatus(CloudBlobWrapper blob,
       PermissionStatus permissionStatus) {
     storeMetadataAttribute(blob, PERMISSION_METADATA_KEY,
         PERMISSION_JSON_SERIALIZER.toJSON(permissionStatus));
@@ -1166,39 +1376,55 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
     removeMetadataAttribute(blob, OLD_PERMISSION_METADATA_KEY);
   }
 
-  private PermissionStatus getPermissionStatus(CloudBlockBlobWrapper blob) {
+  private PermissionStatus getPermissionStatus(CloudBlobWrapper blob) {
     String permissionMetadataValue = getMetadataAttribute(blob,
         PERMISSION_METADATA_KEY, OLD_PERMISSION_METADATA_KEY);
     if (permissionMetadataValue != null) {
-      return PermissionStatusJsonSerializer
-          .fromJSONString(permissionMetadataValue);
+      return PermissionStatusJsonSerializer.fromJSONString(
+          permissionMetadataValue);
     } else {
       return defaultPermissionNoBlobMetadata();
     }
   }
 
-  private static void storeFolderAttribute(CloudBlockBlobWrapper blob) {
+  private static void storeFolderAttribute(CloudBlobWrapper blob) {
     storeMetadataAttribute(blob, IS_FOLDER_METADATA_KEY, "true");
     // Remove the old metadata key if present
     removeMetadataAttribute(blob, OLD_IS_FOLDER_METADATA_KEY);
   }
 
-  private static void storeLinkAttribute(CloudBlockBlobWrapper blob,
-      String linkTarget) {
-    storeMetadataAttribute(blob, LINK_BACK_TO_UPLOAD_IN_PROGRESS_METADATA_KEY,
-        linkTarget);
+  private static void storeLinkAttribute(CloudBlobWrapper blob,
+      String linkTarget) throws UnsupportedEncodingException {
+    // We have to URL encode the link attribute as the link URI could
+    // have URI special characters which unless encoded will result
+    // in 403 errors from the server. This is due to metadata properties
+    // being sent in the HTTP header of the request which is in turn used
+    // on the server side to authorize the request.
+    String encodedLinkTarget = null;
+    if (linkTarget != null) {
+      encodedLinkTarget = URLEncoder.encode(linkTarget, "UTF-8");
+    }
+    storeMetadataAttribute(blob,
+        LINK_BACK_TO_UPLOAD_IN_PROGRESS_METADATA_KEY,
+        encodedLinkTarget);
     // Remove the old metadata key if present
     removeMetadataAttribute(blob,
         OLD_LINK_BACK_TO_UPLOAD_IN_PROGRESS_METADATA_KEY);
   }
 
-  private static String getLinkAttributeValue(CloudBlockBlobWrapper blob) {
-    return getMetadataAttribute(blob,
+  private static String getLinkAttributeValue(CloudBlobWrapper blob)
+      throws UnsupportedEncodingException {
+    String encodedLinkTarget = getMetadataAttribute(blob,
         LINK_BACK_TO_UPLOAD_IN_PROGRESS_METADATA_KEY,
         OLD_LINK_BACK_TO_UPLOAD_IN_PROGRESS_METADATA_KEY);
+    String linkTarget = null;
+    if (encodedLinkTarget != null) {
+      linkTarget = URLDecoder.decode(encodedLinkTarget, "UTF-8");
+    }
+    return linkTarget;
   }
 
-  private static boolean retrieveFolderAttribute(CloudBlockBlobWrapper blob) {
+  private static boolean retrieveFolderAttribute(CloudBlobWrapper blob) {
     HashMap<String, String> metadata = blob.getMetadata();
     return null != metadata
         && (metadata.containsKey(IS_FOLDER_METADATA_KEY) || metadata
@@ -1255,11 +1481,10 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
     try {
       checkContainer(ContainerAccessType.PureWrite);
 
-      CloudBlockBlobWrapper blob = getBlobReference(key);
+      CloudBlobWrapper blob = getBlobReference(key);
       storePermissionStatus(blob, permissionStatus);
       storeFolderAttribute(blob);
-      blob.upload(new ByteArrayInputStream(new byte[0]),
-          getInstrumentedContext());
+      openOutputStream(blob).close();
     } catch (Exception e) {
       // Caught exception while attempting upload. Re-throw as an Azure
       // storage exception.
@@ -1293,11 +1518,10 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
     try {
       checkContainer(ContainerAccessType.PureWrite);
 
-      CloudBlockBlobWrapper blob = getBlobReference(key);
+      CloudBlobWrapper blob = getBlobReference(key);
       storePermissionStatus(blob, permissionStatus);
       storeLinkAttribute(blob, tempBlobKey);
-      blob.upload(new ByteArrayInputStream(new byte[0]),
-          getInstrumentedContext());
+      openOutputStream(blob).close();
     } catch (Exception e) {
       // Caught exception while attempting upload. Re-throw as an Azure
       // storage exception.
@@ -1322,7 +1546,7 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
     try {
       checkContainer(ContainerAccessType.PureRead);
 
-      CloudBlockBlobWrapper blob = getBlobReference(key);
+      CloudBlobWrapper blob = getBlobReference(key);
       blob.downloadAttributes(getInstrumentedContext());
       return getLinkAttributeValue(blob);
     } catch (Exception e) {
@@ -1366,10 +1590,12 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
   private Iterable<ListBlobItem> listRootBlobs(boolean includeMetadata)
       throws StorageException, URISyntaxException {
     return rootDirectory.listBlobs(
+        null, false,
+        includeMetadata ?
+            EnumSet.of(BlobListingDetails.METADATA) :
+              EnumSet.noneOf(BlobListingDetails.class),
         null,
-        false,
-        includeMetadata ? EnumSet.of(BlobListingDetails.METADATA) : EnumSet
-            .noneOf(BlobListingDetails.class), null, getInstrumentedContext());
+              getInstrumentedContext());
   }
 
   /**
@@ -1392,11 +1618,14 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
   private Iterable<ListBlobItem> listRootBlobs(String aPrefix,
       boolean includeMetadata) throws StorageException, URISyntaxException {
 
-    return rootDirectory.listBlobs(
-        aPrefix,
+    Iterable<ListBlobItem> list = rootDirectory.listBlobs(aPrefix,
         false,
-        includeMetadata ? EnumSet.of(BlobListingDetails.METADATA) : EnumSet
-            .noneOf(BlobListingDetails.class), null, getInstrumentedContext());
+        includeMetadata ?
+            EnumSet.of(BlobListingDetails.METADATA) :
+              EnumSet.noneOf(BlobListingDetails.class),
+              null,
+              getInstrumentedContext());
+    return list;
   }
 
   /**
@@ -1423,15 +1652,17 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
    * @throws URISyntaxException
    * 
    */
-  private Iterable<ListBlobItem> listRootBlobs(String aPrefix,
-      boolean useFlatBlobListing, EnumSet<BlobListingDetails> listingDetails,
-      BlobRequestOptions options, OperationContext opContext)
-      throws StorageException, URISyntaxException {
+  private Iterable<ListBlobItem> listRootBlobs(String aPrefix, boolean useFlatBlobListing,
+      EnumSet<BlobListingDetails> listingDetails, BlobRequestOptions options,
+      OperationContext opContext) throws StorageException, URISyntaxException {
 
-    CloudBlobDirectoryWrapper directory = this.container
-        .getDirectoryReference(aPrefix);
-    return directory.listBlobs(null, useFlatBlobListing, listingDetails,
-        options, opContext);
+    CloudBlobDirectoryWrapper directory =  this.container.getDirectoryReference(aPrefix);
+    return directory.listBlobs(
+        null,
+        useFlatBlobListing,
+        listingDetails,
+        options,
+        opContext);
   }
 
   /**
@@ -1447,15 +1678,18 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
    * @throws URISyntaxException
    * 
    */
-  private CloudBlockBlobWrapper getBlobReference(String aKey)
+  private CloudBlobWrapper getBlobReference(String aKey)
       throws StorageException, URISyntaxException {
 
-    CloudBlockBlobWrapper blob = this.container.getBlockBlobReference(aKey);
-
+    CloudBlobWrapper blob = null;
+    if (isPageBlobKey(aKey)) {
+      blob = this.container.getPageBlobReference(aKey);
+    } else {
+      blob = this.container.getBlockBlobReference(aKey);
     blob.setStreamMinimumReadSizeInBytes(downloadBlockSizeBytes);
     blob.setWriteBlockSizeInBytes(uploadBlockSizeBytes);
+    }
 
-    // Return with block blob.
     return blob;
   }
 
@@ -1492,7 +1726,7 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
    * 
    * @returns normKey
    */
-  private String normalizeKey(CloudBlockBlobWrapper blob) {
+  private String normalizeKey(CloudBlobWrapper blob) {
     return normalizeKey(blob.getUri());
   }
 
@@ -1552,20 +1786,19 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
         instrumentation,
         bandwidthGaugeUpdater);
     
-    // Bind operation context to receive send request callbacks on this
-    // operation.
-    // If reads concurrent to OOB writes are allowed, the interception will
-    // reset the conditional header on all Azure blob storage read requests.
+    // Bind operation context to receive send request callbacks on this operation.
+    // If reads concurrent to OOB writes are allowed, the interception will reset
+    // the conditional header on all Azure blob storage read requests.
     if (bindConcurrentOOBIo) {
       SendRequestIntercept.bind(storageInteractionLayer.getCredentials(),
           operationContext, true);
     }
 
     if (testHookOperationContext != null) {
-      operationContext = testHookOperationContext
-          .modifyOperationContext(operationContext);
+      operationContext =
+          testHookOperationContext.modifyOperationContext(operationContext);
     }
-    
+
     ErrorMetricUpdater.hook(operationContext, instrumentation);
 
     // Return the operation context.
@@ -1605,7 +1838,7 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
             BlobMaterialization.Implicit);
       }
 
-      CloudBlockBlobWrapper blob = getBlobReference(key);
+      CloudBlobWrapper blob = getBlobReference(key);
 
       // Download attributes and return file metadata only if the blob
       // exists.
@@ -1634,7 +1867,8 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
 
           return new FileMetadata(
               key, // Always return denormalized key with metadata.
-              properties.getLength(), properties.getLastModified().getTime(),
+              getDataLength(blob, properties),
+              properties.getLastModified().getTime(),
               getPermissionStatus(blob));
         }
       }
@@ -1642,17 +1876,23 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
       // There is no file with that key name, but maybe it is a folder.
       // Query the underlying folder/container to list the blobs stored
       // there under that key.
-      Iterable<ListBlobItem> objects = listRootBlobs(key, true,
-          EnumSet.of(BlobListingDetails.METADATA), null,
+      //
+      Iterable<ListBlobItem> objects =
+          listRootBlobs(
+              key,
+              true,
+              EnumSet.of(BlobListingDetails.METADATA),
+              null,
           getInstrumentedContext());
 
       // Check if the directory/container has the blob items.
       for (ListBlobItem blobItem : objects) {
-        if (blobItem instanceof CloudBlockBlobWrapper) {
+        if (blobItem instanceof CloudBlockBlobWrapper
+            || blobItem instanceof CloudPageBlobWrapper) {
           LOG.debug("Found blob as a directory-using this file under it to infer its properties "
               + blobItem.getUri());
 
-          blob = (CloudBlockBlobWrapper) blobItem;
+          blob = (CloudBlobWrapper) blobItem;
           // The key specifies a directory. Create a FileMetadata object which
           // specifies as such.
           BlobProperties properties = blob.getProperties();
@@ -1672,10 +1912,7 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
   }
 
   @Override
-  public DataInputStream retrieve(String key) throws AzureException {
-    InputStream inStream = null;
-    BufferedInputStream inBufStream = null;
-    try {
+  public DataInputStream retrieve(String key) throws AzureException, IOException {
       try {
         // Check if a session exists, if not create a session with the
         // Azure storage server.
@@ -1688,27 +1925,13 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
         checkContainer(ContainerAccessType.PureRead);
 
         // Get blob reference and open the input buffer stream.
-        CloudBlockBlobWrapper blob = getBlobReference(key);
-        inStream = blob.openInputStream(getDownloadOptions(),
-            getInstrumentedContext(isConcurrentOOBAppendAllowed()));
-
-        inBufStream = new BufferedInputStream(inStream);
+        CloudBlobWrapper blob = getBlobReference(key);
+      BufferedInputStream inBufStream = new BufferedInputStream(
+          openInputStream(blob));
 
         // Return a data input stream.
         DataInputStream inDataStream = new DataInputStream(inBufStream);
         return inDataStream;
-      }
-      catch (Exception e){
-        // close the streams on error.
-        // We use nested try-catch as stream.close() can throw IOException.
-        if(inBufStream != null){
-          inBufStream.close();
-        }
-        if(inStream != null){
-          inStream.close();
-        }
-        throw e;
-      }
     } catch (Exception e) {
       // Re-throw as an Azure storage exception.
       throw new AzureException(e);
@@ -1717,11 +1940,7 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
 
   @Override
   public DataInputStream retrieve(String key, long startByteOffset)
-      throws AzureException {
-
-    InputStream in = null;
-    DataInputStream inDataStream = null;
-    try {
+      throws AzureException, IOException {
       try {
         // Check if a session exists, if not create a session with the
         // Azure storage server.
@@ -1734,31 +1953,20 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
         checkContainer(ContainerAccessType.PureRead);
 
         // Get blob reference and open the input buffer stream.
-        CloudBlockBlobWrapper blob = getBlobReference(key);
+        CloudBlobWrapper blob = getBlobReference(key);
 
         // Open input stream and seek to the start offset.
-        in = blob.openInputStream(getDownloadOptions(),
-            getInstrumentedContext(isConcurrentOOBAppendAllowed()));
+        InputStream in = blob.openInputStream(
+          getDownloadOptions(), getInstrumentedContext(isConcurrentOOBAppendAllowed()));
 
         // Create a data input stream.
-        inDataStream = new DataInputStream(in);
-        long skippedBytes = inDataStream.skip(startByteOffset);
-        if (skippedBytes != startByteOffset) {
-          throw new IOException("Couldn't skip the requested number of bytes");
-        }
+	    DataInputStream inDataStream = new DataInputStream(in);
+	    
+	    // Skip bytes and ignore return value. This is okay
+	    // because if you try to skip too far you will be positioned
+	    // at the end and reads will not return data.
+	    inDataStream.skip(startByteOffset);
         return inDataStream;
-      }
-      catch (Exception e){
-        // close the streams on error.
-        // We use nested try-catch as stream.close() can throw IOException.
-        if(inDataStream != null){
-          inDataStream.close();
-        }
-        if(in != null){
-          in.close();
-        }
-        throw e;
-      }
     } catch (Exception e) {
       // Re-throw as an Azure storage exception.
       throw new AzureException(e);
@@ -1825,13 +2033,14 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
       for (ListBlobItem blobItem : objects) {
         // Check that the maximum listing count is not exhausted.
         //
-        if (0 < maxListingCount && fileMetadata.size() >= maxListingCount) {
+        if (0 < maxListingCount
+            && fileMetadata.size() >= maxListingCount) {
           break;
         }
 
-        if (blobItem instanceof CloudBlockBlobWrapper) {
+        if (blobItem instanceof CloudBlockBlobWrapper || blobItem instanceof CloudPageBlobWrapper) {
           String blobKey = null;
-          CloudBlockBlobWrapper blob = (CloudBlockBlobWrapper) blobItem;
+          CloudBlobWrapper blob = (CloudBlobWrapper) blobItem;
           BlobProperties properties = blob.getProperties();
 
           // Determine format of the blob name depending on whether an absolute
@@ -1840,11 +2049,14 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
 
           FileMetadata metadata;
           if (retrieveFolderAttribute(blob)) {
-            metadata = new FileMetadata(blobKey, properties.getLastModified()
-                .getTime(), getPermissionStatus(blob),
+            metadata = new FileMetadata(blobKey,
+                properties.getLastModified().getTime(),
+                getPermissionStatus(blob),
                 BlobMaterialization.Explicit);
           } else {
-            metadata = new FileMetadata(blobKey, properties.getLength(),
+            metadata = new FileMetadata(
+                blobKey,
+                getDataLength(blob, properties),
                 properties.getLastModified().getTime(),
                 getPermissionStatus(blob));
           }
@@ -1890,9 +2102,11 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
       }
       // Note: Original code indicated that this may be a hack.
       priorLastKey = null;
-      return new PartialListing(priorLastKey,
+      PartialListing listing = new PartialListing(priorLastKey,
           fileMetadata.toArray(new FileMetadata[] {}),
-          0 == fileMetadata.size() ? new String[] {} : new String[] { prefix });
+          0 == fileMetadata.size() ? new String[] {}
+      : new String[] { prefix });
+      return listing;
     } catch (Exception e) {
       // Re-throw as an Azure storage exception.
       //
@@ -1919,7 +2133,9 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
       final int maxListingDepth) throws Exception {
 
     // Push the blob directory onto the stack.
-    LinkedList<Iterator<ListBlobItem>> dirIteratorStack = new LinkedList<Iterator<ListBlobItem>>();
+    //
+    AzureLinkedStack<Iterator<ListBlobItem>> dirIteratorStack =
+        new AzureLinkedStack<Iterator<ListBlobItem>>();
 
     Iterable<ListBlobItem> blobItems = aCloudBlobDirectory.listBlobs(null,
         false, EnumSet.of(BlobListingDetails.METADATA), null,
@@ -1958,9 +2174,10 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
 
         // Add the file metadata to the list if this is not a blob
         // directory item.
-        if (blobItem instanceof CloudBlockBlobWrapper) {
+        //
+        if (blobItem instanceof CloudBlockBlobWrapper || blobItem instanceof CloudPageBlobWrapper) {
           String blobKey = null;
-          CloudBlockBlobWrapper blob = (CloudBlockBlobWrapper) blobItem;
+          CloudBlobWrapper blob = (CloudBlobWrapper) blobItem;
           BlobProperties properties = blob.getProperties();
 
           // Determine format of the blob name depending on whether an absolute
@@ -1969,11 +2186,14 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
 
           FileMetadata metadata;
           if (retrieveFolderAttribute(blob)) {
-            metadata = new FileMetadata(blobKey, properties.getLastModified()
-                .getTime(), getPermissionStatus(blob),
+            metadata = new FileMetadata(blobKey,
+                properties.getLastModified().getTime(),
+                getPermissionStatus(blob),
                 BlobMaterialization.Explicit);
           } else {
-            metadata = new FileMetadata(blobKey, properties.getLength(),
+            metadata = new FileMetadata(
+                blobKey,
+                getDataLength(blob, properties),
                 properties.getLastModified().getTime(),
                 getPermissionStatus(blob));
           }
@@ -2016,7 +2236,9 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
               // Note: Something smarter should be done about permissions. Maybe
               // inherit the permissions of the first non-directory blob.
               // Also, getting a proper value for last-modified is tricky.
-              FileMetadata directoryMetadata = new FileMetadata(dirKey, 0,
+              //
+              FileMetadata directoryMetadata = new FileMetadata(dirKey,
+                  0,
                   defaultPermissionNoBlobMetadata(),
                   BlobMaterialization.Implicit);
 
@@ -2050,26 +2272,48 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
   }
 
   /**
-   * Deletes the given blob, taking special care that if we get a blob-not-found
-   * exception upon retrying the operation, we just swallow the error since what
-   * most probably happened is that the first operation succeeded on the server.
-   * 
-   * @param blob
-   *          The blob to delete.
+   * Return the actual data length of the blob with the specified properties.
+   * If it is a page blob, you can't rely on the length from the properties
+   * argument and you must get it from the file. Otherwise, you can.
+   */
+  private long getDataLength(CloudBlobWrapper blob, BlobProperties properties)
+    throws AzureException {
+    if (blob instanceof CloudPageBlobWrapper) {
+      try {
+        return PageBlobInputStream.getPageBlobSize((CloudPageBlobWrapper) blob,
+            getInstrumentedContext(
+                isConcurrentOOBAppendAllowed()));
+      } catch (Exception e) {
+        throw new AzureException(
+            "Unexpected exception getting page blob actual data size.", e);
+      }
+    }
+    return properties.getLength();
+  }
+
+  /**
+   * Deletes the given blob, taking special care that if we get a
+   * blob-not-found exception upon retrying the operation, we just
+   * swallow the error since what most probably happened is that
+   * the first operation succeeded on the server.
+   * @param blob The blob to delete.
+   * @param leaseID A string identifying the lease, or null if no
+   *        lease is to be used.
    * @throws StorageException
    */
-  private void safeDelete(CloudBlockBlobWrapper blob) throws StorageException {
+  private void safeDelete(CloudBlobWrapper blob, SelfRenewingLease lease) throws StorageException {
     OperationContext operationContext = getInstrumentedContext();
     try {
-      blob.delete(operationContext);
+      blob.delete(operationContext, lease);
     } catch (StorageException e) {
       // On exception, check that if:
       // 1. It's a BlobNotFound exception AND
       // 2. It got there after one-or-more retries THEN
       // we swallow the exception.
-      if (e.getErrorCode() != null && e.getErrorCode().equals("BlobNotFound")
-          && operationContext.getRequestResults().size() > 1
-          && operationContext.getRequestResults().get(0).getException() != null) {
+      if (e.getErrorCode() != null &&
+          e.getErrorCode().equals("BlobNotFound") &&
+          operationContext.getRequestResults().size() > 1 &&
+          operationContext.getRequestResults().get(0).getException() != null) {
         if (LOG.isDebugEnabled()) {
           LOG.debug("Swallowing delete exception on retry: " + e.getMessage());
         }
@@ -2077,21 +2321,25 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
       } else {
         throw e;
       }
+    } finally {
+      if (lease != null) {
+        lease.free();
+      }
     }
   }
 
   @Override
-  public void delete(String key) throws IOException {
+  public void delete(String key, SelfRenewingLease lease) throws IOException {
     try {
       if (checkContainer(ContainerAccessType.ReadThenWrite) == ContainerState.DoesntExist) {
         // Container doesn't exist, no need to do anything
         return;
       }
 
-      // Get the blob reference an delete it.
-      CloudBlockBlobWrapper blob = getBlobReference(key);
+      // Get the blob reference and delete it.
+      CloudBlobWrapper blob = getBlobReference(key);
       if (blob.exists(getInstrumentedContext())) {
-        safeDelete(blob);
+        safeDelete(blob, lease);
       }
     } catch (Exception e) {
       // Re-throw as an Azure storage exception.
@@ -2100,12 +2348,27 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
   }
 
   @Override
+  public void delete(String key) throws IOException {
+    delete(key, null);
+  }
+
+  @Override
   public void rename(String srcKey, String dstKey) throws IOException {
+    rename(srcKey, dstKey, false, null);
+  }
+
+  @Override
+  public void rename(String srcKey, String dstKey, boolean acquireLease,
+      SelfRenewingLease existingLease) throws IOException {
 
     if (LOG.isDebugEnabled()) {
       LOG.debug("Moving " + srcKey + " to " + dstKey);
     }
 
+    if (acquireLease && existingLease != null) {
+      throw new IOException("Cannot acquire new lease if one already exists.");
+    }
+
     try {
       // Attempts rename may occur before opening any streams so first,
       // check if a session exists, if not create a session with the Azure
@@ -2120,52 +2383,76 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
       checkContainer(ContainerAccessType.ReadThenWrite);
       // Get the source blob and assert its existence. If the source key
       // needs to be normalized then normalize it.
-      CloudBlockBlobWrapper srcBlob = getBlobReference(srcKey);
+      //
+      CloudBlobWrapper srcBlob = getBlobReference(srcKey);
 
       if (!srcBlob.exists(getInstrumentedContext())) {
-        throw new AzureException("Source blob " + srcKey + " does not exist.");
+        throw new AzureException ("Source blob " + srcKey +
+            " does not exist.");
+      }
+
+      /**
+       * Conditionally get a lease on the source blob to prevent other writers
+       * from changing it. This is used for correctness in HBase when log files
+       * are renamed. It generally should do no harm other than take a little
+       * more time for other rename scenarios. When the HBase master renames a
+       * log file folder, the lease locks out other writers.  This
+       * prevents a region server that the master thinks is dead, but is still
+       * alive, from committing additional updates.  This is different than
+       * when HBase runs on HDFS, where the region server recovers the lease
+       * on a log file, to gain exclusive access to it, before it splits it.
+       */
+      SelfRenewingLease lease = null;
+      if (acquireLease) {
+        lease = srcBlob.acquireLease();
+      } else if (existingLease != null) {
+        lease = existingLease;
       }
 
       // Get the destination blob. The destination key always needs to be
       // normalized.
-      CloudBlockBlobWrapper dstBlob = getBlobReference(dstKey);
+      //
+      CloudBlobWrapper dstBlob = getBlobReference(dstKey);
+
+      // TODO: Remove at the time when we move to Azure Java SDK 1.2+.
+      // This is the workaround provided by Azure Java SDK team to
+      // mitigate the issue with un-encoded x-ms-copy-source HTTP
+      // request header. Azure sdk version before 1.2+ does not encode this
+      // header what causes all URIs that have special (category "other")
+      // characters in the URI not to work with startCopyFromBlob when
+      // specified as source (requests fail with HTTP 403).
+      URI srcUri = new URI(srcBlob.getUri().toASCIIString());
 
       // Rename the source blob to the destination blob by copying it to
       // the destination blob then deleting it.
       //
-      dstBlob.startCopyFromBlob(srcBlob, getInstrumentedContext());
+      dstBlob.startCopyFromBlob(srcUri, getInstrumentedContext());
       waitForCopyToComplete(dstBlob, getInstrumentedContext());
 
-      safeDelete(srcBlob);
+      safeDelete(srcBlob, lease);
     } catch (Exception e) {
       // Re-throw exception as an Azure storage exception.
       throw new AzureException(e);
     }
   }
 
-  private void waitForCopyToComplete(CloudBlockBlobWrapper blob,
-      OperationContext opContext) throws AzureException {
+  private void waitForCopyToComplete(CloudBlobWrapper blob, OperationContext opContext){
     boolean copyInProgress = true;
-    int exceptionCount = 0;
     while (copyInProgress) {
       try {
         blob.downloadAttributes(opContext);
-      } catch (StorageException se) {
-        exceptionCount++;
-        if(exceptionCount > 10){
-          throw new AzureException("Too many storage exceptions during waitForCopyToComplete", se);
         }
+      catch (StorageException se){
       }
 
-      // test for null because mocked filesystem doesn't know about copystates
-      // yet.
-      copyInProgress = (blob.getCopyState() != null && blob.getCopyState()
-          .getStatus() == CopyStatus.PENDING);
+      // test for null because mocked filesystem doesn't know about copystates yet.
+      copyInProgress = (blob.getCopyState() != null && blob.getCopyState().getStatus() == CopyStatus.PENDING);
       if (copyInProgress) {
         try {
           Thread.sleep(1000);
-        } catch (InterruptedException ie) {
-          Thread.currentThread().interrupt();
+          }
+          catch (InterruptedException ie){
+            //ignore
         }
       }
     }
@@ -2179,7 +2466,7 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
       throws AzureException {
     try {
       checkContainer(ContainerAccessType.ReadThenWrite);
-      CloudBlockBlobWrapper blob = getBlobReference(key);
+      CloudBlobWrapper blob = getBlobReference(key);
       blob.downloadAttributes(getInstrumentedContext());
       storePermissionStatus(blob, newPermission);
       blob.uploadMetadata(getInstrumentedContext());
@@ -2220,28 +2507,51 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
     }
   }
 
+  /**
+   * Get a lease on the blob identified by key. This lease will be renewed
+   * indefinitely by a background thread.
+   */
+  @Override
+  public SelfRenewingLease acquireLease(String key) throws AzureException {
+    LOG.debug("acquiring lease on " + key);
+    try {
+      checkContainer(ContainerAccessType.ReadThenWrite);
+      CloudBlobWrapper blob = getBlobReference(key);
+      return blob.acquireLease();
+    }
+    catch (Exception e) {
+
+      // Caught exception while attempting to get lease. Re-throw as an
+      // Azure storage exception.
+      throw new AzureException(e);
+    }
+  }
+
   @Override
-  public void updateFolderLastModifiedTime(String key, Date lastModified)
+  public void updateFolderLastModifiedTime(String key, Date lastModified,
+      SelfRenewingLease folderLease)
       throws AzureException {
     try {
       checkContainer(ContainerAccessType.ReadThenWrite);
-      CloudBlockBlobWrapper blob = getBlobReference(key);
+      CloudBlobWrapper blob = getBlobReference(key);
       blob.getProperties().setLastModified(lastModified);
-      blob.uploadProperties(getInstrumentedContext());
+      blob.uploadProperties(getInstrumentedContext(), folderLease);
     } catch (Exception e) {
-      // Caught exception while attempting update the properties. Re-throw as an
+
+      // Caught exception while attempting to update the properties. Re-throw as an
       // Azure storage exception.
       throw new AzureException(e);
     }
   }
 
   @Override
-  public void updateFolderLastModifiedTime(String key) throws AzureException {
+  public void updateFolderLastModifiedTime(String key,
+      SelfRenewingLease folderLease) throws AzureException {
     final Calendar lastModifiedCalendar = Calendar
         .getInstance(Utility.LOCALE_US);
     lastModifiedCalendar.setTimeZone(Utility.UTC_ZONE);
     Date lastModified = lastModifiedCalendar.getTime();
-    updateFolderLastModifiedTime(key, lastModified);
+    updateFolderLastModifiedTime(key, lastModified, folderLease);
   }
 
   @Override