You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cn...@apache.org on 2014/12/18 00:00:00 UTC
[14/24] hadoop git commit: HADOOP-10809. hadoop-azure: page blob
support. Contributed by Dexter Bradshaw, Mostafa Elhemali, Eric Hanson,
and Mike Liddell.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5a737026/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/PageBlobOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/PageBlobOutputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/PageBlobOutputStream.java
new file mode 100644
index 0000000..95f0c22
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/PageBlobOutputStream.java
@@ -0,0 +1,497 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azure;
+
+import static org.apache.hadoop.fs.azure.PageBlobFormatHelpers.PAGE_DATA_SIZE;
+import static org.apache.hadoop.fs.azure.PageBlobFormatHelpers.PAGE_HEADER_SIZE;
+import static org.apache.hadoop.fs.azure.PageBlobFormatHelpers.PAGE_SIZE;
+import static org.apache.hadoop.fs.azure.PageBlobFormatHelpers.fromShort;
+import static org.apache.hadoop.fs.azure.PageBlobFormatHelpers.withMD5Checking;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.fs.Syncable;
+import org.apache.hadoop.fs.azure.StorageInterface.CloudPageBlobWrapper;
+import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.microsoft.windowsazure.storage.OperationContext;
+import com.microsoft.windowsazure.storage.StorageException;
+import com.microsoft.windowsazure.storage.blob.BlobRequestOptions;
+
+
+/**
+ * An output stream that write file data to a page blob stored using ASV's
+ * custom format.
+ */
+final class PageBlobOutputStream extends OutputStream implements Syncable {
+ /**
+ * The maximum number of raw bytes Azure Storage allows us to upload in a
+ * single request (4 MB).
+ */
+ private static final int MAX_RAW_BYTES_PER_REQUEST = 4 * 1024 * 1024;
+ /**
+ * The maximum number of pages Azure Storage allows us to upload in a
+ * single request.
+ */
+ private static final int MAX_PAGES_IN_REQUEST =
+ MAX_RAW_BYTES_PER_REQUEST / PAGE_SIZE;
+ /**
+ * The maximum number of data bytes (header not included) we can upload
+ * in a single request. I'm limiting it to (N - 1) pages to account for
+ * the possibility that we may have to rewrite the previous request's
+ * last page.
+ */
+ private static final int MAX_DATA_BYTES_PER_REQUEST =
+ PAGE_DATA_SIZE * (MAX_PAGES_IN_REQUEST - 1);
+
+ private final CloudPageBlobWrapper blob;
+ private final OperationContext opContext;
+
+ /**
+ * If the IO thread encounters an error, it'll store it here.
+ */
+ private volatile IOException lastError;
+
+ /**
+ * The current byte offset we're at in the blob (how many bytes we've
+ * uploaded to the server).
+ */
+ private long currentBlobOffset;
+ /**
+ * The data in the last page that we wrote to the server, in case we have to
+ * overwrite it in the new request.
+ */
+ private byte[] previousLastPageDataWritten = new byte[0];
+ /**
+ * The current buffer we're writing to before sending to the server.
+ */
+ private ByteArrayOutputStream outBuffer;
+ /**
+ * The task queue for writing to the server.
+ */
+ private final LinkedBlockingQueue<Runnable> ioQueue;
+ /**
+ * The thread pool we're using for writing to the server. Note that the IO
+ * write is NOT designed for parallelism, so there can only be one thread
+ * in that pool (I'm using the thread pool mainly for the lifetime management
+ * capabilities, otherwise I'd have just used a simple Thread).
+ */
+ private final ThreadPoolExecutor ioThreadPool;
+
+ // The last task given to the ioThreadPool to execute, to allow
+ // waiting until it's done.
+ private WriteRequest lastQueuedTask;
+
+ public static final Log LOG = LogFactory.getLog(AzureNativeFileSystemStore.class);
+
+ // Set the minimum page blob file size to 128MB, which is >> the default block size of 32MB.
+ // This default block size is often used as the hbase.regionserver.hlog.blocksize.
+ // The goal is to have a safe minimum size for HBase log files to allow them
+ // to be filled and rolled without exceeding the minimum size. A larger size can be
+ // used by setting the fs.azure.page.blob.size configuration variable.
+ public static final long PAGE_BLOB_MIN_SIZE = 128L * 1024L * 1024L;
+
+ /**
+ * Constructs an output stream over the given page blob.
+ *
+ * @param blob the blob that this stream is associated with.
+ * @param opContext an object used to track the execution of the operation
+ * @throws StorageException if anything goes wrong creating the blob.
+ */
+ public PageBlobOutputStream(final CloudPageBlobWrapper blob,
+ final OperationContext opContext,
+ final Configuration conf) throws StorageException {
+ this.blob = blob;
+ this.outBuffer = new ByteArrayOutputStream();
+ this.opContext = opContext;
+ this.lastQueuedTask = null;
+ this.ioQueue = new LinkedBlockingQueue<Runnable>();
+
+ // As explained above: the IO writes are not designed for parallelism,
+ // so we only have one thread in this thread pool.
+ this.ioThreadPool = new ThreadPoolExecutor(1, 1, 2, TimeUnit.SECONDS,
+ ioQueue);
+
+
+
+ // Make page blob files have a size that is the greater of a
+ // minimum size, or the value of fs.azure.page.blob.size from configuration.
+ long pageBlobConfigSize = conf.getLong("fs.azure.page.blob.size", 0);
+ LOG.debug("Read value of fs.azure.page.blob.size as " + pageBlobConfigSize
+ + " from configuration (0 if not present).");
+ long pageBlobSize = Math.max(PAGE_BLOB_MIN_SIZE, pageBlobConfigSize);
+
+ // Ensure that the pageBlobSize is a multiple of page size.
+ if (pageBlobSize % PAGE_SIZE != 0) {
+ pageBlobSize += PAGE_SIZE - pageBlobSize % PAGE_SIZE;
+ }
+ blob.create(pageBlobSize, new BlobRequestOptions(), opContext);
+ }
+
+ private void checkStreamState() throws IOException {
+ if (lastError != null) {
+ throw lastError;
+ }
+ }
+
+ /**
+ * Closes this output stream and releases any system resources associated with
+ * this stream. If any data remains in the buffer it is committed to the
+ * service.
+ */
+ @Override
+ public void close() throws IOException {
+ LOG.debug("Closing page blob output stream.");
+ flush();
+ checkStreamState();
+ ioThreadPool.shutdown();
+ try {
+ LOG.debug(ioThreadPool.toString());
+ if (!ioThreadPool.awaitTermination(10, TimeUnit.MINUTES)) {
+ LOG.debug("Timed out after 10 minutes waiting for IO requests to finish");
+ logAllStackTraces();
+ LOG.debug(ioThreadPool.toString());
+ throw new IOException("Timed out waiting for IO requests to finish");
+ }
+ } catch (InterruptedException e) {
+ LOG.debug("Caught InterruptedException");
+
+ // Restore the interrupted status
+ Thread.currentThread().interrupt();
+ }
+
+ this.lastError = new IOException("Stream is already closed.");
+ }
+
+ // Log the stacks of all threads.
+ private void logAllStackTraces() {
+ Map liveThreads = Thread.getAllStackTraces();
+ for (Iterator i = liveThreads.keySet().iterator(); i.hasNext(); ) {
+ Thread key = (Thread) i.next();
+ LOG.debug("Thread " + key.getName());
+ StackTraceElement[] trace = (StackTraceElement[]) liveThreads.get(key);
+ for (int j = 0; j < trace.length; j++) {
+ LOG.debug("\tat " + trace[j]);
+ }
+ }
+ }
+
+ /**
+ * A single write request for data to write to Azure storage.
+ */
+ private class WriteRequest implements Runnable {
+ private final byte[] dataPayload;
+ private final CountDownLatch doneSignal = new CountDownLatch(1);
+
+ public WriteRequest(byte[] dataPayload) {
+ this.dataPayload = dataPayload;
+ }
+
+ public void waitTillDone() throws InterruptedException {
+ doneSignal.await();
+ }
+
+ @Override
+ public void run() {
+ try {
+ LOG.debug("before runInternal()");
+ runInternal();
+ LOG.debug("after runInternal()");
+ } finally {
+ doneSignal.countDown();
+ }
+ }
+
+ private void runInternal() {
+ if (lastError != null) {
+ // We're already in an error state, no point doing anything.
+ return;
+ }
+ if (dataPayload.length == 0) {
+ // Nothing to do.
+ return;
+ }
+
+ // Since we have to rewrite the last request's last page's data
+ // (may be empty), total data size is our data plus whatever was
+ // left from there.
+ final int totalDataBytes = dataPayload.length
+ + previousLastPageDataWritten.length;
+ // Calculate the total number of pages we're writing to the server.
+ final int numberOfPages = (totalDataBytes / PAGE_DATA_SIZE)
+ + (totalDataBytes % PAGE_DATA_SIZE == 0 ? 0 : 1);
+ // Fill up the raw bytes we're writing.
+ byte[] rawPayload = new byte[numberOfPages * PAGE_SIZE];
+ // Keep track of the size of the last page we uploaded.
+ int currentLastPageDataSize = -1;
+ for (int page = 0; page < numberOfPages; page++) {
+ // Our current byte offset in the data.
+ int dataOffset = page * PAGE_DATA_SIZE;
+ // Our current byte offset in the raw buffer.
+ int rawOffset = page * PAGE_SIZE;
+ // The size of the data in the current page.
+ final short currentPageDataSize = (short) Math.min(PAGE_DATA_SIZE,
+ totalDataBytes - dataOffset);
+ // Save off this page's size as the potential last page's size.
+ currentLastPageDataSize = currentPageDataSize;
+
+ // Write out the page size in the header.
+ final byte[] header = fromShort(currentPageDataSize);
+ System.arraycopy(header, 0, rawPayload, rawOffset, header.length);
+ rawOffset += header.length;
+
+ int bytesToCopyFromDataPayload = currentPageDataSize;
+ if (dataOffset < previousLastPageDataWritten.length) {
+ // First write out the last page's data.
+ final int bytesToCopyFromLastPage = Math.min(currentPageDataSize,
+ previousLastPageDataWritten.length - dataOffset);
+ System.arraycopy(previousLastPageDataWritten, dataOffset,
+ rawPayload, rawOffset, bytesToCopyFromLastPage);
+ bytesToCopyFromDataPayload -= bytesToCopyFromLastPage;
+ rawOffset += bytesToCopyFromLastPage;
+ dataOffset += bytesToCopyFromLastPage;
+ }
+
+ if (dataOffset >= previousLastPageDataWritten.length) {
+ // Then write the current payload's data.
+ System.arraycopy(dataPayload,
+ dataOffset - previousLastPageDataWritten.length,
+ rawPayload, rawOffset, bytesToCopyFromDataPayload);
+ }
+ }
+
+ // Raw payload constructed, ship it off to the server.
+ writePayloadToServer(rawPayload);
+
+ // Post-send bookkeeping.
+ currentBlobOffset += rawPayload.length;
+ if (currentLastPageDataSize < PAGE_DATA_SIZE) {
+ // Partial page, save it off so it's overwritten in the next request.
+ final int startOffset = (numberOfPages - 1) * PAGE_SIZE + PAGE_HEADER_SIZE;
+ previousLastPageDataWritten = Arrays.copyOfRange(rawPayload,
+ startOffset,
+ startOffset + currentLastPageDataSize);
+ // Since we're rewriting this page, set our current offset in the server
+ // to that page's beginning.
+ currentBlobOffset -= PAGE_SIZE;
+ } else {
+ // It wasn't a partial page, we won't need to rewrite it.
+ previousLastPageDataWritten = new byte[0];
+ }
+ }
+
+ /**
+ * Writes the given raw payload to Azure Storage at the current blob
+ * offset.
+ */
+ private void writePayloadToServer(byte[] rawPayload) {
+ final ByteArrayInputStream wrapperStream =
+ new ByteArrayInputStream(rawPayload);
+ LOG.debug("writing payload of " + rawPayload.length + " bytes to Azure page blob");
+ try {
+ long start = System.currentTimeMillis();
+ blob.uploadPages(wrapperStream, currentBlobOffset, rawPayload.length,
+ withMD5Checking(), PageBlobOutputStream.this.opContext);
+ long end = System.currentTimeMillis();
+ LOG.trace("Azure uploadPages time for " + rawPayload.length + " bytes = " + (end - start));
+ } catch (IOException ex) {
+ LOG.debug(ExceptionUtils.getStackTrace(ex));
+ lastError = ex;
+ } catch (StorageException ex) {
+ LOG.debug(ExceptionUtils.getStackTrace(ex));
+ lastError = new IOException(ex);
+ }
+ if (lastError != null) {
+ LOG.debug("Caught error in PageBlobOutputStream#writePayloadToServer()");
+ }
+ }
+ }
+
+ private synchronized void flushIOBuffers() {
+ if (outBuffer.size() == 0) {
+ return;
+ }
+ lastQueuedTask = new WriteRequest(outBuffer.toByteArray());
+ ioThreadPool.execute(lastQueuedTask);
+ outBuffer = new ByteArrayOutputStream();
+ }
+
+ /**
+ * Flushes this output stream and forces any buffered output bytes to be
+ * written out. If any data remains in the buffer it is committed to the
+ * service. Data is queued for writing but not forced out to the service
+ * before the call returns.
+ */
+ @Override
+ public void flush() throws IOException {
+ checkStreamState();
+ flushIOBuffers();
+ }
+
+ /**
+ * Writes b.length bytes from the specified byte array to this output stream.
+ *
+ * @param data
+ * the byte array to write.
+ *
+ * @throws IOException
+ * if an I/O error occurs. In particular, an IOException may be
+ * thrown if the output stream has been closed.
+ */
+ @Override
+ public void write(final byte[] data) throws IOException {
+ write(data, 0, data.length);
+ }
+
+ /**
+ * Writes length bytes from the specified byte array starting at offset to
+ * this output stream.
+ *
+ * @param data
+ * the byte array to write.
+ * @param offset
+ * the start offset in the data.
+ * @param length
+ * the number of bytes to write.
+ * @throws IOException
+ * if an I/O error occurs. In particular, an IOException may be
+ * thrown if the output stream has been closed.
+ */
+ @Override
+ public void write(final byte[] data, final int offset, final int length)
+ throws IOException {
+ if (offset < 0 || length < 0 || length > data.length - offset) {
+ throw new IndexOutOfBoundsException();
+ }
+
+ writeInternal(data, offset, length);
+ }
+
+ /**
+ * Writes the specified byte to this output stream. The general contract for
+ * write is that one byte is written to the output stream. The byte to be
+ * written is the eight low-order bits of the argument b. The 24 high-order
+ * bits of b are ignored.
+ *
+ * @param byteVal
+ * the byteValue to write.
+ * @throws IOException
+ * if an I/O error occurs. In particular, an IOException may be
+ * thrown if the output stream has been closed.
+ */
+ @Override
+ public void write(final int byteVal) throws IOException {
+ write(new byte[] { (byte) (byteVal & 0xFF) });
+ }
+
+ /**
+ * Writes the data to the buffer and triggers writes to the service as needed.
+ *
+ * @param data
+ * the byte array to write.
+ * @param offset
+ * the start offset in the data.
+ * @param length
+ * the number of bytes to write.
+ * @throws IOException
+ * if an I/O error occurs. In particular, an IOException may be
+ * thrown if the output stream has been closed.
+ */
+ private synchronized void writeInternal(final byte[] data, int offset,
+ int length) throws IOException {
+ while (length > 0) {
+ checkStreamState();
+ final int availableBufferBytes = MAX_DATA_BYTES_PER_REQUEST
+ - this.outBuffer.size();
+ final int nextWrite = Math.min(availableBufferBytes, length);
+
+ outBuffer.write(data, offset, nextWrite);
+ offset += nextWrite;
+ length -= nextWrite;
+
+ if (outBuffer.size() > MAX_DATA_BYTES_PER_REQUEST) {
+ throw new RuntimeException("Internal error: maximum write size " +
+ Integer.toString(MAX_DATA_BYTES_PER_REQUEST) + "exceeded.");
+ }
+
+ if (outBuffer.size() == MAX_DATA_BYTES_PER_REQUEST) {
+ flushIOBuffers();
+ }
+ }
+ }
+
+ /**
+ * Force all data in the output stream to be written to Azure storage.
+ * Wait to return until this is complete.
+ */
+ @Override
+ public synchronized void hsync() throws IOException {
+ LOG.debug("Entering PageBlobOutputStream#hsync().");
+ long start = System.currentTimeMillis();
+ flush();
+ LOG.debug(ioThreadPool.toString());
+ try {
+ if (lastQueuedTask != null) {
+ lastQueuedTask.waitTillDone();
+ }
+ } catch (InterruptedException e1) {
+
+ // Restore the interrupted status
+ Thread.currentThread().interrupt();
+ }
+ LOG.debug("Leaving PageBlobOutputStream#hsync(). Total hsync duration = "
+ + (System.currentTimeMillis() - start) + " msec.");
+ }
+
+ @Override
+
+ public void hflush() throws IOException {
+
+ // hflush is required to force data to storage, so call hsync,
+ // which does that.
+ hsync();
+ }
+
+ @Deprecated
+ public void sync() throws IOException {
+
+ // Sync has been deprecated in favor of hflush.
+ hflush();
+ }
+
+ // For unit testing purposes: kill the IO threads.
+ @VisibleForTesting
+ void killIoThreads() {
+ ioThreadPool.shutdownNow();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5a737026/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/PartialListing.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/PartialListing.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/PartialListing.java
index 9e49de8..4a80d2e 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/PartialListing.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/PartialListing.java
@@ -30,7 +30,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
* This listing may be returned in chunks, so a <code>priorLastKey</code> is
* provided so that the next chunk may be requested.
* </p>
- *
+ *
* @see NativeFileSystemStore#list(String, int, String)
*/
@InterfaceAudience.Private
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5a737026/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SelfRenewingLease.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SelfRenewingLease.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SelfRenewingLease.java
new file mode 100644
index 0000000..2d5c0c8
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SelfRenewingLease.java
@@ -0,0 +1,202 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azure;
+
+import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.azure.StorageInterface.CloudBlobWrapper;
+
+import com.microsoft.windowsazure.storage.AccessCondition;
+import com.microsoft.windowsazure.storage.StorageException;
+import com.microsoft.windowsazure.storage.blob.CloudBlob;
+
+/**
+ * An Azure blob lease that automatically renews itself indefinitely
+ * using a background thread. Use it to synchronize distributed processes,
+ * or to prevent writes to the blob by other processes that don't
+ * have the lease.
+ *
+ * Creating a new Lease object blocks the caller until the Azure blob lease is
+ * acquired.
+ *
+ * Attempting to get a lease on a non-existent blob throws StorageException.
+ *
+ * Call free() to release the Lease.
+ *
+ * You can use this Lease like a distributed lock. If the holder process
+ * dies, the lease will time out since it won't be renewed.
+ */
+public class SelfRenewingLease {
+
+ private CloudBlobWrapper blobWrapper;
+ private Thread renewer;
+ private volatile boolean leaseFreed;
+ private String leaseID = null;
+ private static final int LEASE_TIMEOUT = 60; // Lease timeout in seconds
+
+ // Time to wait to renew lease in milliseconds
+ public static final int LEASE_RENEWAL_PERIOD = 40000;
+ private static final Log LOG = LogFactory.getLog(SelfRenewingLease.class);
+
+ // Used to allocate thread serial numbers in thread name
+ private static volatile int threadNumber = 0;
+
+
+ // Time to wait to retry getting the lease in milliseconds
+ private static final int LEASE_ACQUIRE_RETRY_INTERVAL = 2000;
+
+ public SelfRenewingLease(CloudBlobWrapper blobWrapper)
+ throws StorageException {
+
+ this.leaseFreed = false;
+ this.blobWrapper = blobWrapper;
+
+ // Keep trying to get the lease until you get it.
+ CloudBlob blob = blobWrapper.getBlob();
+ while(leaseID == null) {
+ try {
+ leaseID = blob.acquireLease(LEASE_TIMEOUT, null);
+ } catch (StorageException e) {
+
+ // Throw again if we don't want to keep waiting.
+ // We expect it to be that the lease is already present,
+ // or in some cases that the blob does not exist.
+ if (!e.getErrorCode().equals("LeaseAlreadyPresent")) {
+ LOG.info(
+ "Caught exception when trying to get lease on blob "
+ + blobWrapper.getUri().toString() + ". " + e.getMessage());
+ throw e;
+ }
+ }
+ if (leaseID == null) {
+ try {
+ Thread.sleep(LEASE_ACQUIRE_RETRY_INTERVAL);
+ } catch (InterruptedException e) {
+
+ // Restore the interrupted status
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+ renewer = new Thread(new Renewer());
+
+ // A Renewer running should not keep JVM from exiting, so make it a daemon.
+ renewer.setDaemon(true);
+ renewer.setName("AzureLeaseRenewer-" + threadNumber++);
+ renewer.start();
+ LOG.debug("Acquired lease " + leaseID + " on " + blob.getUri()
+ + " managed by thread " + renewer.getName());
+ }
+
+ /**
+ * Free the lease and stop the keep-alive thread.
+ * @throws StorageException
+ */
+ public void free() throws StorageException {
+ AccessCondition accessCondition = AccessCondition.generateEmptyCondition();
+ accessCondition.setLeaseID(leaseID);
+ try {
+ blobWrapper.getBlob().releaseLease(accessCondition);
+ } catch (StorageException e) {
+ if (e.getErrorCode().equals("BlobNotFound")) {
+
+ // Don't do anything -- it's okay to free a lease
+ // on a deleted file. The delete freed the lease
+ // implicitly.
+ } else {
+
+ // This error is not anticipated, so re-throw it.
+ LOG.warn("Unanticipated exception when trying to free lease " + leaseID
+ + " on " + blobWrapper.getStorageUri());
+ throw(e);
+ }
+ } finally {
+
+ // Even if releasing the lease fails (e.g. because the file was deleted),
+ // make sure to record that we freed the lease, to terminate the
+ // keep-alive thread.
+ leaseFreed = true;
+ LOG.debug("Freed lease " + leaseID + " on " + blobWrapper.getUri()
+ + " managed by thread " + renewer.getName());
+ }
+ }
+
+ public boolean isFreed() {
+ return leaseFreed;
+ }
+
+ public String getLeaseID() {
+ return leaseID;
+ }
+
+ public CloudBlob getCloudBlob() {
+ return blobWrapper.getBlob();
+ }
+
+ private class Renewer implements Runnable {
+
+ /**
+ * Start a keep-alive thread that will continue to renew
+ * the lease until it is freed or the process dies.
+ */
+ @Override
+ public void run() {
+ LOG.debug("Starting lease keep-alive thread.");
+ AccessCondition accessCondition =
+ AccessCondition.generateEmptyCondition();
+ accessCondition.setLeaseID(leaseID);
+
+ while(!leaseFreed) {
+ try {
+ Thread.sleep(LEASE_RENEWAL_PERIOD);
+ } catch (InterruptedException e) {
+ LOG.debug("Keep-alive thread for lease " + leaseID +
+ " interrupted.");
+
+ // Restore the interrupted status
+ Thread.currentThread().interrupt();
+ }
+ try {
+ if (!leaseFreed) {
+ blobWrapper.getBlob().renewLease(accessCondition);
+
+ // It'll be very rare to renew the lease (most will be short)
+ // so log that we did it, to help with system debugging.
+ LOG.info("Renewed lease " + leaseID + " on "
+ + getCloudBlob().getUri());
+ }
+ } catch (StorageException e) {
+ if (!leaseFreed) {
+
+ // Free the lease so we don't leave this thread running forever.
+ leaseFreed = true;
+
+ // Normally leases should be freed and there should be no
+ // exceptions, so log a warning.
+ LOG.warn("Attempt to renew lease " + leaseID + " on "
+ + getCloudBlob().getUri()
+ + " failed, but lease not yet freed. Reason: " +
+ e.getMessage());
+ }
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5a737026/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SelfThrottlingIntercept.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SelfThrottlingIntercept.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SelfThrottlingIntercept.java
index 25f2883..d18a144 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SelfThrottlingIntercept.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SelfThrottlingIntercept.java
@@ -68,12 +68,14 @@ public class SelfThrottlingIntercept {
private final float readFactor;
private final float writeFactor;
+ private final OperationContext operationContext;
// Concurrency: access to non-final members must be thread-safe
private long lastE2Elatency;
- public SelfThrottlingIntercept(OperationContext operationContext,
+ public SelfThrottlingIntercept(OperationContext operationContext,
float readFactor, float writeFactor) {
+ this.operationContext = operationContext;
this.readFactor = readFactor;
this.writeFactor = writeFactor;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5a737026/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/ShellDecryptionKeyProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/ShellDecryptionKeyProvider.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/ShellDecryptionKeyProvider.java
index 2ce8ebd..d9d6fc3 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/ShellDecryptionKeyProvider.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/ShellDecryptionKeyProvider.java
@@ -31,7 +31,8 @@ import org.apache.hadoop.util.Shell;
*/
@InterfaceAudience.Private
public class ShellDecryptionKeyProvider extends SimpleKeyProvider {
- static final String KEY_ACCOUNT_SHELLKEYPROVIDER_SCRIPT = "fs.azure.shellkeyprovider.script";
+ static final String KEY_ACCOUNT_SHELLKEYPROVIDER_SCRIPT =
+ "fs.azure.shellkeyprovider.script";
@Override
public String getStorageAccountKey(String accountName, Configuration conf)
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5a737026/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SimpleKeyProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SimpleKeyProvider.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SimpleKeyProvider.java
index ef44a85..3cd3eda 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SimpleKeyProvider.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SimpleKeyProvider.java
@@ -28,7 +28,8 @@ import org.apache.hadoop.conf.Configuration;
@InterfaceAudience.Private
public class SimpleKeyProvider implements KeyProvider {
- protected static final String KEY_ACCOUNT_KEY_PREFIX = "fs.azure.account.key.";
+ protected static final String KEY_ACCOUNT_KEY_PREFIX =
+ "fs.azure.account.key.";
@Override
public String getStorageAccountKey(String accountName, Configuration conf)
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5a737026/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterface.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterface.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterface.java
index 87cef86..8d0229d 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterface.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterface.java
@@ -23,6 +23,7 @@ import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
import java.net.URISyntaxException;
+import java.util.ArrayList;
import java.util.EnumSet;
import java.util.HashMap;
@@ -36,15 +37,17 @@ import com.microsoft.windowsazure.storage.StorageException;
import com.microsoft.windowsazure.storage.blob.BlobListingDetails;
import com.microsoft.windowsazure.storage.blob.BlobProperties;
import com.microsoft.windowsazure.storage.blob.BlobRequestOptions;
+import com.microsoft.windowsazure.storage.blob.CloudBlob;
import com.microsoft.windowsazure.storage.blob.CopyState;
import com.microsoft.windowsazure.storage.blob.ListBlobItem;
+import com.microsoft.windowsazure.storage.blob.PageRange;
/**
* This is a very thin layer over the methods exposed by the Windows Azure
* Storage SDK that we need for WASB implementation. This base class has a real
* implementation that just simply redirects to the SDK, and a memory-backed one
* that's used for unit tests.
- *
+ *
* IMPORTANT: all the methods here must remain very simple redirects since code
* written here can't be properly unit tested.
*/
@@ -323,23 +326,39 @@ abstract class StorageInterface {
* @throws URISyntaxException
* If URI syntax exception occurred.
*/
- public abstract CloudBlockBlobWrapper getBlockBlobReference(
+ public abstract CloudBlobWrapper getBlockBlobReference(
String relativePath) throws URISyntaxException, StorageException;
+
+ /**
+ * Returns a wrapper for a CloudPageBlob.
+ *
+ * @param relativePath
+ * A <code>String</code> that represents the name of the blob, relative to the container
+ *
+ * @throws StorageException
+ * If a storage service error occurred.
+ *
+ * @throws URISyntaxException
+ * If URI syntax exception occurred.
+ */
+ public abstract CloudBlobWrapper getPageBlobReference(String relativePath)
+ throws URISyntaxException, StorageException;
}
-
+
+
/**
- * A thin wrapper over the {@link CloudBlockBlob} class that simply redirects
- * calls to the real object except in unit tests.
+ * A thin wrapper over the {@link CloudBlob} class that simply redirects calls
+ * to the real object except in unit tests.
*/
@InterfaceAudience.Private
- public abstract static class CloudBlockBlobWrapper implements ListBlobItem {
+ public interface CloudBlobWrapper extends ListBlobItem {
/**
* Returns the URI for this blob.
*
* @return A <code>java.net.URI</code> object that represents the URI for
* the blob.
*/
- public abstract URI getUri();
+ URI getUri();
/**
* Returns the metadata for the blob.
@@ -347,7 +366,7 @@ abstract class StorageInterface {
* @return A <code>java.util.HashMap</code> object that represents the
* metadata for the blob.
*/
- public abstract HashMap<String, String> getMetadata();
+ HashMap<String, String> getMetadata();
/**
* Sets the metadata for the blob.
@@ -356,37 +375,64 @@ abstract class StorageInterface {
* A <code>java.util.HashMap</code> object that contains the
* metadata being assigned to the blob.
*/
- public abstract void setMetadata(HashMap<String, String> metadata);
+ void setMetadata(HashMap<String, String> metadata);
/**
- * Copies an existing blob's contents, properties, and metadata to this
- * instance of the <code>CloudBlob</code> class, using the specified
- * operation context.
- *
- * @param sourceBlob
- * A <code>CloudBlob</code> object that represents the source blob
- * to copy.
+ * Copies an existing blob's contents, properties, and metadata to this instance of the <code>CloudBlob</code>
+ * class, using the specified operation context.
+ *
+ * @param source
+ * A <code>java.net.URI</code> The URI of a source blob.
* @param opContext
- * An {@link OperationContext} object that represents the context
- * for the current operation. This object is used to track requests
- * to the storage service, and to provide additional runtime
- * information about the operation.
- *
+ * An {@link OperationContext} object that represents the context for the current operation. This object
+ * is used to track requests to the storage service, and to provide additional runtime information about
+ * the operation.
+ *
* @throws StorageException
- * If a storage service error occurred.
+ * If a storage service error occurred.
* @throws URISyntaxException
- *
+ *
*/
- public abstract void startCopyFromBlob(CloudBlockBlobWrapper sourceBlob,
- OperationContext opContext) throws StorageException, URISyntaxException;
-
+ public abstract void startCopyFromBlob(URI source,
+ OperationContext opContext)
+ throws StorageException, URISyntaxException;
+
/**
* Returns the blob's copy state.
*
* @return A {@link CopyState} object that represents the copy state of the
* blob.
*/
- public abstract CopyState getCopyState();
+ CopyState getCopyState();
+
+ /**
+ * Downloads a range of bytes from the blob to the given byte buffer, using the specified request options and
+ * operation context.
+ *
+ * @param offset
+ * The byte offset to use as the starting point for the source.
+ * @param length
+ * The number of bytes to read.
+ * @param buffer
+ * The byte buffer, as an array of bytes, to which the blob bytes are downloaded.
+ * @param bufferOffset
+ * The byte offset to use as the starting point for the target.
+ * @param options
+ * A {@link BlobRequestOptions} object that specifies any additional options for the request. Specifying
+ * <code>null</code> will use the default request options from the associated service client (
+ * {@link CloudBlobClient}).
+ * @param opContext
+ * An {@link OperationContext} object that represents the context for the current operation. This object
+ * is used to track requests to the storage service, and to provide additional runtime information about
+ * the operation.
+ *
+ * @throws StorageException
+ * If a storage service error occurred.
+ */
+ void downloadRange(final long offset, final long length,
+ final OutputStream outStream, final BlobRequestOptions options,
+ final OperationContext opContext)
+ throws StorageException, IOException;
/**
* Deletes the blob using the specified operation context.
@@ -407,7 +453,7 @@ abstract class StorageInterface {
* @throws StorageException
* If a storage service error occurred.
*/
- public abstract void delete(OperationContext opContext)
+ void delete(OperationContext opContext, SelfRenewingLease lease)
throws StorageException;
/**
@@ -419,13 +465,13 @@ abstract class StorageInterface {
* to the storage service, and to provide additional runtime
* information about the operation.
*
- * @return <code>true</code> if the blob exists, other wise
+ * @return <code>true</code> if the blob exists, otherwise
* <code>false</code>.
*
* @throws StorageException
- * f a storage service error occurred.
+ * If a storage service error occurred.
*/
- public abstract boolean exists(OperationContext opContext)
+ boolean exists(OperationContext opContext)
throws StorageException;
/**
@@ -446,7 +492,7 @@ abstract class StorageInterface {
* @throws StorageException
* If a storage service error occurred.
*/
- public abstract void downloadAttributes(OperationContext opContext)
+ void downloadAttributes(OperationContext opContext)
throws StorageException;
/**
@@ -455,7 +501,7 @@ abstract class StorageInterface {
* @return A {@link BlobProperties} object that represents the properties of
* the blob.
*/
- public abstract BlobProperties getProperties();
+ BlobProperties getProperties();
/**
* Opens a blob input stream to download the blob using the specified
@@ -476,49 +522,10 @@ abstract class StorageInterface {
* @throws StorageException
* If a storage service error occurred.
*/
- public abstract InputStream openInputStream(BlobRequestOptions options,
+ InputStream openInputStream(BlobRequestOptions options,
OperationContext opContext) throws StorageException;
/**
- * Creates and opens an output stream to write data to the block blob using
- * the specified operation context.
- *
- * @param opContext
- * An {@link OperationContext} object that represents the context
- * for the current operation. This object is used to track requests
- * to the storage service, and to provide additional runtime
- * information about the operation.
- *
- * @return A {@link BlobOutputStream} object used to write data to the blob.
- *
- * @throws StorageException
- * If a storage service error occurred.
- */
- public abstract OutputStream openOutputStream(BlobRequestOptions options,
- OperationContext opContext) throws StorageException;
-
- /**
- * Uploads the source stream data to the blob, using the specified operation
- * context.
- *
- * @param sourceStream
- * An <code>InputStream</code> object that represents the input
- * stream to write to the block blob.
- * @param opContext
- * An {@link OperationContext} object that represents the context
- * for the current operation. This object is used to track requests
- * to the storage service, and to provide additional runtime
- * information about the operation.
- *
- * @throws IOException
- * If an I/O error occurred.
- * @throws StorageException
- * If a storage service error occurred.
- */
- public abstract void upload(InputStream sourceStream,
- OperationContext opContext) throws StorageException, IOException;
-
- /**
* Uploads the blob's metadata to the storage service using the specified
* lease ID, request options, and operation context.
*
@@ -531,12 +538,15 @@ abstract class StorageInterface {
* @throws StorageException
* If a storage service error occurred.
*/
- public abstract void uploadMetadata(OperationContext opContext)
+ void uploadMetadata(OperationContext opContext)
throws StorageException;
- public abstract void uploadProperties(OperationContext opContext)
+ void uploadProperties(OperationContext opContext,
+ SelfRenewingLease lease)
throws StorageException;
+ SelfRenewingLease acquireLease() throws StorageException;
+
/**
* Sets the minimum read block size to use with this Blob.
*
@@ -545,7 +555,7 @@ abstract class StorageInterface {
* while using a {@link BlobInputStream} object, ranging from 512
* bytes to 64 MB, inclusive.
*/
- public abstract void setStreamMinimumReadSizeInBytes(
+ void setStreamMinimumReadSizeInBytes(
int minimumReadSizeBytes);
/**
@@ -560,7 +570,121 @@ abstract class StorageInterface {
* If <code>writeBlockSizeInBytes</code> is less than 1 MB or
* greater than 4 MB.
*/
- public abstract void setWriteBlockSizeInBytes(int writeBlockSizeBytes);
+ void setWriteBlockSizeInBytes(int writeBlockSizeBytes);
+
+ CloudBlob getBlob();
+ }
+
+ /**
+ * A thin wrapper over the {@link CloudBlockBlob} class that simply redirects calls
+ * to the real object except in unit tests.
+ */
+ public abstract interface CloudBlockBlobWrapper
+ extends CloudBlobWrapper {
+ /**
+ * Creates and opens an output stream to write data to the block blob using the specified
+ * operation context.
+ *
+ * @param opContext
+ * An {@link OperationContext} object that represents the context for the current operation. This object
+ * is used to track requests to the storage service, and to provide additional runtime information about
+ * the operation.
+ *
+ * @return A {@link BlobOutputStream} object used to write data to the blob.
+ *
+ * @throws StorageException
+ * If a storage service error occurred.
+ */
+ OutputStream openOutputStream(
+ BlobRequestOptions options,
+ OperationContext opContext) throws StorageException;
+ }
+
+ /**
+ * A thin wrapper over the {@link CloudPageBlob} class that simply redirects calls
+ * to the real object except in unit tests.
+ */
+ public abstract interface CloudPageBlobWrapper
+ extends CloudBlobWrapper {
+ /**
+ * Creates a page blob using the specified request options and operation context.
+ *
+ * @param length
+ * The size, in bytes, of the page blob.
+ * @param options
+ * A {@link BlobRequestOptions} object that specifies any additional options for the request. Specifying
+ * <code>null</code> will use the default request options from the associated service client (
+ * {@link CloudBlobClient}).
+ * @param opContext
+ * An {@link OperationContext} object that represents the context for the current operation. This object
+ * is used to track requests to the storage service, and to provide additional runtime information about
+ * the operation.
+ *
+ * @throws IllegalArgumentException
+ * If the length is not a multiple of 512.
+ *
+ * @throws StorageException
+ * If a storage service error occurred.
+ */
+ void create(final long length, BlobRequestOptions options,
+ OperationContext opContext) throws StorageException;
+
+
+ /**
+ * Uploads a range of contiguous pages, up to 4 MB in size, at the specified offset in the page blob, using the
+ * specified lease ID, request options, and operation context.
+ *
+ * @param sourceStream
+ * An <code>InputStream</code> object that represents the input stream to write to the page blob.
+ * @param offset
+ * The offset, in number of bytes, at which to begin writing the data. This value must be a multiple of
+ * 512.
+ * @param length
+ * The length, in bytes, of the data to write. This value must be a multiple of 512.
+ * @param options
+ * A {@link BlobRequestOptions} object that specifies any additional options for the request. Specifying
+ * <code>null</code> will use the default request options from the associated service client (
+ * {@link CloudBlobClient}).
+ * @param opContext
+ * An {@link OperationContext} object that represents the context for the current operation. This object
+ * is used to track requests to the storage service, and to provide additional runtime information about
+ * the operation.
+ *
+ * @throws IllegalArgumentException
+ * If the offset or length are not multiples of 512, or if the length is greater than 4 MB.
+ * @throws IOException
+ * If an I/O exception occurred.
+ * @throws StorageException
+ * If a storage service error occurred.
+ */
+ void uploadPages(final InputStream sourceStream, final long offset,
+ final long length, BlobRequestOptions options,
+ OperationContext opContext) throws StorageException, IOException;
+
+ /**
+ * Returns a collection of page ranges and their starting and ending byte offsets using the specified request
+ * options and operation context.
+ *
+ * @param options
+ * A {@link BlobRequestOptions} object that specifies any additional options for the request. Specifying
+ * <code>null</code> will use the default request options from the associated service client (
+ * {@link CloudBlobClient}).
+ * @param opContext
+ * An {@link OperationContext} object that represents the context for the current operation. This object
+ * is used to track requests to the storage service, and to provide additional runtime information about
+ * the operation.
+ *
+ * @return An <code>ArrayList</code> object that represents the set of page ranges and their starting and ending
+ * byte offsets.
+ *
+ * @throws StorageException
+ * If a storage service error occurred.
+ */
+ ArrayList<PageRange> downloadPageRanges(BlobRequestOptions options,
+ OperationContext opContext) throws StorageException;
+
+ void uploadMetadata(OperationContext opContext)
+ throws StorageException;
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5a737026/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterfaceImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterfaceImpl.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterfaceImpl.java
index 935bf71..e44823c 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterfaceImpl.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterfaceImpl.java
@@ -23,6 +23,7 @@ import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
import java.net.URISyntaxException;
+import java.util.ArrayList;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Iterator;
@@ -39,13 +40,16 @@ import com.microsoft.windowsazure.storage.StorageUri;
import com.microsoft.windowsazure.storage.blob.BlobListingDetails;
import com.microsoft.windowsazure.storage.blob.BlobProperties;
import com.microsoft.windowsazure.storage.blob.BlobRequestOptions;
+import com.microsoft.windowsazure.storage.blob.CloudBlob;
import com.microsoft.windowsazure.storage.blob.CloudBlobClient;
import com.microsoft.windowsazure.storage.blob.CloudBlobContainer;
import com.microsoft.windowsazure.storage.blob.CloudBlobDirectory;
import com.microsoft.windowsazure.storage.blob.CloudBlockBlob;
+import com.microsoft.windowsazure.storage.blob.CloudPageBlob;
import com.microsoft.windowsazure.storage.blob.CopyState;
import com.microsoft.windowsazure.storage.blob.DeleteSnapshotsOption;
import com.microsoft.windowsazure.storage.blob.ListBlobItem;
+import com.microsoft.windowsazure.storage.blob.PageRange;
/**
* A real implementation of the Azure interaction layer that just redirects
@@ -129,6 +133,8 @@ class StorageInterfaceImpl extends StorageInterface {
return new CloudBlobDirectoryWrapperImpl((CloudBlobDirectory) unwrapped);
} else if (unwrapped instanceof CloudBlockBlob) {
return new CloudBlockBlobWrapperImpl((CloudBlockBlob) unwrapped);
+ } else if (unwrapped instanceof CloudPageBlob) {
+ return new CloudPageBlobWrapperImpl((CloudPageBlob) unwrapped);
} else {
return unwrapped;
}
@@ -244,129 +250,217 @@ class StorageInterfaceImpl extends StorageInterface {
}
@Override
- public CloudBlockBlobWrapper getBlockBlobReference(String relativePath)
+ public CloudBlobWrapper getBlockBlobReference(String relativePath)
throws URISyntaxException, StorageException {
- return new CloudBlockBlobWrapperImpl(
- container.getBlockBlobReference(relativePath));
+ return new CloudBlockBlobWrapperImpl(container.getBlockBlobReference(relativePath));
}
+
+ @Override
+ public CloudBlobWrapper getPageBlobReference(String relativePath)
+ throws URISyntaxException, StorageException {
+ return new CloudPageBlobWrapperImpl(
+ container.getPageBlobReference(relativePath));
+ }
+
}
+
+ abstract static class CloudBlobWrapperImpl implements CloudBlobWrapper {
+ private final CloudBlob blob;
- //
- // CloudBlockBlobWrapperImpl
- //
- @InterfaceAudience.Private
- static class CloudBlockBlobWrapperImpl extends CloudBlockBlobWrapper {
- private final CloudBlockBlob blob;
+ @Override
+ public CloudBlob getBlob() {
+ return blob;
+ }
public URI getUri() {
- return blob.getUri();
+ return getBlob().getUri();
}
- public CloudBlockBlobWrapperImpl(CloudBlockBlob blob) {
+ protected CloudBlobWrapperImpl(CloudBlob blob) {
this.blob = blob;
}
@Override
public HashMap<String, String> getMetadata() {
- return blob.getMetadata();
+ return getBlob().getMetadata();
}
@Override
- public void startCopyFromBlob(CloudBlockBlobWrapper sourceBlob,
- OperationContext opContext) throws StorageException, URISyntaxException {
-
- blob.startCopyFromBlob(((CloudBlockBlobWrapperImpl) sourceBlob).blob,
- null, null, null, opContext);
-
+ public void delete(OperationContext opContext, SelfRenewingLease lease)
+ throws StorageException {
+ getBlob().delete(DeleteSnapshotsOption.NONE, getLeaseCondition(lease),
+ null, opContext);
}
- @Override
- public void delete(OperationContext opContext) throws StorageException {
- blob.delete(DeleteSnapshotsOption.NONE, null, null, opContext);
+ /**
+ * Return and access condition for this lease, or else null if
+ * there's no lease.
+ */
+ private AccessCondition getLeaseCondition(SelfRenewingLease lease) {
+ AccessCondition leaseCondition = null;
+ if (lease != null) {
+ leaseCondition = AccessCondition.generateLeaseCondition(lease.getLeaseID());
+ }
+ return leaseCondition;
}
@Override
- public boolean exists(OperationContext opContext) throws StorageException {
- return blob.exists(null, null, opContext);
+ public boolean exists(OperationContext opContext)
+ throws StorageException {
+ return getBlob().exists(null, null, opContext);
}
@Override
- public void downloadAttributes(OperationContext opContext)
- throws StorageException {
- blob.downloadAttributes(null, null, opContext);
+ public void downloadAttributes(
+ OperationContext opContext) throws StorageException {
+ getBlob().downloadAttributes(null, null, opContext);
}
@Override
public BlobProperties getProperties() {
- return blob.getProperties();
+ return getBlob().getProperties();
}
@Override
public void setMetadata(HashMap<String, String> metadata) {
- blob.setMetadata(metadata);
+ getBlob().setMetadata(metadata);
}
@Override
- public InputStream openInputStream(BlobRequestOptions options,
+ public InputStream openInputStream(
+ BlobRequestOptions options,
OperationContext opContext) throws StorageException {
- return blob.openInputStream(null, options, opContext);
+ return getBlob().openInputStream(null, options, opContext);
}
- @Override
- public OutputStream openOutputStream(BlobRequestOptions options,
+ public OutputStream openOutputStream(
+ BlobRequestOptions options,
OperationContext opContext) throws StorageException {
- return blob.openOutputStream(null, options, opContext);
+ return ((CloudBlockBlob) getBlob()).openOutputStream(null, options, opContext);
}
- @Override
public void upload(InputStream sourceStream, OperationContext opContext)
throws StorageException, IOException {
- blob.upload(sourceStream, 0, null, null, opContext);
+ getBlob().upload(sourceStream, 0, null, null, opContext);
}
@Override
public CloudBlobContainer getContainer() throws URISyntaxException,
StorageException {
- return blob.getContainer();
+ return getBlob().getContainer();
}
@Override
public CloudBlobDirectory getParent() throws URISyntaxException,
StorageException {
- return blob.getParent();
+ return getBlob().getParent();
}
@Override
public void uploadMetadata(OperationContext opContext)
throws StorageException {
- blob.uploadMetadata(null, null, opContext);
+ getBlob().uploadMetadata(null, null, opContext);
}
- @Override
- public void uploadProperties(OperationContext opContext)
+ public void uploadProperties(OperationContext opContext, SelfRenewingLease lease)
throws StorageException {
- blob.uploadProperties(null, null, opContext);
+
+ // Include lease in request if lease not null.
+ getBlob().uploadProperties(getLeaseCondition(lease), null, opContext);
}
@Override
public void setStreamMinimumReadSizeInBytes(int minimumReadSizeBytes) {
- blob.setStreamMinimumReadSizeInBytes(minimumReadSizeBytes);
+ getBlob().setStreamMinimumReadSizeInBytes(minimumReadSizeBytes);
}
@Override
public void setWriteBlockSizeInBytes(int writeBlockSizeBytes) {
- blob.setStreamWriteSizeInBytes(writeBlockSizeBytes);
+ getBlob().setStreamWriteSizeInBytes(writeBlockSizeBytes);
}
@Override
public StorageUri getStorageUri() {
- return blob.getStorageUri();
+ return getBlob().getStorageUri();
}
@Override
public CopyState getCopyState() {
- return blob.getCopyState();
+ return getBlob().getCopyState();
+ }
+
+ @Override
+ public void startCopyFromBlob(URI source,
+ OperationContext opContext)
+ throws StorageException, URISyntaxException {
+ getBlob().startCopyFromBlob(source,
+ null, null, null, opContext);
+ }
+
+ @Override
+ public void downloadRange(long offset, long length, OutputStream outStream,
+ BlobRequestOptions options, OperationContext opContext)
+ throws StorageException, IOException {
+
+ getBlob().downloadRange(offset, length, outStream, null, options, opContext);
+ }
+
+ @Override
+ public SelfRenewingLease acquireLease() throws StorageException {
+ return new SelfRenewingLease(this);
+ }
+ }
+
+
+ //
+ // CloudBlockBlobWrapperImpl
+ //
+
+ static class CloudBlockBlobWrapperImpl extends CloudBlobWrapperImpl implements CloudBlockBlobWrapper {
+ public CloudBlockBlobWrapperImpl(CloudBlockBlob blob) {
+ super(blob);
+ }
+
+ public OutputStream openOutputStream(
+ BlobRequestOptions options,
+ OperationContext opContext) throws StorageException {
+ return ((CloudBlockBlob) getBlob()).openOutputStream(null, options, opContext);
+ }
+
+ public void upload(InputStream sourceStream, OperationContext opContext)
+ throws StorageException, IOException {
+ getBlob().upload(sourceStream, 0, null, null, opContext);
+ }
+
+ public void uploadProperties(OperationContext opContext)
+ throws StorageException {
+ getBlob().uploadProperties(null, null, opContext);
+ }
+
+ }
+
+ static class CloudPageBlobWrapperImpl extends CloudBlobWrapperImpl implements CloudPageBlobWrapper {
+ public CloudPageBlobWrapperImpl(CloudPageBlob blob) {
+ super(blob);
+ }
+
+ public void create(final long length, BlobRequestOptions options,
+ OperationContext opContext) throws StorageException {
+ ((CloudPageBlob) getBlob()).create(length, null, options, opContext);
+ }
+
+ public void uploadPages(final InputStream sourceStream, final long offset,
+ final long length, BlobRequestOptions options, OperationContext opContext)
+ throws StorageException, IOException {
+ ((CloudPageBlob) getBlob()).uploadPages(sourceStream, offset, length, null,
+ options, opContext);
+ }
+
+ public ArrayList<PageRange> downloadPageRanges(BlobRequestOptions options,
+ OperationContext opContext) throws StorageException {
+ return ((CloudPageBlob) getBlob()).downloadPageRanges(
+ null, options, opContext);
}
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5a737026/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SyncableDataOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SyncableDataOutputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SyncableDataOutputStream.java
new file mode 100644
index 0000000..9bec7a5
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SyncableDataOutputStream.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azure;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+
+import org.apache.hadoop.fs.Syncable;
+
+/**
+ * Support the Syncable interface on top of a DataOutputStream.
+ * This allows passing the sync/hflush/hsync calls through to the
+ * wrapped stream passed in to the constructor. This is required
+ * for HBase when wrapping a PageBlobOutputStream used as a write-ahead log.
+ */
+public class SyncableDataOutputStream extends DataOutputStream implements Syncable {
+
+ public SyncableDataOutputStream(OutputStream out) {
+ super(out);
+ }
+
+ @Override
+ public void hflush() throws IOException {
+ if (out instanceof Syncable) {
+ ((Syncable) out).hflush();
+ } else {
+ out.flush();
+ }
+ }
+
+ @Override
+ public void hsync() throws IOException {
+ if (out instanceof Syncable) {
+ ((Syncable) out).hsync();
+ } else {
+ out.flush();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5a737026/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/Wasb.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/Wasb.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/Wasb.java
index e098cef..dd354d7 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/Wasb.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/Wasb.java
@@ -27,7 +27,6 @@ import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.DelegateToFileSystem;
-
/**
* WASB implementation of AbstractFileSystem.
* This impl delegates to the old FileSystem
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5a737026/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/metrics/AzureFileSystemInstrumentation.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/metrics/AzureFileSystemInstrumentation.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/metrics/AzureFileSystemInstrumentation.java
index e389d7c..a08ad71 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/metrics/AzureFileSystemInstrumentation.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/metrics/AzureFileSystemInstrumentation.java
@@ -41,11 +41,11 @@ import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
@InterfaceAudience.Public
@InterfaceStability.Evolving
public final class AzureFileSystemInstrumentation implements MetricsSource {
-
+
public static final String METRIC_TAG_FILESYSTEM_ID = "wasbFileSystemId";
public static final String METRIC_TAG_ACCOUNT_NAME = "accountName";
public static final String METRIC_TAG_CONTAINTER_NAME = "containerName";
-
+
public static final String WASB_WEB_RESPONSES = "wasb_web_responses";
public static final String WASB_BYTES_WRITTEN =
"wasb_bytes_written_last_second";
@@ -381,7 +381,6 @@ public final class AzureFileSystemInstrumentation implements MetricsSource {
*/
public long getCurrentMaximumDownloadBandwidth() {
return currentMaximumDownloadBytesPerSecond;
-
}
@Override
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5a737026/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/metrics/ResponseReceivedMetricUpdater.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/metrics/ResponseReceivedMetricUpdater.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/metrics/ResponseReceivedMetricUpdater.java
index e3f5d44..676adb9 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/metrics/ResponseReceivedMetricUpdater.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/metrics/ResponseReceivedMetricUpdater.java
@@ -33,8 +33,7 @@ import com.microsoft.windowsazure.storage.StorageEvent;
/**
* An event listener to the ResponseReceived event from Azure Storage that will
- * update metrics appropriately.
- *
+ * update metrics appropriately when it gets that event.
*/
@InterfaceAudience.Private
public final class ResponseReceivedMetricUpdater extends StorageEvent<ResponseReceivedEvent> {
@@ -43,7 +42,7 @@ public final class ResponseReceivedMetricUpdater extends StorageEvent<ResponseRe
private final AzureFileSystemInstrumentation instrumentation;
private final BandwidthGaugeUpdater blockUploadGaugeUpdater;
-
+
private ResponseReceivedMetricUpdater(OperationContext operationContext,
AzureFileSystemInstrumentation instrumentation,
BandwidthGaugeUpdater blockUploadGaugeUpdater) {
@@ -142,6 +141,6 @@ public final class ResponseReceivedMetricUpdater extends StorageEvent<ResponseRe
instrumentation.rawBytesDownloaded(length);
instrumentation.blockDownloaded(requestLatency);
}
- }
+ }
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5a737026/hadoop-tools/hadoop-azure/src/main/resources/META-INF/services/org.apache.hadoop.fs.FileSystem
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/resources/META-INF/services/org.apache.hadoop.fs.FileSystem b/hadoop-tools/hadoop-azure/src/main/resources/META-INF/services/org.apache.hadoop.fs.FileSystem
new file mode 100644
index 0000000..9f4922b
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/resources/META-INF/services/org.apache.hadoop.fs.FileSystem
@@ -0,0 +1,17 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.hadoop.fs.azure.NativeAzureFileSystem
+org.apache.hadoop.fs.azure.NativeAzureFileSystem$Secure
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5a737026/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/AzureBlobStorageTestAccount.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/AzureBlobStorageTestAccount.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/AzureBlobStorageTestAccount.java
index 80e8e43..a323237 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/AzureBlobStorageTestAccount.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/AzureBlobStorageTestAccount.java
@@ -78,6 +78,8 @@ public final class AzureBlobStorageTestAccount {
private static final String KEY_DISABLE_THROTTLING = "fs.azure.disable.bandwidth.throttling";
private static final String KEY_READ_TOLERATE_CONCURRENT_APPEND = "fs.azure.io.read.tolerate.concurrent.append";
+ public static final String DEFAULT_PAGE_BLOB_DIRECTORY = "pageBlobs";
+ public static final String DEFAULT_ATOMIC_RENAME_DIRECTORIES = "/atomicRenameDir1,/atomicRenameDir2";
private CloudStorageAccount account;
private CloudBlobContainer container;
@@ -85,12 +87,14 @@ public final class AzureBlobStorageTestAccount {
private NativeAzureFileSystem fs;
private AzureNativeFileSystemStore storage;
private MockStorageInterface mockStorage;
+ private String pageBlobDirectory;
private static final ConcurrentLinkedQueue<MetricsRecord> allMetrics =
new ConcurrentLinkedQueue<MetricsRecord>();
-
+ private static boolean metricsConfigSaved = false;
private AzureBlobStorageTestAccount(NativeAzureFileSystem fs,
- CloudStorageAccount account, CloudBlobContainer container) {
+ CloudStorageAccount account,
+ CloudBlobContainer container) {
this.account = account;
this.container = container;
this.fs = fs;
@@ -158,6 +162,14 @@ public final class AzureBlobStorageTestAccount {
return toMockUri(path.toUri().getRawPath().substring(1));
}
+ public static Path pageBlobPath() {
+ return new Path("/" + DEFAULT_PAGE_BLOB_DIRECTORY);
+ }
+
+ public static Path pageBlobPath(String fileName) {
+ return new Path(pageBlobPath(), fileName);
+ }
+
public Number getLatestMetricValue(String metricName, Number defaultValue)
throws IndexOutOfBoundsException{
boolean found = false;
@@ -206,8 +218,10 @@ public final class AzureBlobStorageTestAccount {
* The blob key (no initial slash).
* @return The blob reference.
*/
- public CloudBlockBlob getBlobReference(String blobKey) throws Exception {
- return container.getBlockBlobReference(String.format(blobKey));
+ public CloudBlockBlob getBlobReference(String blobKey)
+ throws Exception {
+ return container.getBlockBlobReference(
+ String.format(blobKey));
}
/**
@@ -233,45 +247,79 @@ public final class AzureBlobStorageTestAccount {
getBlobReference(blobKey).releaseLease(accessCondition);
}
+ private static void saveMetricsConfigFile() {
+ if (!metricsConfigSaved) {
+ new org.apache.hadoop.metrics2.impl.ConfigBuilder()
+ .add("azure-file-system.sink.azuretestcollector.class",
+ StandardCollector.class.getName())
+ .save("hadoop-metrics2-azure-file-system.properties");
+ metricsConfigSaved = true;
+ }
+ }
+
public static AzureBlobStorageTestAccount createMock() throws Exception {
return createMock(new Configuration());
}
- public static AzureBlobStorageTestAccount createMock(Configuration conf)
- throws Exception {
+ public static AzureBlobStorageTestAccount createMock(Configuration conf) throws Exception {
+ saveMetricsConfigFile();
+ configurePageBlobDir(conf);
+ configureAtomicRenameDir(conf);
AzureNativeFileSystemStore store = new AzureNativeFileSystemStore();
MockStorageInterface mockStorage = new MockStorageInterface();
store.setAzureStorageInteractionLayer(mockStorage);
NativeAzureFileSystem fs = new NativeAzureFileSystem(store);
- addWasbToConfiguration(conf);
setMockAccountKey(conf);
// register the fs provider.
fs.initialize(new URI(MOCK_WASB_URI), conf);
- AzureBlobStorageTestAccount testAcct = new AzureBlobStorageTestAccount(fs,
- mockStorage);
+ AzureBlobStorageTestAccount testAcct =
+ new AzureBlobStorageTestAccount(fs, mockStorage);
return testAcct;
}
/**
+ * Set the page blob directories configuration to the default if it is not
+ * already set. Some tests may set it differently (e.g. the page blob
+ * tests in TestNativeAzureFSPageBlobLive).
+ * @param conf The configuration to conditionally update.
+ */
+ private static void configurePageBlobDir(Configuration conf) {
+ if (conf.get(AzureNativeFileSystemStore.KEY_PAGE_BLOB_DIRECTORIES) == null) {
+ conf.set(AzureNativeFileSystemStore.KEY_PAGE_BLOB_DIRECTORIES,
+ "/" + DEFAULT_PAGE_BLOB_DIRECTORY);
+ }
+ }
+
+ /** Do the same for the atomic rename directories configuration */
+ private static void configureAtomicRenameDir(Configuration conf) {
+ if (conf.get(AzureNativeFileSystemStore.KEY_ATOMIC_RENAME_DIRECTORIES) == null) {
+ conf.set(AzureNativeFileSystemStore.KEY_ATOMIC_RENAME_DIRECTORIES,
+ DEFAULT_ATOMIC_RENAME_DIRECTORIES);
+ }
+ }
+
+ /**
* Creates a test account that goes against the storage emulator.
*
* @return The test account, or null if the emulator isn't setup.
*/
public static AzureBlobStorageTestAccount createForEmulator()
throws Exception {
+ saveMetricsConfigFile();
NativeAzureFileSystem fs = null;
CloudBlobContainer container = null;
Configuration conf = createTestConfiguration();
if (!conf.getBoolean(USE_EMULATOR_PROPERTY_NAME, false)) {
// Not configured to test against the storage emulator.
- System.out.println("Skipping emulator Azure test because configuration "
- + "doesn't indicate that it's running."
- + " Please see README.txt for guidance.");
+ System.out
+ .println("Skipping emulator Azure test because configuration " +
+ "doesn't indicate that it's running." +
+ " Please see RunningLiveWasbTests.txt for guidance.");
return null;
}
- CloudStorageAccount account = CloudStorageAccount
- .getDevelopmentStorageAccount();
+ CloudStorageAccount account =
+ CloudStorageAccount.getDevelopmentStorageAccount();
fs = new NativeAzureFileSystem();
String containerName = String.format("wasbtests-%s-%tQ",
System.getProperty("user.name"), new Date());
@@ -285,14 +333,18 @@ public final class AzureBlobStorageTestAccount {
fs.initialize(accountUri, conf);
// Create test account initializing the appropriate member variables.
- AzureBlobStorageTestAccount testAcct = new AzureBlobStorageTestAccount(fs,
- account, container);
+ //
+ AzureBlobStorageTestAccount testAcct =
+ new AzureBlobStorageTestAccount(fs, account, container);
return testAcct;
}
public static AzureBlobStorageTestAccount createOutOfBandStore(
int uploadBlockSize, int downloadBlockSize) throws Exception {
+
+ saveMetricsConfigFile();
+
CloudBlobContainer container = null;
Configuration conf = createTestConfiguration();
CloudStorageAccount account = createTestAccount(conf);
@@ -337,8 +389,9 @@ public final class AzureBlobStorageTestAccount {
testStorage.initialize(accountUri, conf, instrumentation);
// Create test account initializing the appropriate member variables.
- AzureBlobStorageTestAccount testAcct = new AzureBlobStorageTestAccount(
- testStorage, account, container);
+ //
+ AzureBlobStorageTestAccount testAcct =
+ new AzureBlobStorageTestAccount(testStorage, account, container);
return testAcct;
}
@@ -416,11 +469,11 @@ public final class AzureBlobStorageTestAccount {
}
}
- private static Configuration createTestConfiguration() {
+ public static Configuration createTestConfiguration() {
return createTestConfiguration(null);
}
- protected static Configuration createTestConfiguration(Configuration conf) {
+ private static Configuration createTestConfiguration(Configuration conf) {
if (conf == null) {
conf = new Configuration();
}
@@ -429,16 +482,9 @@ public final class AzureBlobStorageTestAccount {
return conf;
}
- // for programmatic setting of the wasb configuration.
- // note that tests can also get the
- public static void addWasbToConfiguration(Configuration conf) {
- conf.set("fs.wasb.impl", "org.apache.hadoop.fs.azure.NativeAzureFileSystem");
- conf.set("fs.wasbs.impl",
- "org.apache.hadoop.fs.azure.NativeAzureFileSystem");
- }
-
- static CloudStorageAccount createTestAccount() throws URISyntaxException,
- KeyProviderException {
+ static CloudStorageAccount createTestAccount()
+ throws URISyntaxException, KeyProviderException
+ {
return createTestAccount(createTestConfiguration());
}
@@ -447,8 +493,8 @@ public final class AzureBlobStorageTestAccount {
String testAccountName = conf.get(TEST_ACCOUNT_NAME_PROPERTY_NAME);
if (testAccountName == null) {
System.out
- .println("Skipping live Azure test because of missing test account."
- + " Please see README.txt for guidance.");
+ .println("Skipping live Azure test because of missing test account." +
+ " Please see RunningLiveWasbTests.txt for guidance.");
return null;
}
return createStorageAccount(testAccountName, conf, false);
@@ -466,9 +512,12 @@ public final class AzureBlobStorageTestAccount {
public static AzureBlobStorageTestAccount create(String containerNameSuffix,
EnumSet<CreateOptions> createOptions, Configuration initialConfiguration)
throws Exception {
+ saveMetricsConfigFile();
NativeAzureFileSystem fs = null;
CloudBlobContainer container = null;
Configuration conf = createTestConfiguration(initialConfiguration);
+ configurePageBlobDir(conf);
+ configureAtomicRenameDir(conf);
CloudStorageAccount account = createTestAccount(conf);
if (account == null) {
return null;
@@ -510,15 +559,18 @@ public final class AzureBlobStorageTestAccount {
fs.initialize(accountUri, conf);
// Create test account initializing the appropriate member variables.
- AzureBlobStorageTestAccount testAcct = new AzureBlobStorageTestAccount(fs,
- account, container);
+ //
+ AzureBlobStorageTestAccount testAcct =
+ new AzureBlobStorageTestAccount(fs, account, container);
return testAcct;
}
private static String generateContainerName() throws Exception {
- String containerName = String.format("wasbtests-%s-%tQ",
- System.getProperty("user.name"), new Date());
+ String containerName =
+ String.format ("wasbtests-%s-%tQ",
+ System.getProperty("user.name"),
+ new Date());
return containerName;
}
@@ -548,12 +600,16 @@ public final class AzureBlobStorageTestAccount {
if (readonly) {
// Set READ permissions
- sasPolicy.setPermissions(EnumSet.of(SharedAccessBlobPermissions.READ,
+ sasPolicy.setPermissions(EnumSet.of(
+ SharedAccessBlobPermissions.READ,
SharedAccessBlobPermissions.LIST));
} else {
// Set READ and WRITE permissions.
- sasPolicy.setPermissions(EnumSet.of(SharedAccessBlobPermissions.READ,
- SharedAccessBlobPermissions.WRITE, SharedAccessBlobPermissions.LIST));
+ //
+ sasPolicy.setPermissions(EnumSet.of(
+ SharedAccessBlobPermissions.READ,
+ SharedAccessBlobPermissions.WRITE,
+ SharedAccessBlobPermissions.LIST));
}
// Create the container permissions.
@@ -590,8 +646,11 @@ public final class AzureBlobStorageTestAccount {
SharedAccessBlobPolicy sasPolicy = new SharedAccessBlobPolicy();
// Set READ and WRITE permissions.
- sasPolicy.setPermissions(EnumSet.of(SharedAccessBlobPermissions.READ,
- SharedAccessBlobPermissions.WRITE, SharedAccessBlobPermissions.LIST,
+ //
+ sasPolicy.setPermissions(EnumSet.of(
+ SharedAccessBlobPermissions.READ,
+ SharedAccessBlobPermissions.WRITE,
+ SharedAccessBlobPermissions.LIST,
SharedAccessBlobPermissions.DELETE));
// Create the container permissions.
@@ -725,8 +784,9 @@ public final class AzureBlobStorageTestAccount {
// Create test account initializing the appropriate member variables.
// Set the container value to null for the default root container.
- AzureBlobStorageTestAccount testAcct = new AzureBlobStorageTestAccount(fs,
- account, blobRoot);
+ //
+ AzureBlobStorageTestAccount testAcct = new AzureBlobStorageTestAccount(
+ fs, account, blobRoot);
// Return to caller with test account.
return testAcct;
@@ -805,5 +865,12 @@ public final class AzureBlobStorageTestAccount {
public void flush() {
}
}
-
+
+ public void setPageBlobDirectory(String directory) {
+ this.pageBlobDirectory = directory;
+ }
+
+ public String getPageBlobDirectory() {
+ return pageBlobDirectory;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5a737026/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/InMemoryBlockBlobStore.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/InMemoryBlockBlobStore.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/InMemoryBlockBlobStore.java
index ab35961..b8971c4 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/InMemoryBlockBlobStore.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/InMemoryBlockBlobStore.java
@@ -41,12 +41,15 @@ public class InMemoryBlockBlobStore {
private final String key;
private final HashMap<String, String> metadata;
private final int contentLength;
+ private final boolean isPageBlob;
+
ListBlobEntry(String key, HashMap<String, String> metadata,
- int contentLength) {
+ int contentLength, boolean isPageBlob) {
this.key = key;
this.metadata = metadata;
this.contentLength = contentLength;
+ this.isPageBlob = isPageBlob;
}
public String getKey() {
@@ -60,6 +63,10 @@ public class InMemoryBlockBlobStore {
public int getContentLength() {
return contentLength;
}
+
+ public boolean isPageBlob() {
+ return isPageBlob;
+ }
}
/**
@@ -77,10 +84,13 @@ public class InMemoryBlockBlobStore {
ArrayList<ListBlobEntry> list = new ArrayList<ListBlobEntry>();
for (Map.Entry<String, Entry> entry : blobs.entrySet()) {
if (entry.getKey().startsWith(prefix)) {
- list.add(new ListBlobEntry(entry.getKey(),
- includeMetadata ? new HashMap<String, String>(
- entry.getValue().metadata) : null,
- entry.getValue().content.length));
+ list.add(new ListBlobEntry(
+ entry.getKey(),
+ includeMetadata ?
+ new HashMap<String, String>(entry.getValue().metadata) :
+ null,
+ entry.getValue().content.length,
+ entry.getValue().isPageBlob));
}
}
return list;
@@ -92,19 +102,49 @@ public class InMemoryBlockBlobStore {
@SuppressWarnings("unchecked")
public synchronized void setContent(String key, byte[] value,
+ HashMap<String, String> metadata, boolean isPageBlob,
+ long length) {
+ blobs.put(key, new Entry(value, (HashMap<String, String>)metadata.clone(),
+ isPageBlob, length));
+ }
+
+ @SuppressWarnings("unchecked")
+ public synchronized void setMetadata(String key,
HashMap<String, String> metadata) {
- blobs
- .put(key, new Entry(value, (HashMap<String, String>) metadata.clone()));
+ blobs.get(key).metadata = (HashMap<String, String>) metadata.clone();
}
- public OutputStream upload(final String key,
+ public OutputStream uploadBlockBlob(final String key,
final HashMap<String, String> metadata) {
- setContent(key, new byte[0], metadata);
+ setContent(key, new byte[0], metadata, false, 0);
+ return new ByteArrayOutputStream() {
+ @Override
+ public void flush()
+ throws IOException {
+ super.flush();
+ byte[] tempBytes = toByteArray();
+ setContent(key, tempBytes, metadata, false, tempBytes.length);
+ }
+ @Override
+ public void close()
+ throws IOException {
+ super.close();
+ byte[] tempBytes = toByteArray();
+ setContent(key, tempBytes, metadata, false, tempBytes.length);
+ }
+ };
+ }
+
+ public OutputStream uploadPageBlob(final String key,
+ final HashMap<String, String> metadata,
+ final long length) {
+ setContent(key, new byte[0], metadata, true, length);
return new ByteArrayOutputStream() {
@Override
- public void flush() throws IOException {
+ public void flush()
+ throws IOException {
super.flush();
- setContent(key, toByteArray(), metadata);
+ setContent(key, toByteArray(), metadata, true, length);
}
};
}
@@ -137,10 +177,16 @@ public class InMemoryBlockBlobStore {
private static class Entry {
private byte[] content;
private HashMap<String, String> metadata;
+ private boolean isPageBlob;
+ @SuppressWarnings("unused") // TODO: use it
+ private long length;
- public Entry(byte[] content, HashMap<String, String> metadata) {
+ public Entry(byte[] content, HashMap<String, String> metadata,
+ boolean isPageBlob, long length) {
this.content = content;
this.metadata = metadata;
+ this.isPageBlob = isPageBlob;
+ this.length = length;
}
}
}