You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cn...@apache.org on 2014/12/18 00:00:06 UTC
[20/24] hadoop git commit: HADOOP-11188. hadoop-azure: automatically
expand page blobs when they become full. Contributed by Eric Hanson.
HADOOP-11188. hadoop-azure: automatically expand page blobs when they become full. Contributed by Eric Hanson.
(cherry picked from commit f4b7e99f4ebac5b0295b7f7f42eb5705af41f079)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/1646cc9f
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/1646cc9f
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/1646cc9f
Branch: refs/heads/branch-2
Commit: 1646cc9f2931da101534b8288c1a2a0026f39a11
Parents: 5a73702
Author: cnauroth <cn...@apache.org>
Authored: Fri Oct 10 15:05:52 2014 -0700
Committer: cnauroth <cn...@apache.org>
Committed: Wed Dec 17 14:57:13 2014 -0800
----------------------------------------------------------------------
hadoop-common-project/hadoop-common/CHANGES.txt | 3 +
.../hadoop/fs/azure/PageBlobOutputStream.java | 95 +++++++++++++++++++-
.../TestReadAndSeekPageBlobAfterWrite.java | 32 +++++++
3 files changed, 126 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1646cc9f/hadoop-common-project/hadoop-common/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt
index 6509bcb..9870fc7 100644
--- a/hadoop-common-project/hadoop-common/CHANGES.txt
+++ b/hadoop-common-project/hadoop-common/CHANGES.txt
@@ -82,6 +82,9 @@ Release 2.7.0 - UNRELEASED
HADOOP-10809. hadoop-azure: page blob support. (Dexter Bradshaw,
Mostafa Elhemali, Eric Hanson, and Mike Liddell via cnauroth)
+ HADOOP-11188. hadoop-azure: automatically expand page blobs when they become
+ full. (Eric Hanson via cnauroth)
+
BUG FIXES
HADOOP-11236. NFS: Fix javadoc warning in RpcProgram.java (Abhiraj Butala via harsh)
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1646cc9f/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/PageBlobOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/PageBlobOutputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/PageBlobOutputStream.java
index 95f0c22..4d1d5c8 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/PageBlobOutputStream.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/PageBlobOutputStream.java
@@ -47,6 +47,7 @@ import com.google.common.annotations.VisibleForTesting;
import com.microsoft.windowsazure.storage.OperationContext;
import com.microsoft.windowsazure.storage.StorageException;
import com.microsoft.windowsazure.storage.blob.BlobRequestOptions;
+import com.microsoft.windowsazure.storage.blob.CloudPageBlob;
/**
@@ -83,6 +84,11 @@ final class PageBlobOutputStream extends OutputStream implements Syncable {
private volatile IOException lastError;
/**
+ * Current size of the page blob in bytes. It may be extended if the file
+ * gets full.
+ */
+ private long currentBlobSize;
+ /**
* The current byte offset we're at in the blob (how many bytes we've
* uploaded to the server).
*/
@@ -114,13 +120,23 @@ final class PageBlobOutputStream extends OutputStream implements Syncable {
public static final Log LOG = LogFactory.getLog(AzureNativeFileSystemStore.class);
- // Set the minimum page blob file size to 128MB, which is >> the default block size of 32MB.
- // This default block size is often used as the hbase.regionserver.hlog.blocksize.
+ // Set the minimum page blob file size to 128MB, which is >> the default
+ // block size of 32MB. This default block size is often used as the
+ // hbase.regionserver.hlog.blocksize.
// The goal is to have a safe minimum size for HBase log files to allow them
- // to be filled and rolled without exceeding the minimum size. A larger size can be
- // used by setting the fs.azure.page.blob.size configuration variable.
+ // to be filled and rolled without exceeding the minimum size. A larger size
+ // can be used by setting the fs.azure.page.blob.size configuration variable.
public static final long PAGE_BLOB_MIN_SIZE = 128L * 1024L * 1024L;
+ // The default and minimum amount to extend a page blob by if it starts
+ // to get full.
+ public static final long
+ PAGE_BLOB_DEFAULT_EXTENSION_SIZE = 128L * 1024L * 1024L;
+
+ // The configured page blob extension size (either the default, or if greater,
+ // the value configured in fs.azure.page.blob.extension.size
+ private long configuredPageBlobExtensionSize;
+
/**
* Constructs an output stream over the given page blob.
*
@@ -156,6 +172,21 @@ final class PageBlobOutputStream extends OutputStream implements Syncable {
pageBlobSize += PAGE_SIZE - pageBlobSize % PAGE_SIZE;
}
blob.create(pageBlobSize, new BlobRequestOptions(), opContext);
+ currentBlobSize = pageBlobSize;
+
+ // Set the page blob extension size. It must be a minimum of the default
+ // value.
+ configuredPageBlobExtensionSize =
+ conf.getLong("fs.azure.page.blob.extension.size", 0);
+ if (configuredPageBlobExtensionSize < PAGE_BLOB_DEFAULT_EXTENSION_SIZE) {
+ configuredPageBlobExtensionSize = PAGE_BLOB_DEFAULT_EXTENSION_SIZE;
+ }
+
+ // make sure it is a multiple of the page size
+ if (configuredPageBlobExtensionSize % PAGE_SIZE != 0) {
+ configuredPageBlobExtensionSize +=
+ PAGE_SIZE - configuredPageBlobExtensionSize % PAGE_SIZE;
+ }
}
private void checkStreamState() throws IOException {
@@ -308,6 +339,12 @@ final class PageBlobOutputStream extends OutputStream implements Syncable {
// It wasn't a partial page, we won't need to rewrite it.
previousLastPageDataWritten = new byte[0];
}
+
+ // Extend the file if we need more room in the file. This typically takes
+ // less than 200 milliseconds if it has to actually be done,
+ // so it is okay to include it in a write and won't cause a long pause.
+ // Other writes can be queued behind this write in any case.
+ conditionalExtendFile();
}
/**
@@ -347,6 +384,56 @@ final class PageBlobOutputStream extends OutputStream implements Syncable {
}
/**
+ * Extend the page blob file if we are close to the end.
+ */
+ private void conditionalExtendFile() {
+
+ // maximum allowed size of an Azure page blob (1 terabyte)
+ final long MAX_PAGE_BLOB_SIZE = 1024L * 1024L * 1024L * 1024L;
+
+ // If blob is already at the maximum size, then don't try to extend it.
+ if (currentBlobSize == MAX_PAGE_BLOB_SIZE) {
+ return;
+ }
+
+ // If we are within the maximum write size of the end of the file,
+ if (currentBlobSize - currentBlobOffset <= MAX_RAW_BYTES_PER_REQUEST) {
+
+ // Extend the file. Retry up to 3 times with back-off.
+ CloudPageBlob cloudPageBlob = (CloudPageBlob) blob.getBlob();
+ long newSize = currentBlobSize + configuredPageBlobExtensionSize;
+
+ // Make sure we don't exceed maximum blob size.
+ if (newSize > MAX_PAGE_BLOB_SIZE) {
+ newSize = MAX_PAGE_BLOB_SIZE;
+ }
+ final int MAX_RETRIES = 3;
+ int retries = 1;
+ boolean resizeDone = false;
+ while(!resizeDone && retries <= MAX_RETRIES) {
+ try {
+ cloudPageBlob.resize(newSize);
+ resizeDone = true;
+ currentBlobSize = newSize;
+ } catch (StorageException e) {
+ LOG.warn("Failed to extend size of " + cloudPageBlob.getUri());
+ try {
+
+ // sleep 2, 8, 18 seconds for up to 3 retries
+ Thread.sleep(2000 * retries * retries);
+ } catch (InterruptedException e1) {
+
+ // Restore the interrupted status
+ Thread.currentThread().interrupt();
+ }
+ } finally {
+ retries++;
+ }
+ }
+ }
+ }
+
+ /**
* Flushes this output stream and forces any buffered output bytes to be
* written out. If any data remains in the buffer it is committed to the
* service. Data is queued for writing but not forced out to the service
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1646cc9f/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestReadAndSeekPageBlobAfterWrite.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestReadAndSeekPageBlobAfterWrite.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestReadAndSeekPageBlobAfterWrite.java
index 7c60373..e6219df 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestReadAndSeekPageBlobAfterWrite.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestReadAndSeekPageBlobAfterWrite.java
@@ -31,6 +31,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.azure.AzureException;
@@ -330,4 +331,35 @@ public class TestReadAndSeekPageBlobAfterWrite {
writeAndReadOneFile(numWrites, recordSize, syncInterval);
}
}
+
+ // Write to a file repeatedly to verify that it extends.
+ // The page blob file should start out at 128MB and finish at 256MB.
+ @Test(timeout=300000)
+ public void testFileSizeExtension() throws IOException {
+ final int writeSize = 1024 * 1024;
+ final int numWrites = 129;
+ final byte dataByte = 5;
+ byte[] data = new byte[writeSize];
+ Arrays.fill(data, dataByte);
+ FSDataOutputStream output = fs.create(PATH);
+ try {
+ for (int i = 0; i < numWrites; i++) {
+ output.write(data);
+ output.hflush();
+ LOG.debug("total writes = " + (i + 1));
+ }
+ } finally {
+ output.close();
+ }
+
+ // Show that we wrote more than the default page blob file size.
+ assertTrue(numWrites * writeSize > PageBlobOutputStream.PAGE_BLOB_MIN_SIZE);
+
+ // Verify we can list the new size. That will prove we expanded the file.
+ FileStatus[] status = fs.listStatus(PATH);
+ assertTrue(status[0].getLen() == numWrites * writeSize);
+ LOG.debug("Total bytes written to " + PATH + " = " + status[0].getLen());
+ fs.delete(PATH, false);
+ }
+
}