You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cn...@apache.org on 2014/12/18 00:00:06 UTC

[20/24] hadoop git commit: HADOOP-11188. hadoop-azure: automatically expand page blobs when they become full. Contributed by Eric Hanson.

HADOOP-11188. hadoop-azure: automatically expand page blobs when they become full. Contributed by Eric Hanson.

(cherry picked from commit f4b7e99f4ebac5b0295b7f7f42eb5705af41f079)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/1646cc9f
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/1646cc9f
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/1646cc9f

Branch: refs/heads/branch-2
Commit: 1646cc9f2931da101534b8288c1a2a0026f39a11
Parents: 5a73702
Author: cnauroth <cn...@apache.org>
Authored: Fri Oct 10 15:05:52 2014 -0700
Committer: cnauroth <cn...@apache.org>
Committed: Wed Dec 17 14:57:13 2014 -0800

----------------------------------------------------------------------
 hadoop-common-project/hadoop-common/CHANGES.txt |  3 +
 .../hadoop/fs/azure/PageBlobOutputStream.java   | 95 +++++++++++++++++++-
 .../TestReadAndSeekPageBlobAfterWrite.java      | 32 +++++++
 3 files changed, 126 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/1646cc9f/hadoop-common-project/hadoop-common/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt
index 6509bcb..9870fc7 100644
--- a/hadoop-common-project/hadoop-common/CHANGES.txt
+++ b/hadoop-common-project/hadoop-common/CHANGES.txt
@@ -82,6 +82,9 @@ Release 2.7.0 - UNRELEASED
     HADOOP-10809. hadoop-azure: page blob support. (Dexter Bradshaw,
     Mostafa Elhemali, Eric Hanson, and Mike Liddell via cnauroth)
 
+    HADOOP-11188. hadoop-azure: automatically expand page blobs when they become
+    full. (Eric Hanson via cnauroth)
+
   BUG FIXES
 
     HADOOP-11236. NFS: Fix javadoc warning in RpcProgram.java (Abhiraj Butala via harsh)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1646cc9f/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/PageBlobOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/PageBlobOutputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/PageBlobOutputStream.java
index 95f0c22..4d1d5c8 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/PageBlobOutputStream.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/PageBlobOutputStream.java
@@ -47,6 +47,7 @@ import com.google.common.annotations.VisibleForTesting;
 import com.microsoft.windowsazure.storage.OperationContext;
 import com.microsoft.windowsazure.storage.StorageException;
 import com.microsoft.windowsazure.storage.blob.BlobRequestOptions;
+import com.microsoft.windowsazure.storage.blob.CloudPageBlob;
 
 
 /**
@@ -83,6 +84,11 @@ final class PageBlobOutputStream extends OutputStream implements Syncable {
   private volatile IOException lastError;
 
   /**
+   * Current size of the page blob in bytes. It may be extended if the file
+   * gets full.
+   */
+  private long currentBlobSize;
+  /**
    * The current byte offset we're at in the blob (how many bytes we've
    * uploaded to the server).
    */
