You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by st...@apache.org on 2014/10/10 21:01:27 UTC

[48/50] [abbrv] git commit: Monarch HADOOP-918: automatically expand page blobs when they become full

Monarch HADOOP-918: automatically expand page blobs when they become full


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/b7f34c78
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/b7f34c78
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/b7f34c78

Branch: refs/heads/champlain
Commit: b7f34c78fde5d7b4519564e4b932be654883a508
Parents: 30c8199
Author: Eric Hanson <ehans at microsoft dot com>
Authored: Fri Oct 10 10:53:33 2014 -0700
Committer: Eric Hanson <ehans at microsoft dot com>
Committed: Fri Oct 10 10:53:33 2014 -0700

----------------------------------------------------------------------
 .../fs/azurenative/PageBlobOutputStream.java    | 98 ++++++++++++++++++--
 .../TestReadAndSeekPageBlobAfterWrite.java      | 31 +++++++
 2 files changed, 123 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/b7f34c78/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/azurenative/PageBlobOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/azurenative/PageBlobOutputStream.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/azurenative/PageBlobOutputStream.java
index 6c8a855..d62e606 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/azurenative/PageBlobOutputStream.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/azurenative/PageBlobOutputStream.java
@@ -16,6 +16,8 @@ import org.apache.hadoop.conf.Configuration;
 import com.microsoft.windowsazure.storage.OperationContext;
 import com.microsoft.windowsazure.storage.StorageException;
 import com.microsoft.windowsazure.storage.blob.BlobRequestOptions;
+import com.microsoft.windowsazure.storage.blob.CloudBlob;
+import com.microsoft.windowsazure.storage.blob.CloudPageBlob;
 
 import static org.apache.hadoop.fs.azurenative.PageBlobFormatHelpers.*;
 
