You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cn...@apache.org on 2014/10/08 23:24:40 UTC

[3/5] HADOOP-10809. hadoop-azure: page blob support. Contributed by Dexter Bradshaw, Mostafa Elhemali, Eric Hanson, and Mike Liddell.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2217e2f8/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/PageBlobOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/PageBlobOutputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/PageBlobOutputStream.java
new file mode 100644
index 0000000..95f0c22
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/PageBlobOutputStream.java
@@ -0,0 +1,497 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azure;
+
+import static org.apache.hadoop.fs.azure.PageBlobFormatHelpers.PAGE_DATA_SIZE;
+import static org.apache.hadoop.fs.azure.PageBlobFormatHelpers.PAGE_HEADER_SIZE;
+import static org.apache.hadoop.fs.azure.PageBlobFormatHelpers.PAGE_SIZE;
+import static org.apache.hadoop.fs.azure.PageBlobFormatHelpers.fromShort;
+import static org.apache.hadoop.fs.azure.PageBlobFormatHelpers.withMD5Checking;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.fs.Syncable;
+import org.apache.hadoop.fs.azure.StorageInterface.CloudPageBlobWrapper;
+import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.microsoft.windowsazure.storage.OperationContext;
+import com.microsoft.windowsazure.storage.StorageException;
+import com.microsoft.windowsazure.storage.blob.BlobRequestOptions;
+
+
+/**
+ * An output stream that write file data to a page blob stored using ASV's
+ * custom format.
+ */
+final class PageBlobOutputStream extends OutputStream implements Syncable {
+  /**
+   * The maximum number of raw bytes Azure Storage allows us to upload in a
+   * single request (4 MB).
+   */
+  private static final int MAX_RAW_BYTES_PER_REQUEST = 4 * 1024 * 1024;
+  /**
+   * The maximum number of pages Azure Storage allows us to upload in a
+   * single request.
+   */
+  private static final int MAX_PAGES_IN_REQUEST =
+      MAX_RAW_BYTES_PER_REQUEST / PAGE_SIZE;
+  /**
+   * The maximum number of data bytes (header not included) we can upload
+   * in a single request. I'm limiting it to (N - 1) pages to account for
+   * the possibility that we may have to rewrite the previous request's
+   * last page.
+   */
+  private static final int MAX_DATA_BYTES_PER_REQUEST =
+      PAGE_DATA_SIZE * (MAX_PAGES_IN_REQUEST - 1);
+
+  private final CloudPageBlobWrapper blob;
+  private final OperationContext opContext;
+
+  /**
+   * If the IO thread encounters an error, it'll store it here.
+   */
+  private volatile IOException lastError;
+
+  /**
+   * The current byte offset we're at in the blob (how many bytes we've
+   * uploaded to the server).
+   */
+  private long currentBlobOffset;
+  /**
+   * The data in the last page that we wrote to the server, in case we have to
+   * overwrite it in the new request.
+   */
+  private byte[] previousLastPageDataWritten = new byte[0];
+  /**
+   * The current buffer we're writing to before sending to the server.
+   */
+  private ByteArrayOutputStream outBuffer;
+  /**
+   * The task queue for writing to the server.
+   */
+  private final LinkedBlockingQueue<Runnable> ioQueue;
+  /**
+   * The thread pool we're using for writing to the server. Note that the IO
+   * write is NOT designed for parallelism, so there can only be one thread
+   * in that pool (I'm using the thread pool mainly for the lifetime management
+   * capabilities, otherwise I'd have just used a simple Thread).
+   */
+  private final ThreadPoolExecutor ioThreadPool;
+
+  // The last task given to the ioThreadPool to execute, to allow
+  // waiting until it's done.
+  private WriteRequest lastQueuedTask;
+
+  public static final Log LOG = LogFactory.getLog(AzureNativeFileSystemStore.class);
+
+  // Set the minimum page blob file size to 128MB, which is >> the default block size of 32MB.
+  // This default block size is often used as the hbase.regionserver.hlog.blocksize.
+  // The goal is to have a safe minimum size for HBase log files to allow them
+  // to be filled and rolled without exceeding the minimum size. A larger size can be
+  // used by setting the fs.azure.page.blob.size configuration variable.
+  public static final long PAGE_BLOB_MIN_SIZE = 128L * 1024L * 1024L;
+
+  /**
+   * Constructs an output stream over the given page blob.
+   *
+   * @param blob the blob that this stream is associated with.
+   * @param opContext an object used to track the execution of the operation
+   * @throws StorageException if anything goes wrong creating the blob.
+   */
+  public PageBlobOutputStream(final CloudPageBlobWrapper blob,
+      final OperationContext opContext,
+      final Configuration conf) throws StorageException {
+    this.blob = blob;
+    this.outBuffer = new ByteArrayOutputStream();
+    this.opContext = opContext;
+    this.lastQueuedTask = null;
+    this.ioQueue = new LinkedBlockingQueue<Runnable>();
+
+    // As explained above: the IO writes are not designed for parallelism,
+    // so we only have one thread in this thread pool.
+    this.ioThreadPool = new ThreadPoolExecutor(1, 1, 2, TimeUnit.SECONDS,
+        ioQueue);
+
+
+
+    // Make page blob files have a size that is the greater of a
+    // minimum size, or the value of fs.azure.page.blob.size from configuration.
+    long pageBlobConfigSize = conf.getLong("fs.azure.page.blob.size", 0);
+    LOG.debug("Read value of fs.azure.page.blob.size as " + pageBlobConfigSize
+        + " from configuration (0 if not present).");
+    long pageBlobSize = Math.max(PAGE_BLOB_MIN_SIZE, pageBlobConfigSize);
+
+    // Ensure that the pageBlobSize is a multiple of page size.
+    if (pageBlobSize % PAGE_SIZE != 0) {
+      pageBlobSize += PAGE_SIZE - pageBlobSize % PAGE_SIZE;
+    }
+    blob.create(pageBlobSize, new BlobRequestOptions(), opContext);
+  }
+
+  private void checkStreamState() throws IOException {
+    if (lastError != null) {
+      throw lastError;
+    }
+  }
+
+  /**
+   * Closes this output stream and releases any system resources associated with
+   * this stream. If any data remains in the buffer it is committed to the
+   * service.
+   */
+  @Override
+  public void close() throws IOException {
+    LOG.debug("Closing page blob output stream.");
+    flush();
+    checkStreamState();
+    ioThreadPool.shutdown();
+    try {
+      LOG.debug(ioThreadPool.toString());
+      if (!ioThreadPool.awaitTermination(10, TimeUnit.MINUTES)) {
+        LOG.debug("Timed out after 10 minutes waiting for IO requests to finish");
+        logAllStackTraces();
+        LOG.debug(ioThreadPool.toString());
+        throw new IOException("Timed out waiting for IO requests to finish");
+      }
+    } catch (InterruptedException e) {
+      LOG.debug("Caught InterruptedException");
+
+      // Restore the interrupted status
+      Thread.currentThread().interrupt();
+    }
+
+    this.lastError = new IOException("Stream is already closed.");
+  }
+
+  // Log the stacks of all threads.
+  private void logAllStackTraces() {
+    Map liveThreads = Thread.getAllStackTraces();
+    for (Iterator i = liveThreads.keySet().iterator(); i.hasNext(); ) {
+      Thread key = (Thread) i.next();
+      LOG.debug("Thread " + key.getName());
+      StackTraceElement[] trace = (StackTraceElement[]) liveThreads.get(key);
+      for (int j = 0; j < trace.length; j++) {
+        LOG.debug("\tat " + trace[j]);
+      }
+    }
+  }
+
+  /**
+   * A single write request for data to write to Azure storage.
+   */
+  private class WriteRequest implements Runnable {
+    private final byte[] dataPayload;
+    private final CountDownLatch doneSignal = new CountDownLatch(1);
+
+    public WriteRequest(byte[] dataPayload) {
+      this.dataPayload = dataPayload;
+    }
+
+    public void waitTillDone() throws InterruptedException {
+      doneSignal.await();
+    }
+
+    @Override
+    public void run() {
+      try {
+        LOG.debug("before runInternal()");
+        runInternal();
+        LOG.debug("after runInternal()");
+      } finally {
+        doneSignal.countDown();
+      }
+    }
+
+    private void runInternal() {
+      if (lastError != null) {
+        // We're already in an error state, no point doing anything.
+        return;
+      }
+      if (dataPayload.length == 0) {
+        // Nothing to do.
+        return;
+      }
+
+      // Since we have to rewrite the last request's last page's data
+      // (may be empty), total data size is our data plus whatever was
+      // left from there.
+      final int totalDataBytes = dataPayload.length 
+          + previousLastPageDataWritten.length;
+      // Calculate the total number of pages we're writing to the server.
+      final int numberOfPages = (totalDataBytes / PAGE_DATA_SIZE) 
+          + (totalDataBytes % PAGE_DATA_SIZE == 0 ? 0 : 1);
+      // Fill up the raw bytes we're writing.
+      byte[] rawPayload = new byte[numberOfPages * PAGE_SIZE];
+      // Keep track of the size of the last page we uploaded.
+      int currentLastPageDataSize = -1;
+      for (int page = 0; page < numberOfPages; page++) {
+        // Our current byte offset in the data.
+        int dataOffset = page * PAGE_DATA_SIZE;
+        // Our current byte offset in the raw buffer.
+        int rawOffset = page * PAGE_SIZE;
+        // The size of the data in the current page.
+        final short currentPageDataSize = (short) Math.min(PAGE_DATA_SIZE,
+            totalDataBytes - dataOffset);
+        // Save off this page's size as the potential last page's size.
+        currentLastPageDataSize = currentPageDataSize;
+
+        // Write out the page size in the header.
+        final byte[] header = fromShort(currentPageDataSize);
+        System.arraycopy(header, 0, rawPayload, rawOffset, header.length);
+        rawOffset += header.length;
+
+        int bytesToCopyFromDataPayload = currentPageDataSize;
+        if (dataOffset < previousLastPageDataWritten.length) {
+          // First write out the last page's data.
+          final int bytesToCopyFromLastPage = Math.min(currentPageDataSize,
+              previousLastPageDataWritten.length - dataOffset);
+          System.arraycopy(previousLastPageDataWritten, dataOffset,
+              rawPayload, rawOffset, bytesToCopyFromLastPage);
+          bytesToCopyFromDataPayload -= bytesToCopyFromLastPage;
+          rawOffset += bytesToCopyFromLastPage;
+          dataOffset += bytesToCopyFromLastPage;
+        }
+
+        if (dataOffset >= previousLastPageDataWritten.length) {
+          // Then write the current payload's data.
+          System.arraycopy(dataPayload, 
+        	  dataOffset - previousLastPageDataWritten.length,
+              rawPayload, rawOffset, bytesToCopyFromDataPayload);
+        }
+      }
+
+      // Raw payload constructed, ship it off to the server.
+      writePayloadToServer(rawPayload);
+
+      // Post-send bookkeeping.
+      currentBlobOffset += rawPayload.length;
+      if (currentLastPageDataSize < PAGE_DATA_SIZE) {
+        // Partial page, save it off so it's overwritten in the next request.
+        final int startOffset = (numberOfPages - 1) * PAGE_SIZE + PAGE_HEADER_SIZE;
+        previousLastPageDataWritten = Arrays.copyOfRange(rawPayload,
+            startOffset,
+            startOffset + currentLastPageDataSize);
+        // Since we're rewriting this page, set our current offset in the server
+        // to that page's beginning.
+        currentBlobOffset -= PAGE_SIZE;
+      } else {
+        // It wasn't a partial page, we won't need to rewrite it.
+        previousLastPageDataWritten = new byte[0];
+      }
+    }
+
+    /**
+     * Writes the given raw payload to Azure Storage at the current blob
+     * offset.
+     */
+    private void writePayloadToServer(byte[] rawPayload) {
+      final ByteArrayInputStream wrapperStream =
+                  new ByteArrayInputStream(rawPayload);
+      LOG.debug("writing payload of " + rawPayload.length + " bytes to Azure page blob");
+      try {
+        long start = System.currentTimeMillis();
+        blob.uploadPages(wrapperStream, currentBlobOffset, rawPayload.length,
+            withMD5Checking(), PageBlobOutputStream.this.opContext);
+        long end = System.currentTimeMillis();
+        LOG.trace("Azure uploadPages time for " + rawPayload.length + " bytes = " + (end - start));
+      } catch (IOException ex) {
+        LOG.debug(ExceptionUtils.getStackTrace(ex));
+        lastError = ex;
+      } catch (StorageException ex) {
+        LOG.debug(ExceptionUtils.getStackTrace(ex));
+        lastError = new IOException(ex);
+      }
+      if (lastError != null) {
+        LOG.debug("Caught error in PageBlobOutputStream#writePayloadToServer()");
+      }
+    }
+  }
+
+  private synchronized void flushIOBuffers()  {
+    if (outBuffer.size() == 0) {
+      return;
+    }
+    lastQueuedTask = new WriteRequest(outBuffer.toByteArray());
+    ioThreadPool.execute(lastQueuedTask);
+    outBuffer = new ByteArrayOutputStream();
+  }
+
+  /**
+   * Flushes this output stream and forces any buffered output bytes to be
+   * written out. If any data remains in the buffer it is committed to the
+   * service. Data is queued for writing but not forced out to the service
+   * before the call returns.
+   */
+  @Override
+  public void flush() throws IOException {
+    checkStreamState();
+    flushIOBuffers();
+  }
+
+  /**
+   * Writes b.length bytes from the specified byte array to this output stream.
+   *
+   * @param data
+   *          the byte array to write.
+   *
+   * @throws IOException
+   *           if an I/O error occurs. In particular, an IOException may be
+   *           thrown if the output stream has been closed.
+   */
+  @Override
+  public void write(final byte[] data) throws IOException {
+    write(data, 0, data.length);
+  }
+
+  /**
+   * Writes length bytes from the specified byte array starting at offset to
+   * this output stream.
+   *
+   * @param data
+   *          the byte array to write.
+   * @param offset
+   *          the start offset in the data.
+   * @param length
+   *          the number of bytes to write.
+   * @throws IOException
+   *           if an I/O error occurs. In particular, an IOException may be
+   *           thrown if the output stream has been closed.
+   */
+  @Override
+  public void write(final byte[] data, final int offset, final int length)
+      throws IOException {
+    if (offset < 0 || length < 0 || length > data.length - offset) {
+      throw new IndexOutOfBoundsException();
+    }
+
+    writeInternal(data, offset, length);
+  }
+
+  /**
+   * Writes the specified byte to this output stream. The general contract for
+   * write is that one byte is written to the output stream. The byte to be
+   * written is the eight low-order bits of the argument b. The 24 high-order
+   * bits of b are ignored.
+   *
+   * @param byteVal
+   *          the byteValue to write.
+   * @throws IOException
+   *           if an I/O error occurs. In particular, an IOException may be
+   *           thrown if the output stream has been closed.
+   */
+  @Override
+  public void write(final int byteVal) throws IOException {
+    write(new byte[] { (byte) (byteVal & 0xFF) });
+  }
+
+  /**
+   * Writes the data to the buffer and triggers writes to the service as needed.
+   *
+   * @param data
+   *          the byte array to write.
+   * @param offset
+   *          the start offset in the data.
+   * @param length
+   *          the number of bytes to write.
+   * @throws IOException
+   *           if an I/O error occurs. In particular, an IOException may be
+   *           thrown if the output stream has been closed.
+   */
+  private synchronized void writeInternal(final byte[] data, int offset,
+      int length) throws IOException {
+    while (length > 0) {
+      checkStreamState();
+      final int availableBufferBytes = MAX_DATA_BYTES_PER_REQUEST
+          - this.outBuffer.size();
+      final int nextWrite = Math.min(availableBufferBytes, length);
+
+      outBuffer.write(data, offset, nextWrite);
+      offset += nextWrite;
+      length -= nextWrite;
+
+      if (outBuffer.size() > MAX_DATA_BYTES_PER_REQUEST) {
+        throw new RuntimeException("Internal error: maximum write size " +
+            Integer.toString(MAX_DATA_BYTES_PER_REQUEST) + "exceeded.");
+      }
+
+      if (outBuffer.size() == MAX_DATA_BYTES_PER_REQUEST) {
+        flushIOBuffers();
+      }
+    }
+  }
+
+  /**
+   * Force all data in the output stream to be written to Azure storage.
+   * Wait to return until this is complete.
+   */
+  @Override
+  public synchronized void hsync() throws IOException {
+    LOG.debug("Entering PageBlobOutputStream#hsync().");
+    long start = System.currentTimeMillis();
+  	flush();
+    LOG.debug(ioThreadPool.toString());
+    try {
+      if (lastQueuedTask != null) {
+        lastQueuedTask.waitTillDone();
+      }
+    } catch (InterruptedException e1) {
+
+      // Restore the interrupted status
+      Thread.currentThread().interrupt();
+    }
+    LOG.debug("Leaving PageBlobOutputStream#hsync(). Total hsync duration = "
+  	  + (System.currentTimeMillis() - start) + " msec.");
+  }
+
+  @Override
+
+  public void hflush() throws IOException {
+
+    // hflush is required to force data to storage, so call hsync,
+    // which does that.
+    hsync();
+  }
+
+  @Deprecated
+  public void sync() throws IOException {
+
+    // Sync has been deprecated in favor of hflush.
+    hflush();
+  }
+
+  // For unit testing purposes: kill the IO threads.
+  @VisibleForTesting
+  void killIoThreads() {
+    ioThreadPool.shutdownNow();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2217e2f8/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/PartialListing.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/PartialListing.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/PartialListing.java
index 9e49de8..4a80d2e 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/PartialListing.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/PartialListing.java
@@ -30,7 +30,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
  * This listing may be returned in chunks, so a <code>priorLastKey</code> is
  * provided so that the next chunk may be requested.
  * </p>
- * 
+ *
  * @see NativeFileSystemStore#list(String, int, String)
  */
 @InterfaceAudience.Private

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2217e2f8/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SelfRenewingLease.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SelfRenewingLease.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SelfRenewingLease.java
new file mode 100644
index 0000000..2d5c0c8
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SelfRenewingLease.java
@@ -0,0 +1,202 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azure;
+
+import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.azure.StorageInterface.CloudBlobWrapper;
+
+import com.microsoft.windowsazure.storage.AccessCondition;
+import com.microsoft.windowsazure.storage.StorageException;
+import com.microsoft.windowsazure.storage.blob.CloudBlob;
+
+/**
+ * An Azure blob lease that automatically renews itself indefinitely
+ * using a background thread. Use it to synchronize distributed processes,
+ * or to prevent writes to the blob by other processes that don't
+ * have the lease.
+ *
+ * Creating a new Lease object blocks the caller until the Azure blob lease is
+ * acquired.
+ *
+ * Attempting to get a lease on a non-existent blob throws StorageException.
+ *
+ * Call free() to release the Lease.
+ *
+ * You can use this Lease like a distributed lock. If the holder process
+ * dies, the lease will time out since it won't be renewed.
+ */
+public class SelfRenewingLease {
+
+  private CloudBlobWrapper blobWrapper;
+  private Thread renewer;
+  private volatile boolean leaseFreed;
+  private String leaseID = null;
+  private static final int LEASE_TIMEOUT = 60;  // Lease timeout in seconds
+
+  // Time to wait to renew lease in milliseconds
+  public static final int LEASE_RENEWAL_PERIOD = 40000;
+  private static final Log LOG = LogFactory.getLog(SelfRenewingLease.class);
+
+  // Used to allocate thread serial numbers in thread name
+  private static volatile int threadNumber = 0;
+
+
+  // Time to wait to retry getting the lease in milliseconds
+  private static final int LEASE_ACQUIRE_RETRY_INTERVAL = 2000;
+
+  public SelfRenewingLease(CloudBlobWrapper blobWrapper)
+      throws StorageException {
+
+    this.leaseFreed = false;
+    this.blobWrapper = blobWrapper;
+
+    // Keep trying to get the lease until you get it.
+    CloudBlob blob = blobWrapper.getBlob();
+    while(leaseID == null) {
+      try {
+        leaseID = blob.acquireLease(LEASE_TIMEOUT, null);
+      } catch (StorageException e) {
+
+        // Throw again if we don't want to keep waiting.
+        // We expect it to be that the lease is already present,
+        // or in some cases that the blob does not exist.
+        if (!e.getErrorCode().equals("LeaseAlreadyPresent")) {
+          LOG.info(
+            "Caught exception when trying to get lease on blob "
+            + blobWrapper.getUri().toString() + ". " + e.getMessage());
+          throw e;
+        }
+      }
+      if (leaseID == null) {
+        try {
+          Thread.sleep(LEASE_ACQUIRE_RETRY_INTERVAL);
+        } catch (InterruptedException e) {
+
+          // Restore the interrupted status
+          Thread.currentThread().interrupt();
+        }
+      }
+    }
+    renewer = new Thread(new Renewer());
+
+    // A Renewer running should not keep JVM from exiting, so make it a daemon.
+    renewer.setDaemon(true);
+    renewer.setName("AzureLeaseRenewer-" + threadNumber++);
+    renewer.start();
+    LOG.debug("Acquired lease " + leaseID + " on " + blob.getUri()
+        + " managed by thread " + renewer.getName());
+  }
+
+  /**
+   * Free the lease and stop the keep-alive thread.
+   * @throws StorageException
+   */
+  public void free() throws StorageException {
+    AccessCondition accessCondition = AccessCondition.generateEmptyCondition();
+    accessCondition.setLeaseID(leaseID);
+    try {
+      blobWrapper.getBlob().releaseLease(accessCondition);
+    } catch (StorageException e) {
+      if (e.getErrorCode().equals("BlobNotFound")) {
+
+        // Don't do anything -- it's okay to free a lease
+        // on a deleted file. The delete freed the lease
+        // implicitly.
+      } else {
+
+        // This error is not anticipated, so re-throw it.
+        LOG.warn("Unanticipated exception when trying to free lease " + leaseID
+            + " on " +  blobWrapper.getStorageUri());
+        throw(e);
+      }
+    } finally {
+
+      // Even if releasing the lease fails (e.g. because the file was deleted),
+      // make sure to record that we freed the lease, to terminate the
+      // keep-alive thread.
+      leaseFreed = true;
+      LOG.debug("Freed lease " + leaseID + " on " + blobWrapper.getUri()
+          + " managed by thread " + renewer.getName());
+    }
+  }
+
+  public boolean isFreed() {
+    return leaseFreed;
+  }
+
+  public String getLeaseID() {
+    return leaseID;
+  }
+
+  public CloudBlob getCloudBlob() {
+    return blobWrapper.getBlob();
+  }
+
+  private class Renewer implements Runnable {
+
+    /**
+     * Start a keep-alive thread that will continue to renew
+     * the lease until it is freed or the process dies.
+     */
+    @Override
+    public void run() {
+      LOG.debug("Starting lease keep-alive thread.");
+      AccessCondition accessCondition =
+          AccessCondition.generateEmptyCondition();
+      accessCondition.setLeaseID(leaseID);
+
+      while(!leaseFreed) {
+        try {
+          Thread.sleep(LEASE_RENEWAL_PERIOD);
+        } catch (InterruptedException e) {
+          LOG.debug("Keep-alive thread for lease " + leaseID +
+              " interrupted.");
+
+          // Restore the interrupted status
+          Thread.currentThread().interrupt();
+        }
+        try {
+          if (!leaseFreed) {
+            blobWrapper.getBlob().renewLease(accessCondition);
+
+            // It'll be very rare to renew the lease (most will be short)
+            // so log that we did it, to help with system debugging.
+            LOG.info("Renewed lease " + leaseID + " on "
+                + getCloudBlob().getUri());
+          }
+        } catch (StorageException e) {
+          if (!leaseFreed) {
+
+            // Free the lease so we don't leave this thread running forever.
+            leaseFreed = true;
+
+            // Normally leases should be freed and there should be no
+            // exceptions, so log a warning.
+            LOG.warn("Attempt to renew lease " + leaseID + " on "
+                + getCloudBlob().getUri()
+                + " failed, but lease not yet freed. Reason: " +
+                e.getMessage());
+          }
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2217e2f8/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SelfThrottlingIntercept.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SelfThrottlingIntercept.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SelfThrottlingIntercept.java
index 25f2883..d18a144 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SelfThrottlingIntercept.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SelfThrottlingIntercept.java
@@ -68,12 +68,14 @@ public class SelfThrottlingIntercept {
 
   private final float readFactor;
   private final float writeFactor;
+  private final OperationContext operationContext;
 
   // Concurrency: access to non-final members must be thread-safe
   private long lastE2Elatency;
 
-  public SelfThrottlingIntercept(OperationContext operationContext,
+  public SelfThrottlingIntercept(OperationContext operationContext, 
       float readFactor, float writeFactor) {
+    this.operationContext = operationContext;
     this.readFactor = readFactor;
     this.writeFactor = writeFactor;
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2217e2f8/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/ShellDecryptionKeyProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/ShellDecryptionKeyProvider.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/ShellDecryptionKeyProvider.java
index 2ce8ebd..d9d6fc3 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/ShellDecryptionKeyProvider.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/ShellDecryptionKeyProvider.java
@@ -31,7 +31,8 @@ import org.apache.hadoop.util.Shell;
  */
 @InterfaceAudience.Private
 public class ShellDecryptionKeyProvider extends SimpleKeyProvider {
-  static final String KEY_ACCOUNT_SHELLKEYPROVIDER_SCRIPT = "fs.azure.shellkeyprovider.script";
+  static final String KEY_ACCOUNT_SHELLKEYPROVIDER_SCRIPT =
+      "fs.azure.shellkeyprovider.script";
 
   @Override
   public String getStorageAccountKey(String accountName, Configuration conf)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2217e2f8/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SimpleKeyProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SimpleKeyProvider.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SimpleKeyProvider.java
index ef44a85..3cd3eda 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SimpleKeyProvider.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SimpleKeyProvider.java
@@ -28,7 +28,8 @@ import org.apache.hadoop.conf.Configuration;
 @InterfaceAudience.Private
 public class SimpleKeyProvider implements KeyProvider {
 
-  protected static final String KEY_ACCOUNT_KEY_PREFIX = "fs.azure.account.key.";
+  protected static final String KEY_ACCOUNT_KEY_PREFIX =
+      "fs.azure.account.key.";
 
   @Override
   public String getStorageAccountKey(String accountName, Configuration conf)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2217e2f8/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterface.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterface.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterface.java
index 87cef86..8d0229d 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterface.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterface.java
@@ -23,6 +23,7 @@ import java.io.InputStream;
 import java.io.OutputStream;
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.util.ArrayList;
 import java.util.EnumSet;
 import java.util.HashMap;
 
@@ -36,15 +37,17 @@ import com.microsoft.windowsazure.storage.StorageException;
 import com.microsoft.windowsazure.storage.blob.BlobListingDetails;
 import com.microsoft.windowsazure.storage.blob.BlobProperties;
 import com.microsoft.windowsazure.storage.blob.BlobRequestOptions;
+import com.microsoft.windowsazure.storage.blob.CloudBlob;
 import com.microsoft.windowsazure.storage.blob.CopyState;
 import com.microsoft.windowsazure.storage.blob.ListBlobItem;
+import com.microsoft.windowsazure.storage.blob.PageRange;
 
 /**
  * This is a very thin layer over the methods exposed by the Windows Azure
  * Storage SDK that we need for WASB implementation. This base class has a real
  * implementation that just simply redirects to the SDK, and a memory-backed one
  * that's used for unit tests.
- * 
+ *
  * IMPORTANT: all the methods here must remain very simple redirects since code
  * written here can't be properly unit tested.
  */
@@ -323,23 +326,39 @@ abstract class StorageInterface {
      * @throws URISyntaxException
      *           If URI syntax exception occurred.
      */
-    public abstract CloudBlockBlobWrapper getBlockBlobReference(
+    public abstract CloudBlobWrapper getBlockBlobReference(
         String relativePath) throws URISyntaxException, StorageException;
+  
+    /**
+     * Returns a wrapper for a CloudPageBlob.
+     *
+     * @param relativePath
+     *            A <code>String</code> that represents the name of the blob, relative to the container 
+     *
+     * @throws StorageException
+     *             If a storage service error occurred.
+     * 
+     * @throws URISyntaxException
+     *             If URI syntax exception occurred.            
+     */
+    public abstract CloudBlobWrapper getPageBlobReference(String relativePath)
+        throws URISyntaxException, StorageException;
   }
-
+  
+  
   /**
-   * A thin wrapper over the {@link CloudBlockBlob} class that simply redirects
-   * calls to the real object except in unit tests.
+   * A thin wrapper over the {@link CloudBlob} class that simply redirects calls
+   * to the real object except in unit tests.
    */
   @InterfaceAudience.Private
-  public abstract static class CloudBlockBlobWrapper implements ListBlobItem {
+  public interface CloudBlobWrapper extends ListBlobItem {
     /**
      * Returns the URI for this blob.
      * 
      * @return A <code>java.net.URI</code> object that represents the URI for
      *         the blob.
      */
-    public abstract URI getUri();
+    URI getUri();
 
     /**
      * Returns the metadata for the blob.
@@ -347,7 +366,7 @@ abstract class StorageInterface {
      * @return A <code>java.util.HashMap</code> object that represents the
      *         metadata for the blob.
      */
-    public abstract HashMap<String, String> getMetadata();
+    HashMap<String, String> getMetadata();
 
     /**
      * Sets the metadata for the blob.
@@ -356,37 +375,64 @@ abstract class StorageInterface {
      *          A <code>java.util.HashMap</code> object that contains the
      *          metadata being assigned to the blob.
      */
-    public abstract void setMetadata(HashMap<String, String> metadata);
+    void setMetadata(HashMap<String, String> metadata);
 
     /**
-     * Copies an existing blob's contents, properties, and metadata to this
-     * instance of the <code>CloudBlob</code> class, using the specified
-     * operation context.
-     * 
-     * @param sourceBlob
-     *          A <code>CloudBlob</code> object that represents the source blob
-     *          to copy.
+     * Copies an existing blob's contents, properties, and metadata to this instance of the <code>CloudBlob</code>
+     * class, using the specified operation context.
+     *
+     * @param source
+     *            A <code>java.net.URI</code> The URI of a source blob.
      * @param opContext
-     *          An {@link OperationContext} object that represents the context
-     *          for the current operation. This object is used to track requests
-     *          to the storage service, and to provide additional runtime
-     *          information about the operation.
-     * 
+     *            An {@link OperationContext} object that represents the context for the current operation. This object
+     *            is used to track requests to the storage service, and to provide additional runtime information about
+     *            the operation.
+     *
      * @throws StorageException
-     *           If a storage service error occurred.
+     *             If a storage service error occurred.
      * @throws URISyntaxException
-     * 
+     *
      */
-    public abstract void startCopyFromBlob(CloudBlockBlobWrapper sourceBlob,
-        OperationContext opContext) throws StorageException, URISyntaxException;
-
+    public abstract void startCopyFromBlob(URI source,
+        OperationContext opContext)
+        throws StorageException, URISyntaxException;
+    
     /**
      * Returns the blob's copy state.
      * 
      * @return A {@link CopyState} object that represents the copy state of the
      *         blob.
      */
-    public abstract CopyState getCopyState();
+    CopyState getCopyState();
+
+    /**
+     * Downloads a range of bytes from the blob to the given byte buffer, using the specified request options and
+     * operation context.
+     *
+     * @param offset
+     *            The byte offset to use as the starting point for the source.
+     * @param length
+     *            The number of bytes to read.
+     * @param buffer
+     *            The byte buffer, as an array of bytes, to which the blob bytes are downloaded.
+     * @param bufferOffset
+     *            The byte offset to use as the starting point for the target.
+     * @param options
+     *            A {@link BlobRequestOptions} object that specifies any additional options for the request. Specifying
+     *            <code>null</code> will use the default request options from the associated service client (
+     *            {@link CloudBlobClient}).
+     * @param opContext
+     *            An {@link OperationContext} object that represents the context for the current operation. This object
+     *            is used to track requests to the storage service, and to provide additional runtime information about
+     *            the operation.
+     *
+     * @throws StorageException
+     *             If a storage service error occurred.
+     */
+    void downloadRange(final long offset, final long length,
+        final OutputStream outStream, final BlobRequestOptions options,
+        final OperationContext opContext)
+            throws StorageException, IOException;
 
     /**
      * Deletes the blob using the specified operation context.
@@ -407,7 +453,7 @@ abstract class StorageInterface {
      * @throws StorageException
      *           If a storage service error occurred.
      */
-    public abstract void delete(OperationContext opContext)
+    void delete(OperationContext opContext, SelfRenewingLease lease)
         throws StorageException;
 
     /**
@@ -419,13 +465,13 @@ abstract class StorageInterface {
      *          to the storage service, and to provide additional runtime
      *          information about the operation.
      * 
-     * @return <code>true</code> if the blob exists, other wise
+     * @return <code>true</code> if the blob exists, otherwise
      *         <code>false</code>.
      * 
      * @throws StorageException
-     *           f a storage service error occurred.
+     *           If a storage service error occurred.
      */
-    public abstract boolean exists(OperationContext opContext)
+    boolean exists(OperationContext opContext)
         throws StorageException;
 
     /**
@@ -446,7 +492,7 @@ abstract class StorageInterface {
      * @throws StorageException
      *           If a storage service error occurred.
      */
-    public abstract void downloadAttributes(OperationContext opContext)
+    void downloadAttributes(OperationContext opContext)
         throws StorageException;
 
     /**
@@ -455,7 +501,7 @@ abstract class StorageInterface {
      * @return A {@link BlobProperties} object that represents the properties of
      *         the blob.
      */
-    public abstract BlobProperties getProperties();
+    BlobProperties getProperties();
 
     /**
      * Opens a blob input stream to download the blob using the specified
@@ -476,49 +522,10 @@ abstract class StorageInterface {
      * @throws StorageException
      *           If a storage service error occurred.
      */
-    public abstract InputStream openInputStream(BlobRequestOptions options,
+    InputStream openInputStream(BlobRequestOptions options,
         OperationContext opContext) throws StorageException;
 
     /**
-     * Creates and opens an output stream to write data to the block blob using
-     * the specified operation context.
-     * 
-     * @param opContext
-     *          An {@link OperationContext} object that represents the context
-     *          for the current operation. This object is used to track requests
-     *          to the storage service, and to provide additional runtime
-     *          information about the operation.
-     * 
-     * @return A {@link BlobOutputStream} object used to write data to the blob.
-     * 
-     * @throws StorageException
-     *           If a storage service error occurred.
-     */
-    public abstract OutputStream openOutputStream(BlobRequestOptions options,
-        OperationContext opContext) throws StorageException;
-
-    /**
-     * Uploads the source stream data to the blob, using the specified operation
-     * context.
-     * 
-     * @param sourceStream
-     *          An <code>InputStream</code> object that represents the input
-     *          stream to write to the block blob.
-     * @param opContext
-     *          An {@link OperationContext} object that represents the context
-     *          for the current operation. This object is used to track requests
-     *          to the storage service, and to provide additional runtime
-     *          information about the operation.
-     * 
-     * @throws IOException
-     *           If an I/O error occurred.
-     * @throws StorageException
-     *           If a storage service error occurred.
-     */
-    public abstract void upload(InputStream sourceStream,
-        OperationContext opContext) throws StorageException, IOException;
-
-    /**
      * Uploads the blob's metadata to the storage service using the specified
      * lease ID, request options, and operation context.
      * 
@@ -531,12 +538,15 @@ abstract class StorageInterface {
      * @throws StorageException
      *           If a storage service error occurred.
      */
-    public abstract void uploadMetadata(OperationContext opContext)
+    void uploadMetadata(OperationContext opContext)
         throws StorageException;
 
-    public abstract void uploadProperties(OperationContext opContext)
+    void uploadProperties(OperationContext opContext,
+        SelfRenewingLease lease)
         throws StorageException;
 
+    SelfRenewingLease acquireLease() throws StorageException;
+    
     /**
      * Sets the minimum read block size to use with this Blob.
      * 
@@ -545,7 +555,7 @@ abstract class StorageInterface {
      *          while using a {@link BlobInputStream} object, ranging from 512
      *          bytes to 64 MB, inclusive.
      */
-    public abstract void setStreamMinimumReadSizeInBytes(
+    void setStreamMinimumReadSizeInBytes(
         int minimumReadSizeBytes);
 
     /**
@@ -560,7 +570,121 @@ abstract class StorageInterface {
      *           If <code>writeBlockSizeInBytes</code> is less than 1 MB or
      *           greater than 4 MB.
      */
-    public abstract void setWriteBlockSizeInBytes(int writeBlockSizeBytes);
+    void setWriteBlockSizeInBytes(int writeBlockSizeBytes);
+
+    CloudBlob getBlob();
+  }
+
+  /**
+   * A thin wrapper over the {@link CloudBlockBlob} class that simply redirects calls
+   * to the real object except in unit tests.
+   */
+  public abstract interface CloudBlockBlobWrapper
+      extends CloudBlobWrapper {
+    /**
+     * Creates and opens an output stream to write data to the block blob using the specified 
+     * operation context.
+     * 
+     * @param opContext
+     *            An {@link OperationContext} object that represents the context for the current operation. This object
+     *            is used to track requests to the storage service, and to provide additional runtime information about
+     *            the operation.
+     * 
+     * @return A {@link BlobOutputStream} object used to write data to the blob.
+     * 
+     * @throws StorageException
+     *             If a storage service error occurred.
+     */
+    OutputStream openOutputStream(
+        BlobRequestOptions options,
+        OperationContext opContext) throws StorageException;
+  }
+
+  /**
+   * A thin wrapper over the {@link CloudPageBlob} class that simply redirects calls
+   * to the real object except in unit tests.
+   */
+  public abstract interface CloudPageBlobWrapper
+      extends CloudBlobWrapper {
+    /**
+     * Creates a page blob using the specified request options and operation context.
+     *
+     * @param length
+     *            The size, in bytes, of the page blob.
+     * @param options
+     *            A {@link BlobRequestOptions} object that specifies any additional options for the request. Specifying
+     *            <code>null</code> will use the default request options from the associated service client (
+     *            {@link CloudBlobClient}).
+     * @param opContext
+     *            An {@link OperationContext} object that represents the context for the current operation. This object
+     *            is used to track requests to the storage service, and to provide additional runtime information about
+     *            the operation.
+     *
+     * @throws IllegalArgumentException
+     *             If the length is not a multiple of 512.
+     *
+     * @throws StorageException
+     *             If a storage service error occurred.
+     */
+    void create(final long length, BlobRequestOptions options,
+            OperationContext opContext) throws StorageException;
+    
+
+    /**
+     * Uploads a range of contiguous pages, up to 4 MB in size, at the specified offset in the page blob, using the
+     * specified lease ID, request options, and operation context.
+     * 
+     * @param sourceStream
+     *            An <code>InputStream</code> object that represents the input stream to write to the page blob.
+     * @param offset
+     *            The offset, in number of bytes, at which to begin writing the data. This value must be a multiple of
+     *            512.
+     * @param length
+     *            The length, in bytes, of the data to write. This value must be a multiple of 512.
+     * @param options
+     *            A {@link BlobRequestOptions} object that specifies any additional options for the request. Specifying
+     *            <code>null</code> will use the default request options from the associated service client (
+     *            {@link CloudBlobClient}).
+     * @param opContext
+     *            An {@link OperationContext} object that represents the context for the current operation. This object
+     *            is used to track requests to the storage service, and to provide additional runtime information about
+     *            the operation.
+     * 
+     * @throws IllegalArgumentException
+     *             If the offset or length are not multiples of 512, or if the length is greater than 4 MB.
+     * @throws IOException
+     *             If an I/O exception occurred.
+     * @throws StorageException
+     *             If a storage service error occurred.
+     */
+    void uploadPages(final InputStream sourceStream, final long offset,
+        final long length, BlobRequestOptions options,
+        OperationContext opContext) throws StorageException, IOException;
+
+    /**
+     * Returns a collection of page ranges and their starting and ending byte offsets using the specified request
+     * options and operation context.
+     *
+     * @param options
+     *            A {@link BlobRequestOptions} object that specifies any additional options for the request. Specifying
+     *            <code>null</code> will use the default request options from the associated service client (
+     *            {@link CloudBlobClient}).
+     * @param opContext
+     *            An {@link OperationContext} object that represents the context for the current operation. This object
+     *            is used to track requests to the storage service, and to provide additional runtime information about
+     *            the operation.
+     *
+     * @return An <code>ArrayList</code> object that represents the set of page ranges and their starting and ending
+     *         byte offsets.
+     *
+     * @throws StorageException
+     *             If a storage service error occurred.
+     */
 
+    ArrayList<PageRange> downloadPageRanges(BlobRequestOptions options,
+            OperationContext opContext) throws StorageException;
+    
+    void uploadMetadata(OperationContext opContext)
+        throws StorageException; 
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2217e2f8/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterfaceImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterfaceImpl.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterfaceImpl.java
index 935bf71..e44823c 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterfaceImpl.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterfaceImpl.java
@@ -23,6 +23,7 @@ import java.io.InputStream;
 import java.io.OutputStream;
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.util.ArrayList;
 import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -39,13 +40,16 @@ import com.microsoft.windowsazure.storage.StorageUri;
 import com.microsoft.windowsazure.storage.blob.BlobListingDetails;
 import com.microsoft.windowsazure.storage.blob.BlobProperties;
 import com.microsoft.windowsazure.storage.blob.BlobRequestOptions;
+import com.microsoft.windowsazure.storage.blob.CloudBlob;
 import com.microsoft.windowsazure.storage.blob.CloudBlobClient;
 import com.microsoft.windowsazure.storage.blob.CloudBlobContainer;
 import com.microsoft.windowsazure.storage.blob.CloudBlobDirectory;
 import com.microsoft.windowsazure.storage.blob.CloudBlockBlob;
+import com.microsoft.windowsazure.storage.blob.CloudPageBlob;
 import com.microsoft.windowsazure.storage.blob.CopyState;
 import com.microsoft.windowsazure.storage.blob.DeleteSnapshotsOption;
 import com.microsoft.windowsazure.storage.blob.ListBlobItem;
+import com.microsoft.windowsazure.storage.blob.PageRange;
 
 /**
  * A real implementation of the Azure interaction layer that just redirects
@@ -129,6 +133,8 @@ class StorageInterfaceImpl extends StorageInterface {
         return new CloudBlobDirectoryWrapperImpl((CloudBlobDirectory) unwrapped);
       } else if (unwrapped instanceof CloudBlockBlob) {
         return new CloudBlockBlobWrapperImpl((CloudBlockBlob) unwrapped);
+      } else if (unwrapped instanceof CloudPageBlob) {
+        return new CloudPageBlobWrapperImpl((CloudPageBlob) unwrapped);
       } else {
         return unwrapped;
       }
@@ -244,129 +250,217 @@ class StorageInterfaceImpl extends StorageInterface {
     }
 
     @Override
-    public CloudBlockBlobWrapper getBlockBlobReference(String relativePath)
+    public CloudBlobWrapper getBlockBlobReference(String relativePath)
         throws URISyntaxException, StorageException {
 
-      return new CloudBlockBlobWrapperImpl(
-          container.getBlockBlobReference(relativePath));
+      return new CloudBlockBlobWrapperImpl(container.getBlockBlobReference(relativePath));
     }
+    
+    @Override
+    public CloudBlobWrapper getPageBlobReference(String relativePath)
+        throws URISyntaxException, StorageException {
+      return new CloudPageBlobWrapperImpl(
+          container.getPageBlobReference(relativePath));
+    }
+
   }
+  
+  abstract static class CloudBlobWrapperImpl implements CloudBlobWrapper {
+    private final CloudBlob blob;
 
-  //
-  // CloudBlockBlobWrapperImpl
-  //
-  @InterfaceAudience.Private
-  static class CloudBlockBlobWrapperImpl extends CloudBlockBlobWrapper {
-    private final CloudBlockBlob blob;
+    @Override
+    public CloudBlob getBlob() {
+      return blob;
+    }
 
     public URI getUri() {
-      return blob.getUri();
+      return getBlob().getUri();
     }
 
-    public CloudBlockBlobWrapperImpl(CloudBlockBlob blob) {
+    protected CloudBlobWrapperImpl(CloudBlob blob) {
       this.blob = blob;
     }
 
     @Override
     public HashMap<String, String> getMetadata() {
-      return blob.getMetadata();
+      return getBlob().getMetadata();
     }
 
     @Override
-    public void startCopyFromBlob(CloudBlockBlobWrapper sourceBlob,
-        OperationContext opContext) throws StorageException, URISyntaxException {
-
-      blob.startCopyFromBlob(((CloudBlockBlobWrapperImpl) sourceBlob).blob,
-          null, null, null, opContext);
-
+    public void delete(OperationContext opContext, SelfRenewingLease lease)
+        throws StorageException {
+      getBlob().delete(DeleteSnapshotsOption.NONE, getLeaseCondition(lease),
+          null, opContext);
     }
 
-    @Override
-    public void delete(OperationContext opContext) throws StorageException {
-      blob.delete(DeleteSnapshotsOption.NONE, null, null, opContext);
+    /**
+     * Return and access condition for this lease, or else null if
+     * there's no lease.
+     */
+    private AccessCondition getLeaseCondition(SelfRenewingLease lease) {
+      AccessCondition leaseCondition = null;
+      if (lease != null) {
+        leaseCondition = AccessCondition.generateLeaseCondition(lease.getLeaseID());
+      }
+      return leaseCondition;
     }
 
     @Override
-    public boolean exists(OperationContext opContext) throws StorageException {
-      return blob.exists(null, null, opContext);
+    public boolean exists(OperationContext opContext)
+        throws StorageException {
+      return getBlob().exists(null, null, opContext);
     }
 
     @Override
-    public void downloadAttributes(OperationContext opContext)
-        throws StorageException {
-      blob.downloadAttributes(null, null, opContext);
+    public void downloadAttributes(
+        OperationContext opContext) throws StorageException {
+      getBlob().downloadAttributes(null, null, opContext);
     }
 
     @Override
     public BlobProperties getProperties() {
-      return blob.getProperties();
+      return getBlob().getProperties();
     }
 
     @Override
     public void setMetadata(HashMap<String, String> metadata) {
-      blob.setMetadata(metadata);
+      getBlob().setMetadata(metadata);
     }
 
     @Override
-    public InputStream openInputStream(BlobRequestOptions options,
+    public InputStream openInputStream(
+        BlobRequestOptions options,
         OperationContext opContext) throws StorageException {
-      return blob.openInputStream(null, options, opContext);
+      return getBlob().openInputStream(null, options, opContext);
     }
 
-    @Override
-    public OutputStream openOutputStream(BlobRequestOptions options,
+    public OutputStream openOutputStream(
+        BlobRequestOptions options,
         OperationContext opContext) throws StorageException {
-      return blob.openOutputStream(null, options, opContext);
+      return ((CloudBlockBlob) getBlob()).openOutputStream(null, options, opContext);
     }
 
-    @Override
     public void upload(InputStream sourceStream, OperationContext opContext)
         throws StorageException, IOException {
-      blob.upload(sourceStream, 0, null, null, opContext);
+      getBlob().upload(sourceStream, 0, null, null, opContext);
     }
 
     @Override
     public CloudBlobContainer getContainer() throws URISyntaxException,
         StorageException {
-      return blob.getContainer();
+      return getBlob().getContainer();
     }
 
     @Override
     public CloudBlobDirectory getParent() throws URISyntaxException,
         StorageException {
-      return blob.getParent();
+      return getBlob().getParent();
     }
 
     @Override
     public void uploadMetadata(OperationContext opContext)
         throws StorageException {
-      blob.uploadMetadata(null, null, opContext);
+      getBlob().uploadMetadata(null, null, opContext);
     }
 
-    @Override
-    public void uploadProperties(OperationContext opContext)
+    public void uploadProperties(OperationContext opContext, SelfRenewingLease lease)
         throws StorageException {
-      blob.uploadProperties(null, null, opContext);
+
+      // Include lease in request if lease not null.
+      getBlob().uploadProperties(getLeaseCondition(lease), null, opContext);
     }
 
     @Override
     public void setStreamMinimumReadSizeInBytes(int minimumReadSizeBytes) {
-      blob.setStreamMinimumReadSizeInBytes(minimumReadSizeBytes);
+      getBlob().setStreamMinimumReadSizeInBytes(minimumReadSizeBytes);
     }
 
     @Override
     public void setWriteBlockSizeInBytes(int writeBlockSizeBytes) {
-      blob.setStreamWriteSizeInBytes(writeBlockSizeBytes);
+      getBlob().setStreamWriteSizeInBytes(writeBlockSizeBytes);
     }
 
     @Override
     public StorageUri getStorageUri() {
-      return blob.getStorageUri();
+      return getBlob().getStorageUri();
     }
 
     @Override
     public CopyState getCopyState() {
-      return blob.getCopyState();
+      return getBlob().getCopyState();
+    }
+
+    @Override
+    public void startCopyFromBlob(URI source,
+        OperationContext opContext)
+            throws StorageException, URISyntaxException {
+      getBlob().startCopyFromBlob(source,
+          null, null, null, opContext);
+    }
+
+    @Override
+    public void downloadRange(long offset, long length, OutputStream outStream,
+        BlobRequestOptions options, OperationContext opContext)
+            throws StorageException, IOException {
+
+      getBlob().downloadRange(offset, length, outStream, null, options, opContext);
+    }
+
+    @Override
+    public SelfRenewingLease acquireLease() throws StorageException {
+      return new SelfRenewingLease(this);
+    }
+  }
+  
+
+  //
+  // CloudBlockBlobWrapperImpl
+  //
+
+  static class CloudBlockBlobWrapperImpl extends CloudBlobWrapperImpl implements CloudBlockBlobWrapper {
+    public CloudBlockBlobWrapperImpl(CloudBlockBlob blob) {
+      super(blob);
+    }
+
+    public OutputStream openOutputStream(
+        BlobRequestOptions options,
+        OperationContext opContext) throws StorageException {
+      return ((CloudBlockBlob) getBlob()).openOutputStream(null, options, opContext);
+    }
+
+    public void upload(InputStream sourceStream, OperationContext opContext)
+        throws StorageException, IOException {
+      getBlob().upload(sourceStream, 0, null, null, opContext);
+    }
+
+    public void uploadProperties(OperationContext opContext)
+        throws StorageException {
+      getBlob().uploadProperties(null, null, opContext);
+    }
+
+  }
+
+  static class CloudPageBlobWrapperImpl extends CloudBlobWrapperImpl implements CloudPageBlobWrapper {
+    public CloudPageBlobWrapperImpl(CloudPageBlob blob) {
+      super(blob);
+    }
+
+    public void create(final long length, BlobRequestOptions options,
+        OperationContext opContext) throws StorageException {
+      ((CloudPageBlob) getBlob()).create(length, null, options, opContext);
+    }
+
+    public void uploadPages(final InputStream sourceStream, final long offset,
+        final long length, BlobRequestOptions options, OperationContext opContext)
+        throws StorageException, IOException {
+      ((CloudPageBlob) getBlob()).uploadPages(sourceStream, offset, length, null,
+          options, opContext);
+    }
+
+    public ArrayList<PageRange> downloadPageRanges(BlobRequestOptions options,
+        OperationContext opContext) throws StorageException {
+      return ((CloudPageBlob) getBlob()).downloadPageRanges(
+          null, options, opContext);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2217e2f8/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SyncableDataOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SyncableDataOutputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SyncableDataOutputStream.java
new file mode 100644
index 0000000..9bec7a5
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SyncableDataOutputStream.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azure;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+
+import org.apache.hadoop.fs.Syncable;
+
+/**
+ * Support the Syncable interface on top of a DataOutputStream.
+ * This allows passing the sync/hflush/hsync calls through to the
+ * wrapped stream passed in to the constructor. This is required
+ * for HBase when wrapping a PageBlobOutputStream used as a write-ahead log.
+ */
+public class SyncableDataOutputStream extends DataOutputStream implements Syncable {
+
+  public SyncableDataOutputStream(OutputStream out) {
+    super(out);
+  }
+
+  @Override
+  public void hflush() throws IOException {
+    if (out instanceof Syncable) {
+      ((Syncable) out).hflush();
+    } else {
+      out.flush();
+    }
+  }
+
+  @Override
+  public void hsync() throws IOException {
+    if (out instanceof Syncable) {
+      ((Syncable) out).hsync();
+    } else {
+      out.flush();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2217e2f8/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/Wasb.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/Wasb.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/Wasb.java
index e098cef..dd354d7 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/Wasb.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/Wasb.java
@@ -27,7 +27,6 @@ import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.DelegateToFileSystem;
 
-
 /**
  * WASB implementation of AbstractFileSystem.
  * This impl delegates to the old FileSystem

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2217e2f8/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/metrics/AzureFileSystemInstrumentation.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/metrics/AzureFileSystemInstrumentation.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/metrics/AzureFileSystemInstrumentation.java
index e389d7c..a08ad71 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/metrics/AzureFileSystemInstrumentation.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/metrics/AzureFileSystemInstrumentation.java
@@ -41,11 +41,11 @@ import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
 @InterfaceAudience.Public
 @InterfaceStability.Evolving
 public final class AzureFileSystemInstrumentation implements MetricsSource {
-  
+
   public static final String METRIC_TAG_FILESYSTEM_ID = "wasbFileSystemId";
   public static final String METRIC_TAG_ACCOUNT_NAME = "accountName";
   public static final String METRIC_TAG_CONTAINTER_NAME = "containerName";
-  
+
   public static final String WASB_WEB_RESPONSES = "wasb_web_responses";
   public static final String WASB_BYTES_WRITTEN =
       "wasb_bytes_written_last_second";
@@ -381,7 +381,6 @@ public final class AzureFileSystemInstrumentation implements MetricsSource {
    */
   public long getCurrentMaximumDownloadBandwidth() {
     return currentMaximumDownloadBytesPerSecond;
-
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2217e2f8/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/metrics/ResponseReceivedMetricUpdater.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/metrics/ResponseReceivedMetricUpdater.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/metrics/ResponseReceivedMetricUpdater.java
index e3f5d44..676adb9 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/metrics/ResponseReceivedMetricUpdater.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/metrics/ResponseReceivedMetricUpdater.java
@@ -33,8 +33,7 @@ import com.microsoft.windowsazure.storage.StorageEvent;
 
 /**
  * An event listener to the ResponseReceived event from Azure Storage that will
- * update metrics appropriately.
- *
+ * update metrics appropriately when it gets that event.
  */
 @InterfaceAudience.Private
 public final class ResponseReceivedMetricUpdater extends StorageEvent<ResponseReceivedEvent> {
@@ -43,7 +42,7 @@ public final class ResponseReceivedMetricUpdater extends StorageEvent<ResponseRe
 
   private final AzureFileSystemInstrumentation instrumentation;
   private final BandwidthGaugeUpdater blockUploadGaugeUpdater;
-  
+
   private ResponseReceivedMetricUpdater(OperationContext operationContext,
       AzureFileSystemInstrumentation instrumentation,
       BandwidthGaugeUpdater blockUploadGaugeUpdater) {
@@ -142,6 +141,6 @@ public final class ResponseReceivedMetricUpdater extends StorageEvent<ResponseRe
         instrumentation.rawBytesDownloaded(length);
         instrumentation.blockDownloaded(requestLatency);
       }
-    } 
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2217e2f8/hadoop-tools/hadoop-azure/src/main/resources/META-INF/services/org.apache.hadoop.fs.FileSystem
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/resources/META-INF/services/org.apache.hadoop.fs.FileSystem b/hadoop-tools/hadoop-azure/src/main/resources/META-INF/services/org.apache.hadoop.fs.FileSystem
new file mode 100644
index 0000000..9f4922b
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/resources/META-INF/services/org.apache.hadoop.fs.FileSystem
@@ -0,0 +1,17 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.hadoop.fs.azure.NativeAzureFileSystem
+org.apache.hadoop.fs.azure.NativeAzureFileSystem$Secure
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2217e2f8/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/AzureBlobStorageTestAccount.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/AzureBlobStorageTestAccount.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/AzureBlobStorageTestAccount.java
index 80e8e43..a323237 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/AzureBlobStorageTestAccount.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/AzureBlobStorageTestAccount.java
@@ -78,6 +78,8 @@ public final class AzureBlobStorageTestAccount {
 
   private static final String KEY_DISABLE_THROTTLING = "fs.azure.disable.bandwidth.throttling";
   private static final String KEY_READ_TOLERATE_CONCURRENT_APPEND = "fs.azure.io.read.tolerate.concurrent.append";
+  public static final String DEFAULT_PAGE_BLOB_DIRECTORY = "pageBlobs";
+  public static final String DEFAULT_ATOMIC_RENAME_DIRECTORIES = "/atomicRenameDir1,/atomicRenameDir2";
 
   private CloudStorageAccount account;
   private CloudBlobContainer container;
@@ -85,12 +87,14 @@ public final class AzureBlobStorageTestAccount {
   private NativeAzureFileSystem fs;
   private AzureNativeFileSystemStore storage;
   private MockStorageInterface mockStorage;
+  private String pageBlobDirectory;
   private static final ConcurrentLinkedQueue<MetricsRecord> allMetrics =
       new ConcurrentLinkedQueue<MetricsRecord>();
-  
+  private static boolean metricsConfigSaved = false;
   
   private AzureBlobStorageTestAccount(NativeAzureFileSystem fs,
-      CloudStorageAccount account, CloudBlobContainer container) {
+      CloudStorageAccount account,
+      CloudBlobContainer container) {
     this.account = account;
     this.container = container;
     this.fs = fs;
@@ -158,6 +162,14 @@ public final class AzureBlobStorageTestAccount {
     return toMockUri(path.toUri().getRawPath().substring(1)); 
   }
   
+  public static Path pageBlobPath() {
+    return new Path("/" + DEFAULT_PAGE_BLOB_DIRECTORY);
+  }
+
+  public static Path pageBlobPath(String fileName) {
+    return new Path(pageBlobPath(), fileName);
+  }
+
   public Number getLatestMetricValue(String metricName, Number defaultValue)
       throws IndexOutOfBoundsException{
     boolean found = false;
@@ -206,8 +218,10 @@ public final class AzureBlobStorageTestAccount {
    *          The blob key (no initial slash).
    * @return The blob reference.
    */
-  public CloudBlockBlob getBlobReference(String blobKey) throws Exception {
-    return container.getBlockBlobReference(String.format(blobKey));
+  public CloudBlockBlob getBlobReference(String blobKey)
+      throws Exception {
+    return container.getBlockBlobReference(
+        String.format(blobKey));
   }
 
   /**
@@ -233,45 +247,79 @@ public final class AzureBlobStorageTestAccount {
     getBlobReference(blobKey).releaseLease(accessCondition);
   }
 
+  private static void saveMetricsConfigFile() {
+    if (!metricsConfigSaved) {
+      new org.apache.hadoop.metrics2.impl.ConfigBuilder()
+      .add("azure-file-system.sink.azuretestcollector.class",
+          StandardCollector.class.getName())
+      .save("hadoop-metrics2-azure-file-system.properties");
+      metricsConfigSaved = true;
+    }
+  }
+
   public static AzureBlobStorageTestAccount createMock() throws Exception {
     return createMock(new Configuration());
   }
 
-  public static AzureBlobStorageTestAccount createMock(Configuration conf)
-      throws Exception {
+  public static AzureBlobStorageTestAccount createMock(Configuration conf) throws Exception {
+    saveMetricsConfigFile();
+    configurePageBlobDir(conf);
+    configureAtomicRenameDir(conf);
     AzureNativeFileSystemStore store = new AzureNativeFileSystemStore();
     MockStorageInterface mockStorage = new MockStorageInterface();
     store.setAzureStorageInteractionLayer(mockStorage);
     NativeAzureFileSystem fs = new NativeAzureFileSystem(store);
-    addWasbToConfiguration(conf);
     setMockAccountKey(conf);
     // register the fs provider.
 
     fs.initialize(new URI(MOCK_WASB_URI), conf);
-    AzureBlobStorageTestAccount testAcct = new AzureBlobStorageTestAccount(fs,
-        mockStorage);
+    AzureBlobStorageTestAccount testAcct =
+        new AzureBlobStorageTestAccount(fs, mockStorage);
     return testAcct;
   }
 
   /**
+   * Set the page blob directories configuration to the default if it is not
+   * already set. Some tests may set it differently (e.g. the page blob
+   * tests in TestNativeAzureFSPageBlobLive).
+   * @param conf The configuration to conditionally update.
+   */
+  private static void configurePageBlobDir(Configuration conf) {
+    if (conf.get(AzureNativeFileSystemStore.KEY_PAGE_BLOB_DIRECTORIES) == null) {
+      conf.set(AzureNativeFileSystemStore.KEY_PAGE_BLOB_DIRECTORIES,
+          "/" + DEFAULT_PAGE_BLOB_DIRECTORY);
+    }
+  }
+
+  /** Do the same for the atomic rename directories configuration */
+  private static void configureAtomicRenameDir(Configuration conf) {
+    if (conf.get(AzureNativeFileSystemStore.KEY_ATOMIC_RENAME_DIRECTORIES) == null) {
+      conf.set(AzureNativeFileSystemStore.KEY_ATOMIC_RENAME_DIRECTORIES,
+          DEFAULT_ATOMIC_RENAME_DIRECTORIES);
+    }
+  }
+
+  /**
    * Creates a test account that goes against the storage emulator.
    * 
    * @return The test account, or null if the emulator isn't setup.
    */
   public static AzureBlobStorageTestAccount createForEmulator()
       throws Exception {
+    saveMetricsConfigFile();
     NativeAzureFileSystem fs = null;
     CloudBlobContainer container = null;
     Configuration conf = createTestConfiguration();
     if (!conf.getBoolean(USE_EMULATOR_PROPERTY_NAME, false)) {
       // Not configured to test against the storage emulator.
-      System.out.println("Skipping emulator Azure test because configuration "
-          + "doesn't indicate that it's running."
-          + " Please see README.txt for guidance.");
+      System.out
+        .println("Skipping emulator Azure test because configuration " +
+            "doesn't indicate that it's running." +
+            " Please see RunningLiveWasbTests.txt for guidance.");
       return null;
     }
-    CloudStorageAccount account = CloudStorageAccount
-        .getDevelopmentStorageAccount();
+    CloudStorageAccount account =
+        CloudStorageAccount.getDevelopmentStorageAccount();
     fs = new NativeAzureFileSystem();
     String containerName = String.format("wasbtests-%s-%tQ",
         System.getProperty("user.name"), new Date());
@@ -285,14 +333,18 @@ public final class AzureBlobStorageTestAccount {
     fs.initialize(accountUri, conf);
 
     // Create test account initializing the appropriate member variables.
-    AzureBlobStorageTestAccount testAcct = new AzureBlobStorageTestAccount(fs,
-        account, container);
+    //
+    AzureBlobStorageTestAccount testAcct =
+        new AzureBlobStorageTestAccount(fs, account, container);
 
     return testAcct;
   }
 
   public static AzureBlobStorageTestAccount createOutOfBandStore(
       int uploadBlockSize, int downloadBlockSize) throws Exception {
+
+    saveMetricsConfigFile();
+
     CloudBlobContainer container = null;
     Configuration conf = createTestConfiguration();
     CloudStorageAccount account = createTestAccount(conf);
@@ -337,8 +389,9 @@ public final class AzureBlobStorageTestAccount {
     testStorage.initialize(accountUri, conf, instrumentation);
 
     // Create test account initializing the appropriate member variables.
-    AzureBlobStorageTestAccount testAcct = new AzureBlobStorageTestAccount(
-        testStorage, account, container);
+    //
+    AzureBlobStorageTestAccount testAcct =
+        new AzureBlobStorageTestAccount(testStorage, account, container);
 
     return testAcct;
   }
@@ -416,11 +469,11 @@ public final class AzureBlobStorageTestAccount {
     }
   }
 
-  private static Configuration createTestConfiguration() {
+  public static Configuration createTestConfiguration() {
     return createTestConfiguration(null);
   }
 
-  protected static Configuration createTestConfiguration(Configuration conf) {
+  private static Configuration createTestConfiguration(Configuration conf) {
     if (conf == null) {
       conf = new Configuration();
     }
@@ -429,16 +482,9 @@ public final class AzureBlobStorageTestAccount {
     return conf;
   }
 
-  // for programmatic setting of the wasb configuration.
-  // note that tests can also get the
-  public static void addWasbToConfiguration(Configuration conf) {
-    conf.set("fs.wasb.impl", "org.apache.hadoop.fs.azure.NativeAzureFileSystem");
-    conf.set("fs.wasbs.impl",
-        "org.apache.hadoop.fs.azure.NativeAzureFileSystem");
-  }
-
-  static CloudStorageAccount createTestAccount() throws URISyntaxException,
-      KeyProviderException {
+  static CloudStorageAccount createTestAccount()
+      throws URISyntaxException, KeyProviderException
+  {
     return createTestAccount(createTestConfiguration());
   }
 
@@ -447,8 +493,8 @@ public final class AzureBlobStorageTestAccount {
     String testAccountName = conf.get(TEST_ACCOUNT_NAME_PROPERTY_NAME);
     if (testAccountName == null) {
       System.out
-          .println("Skipping live Azure test because of missing test account."
-              + " Please see README.txt for guidance.");
+        .println("Skipping live Azure test because of missing test account." +
+                 " Please see RunningLiveWasbTests.txt for guidance.");
       return null;
     }
     return createStorageAccount(testAccountName, conf, false);
@@ -466,9 +512,12 @@ public final class AzureBlobStorageTestAccount {
   public static AzureBlobStorageTestAccount create(String containerNameSuffix,
       EnumSet<CreateOptions> createOptions, Configuration initialConfiguration)
       throws Exception {
+    saveMetricsConfigFile();
     NativeAzureFileSystem fs = null;
     CloudBlobContainer container = null;
     Configuration conf = createTestConfiguration(initialConfiguration);
+    configurePageBlobDir(conf);
+    configureAtomicRenameDir(conf);
     CloudStorageAccount account = createTestAccount(conf);
     if (account == null) {
       return null;
@@ -510,15 +559,18 @@ public final class AzureBlobStorageTestAccount {
     fs.initialize(accountUri, conf);
 
     // Create test account initializing the appropriate member variables.
-    AzureBlobStorageTestAccount testAcct = new AzureBlobStorageTestAccount(fs,
-        account, container);
+    //
+    AzureBlobStorageTestAccount testAcct =
+        new AzureBlobStorageTestAccount(fs, account, container);
 
     return testAcct;
   }
 
   private static String generateContainerName() throws Exception {
-    String containerName = String.format("wasbtests-%s-%tQ",
-        System.getProperty("user.name"), new Date());
+    String containerName =
+        String.format ("wasbtests-%s-%tQ",
+            System.getProperty("user.name"),
+            new Date());
     return containerName;
   }
 
@@ -548,12 +600,16 @@ public final class AzureBlobStorageTestAccount {
 
     if (readonly) {
       // Set READ permissions
-      sasPolicy.setPermissions(EnumSet.of(SharedAccessBlobPermissions.READ,
+      sasPolicy.setPermissions(EnumSet.of(
+          SharedAccessBlobPermissions.READ,
           SharedAccessBlobPermissions.LIST));
     } else {
       // Set READ and WRITE permissions.
-      sasPolicy.setPermissions(EnumSet.of(SharedAccessBlobPermissions.READ,
-          SharedAccessBlobPermissions.WRITE, SharedAccessBlobPermissions.LIST));
+      //
+      sasPolicy.setPermissions(EnumSet.of(
+          SharedAccessBlobPermissions.READ,
+          SharedAccessBlobPermissions.WRITE,
+          SharedAccessBlobPermissions.LIST));
     }
 
     // Create the container permissions.
@@ -590,8 +646,11 @@ public final class AzureBlobStorageTestAccount {
     SharedAccessBlobPolicy sasPolicy = new SharedAccessBlobPolicy();
 
     // Set READ and WRITE permissions.
-    sasPolicy.setPermissions(EnumSet.of(SharedAccessBlobPermissions.READ,
-        SharedAccessBlobPermissions.WRITE, SharedAccessBlobPermissions.LIST,
+    //
+    sasPolicy.setPermissions(EnumSet.of(
+        SharedAccessBlobPermissions.READ,
+        SharedAccessBlobPermissions.WRITE,
+        SharedAccessBlobPermissions.LIST,
         SharedAccessBlobPermissions.DELETE));
 
     // Create the container permissions.
@@ -725,8 +784,9 @@ public final class AzureBlobStorageTestAccount {
 
     // Create test account initializing the appropriate member variables.
     // Set the container value to null for the default root container.
-    AzureBlobStorageTestAccount testAcct = new AzureBlobStorageTestAccount(fs,
-        account, blobRoot);
+    //
+    AzureBlobStorageTestAccount testAcct = new AzureBlobStorageTestAccount(
+        fs, account, blobRoot);
 
     // Return to caller with test account.
     return testAcct;
@@ -805,5 +865,12 @@ public final class AzureBlobStorageTestAccount {
     public void flush() {
     }
   }
- 
+
+  public void setPageBlobDirectory(String directory) {
+    this.pageBlobDirectory = directory;
+  }
+
+  public String getPageBlobDirectory() {
+    return pageBlobDirectory;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2217e2f8/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/InMemoryBlockBlobStore.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/InMemoryBlockBlobStore.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/InMemoryBlockBlobStore.java
index ab35961..b8971c4 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/InMemoryBlockBlobStore.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/InMemoryBlockBlobStore.java
@@ -41,12 +41,15 @@ public class InMemoryBlockBlobStore {
     private final String key;
     private final HashMap<String, String> metadata;
     private final int contentLength;
+    private final boolean isPageBlob;
 
+    
     ListBlobEntry(String key, HashMap<String, String> metadata,
-        int contentLength) {
+        int contentLength, boolean isPageBlob) {
       this.key = key;
       this.metadata = metadata;
       this.contentLength = contentLength;
+      this.isPageBlob = isPageBlob;
     }
 
     public String getKey() {
@@ -60,6 +63,10 @@ public class InMemoryBlockBlobStore {
     public int getContentLength() {
       return contentLength;
     }
+
+    public boolean isPageBlob() {
+      return isPageBlob;
+    }
   }
 
   /**
@@ -77,10 +84,13 @@ public class InMemoryBlockBlobStore {
     ArrayList<ListBlobEntry> list = new ArrayList<ListBlobEntry>();
     for (Map.Entry<String, Entry> entry : blobs.entrySet()) {
       if (entry.getKey().startsWith(prefix)) {
-        list.add(new ListBlobEntry(entry.getKey(),
-            includeMetadata ? new HashMap<String, String>(
-                entry.getValue().metadata) : null,
-            entry.getValue().content.length));
+        list.add(new ListBlobEntry(
+            entry.getKey(),
+            includeMetadata ?
+                new HashMap<String, String>(entry.getValue().metadata) :
+                  null,
+            entry.getValue().content.length,
+            entry.getValue().isPageBlob));
       }
     }
     return list;
@@ -92,19 +102,49 @@ public class InMemoryBlockBlobStore {
 
   @SuppressWarnings("unchecked")
   public synchronized void setContent(String key, byte[] value,
+      HashMap<String, String> metadata, boolean isPageBlob,
+      long length) {
+    blobs.put(key, new Entry(value, (HashMap<String, String>)metadata.clone(),
+        isPageBlob, length));
+  }
+
+  @SuppressWarnings("unchecked")
+  public synchronized void setMetadata(String key,
       HashMap<String, String> metadata) {
-    blobs
-        .put(key, new Entry(value, (HashMap<String, String>) metadata.clone()));
+    blobs.get(key).metadata = (HashMap<String, String>) metadata.clone();
   }
 
-  public OutputStream upload(final String key,
+  public OutputStream uploadBlockBlob(final String key,
       final HashMap<String, String> metadata) {
-    setContent(key, new byte[0], metadata);
+    setContent(key, new byte[0], metadata, false, 0);
+    return new ByteArrayOutputStream() {
+      @Override
+      public void flush()
+          throws IOException {
+        super.flush();
+        byte[] tempBytes = toByteArray();
+        setContent(key, tempBytes, metadata, false, tempBytes.length);
+      }
+      @Override
+      public void close()
+          throws IOException {
+        super.close();
+        byte[] tempBytes = toByteArray();
+        setContent(key, tempBytes, metadata, false, tempBytes.length);
+      }
+    };
+  }
+
+  public OutputStream uploadPageBlob(final String key,
+      final HashMap<String, String> metadata,
+      final long length) {
+    setContent(key, new byte[0], metadata, true, length);
     return new ByteArrayOutputStream() {
       @Override
-      public void flush() throws IOException {
+      public void flush()
+          throws IOException {
         super.flush();
-        setContent(key, toByteArray(), metadata);
+        setContent(key, toByteArray(), metadata, true, length);
       }
     };
   }
@@ -137,10 +177,16 @@ public class InMemoryBlockBlobStore {
   private static class Entry {
     private byte[] content;
     private HashMap<String, String> metadata;
+    private boolean isPageBlob;
+    @SuppressWarnings("unused") // TODO: use it
+    private long length;
 
-    public Entry(byte[] content, HashMap<String, String> metadata) {
+    public Entry(byte[] content, HashMap<String, String> metadata,
+        boolean isPageBlob, long length) {
       this.content = content;
       this.metadata = metadata;
+      this.isPageBlob = isPageBlob;
+      this.length = length;
     }
   }
 }