@@ -114,13 +120,23 @@ final class PageBlobOutputStream extends OutputStream implements Syncable {
 
   public static final Log LOG = LogFactory.getLog(AzureNativeFileSystemStore.class);
 
-  // Set the minimum page blob file size to 128MB, which is >> the default block size of 32MB.
-  // This default block size is often used as the hbase.regionserver.hlog.blocksize.
+  // Set the minimum page blob file size to 128MB, which is >> the default
+  // block size of 32MB. This default block size is often used as the
+  // hbase.regionserver.hlog.blocksize.
   // The goal is to have a safe minimum size for HBase log files to allow them
-  // to be filled and rolled without exceeding the minimum size. A larger size can be
-  // used by setting the fs.azure.page.blob.size configuration variable.
+  // to be filled and rolled without exceeding the minimum size. A larger size
+  // can be used by setting the fs.azure.page.blob.size configuration variable.
   public static final long PAGE_BLOB_MIN_SIZE = 128L * 1024L * 1024L;
 
+  // The default and minimum amount to extend a page blob by if it starts
+  // to get full.
+  public static final long
+    PAGE_BLOB_DEFAULT_EXTENSION_SIZE = 128L * 1024L * 1024L;
+
+  // The configured page blob extension size (either the default, or if greater,
+  // the value configured in fs.azure.page.blob.extension.size
+  private long configuredPageBlobExtensionSize;
+
   /**
    * Constructs an output stream over the given page blob.
    *
@@ -156,6 +172,21 @@ final class PageBlobOutputStream extends OutputStream implements Syncable {
       pageBlobSize += PAGE_SIZE - pageBlobSize % PAGE_SIZE;
     }
     blob.create(pageBlobSize, new BlobRequestOptions(), opContext);
+    currentBlobSize = pageBlobSize;
+
+    // Set the page blob extension size. It must be a minimum of the default
+    // value.
+    configuredPageBlobExtensionSize =
+        conf.getLong("fs.azure.page.blob.extension.size", 0);
+    if (configuredPageBlobExtensionSize < PAGE_BLOB_DEFAULT_EXTENSION_SIZE) {
+      configuredPageBlobExtensionSize = PAGE_BLOB_DEFAULT_EXTENSION_SIZE;
+    }
+
+    // make sure it is a multiple of the page size
+    if (configuredPageBlobExtensionSize % PAGE_SIZE != 0) {
+      configuredPageBlobExtensionSize +=
+          PAGE_SIZE - configuredPageBlobExtensionSize % PAGE_SIZE;
+    }
   }
 
   private void checkStreamState() throws IOException {
@@ -308,6 +339,12 @@ final class PageBlobOutputStream extends OutputStream implements Syncable {
         // It wasn't a partial page, we won't need to rewrite it.
         previousLastPageDataWritten = new byte[0];
       }
+
+      // Extend the file if we need more room in the file. This typically takes
+      // less than 200 milliseconds if it has to actually be done,
+      // so it is okay to include it in a write and won't cause a long pause.
+      // Other writes can be queued behind this write in any case.
+      conditionalExtendFile();
     }
 
     /**
@@ -347,6 +384,56 @@ final class PageBlobOutputStream extends OutputStream implements Syncable {
   }
 
   /**
+   * Extend the page blob file if we are close to the end.
+   */
+  private void conditionalExtendFile() {
+
+    // maximum allowed size of an Azure page blob (1 terabyte)
+    final long MAX_PAGE_BLOB_SIZE = 1024L * 1024L * 1024L * 1024L;
+
+    // If blob is already at the maximum size, then don't try to extend it.
+    if (currentBlobSize == MAX_PAGE_BLOB_SIZE) {
+      return;
+    }
+
+    // If we are within the maximum write size of the end of the file,
+    if (currentBlobSize - currentBlobOffset <= MAX_RAW_BYTES_PER_REQUEST) {
+
+      // Extend the file. Retry up to 3 times with back-off.
+      CloudPageBlob cloudPageBlob = (CloudPageBlob) blob.getBlob();
+      long newSize = currentBlobSize + configuredPageBlobExtensionSize;
+
+      // Make sure we don't exceed maximum blob size.
+      if (newSize > MAX_PAGE_BLOB_SIZE) {
+        newSize = MAX_PAGE_BLOB_SIZE;
+      }
+      final int MAX_RETRIES = 3;
+      int retries = 1;
+      boolean resizeDone = false;
+      while(!resizeDone && retries <= MAX_RETRIES) {
+        try {
+          cloudPageBlob.resize(newSize);
+          resizeDone = true;
+          currentBlobSize = newSize;
+        } catch (StorageException e) {
+          LOG.warn("Failed to extend size of " + cloudPageBlob.getUri());
+          try {
+
+            // sleep 2, 8, 18 seconds for up to 3 retries
+            Thread.sleep(2000 * retries * retries);
+          } catch (InterruptedException e1) {
+
+            // Restore the interrupted status
+            Thread.currentThread().interrupt();
+          }
+        } finally {
+          retries++;
+        }
+      }
+    }
+  }
+
+  /**
    * Flushes this output stream and forces any buffered output bytes to be
    * written out. If any data remains in the buffer it is committed to the
    * service. Data is queued for writing but not forced out to the service

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1646cc9f/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestReadAndSeekPageBlobAfterWrite.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestReadAndSeekPageBlobAfterWrite.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestReadAndSeekPageBlobAfterWrite.java
index 7c60373..e6219df 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestReadAndSeekPageBlobAfterWrite.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestReadAndSeekPageBlobAfterWrite.java
@@ -31,6 +31,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.azure.AzureException;
@@ -330,4 +331,35 @@ public class TestReadAndSeekPageBlobAfterWrite {
       writeAndReadOneFile(numWrites, recordSize, syncInterval);
     }
   }
+  
+  // Write to a file repeatedly to verify that it extends.
+  // The page blob file should start out at 128MB and finish at 256MB.
+  @Test(timeout=300000)
+  public void testFileSizeExtension() throws IOException {
+    final int writeSize = 1024 * 1024;
+    final int numWrites = 129;
+    final byte dataByte = 5;
+    byte[] data = new byte[writeSize];
+    Arrays.fill(data, dataByte);
+    FSDataOutputStream output = fs.create(PATH);
+    try {
+      for (int i = 0; i < numWrites; i++) {
+        output.write(data);
+        output.hflush();
+        LOG.debug("total writes = " + (i + 1));
+      }
+    } finally {
+      output.close();
+    }
+
+    // Show that we wrote more than the default page blob file size.
+    assertTrue(numWrites * writeSize > PageBlobOutputStream.PAGE_BLOB_MIN_SIZE);
+
+    // Verify we can list the new size. That will prove we expanded the file.
+    FileStatus[] status = fs.listStatus(PATH);
+    assertTrue(status[0].getLen() == numWrites * writeSize);
+    LOG.debug("Total bytes written to " + PATH + " = " + status[0].getLen());
+    fs.delete(PATH, false);
+  }
+
 }