@@ -62,6 +64,11 @@ final class PageBlobOutputStream extends OutputStream implements Syncable {
   private volatile IOException lastError;
 
   /**
+   * Current size of the page blob in bytes. It may be extended if the file
+   * gets full.
+   */
+  private long currentBlobSize;
+  /**
    * The current byte offset we're at in the blob (how many bytes we've
    * uploaded to the server).
    */
@@ -93,13 +100,23 @@ final class PageBlobOutputStream extends OutputStream implements Syncable {
 
   public static final Log LOG = LogFactory.getLog(AzureNativeFileSystemStore.class);
 
-  // Set the minimum page blob file size to 128MB, which is >> the default block size of 32MB.
-  // This default block size is often used as the hbase.regionserver.hlog.blocksize.
+  // Set the minimum page blob file size to 128MB, which is >> the default
+  // block size of 32MB. This default block size is often used as the
+  // hbase.regionserver.hlog.blocksize.
   // The goal is to have a safe minimum size for HBase log files to allow them
-  // to be filled and rolled without exceeding the minimum size. A larger size can be
-  // used by setting the fs.azure.page.blob.size configuration variable.
+  // to be filled and rolled without exceeding the minimum size. A larger size
+  // can be used by setting the fs.azure.page.blob.size configuration variable.
   public static final long PAGE_BLOB_MIN_SIZE = 128L * 1024L * 1024L;
 
+  // The default and minimum amount to extend a page blob by if it starts
+  // to get full.
+  public static final long
+    PAGE_BLOB_DEFAULT_EXTENSION_SIZE = 128L * 1024L * 1024L;
+
+  // The configured page blob extension size (either the default, or if greater,
+  // the value configured in fs.azure.page.blob.extension.size
+  private long configuredPageBlobExtensionSize;
+
   /**
    * Constructs an output stream over the given page blob.
    *
@@ -122,8 +139,6 @@ final class PageBlobOutputStream extends OutputStream implements Syncable {
     this.ioThreadPool = new ThreadPoolExecutor(1, 1, 2, TimeUnit.SECONDS,
         ioQueue);
 
-
-
     // Make page blob files have a size that is the greater of a
     // minimum size, or the value of fs.azure.page.blob.size from configuration.
     long pageBlobConfigSize = conf.getLong("fs.azure.page.blob.size", 0);
@@ -136,6 +151,21 @@ final class PageBlobOutputStream extends OutputStream implements Syncable {
       pageBlobSize += PAGE_SIZE - pageBlobSize % PAGE_SIZE;
     }
     blob.create(pageBlobSize, new BlobRequestOptions(), opContext);
+    currentBlobSize = pageBlobSize;
+
+    // Set the page blob extension size. It must be a minimum of the default
+    // value.
+    configuredPageBlobExtensionSize =
+        conf.getLong("fs.azure.page.blob.extension.size", 0);
+    if (configuredPageBlobExtensionSize < PAGE_BLOB_DEFAULT_EXTENSION_SIZE) {
+      configuredPageBlobExtensionSize = PAGE_BLOB_DEFAULT_EXTENSION_SIZE;
+    }
+
+    // make sure it is a multiple of the page size
+    if (configuredPageBlobExtensionSize % PAGE_SIZE != 0) {
+      configuredPageBlobExtensionSize +=
+          PAGE_SIZE - configuredPageBlobExtensionSize % PAGE_SIZE;
+    }
   }
 
   private void checkStreamState() throws IOException {
@@ -287,6 +317,12 @@ final class PageBlobOutputStream extends OutputStream implements Syncable {
         // It wasn't a partial page, we won't need to rewrite it.
         previousLastPageDataWritten = new byte[0];
       }
+
+      // Extend the file if we need more room in the file. This typically takes
+      // less than 200 milliseconds if it has to actually be done,
+      // so it is okay to include it in a write and won't cause a long pause.
+      // Other writes can be queued behind this write in any case.
+      conditionalExtendFile();
     }
 
     /**
@@ -326,6 +362,56 @@ final class PageBlobOutputStream extends OutputStream implements Syncable {
   }
 
   /**
+   * Extend the page blob file if we are close to the end.
+   */
+  private void conditionalExtendFile() {
+
+    // maximum allowed size of an Azure page blob (1 terabyte)
+    final long MAX_PAGE_BLOB_SIZE = 1024L * 1024L * 1024L * 1024L;
+
+    // If blob is already at the maximum size, then don't try to extend it.
+    if (currentBlobSize == MAX_PAGE_BLOB_SIZE) {
+      return;
+    }
+
+    // If we are within the maximum write size of the end of the file,
+    if (currentBlobSize - currentBlobOffset <= MAX_RAW_BYTES_PER_REQUEST) {
+
+      // Extend the file. Retry up to 3 times with back-off.
+      CloudPageBlob cloudPageBlob = (CloudPageBlob) blob.getBlob();
+      long newSize = currentBlobSize + configuredPageBlobExtensionSize;
+
+      // Make sure we don't exceed maximum blob size.
+      if (newSize > MAX_PAGE_BLOB_SIZE) {
+        newSize = MAX_PAGE_BLOB_SIZE;
+      }
+      final int MAX_RETRIES = 3;
+      int retries = 1;
+      boolean resizeDone = false;
+      while(!resizeDone && retries <= MAX_RETRIES) {
+        try {
+          cloudPageBlob.resize(newSize);
+          resizeDone = true;
+          currentBlobSize = newSize;
+        } catch (StorageException e) {
+          LOG.warn("Failed to extend size of " + cloudPageBlob.getUri());
+          try {
+
+            // sleep 2, 8, 18 seconds for up to 3 retries
+            Thread.sleep(2000 * retries * retries);
+          } catch (InterruptedException e1) {
+
+            // Restore the interrupted status
+            Thread.currentThread().interrupt();
+          }
+        } finally {
+          retries++;
+        }
+      }
+    }
+  }
+
+  /**
    * Flushes this output stream and forces any buffered output bytes to be
    * written out. If any data remains in the buffer it is committed to the
    * service. Data is queued for writing but not forced out to the service

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b7f34c78/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/azurenative/TestReadAndSeekPageBlobAfterWrite.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/azurenative/TestReadAndSeekPageBlobAfterWrite.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/azurenative/TestReadAndSeekPageBlobAfterWrite.java
index 6226655..01271e3 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/azurenative/TestReadAndSeekPageBlobAfterWrite.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/azurenative/TestReadAndSeekPageBlobAfterWrite.java
@@ -35,6 +35,7 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.BufferedFSInputStream;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.azure.AzureException;
@@ -334,4 +335,34 @@ public class TestReadAndSeekPageBlobAfterWrite {
       writeAndReadOneFile(numWrites, recordSize, syncInterval);
     }
   }
+
+  // Write to a file repeatedly to verify that it extends.
+  // The page blob file should start out at 128MB and finish at 256MB.
+  @Test(timeout=300000)
+  public void testFileSizeExtension() throws IOException {
+    final int writeSize = 1024 * 1024;
+    final int numWrites = 129;
+    final byte dataByte = 5;
+    byte[] data = new byte[writeSize];
+    Arrays.fill(data, dataByte);
+    FSDataOutputStream output = fs.create(PATH);
+    try {
+      for (int i = 0; i < numWrites; i++) {
+        output.write(data);
+        output.hflush();
+        LOG.debug("total writes = " + (i + 1));
+      }
+    } finally {
+      output.close();
+    }
+
+    // Show that we wrote more than the default page blob file size.
+    assertTrue(numWrites * writeSize > PageBlobOutputStream.PAGE_BLOB_MIN_SIZE);
+
+    // Verify we can list the new size. That will prove we expanded the file.
+    FileStatus[] status = fs.listStatus(PATH);
+    assertTrue(status[0].getLen() == numWrites * writeSize);
+    LOG.debug("Total bytes written to " + PATH + " = " + status[0].getLen());
+    fs.delete(PATH, false);
+  }
 }