You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by st...@apache.org on 2017/09/11 13:16:50 UTC
[2/2] hadoop git commit: HADOOP-14520. WASB: Block compaction for
Azure Block Blobs. Contributed by Georgi Chalakov
HADOOP-14520. WASB: Block compaction for Azure Block Blobs.
Contributed by Georgi Chalakov
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/87af3f49
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/87af3f49
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/87af3f49
Branch: refs/heads/branch-2
Commit: 87af3f4991a553d34735b4dc9e052adc421edd01
Parents: 416a440
Author: Steve Loughran <st...@apache.org>
Authored: Mon Sep 11 14:15:58 2017 +0100
Committer: Steve Loughran <st...@apache.org>
Committed: Mon Sep 11 14:15:58 2017 +0100
----------------------------------------------------------------------
.../fs/azure/AzureNativeFileSystemStore.java | 73 +-
.../hadoop/fs/azure/BlockBlobAppendStream.java | 1312 +++++++++++-------
.../hadoop/fs/azure/NativeAzureFileSystem.java | 86 +-
.../hadoop/fs/azure/NativeFileSystemStore.java | 5 +-
.../fs/azure/SecureStorageInterfaceImpl.java | 10 +-
.../hadoop/fs/azure/SelfRenewingLease.java | 10 +-
.../hadoop/fs/azure/StorageInterface.java | 3 +-
.../hadoop/fs/azure/StorageInterfaceImpl.java | 12 +-
.../fs/azure/SyncableDataOutputStream.java | 13 +-
.../hadoop-azure/src/site/markdown/index.md | 34 +
.../hadoop/fs/azure/MockStorageInterface.java | 3 +-
.../azure/TestAzureConcurrentOutOfBandIo.java | 6 +-
...estNativeAzureFileSystemBlockCompaction.java | 266 ++++
13 files changed, 1293 insertions(+), 540 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/87af3f49/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java
index 038d160..ae9fa6c 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java
@@ -203,6 +203,23 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
private Set<String> pageBlobDirs;
/**
+ * Configuration key to indicate the set of directories in WASB where we
+ * should store files as block blobs with block compaction enabled.
+ *
+ * Entries can be directory paths relative to the container (e.g. "/path") or
+ * fully qualified wasb:// URIs (e.g.
+ * wasb://container@example.blob.core.windows.net/path)
+ */
+ public static final String KEY_BLOCK_BLOB_WITH_COMPACTION_DIRECTORIES =
+ "fs.azure.block.blob.with.compaction.dir";
+
+ /**
+ * The set of directories where we should store files as block blobs with
+ * block compaction enabled.
+ */
+ private Set<String> blockBlobWithCompationDirs;
+
+ /**
* Configuration key to indicate the set of directories in WASB where
* we should do atomic folder rename synchronized with createNonRecursive.
*/
@@ -527,6 +544,12 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
// User-agent
userAgentId = conf.get(USER_AGENT_ID_KEY, USER_AGENT_ID_DEFAULT);
+ // Extract the directories that should contain block blobs with compaction
+ blockBlobWithCompationDirs = getDirectorySet(
+ KEY_BLOCK_BLOB_WITH_COMPACTION_DIRECTORIES);
+ LOG.debug("Block blobs with compaction directories: {}",
+ setToString(blockBlobWithCompationDirs));
+
// Extract directories that should have atomic rename applied.
atomicRenameDirs = getDirectorySet(KEY_ATOMIC_RENAME_DIRECTORIES);
String hbaseRoot;
@@ -1165,6 +1188,17 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
}
/**
+ * Checks if the given key in Azure Storage should be stored as a block blobs
+ * with compaction enabled instead of normal block blob.
+ *
+ * @param key blob name
+ * @return true, if the file is in directory with block compaction enabled.
+ */
+ public boolean isBlockBlobWithCompactionKey(String key) {
+ return isKeyForDirectorySet(key, blockBlobWithCompationDirs);
+ }
+
+ /**
* Checks if the given key in Azure storage should have synchronized
* atomic folder rename createNonRecursive implemented.
*/
@@ -1356,7 +1390,9 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
}
@Override
- public DataOutputStream storefile(String key, PermissionStatus permissionStatus)
+ public DataOutputStream storefile(String keyEncoded,
+ PermissionStatus permissionStatus,
+ String key)
throws AzureException {
try {
@@ -1417,12 +1453,26 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
// Get the blob reference from the store's container and
// return it.
- CloudBlobWrapper blob = getBlobReference(key);
+ CloudBlobWrapper blob = getBlobReference(keyEncoded);
storePermissionStatus(blob, permissionStatus);
// Create the output stream for the Azure blob.
//
- OutputStream outputStream = openOutputStream(blob);
+ OutputStream outputStream;
+
+ if (isBlockBlobWithCompactionKey(key)) {
+ BlockBlobAppendStream blockBlobOutputStream = new BlockBlobAppendStream(
+ (CloudBlockBlobWrapper) blob,
+ keyEncoded,
+ this.uploadBlockSizeBytes,
+ true,
+ getInstrumentedContext());
+
+ outputStream = blockBlobOutputStream;
+ } else {
+ outputStream = openOutputStream(blob);
+ }
+
DataOutputStream dataOutStream = new SyncableDataOutputStream(outputStream);
return dataOutStream;
} catch (Exception e) {
@@ -2863,10 +2913,21 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
CloudBlobWrapper blob = this.container.getBlockBlobReference(key);
- BlockBlobAppendStream appendStream = new BlockBlobAppendStream((CloudBlockBlobWrapper) blob, key, bufferSize, getInstrumentedContext());
- appendStream.initialize();
+ OutputStream outputStream;
+
+ BlockBlobAppendStream blockBlobOutputStream = new BlockBlobAppendStream(
+ (CloudBlockBlobWrapper) blob,
+ key,
+ bufferSize,
+ isBlockBlobWithCompactionKey(key),
+ getInstrumentedContext());
+
+ outputStream = blockBlobOutputStream;
+
+ DataOutputStream dataOutStream = new SyncableDataOutputStream(
+ outputStream);
- return new DataOutputStream(appendStream);
+ return dataOutStream;
} catch(Exception ex) {
throw new AzureException(ex);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/87af3f49/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/BlockBlobAppendStream.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/BlockBlobAppendStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/BlockBlobAppendStream.java
index e419a3b..eaada1b 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/BlockBlobAppendStream.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/BlockBlobAppendStream.java
@@ -22,122 +22,256 @@ import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
+import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
-import java.util.Calendar;
-import java.util.HashMap;
-import java.util.Locale;
+import java.util.Iterator;
import java.util.List;
import java.util.UUID;
import java.util.Random;
-import java.util.TimeZone;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.fs.FSExceptionMessages;
import org.apache.commons.codec.binary.Base64;
+import org.apache.hadoop.fs.StreamCapabilities;
+import org.apache.hadoop.fs.Syncable;
import org.apache.hadoop.fs.azure.StorageInterface.CloudBlockBlobWrapper;
-import org.mortbay.log.Log;
+import org.apache.hadoop.io.ElasticByteBufferPool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.microsoft.azure.storage.AccessCondition;
import com.microsoft.azure.storage.OperationContext;
import com.microsoft.azure.storage.StorageException;
+import com.microsoft.azure.storage.StorageErrorCodeStrings;
import com.microsoft.azure.storage.blob.BlobRequestOptions;
import com.microsoft.azure.storage.blob.BlockEntry;
import com.microsoft.azure.storage.blob.BlockListingFilter;
+import com.microsoft.azure.storage.blob.BlockSearchMode;
+
+import static org.apache.hadoop.fs.StreamCapabilities.StreamCapability.HFLUSH;
+import static org.apache.hadoop.fs.StreamCapabilities.StreamCapability.HSYNC;
/**
- * Stream object that implememnts append for Block Blobs in WASB.
+ * Stream object that implements append for Block Blobs in WASB.
+ *
+ * The stream object implements hflush/hsync and block compaction. Block
+ * compaction is the process of replacing a sequence of small blocks with one
+ * big block. Azure Block blobs supports up to 50000 blocks and every
+ * hflush/hsync generates one block. When the number of blocks is above 32000,
+ * the process of compaction decreases the total number of blocks, if possible.
+ * If compaction is disabled, hflush/hsync are empty functions.
+ *
+ * The stream object uses background threads for uploading the blocks and the
+ * block blob list. Blocks can be uploaded concurrently. However, when the block
+ * list is uploaded, block uploading should stop. If a block is uploaded before
+ * the block list and the block id is not in the list, the block will be lost.
+ * If the block is uploaded after the block list and the block id is in the
+ * list, the block list upload will fail. The exclusive access for the block
+ * list upload is managed by uploadingSemaphore.
*/
-public class BlockBlobAppendStream extends OutputStream {
+public class BlockBlobAppendStream extends OutputStream implements Syncable,
+ StreamCapabilities {
+
+ /**
+ * The name of the blob/file.
+ */
private final String key;
- private final int bufferSize;
- private ByteArrayOutputStream outBuffer;
- private final CloudBlockBlobWrapper blob;
- private final OperationContext opContext;
/**
- * Variable to track if the stream has been closed.
+ * This variable tracks if this is new blob or existing one.
*/
- private boolean closed = false;
+ private boolean blobExist;
/**
- * Variable to track if the append lease is released.
+ * When the blob exist, to to prevent concurrent write we take a lease.
+ * Taking a lease is not necessary for new blobs.
*/
+ private SelfRenewingLease lease = null;
- private volatile boolean leaseFreed;
+ /**
+ * The support for process of compaction is optional.
+ */
+ private final boolean compactionEnabled;
/**
- * Variable to track if the append stream has been
- * initialized.
+ * The number of blocks above each block compaction is triggered.
*/
+ private static final int DEFAULT_ACTIVATE_COMPACTION_BLOCK_COUNT = 32000;
- private boolean initialized = false;
+ /**
+ * The number of blocks above each block compaction is triggered.
+ */
+ private int activateCompactionBlockCount
+ = DEFAULT_ACTIVATE_COMPACTION_BLOCK_COUNT;
/**
- * Last IOException encountered
+ * The size of the output buffer. Writes store the data in outBuffer until
+ * either the size is above maxBlockSize or hflush/hsync is called.
*/
- private volatile IOException lastError = null;
+ private final AtomicInteger maxBlockSize;
/**
- * List to keep track of the uncommitted azure storage
- * block ids
+ * The current buffer where writes are stored.
*/
- private final List<BlockEntry> uncommittedBlockEntries;
+ private ByteBuffer outBuffer;
- private static final int UNSET_BLOCKS_COUNT = -1;
+ /**
+ * The size of the blob that has been successfully stored in the Azure Blob
+ * service.
+ */
+ private final AtomicLong committedBlobLength = new AtomicLong(0);
/**
- * Variable to hold the next block id to be used for azure
- * storage blocks.
+ * Position of last block in the blob.
*/
- private long nextBlockCount = UNSET_BLOCKS_COUNT;
+ private volatile long blobLength = 0;
/**
- * Variable to hold the block id prefix to be used for azure
- * storage blocks from azure-storage-java sdk version 4.2.0 onwards
+ * Minutes waiting before the close operation timed out.
*/
- private String blockIdPrefix = null;
+ private static final int CLOSE_UPLOAD_DELAY = 10;
- private final Random sequenceGenerator = new Random();
+ /**
+ * Keep alive time for the threadpool.
+ */
+ private static final int THREADPOOL_KEEP_ALIVE = 30;
+ /**
+ * Azure Block Blob used for the stream.
+ */
+ private final CloudBlockBlobWrapper blob;
+
+ /**
+ * Azure Storage operation context.
+ */
+ private final OperationContext opContext;
/**
- * Time to wait to renew lease in milliseconds
+ * Commands send from client calls to the background thread pool.
*/
- private static final int LEASE_RENEWAL_PERIOD = 10000;
+ private abstract class UploadCommand {
+
+ // the blob offset for the command
+ private final long commandBlobOffset;
+
+ // command completion latch
+ private final CountDownLatch completed = new CountDownLatch(1);
+
+ UploadCommand(long offset) {
+ this.commandBlobOffset = offset;
+ }
+
+ long getCommandBlobOffset() {
+ return commandBlobOffset;
+ }
+
+ void await() throws InterruptedException {
+ completed.await();
+ }
+
+ void awaitAsDependent() throws InterruptedException {
+ await();
+ }
+
+ void setCompleted() {
+ completed.countDown();
+ }
+
+ void execute() throws InterruptedException, IOException {}
+
+ void dump() {}
+ }
/**
- * Number of times to retry for lease renewal
+ * The list of recent commands. Before block list is committed, all the block
+ * listed in the list must be uploaded. activeBlockCommands is used for
+ * enumerating the blocks and waiting on the latch until the block is
+ * uploaded.
*/
- private static final int MAX_LEASE_RENEWAL_RETRY_COUNT = 3;
+ private final ConcurrentLinkedQueue<UploadCommand> activeBlockCommands
+ = new ConcurrentLinkedQueue<>();
/**
- * Time to wait before retrying to set the lease
+ * Variable to track if the stream has been closed.
*/
- private static final int LEASE_RENEWAL_RETRY_SLEEP_PERIOD = 500;
+ private volatile boolean closed = false;
/**
- * Metadata key used on the blob to indicate append lease is active
+ * First IOException encountered.
*/
- public static final String APPEND_LEASE = "append_lease";
+ private final AtomicReference<IOException> firstError
+ = new AtomicReference<>();
/**
- * Timeout value for the append lease in millisecs. If the lease is not
- * renewed within 30 seconds then another thread can acquire the append lease
- * on the blob
+ * Flag set when the first error has been thrown.
*/
- public static final int APPEND_LEASE_TIMEOUT = 30000;
+ private boolean firstErrorThrown = false;
/**
- * Metdata key used on the blob to indicate last modified time of append lease
+ * Semaphore for serializing block uploads with NativeAzureFileSystem.
+ *
+ * The semaphore starts with number of permits equal to the number of block
+ * upload threads. Each block upload thread needs one permit to start the
+ * upload. The put block list acquires all the permits before the block list
+ * is committed.
*/
- public static final String APPEND_LEASE_LAST_MODIFIED = "append_lease_last_modified";
+ private final Semaphore uploadingSemaphore = new Semaphore(
+ MAX_NUMBER_THREADS_IN_THREAD_POOL,
+ true);
+
+ /**
+ * Queue storing buffers with the size of the Azure block ready for
+ * reuse. The pool allows reusing the blocks instead of allocating new
+ * blocks. After the data is sent to the service, the buffer is returned
+ * back to the queue
+ */
+ private final ElasticByteBufferPool poolReadyByteBuffers
+ = new ElasticByteBufferPool();
+
+ /**
+ * The blob's block list.
+ */
+ private final List<BlockEntry> blockEntries = new ArrayList<>(
+ DEFAULT_CAPACITY_BLOCK_ENTRIES);
+
+ private static final int DEFAULT_CAPACITY_BLOCK_ENTRIES = 1024;
+
+ /**
+ * The uncommitted blob's block list.
+ */
+ private final ConcurrentLinkedDeque<BlockEntry> uncommittedBlockEntries
+ = new ConcurrentLinkedDeque<>();
+
+ /**
+ * Variable to hold the next block id to be used for azure storage blocks.
+ */
+ private static final int UNSET_BLOCKS_COUNT = -1;
+ private long nextBlockCount = UNSET_BLOCKS_COUNT;
+
+ /**
+ * Variable to hold the block id prefix to be used for azure storage blocks.
+ */
+ private String blockIdPrefix = null;
+
+ /**
+ * Maximum number of threads in block upload thread pool.
+ */
+ private static final int MAX_NUMBER_THREADS_IN_THREAD_POOL = 4;
/**
* Number of times block upload needs is retried.
@@ -145,17 +279,33 @@ public class BlockBlobAppendStream extends OutputStream {
private static final int MAX_BLOCK_UPLOAD_RETRIES = 3;
/**
- * Wait time between block upload retries in millisecs.
+ * Wait time between block upload retries in milliseconds.
*/
private static final int BLOCK_UPLOAD_RETRY_INTERVAL = 1000;
- private static final Logger LOG = LoggerFactory.getLogger(BlockBlobAppendStream.class);
+ /**
+ * Logger.
+ */
+ private static final Logger LOG =
+ LoggerFactory.getLogger(BlockBlobAppendStream.class);
+ /**
+ * The absolute maximum of blocks for a blob. It includes committed and
+ * temporary blocks.
+ */
private static final int MAX_BLOCK_COUNT = 100000;
+ /**
+ * The upload thread pool executor.
+ */
private ThreadPoolExecutor ioThreadPool;
/**
+ * Azure Storage access conditions for the blob.
+ */
+ private final AccessCondition accessCondition = new AccessCondition();
+
+ /**
* Atomic integer to provide thread id for thread names for uploader threads.
*/
private final AtomicInteger threadSequenceNumber;
@@ -163,106 +313,123 @@ public class BlockBlobAppendStream extends OutputStream {
/**
* Prefix to be used for thread names for uploader threads.
*/
- private static final String THREAD_ID_PREFIX = "BlockBlobAppendStream";
-
- private static final String UTC_STR = "UTC";
+ private static final String THREAD_ID_PREFIX = "append-blockblob";
+ /**
+ * BlockBlobAppendStream constructor.
+ *
+ * @param blob
+ * Azure Block Blob
+ * @param aKey
+ * blob's name
+ * @param bufferSize
+ * the maximum size of a blob block.
+ * @param compactionEnabled
+ * is the compaction process enabled for this blob
+ * @param opContext
+ * Azure Store operation context for the blob
+ * @throws IOException
+ * if an I/O error occurs. In particular, an IOException may be
+ * thrown if the output stream cannot be used for append operations
+ */
public BlockBlobAppendStream(final CloudBlockBlobWrapper blob,
- final String aKey, final int bufferSize, final OperationContext opContext)
+ final String aKey,
+ final int bufferSize,
+ final boolean compactionEnabled,
+ final OperationContext opContext)
throws IOException {
- if (null == aKey || 0 == aKey.length()) {
- throw new IllegalArgumentException(
- "Illegal argument: The key string is null or empty");
- }
-
- if (0 >= bufferSize) {
- throw new IllegalArgumentException(
- "Illegal argument bufferSize cannot be zero or negative");
- }
-
+ Preconditions.checkArgument(StringUtils.isNotEmpty(aKey));
+ Preconditions.checkArgument(bufferSize >= 0);
this.blob = blob;
this.opContext = opContext;
this.key = aKey;
- this.bufferSize = bufferSize;
+ this.maxBlockSize = new AtomicInteger(bufferSize);
this.threadSequenceNumber = new AtomicInteger(0);
this.blockIdPrefix = null;
- setBlocksCountAndBlockIdPrefix();
-
- this.outBuffer = new ByteArrayOutputStream(bufferSize);
- this.uncommittedBlockEntries = new ArrayList<BlockEntry>();
+ this.compactionEnabled = compactionEnabled;
+ this.blobExist = true;
+ this.outBuffer = poolReadyByteBuffers.getBuffer(false, maxBlockSize.get());
- // Acquire append lease on the blob.
try {
- //Set the append lease if the value of the append lease is false
- if (!updateBlobAppendMetadata(true, false)) {
- LOG.error("Unable to set Append Lease on the Blob : {} "
- + "Possibly because another client already has a create or append stream open on the Blob", key);
- throw new IOException("Unable to set Append lease on the Blob. "
- + "Possibly because another client already had an append stream open on the Blob.");
- }
- } catch (StorageException ex) {
- LOG.error("Encountered Storage exception while acquiring append "
- + "lease on blob : {}. Storage Exception : {} ErrorCode : {}",
- key, ex, ex.getErrorCode());
+ // download the block list
+ blockEntries.addAll(
+ blob.downloadBlockList(
+ BlockListingFilter.COMMITTED,
+ new BlobRequestOptions(),
+ opContext));
- throw new IOException(ex);
+ blobLength = blob.getProperties().getLength();
+
+ committedBlobLength.set(blobLength);
+
+ // Acquiring lease on the blob.
+ lease = new SelfRenewingLease(blob, true);
+ accessCondition.setLeaseID(lease.getLeaseID());
+
+ } catch (StorageException ex) {
+ if (ex.getErrorCode().equals(StorageErrorCodeStrings.BLOB_NOT_FOUND)) {
+ blobExist = false;
+ }
+ else if (ex.getErrorCode().equals(
+ StorageErrorCodeStrings.LEASE_ALREADY_PRESENT)) {
+ throw new AzureException(
+ "Unable to set Append lease on the Blob: " + ex, ex);
+ }
+ else {
+ LOG.debug(
+ "Encountered storage exception."
+ + " StorageException : {} ErrorCode : {}",
+ ex,
+ ex.getErrorCode());
+ throw new AzureException(ex);
+ }
}
- leaseFreed = false;
+ setBlocksCountAndBlockIdPrefix(blockEntries);
+
+ this.ioThreadPool = new ThreadPoolExecutor(
+ MAX_NUMBER_THREADS_IN_THREAD_POOL,
+ MAX_NUMBER_THREADS_IN_THREAD_POOL,
+ THREADPOOL_KEEP_ALIVE,
+ TimeUnit.SECONDS,
+ new LinkedBlockingQueue<Runnable>(),
+ new UploaderThreadFactory());
}
/**
- * Helper method that starts an Append Lease renewer thread and the
- * thread pool.
+ * Set payload size of the stream.
+ * It is intended to be used for unit testing purposes only.
*/
- public synchronized void initialize() {
+ @VisibleForTesting
+ synchronized void setMaxBlockSize(int size) {
+ maxBlockSize.set(size);
- if (initialized) {
- return;
- }
- /*
- * Start the thread for Append lease renewer.
- */
- Thread appendLeaseRenewer = new Thread(new AppendRenewer());
- appendLeaseRenewer.setDaemon(true);
- appendLeaseRenewer.setName(String.format("%s-AppendLeaseRenewer", key));
- appendLeaseRenewer.start();
-
- /*
- * Parameters to ThreadPoolExecutor:
- * corePoolSize : the number of threads to keep in the pool, even if they are idle,
- * unless allowCoreThreadTimeOut is set
- * maximumPoolSize : the maximum number of threads to allow in the pool
- * keepAliveTime - when the number of threads is greater than the core,
- * this is the maximum time that excess idle threads will
- * wait for new tasks before terminating.
- * unit - the time unit for the keepAliveTime argument
- * workQueue - the queue to use for holding tasks before they are executed
- * This queue will hold only the Runnable tasks submitted by the execute method.
- */
- this.ioThreadPool = new ThreadPoolExecutor(4, 4, 2, TimeUnit.SECONDS,
- new LinkedBlockingQueue<Runnable>(), new UploaderThreadFactory());
-
- initialized = true;
+ // it is for testing only so we can abandon the previously allocated
+ // payload
+ this.outBuffer = ByteBuffer.allocate(maxBlockSize.get());
}
/**
- * Get the blob name.
- *
- * @return String Blob name.
+ * Set compaction parameters.
+ * It is intended to be used for unit testing purposes only.
*/
- public String getKey() {
- return key;
+ @VisibleForTesting
+ void setCompactionBlockCount(int activationCount) {
+ activateCompactionBlockCount = activationCount;
}
/**
- * Get the backing blob.
- * @return buffer size of the stream.
+ * Get the list of block entries. It is used for testing purposes only.
+ * @return List of block entries.
*/
- public int getBufferSize() {
- return bufferSize;
+ @VisibleForTesting
+ List<BlockEntry> getBlockList() throws StorageException, IOException {
+ return blob.downloadBlockList(
+ BlockListingFilter.COMMITTED,
+ new BlobRequestOptions(),
+ opContext);
}
/**
@@ -283,21 +450,6 @@ public class BlockBlobAppendStream extends OutputStream {
}
/**
- * Writes b.length bytes from the specified byte array to this output stream.
- *
- * @param data
- * the byte array to write.
- *
- * @throws IOException
- * if an I/O error occurs. In particular, an IOException may be
- * thrown if the output stream has been closed.
- */
- @Override
- public void write(final byte[] data) throws IOException {
- write(data, 0, data.length);
- }
-
- /**
* Writes length bytes from the specified byte array starting at offset to
* this output stream.
*
@@ -312,529 +464,691 @@ public class BlockBlobAppendStream extends OutputStream {
* thrown if the output stream has been closed.
*/
@Override
- public void write(final byte[] data, final int offset, final int length)
+ public synchronized void write(final byte[] data, int offset, int length)
throws IOException {
+ Preconditions.checkArgument(data != null, "null data");
if (offset < 0 || length < 0 || length > data.length - offset) {
- throw new IndexOutOfBoundsException("write API in append stream called with invalid arguments");
- }
-
- writeInternal(data, offset, length);
- }
-
- @Override
- public synchronized void close() throws IOException {
-
- if (!initialized) {
- throw new IOException("Trying to close an uninitialized Append stream");
+ throw new IndexOutOfBoundsException();
}
if (closed) {
- return;
+ throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
}
- if (leaseFreed) {
- throw new IOException(String.format("Attempting to close an append stream on blob : %s "
- + " that does not have lease on the Blob. Failing close", key));
- }
+ while (outBuffer.remaining() < length) {
+
+ int remaining = outBuffer.remaining();
+ outBuffer.put(data, offset, remaining);
- if (outBuffer.size() > 0) {
- uploadBlockToStorage(outBuffer.toByteArray());
+ // upload payload to azure storage
+ addBlockUploadCommand();
+
+ offset += remaining;
+ length -= remaining;
}
- ioThreadPool.shutdown();
+ outBuffer.put(data, offset, length);
+ }
- try {
- if (!ioThreadPool.awaitTermination(10, TimeUnit.MINUTES)) {
- LOG.error("Time out occured while waiting for IO request to finish in append"
- + " for blob : {}", key);
- NativeAzureFileSystemHelper.logAllLiveStackTraces();
- throw new IOException("Timed out waiting for IO requests to finish");
- }
- } catch(InterruptedException intrEx) {
- // Restore the interrupted status
- Thread.currentThread().interrupt();
- LOG.error("Upload block operation in append interrupted for blob {}. Failing close", key);
- throw new IOException("Append Commit interrupted.");
- }
+ /**
+ * Flushes this output stream and forces any buffered output bytes to be
+ * written out. If any data remains in the payload it is committed to the
+ * service. Data is queued for writing and forced out to the service
+ * before the call returns.
+ */
+ @Override
+ public void flush() throws IOException {
- // Calling commit after all blocks are succesfully uploaded.
- if (lastError == null) {
- commitAppendBlocks();
+ if (closed) {
+ // calling close() after the stream is closed starts with call to flush()
+ return;
}
- // Perform cleanup.
- cleanup();
+ addBlockUploadCommand();
- if (lastError != null) {
- throw lastError;
+ if (committedBlobLength.get() < blobLength) {
+ try {
+ // wait until the block list is committed
+ addFlushCommand().await();
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ }
}
}
/**
- * Helper method that cleans up the append stream.
+ * Force all data in the output stream to be written to Azure storage.
+ * Wait to return until this is complete.
*/
- private synchronized void cleanup() {
-
- closed = true;
+ @Override
+ public void sync() throws IOException {
+ // when block compaction is disabled, hsync is empty function
+ if (compactionEnabled) {
+ flush();
+ }
+ }
- try {
- // Set the value of append lease to false if the value is set to true.
- updateBlobAppendMetadata(false, true);
- } catch(StorageException ex) {
- LOG.debug("Append metadata update on the Blob : {} encountered Storage Exception : {} "
- + "Error Code : {}",
- key, ex, ex.getErrorCode());
- lastError = new IOException(ex);
+ /**
+ * Force all data in the output stream to be written to Azure storage.
+ * Wait to return until this is complete.
+ */
+ @Override
+ public void hsync() throws IOException {
+ // when block compaction is disabled, hsync is empty function
+ if (compactionEnabled) {
+ flush();
}
+ }
- leaseFreed = true;
+ /**
+ * Force all data in the output stream to be written to Azure storage.
+ * Wait to return until this is complete.
+ */
+ @Override
+ public void hflush() throws IOException {
+ // when block compaction is disabled, hflush is empty function
+ if (compactionEnabled) {
+ flush();
+ }
}
/**
- * Method to commit all the uncommited blocks to azure storage.
- * If the commit fails then blocks are automatically cleaned up
- * by Azure storage.
- * @throws IOException
+ * The Synchronization capabilities of this stream depend upon the compaction
+ * policy.
+ * @param capability string to query the stream support for.
+ * @return true for hsync and hflush when compaction is enabled.
*/
- private synchronized void commitAppendBlocks() throws IOException {
+ @Override
+ public boolean hasCapability(String capability) {
+ return compactionEnabled
+ && (capability.equalsIgnoreCase(HSYNC.getValue())
+ || capability.equalsIgnoreCase((HFLUSH.getValue())));
+ }
- SelfRenewingLease lease = null;
+ /**
+ * Force all data in the output stream to be written to Azure storage.
+ * Wait to return until this is complete. Close the access to the stream and
+ * shutdown the upload thread pool.
+ * If the blob was created, its lease will be released.
+ * Any error encountered caught in threads and stored will be rethrown here
+ * after cleanup.
+ */
+ @Override
+ public synchronized void close() throws IOException {
- try {
- if (uncommittedBlockEntries.size() > 0) {
+ LOG.debug("close {} ", key);
- //Acquiring lease on the blob.
- lease = new SelfRenewingLease(blob);
+ if (closed) {
+ return;
+ }
- // Downloading existing blocks
- List<BlockEntry> blockEntries = blob.downloadBlockList(BlockListingFilter.COMMITTED,
- new BlobRequestOptions(), opContext);
+ // Upload the last block regardless of compactionEnabled flag
+ flush();
- // Adding uncommitted blocks.
- blockEntries.addAll(uncommittedBlockEntries);
+ // Initiates an orderly shutdown in which previously submitted tasks are
+ // executed.
+ ioThreadPool.shutdown();
- AccessCondition accessCondition = new AccessCondition();
- accessCondition.setLeaseID(lease.getLeaseID());
- blob.commitBlockList(blockEntries, accessCondition, new BlobRequestOptions(), opContext);
- uncommittedBlockEntries.clear();
+ try {
+ // wait up to CLOSE_UPLOAD_DELAY minutes to upload all the blocks
+ if (!ioThreadPool.awaitTermination(CLOSE_UPLOAD_DELAY, TimeUnit.MINUTES)) {
+ LOG.error("Time out occured while close() is waiting for IO request to"
+ + " finish in append"
+ + " for blob : {}",
+ key);
+ NativeAzureFileSystemHelper.logAllLiveStackTraces();
+ throw new AzureException("Timed out waiting for IO requests to finish");
}
- } catch(StorageException ex) {
- LOG.error("Storage exception encountered during block commit phase of append for blob"
- + " : {} Storage Exception : {} Error Code: {}", key, ex, ex.getErrorCode());
- throw new IOException("Encountered Exception while committing append blocks", ex);
- } finally {
- if (lease != null) {
+ } catch(InterruptedException ex) {
+ Thread.currentThread().interrupt();
+ }
+
+ // release the lease
+ if (firstError.get() == null && blobExist) {
try {
lease.free();
- } catch(StorageException ex) {
- LOG.debug("Exception encountered while releasing lease for "
- + "blob : {} StorageException : {} ErrorCode : {}", key, ex, ex.getErrorCode());
- // Swallowing exception here as the lease is cleaned up by the SelfRenewingLease object.
+ } catch (StorageException ex) {
+ LOG.debug("Lease free update blob {} encountered Storage Exception:"
+ + " {} Error Code : {}",
+ key,
+ ex,
+ ex.getErrorCode());
+ maybeSetFirstError(new AzureException(ex));
}
- }
+ }
+
+ closed = true;
+
+ // finally, throw the first exception raised if it has not
+ // been thrown elsewhere.
+ if (firstError.get() != null && !firstErrorThrown) {
+ throw firstError.get();
}
}
/**
- * Helper method used to generate the blockIDs. The algorithm used is similar to the Azure
- * storage SDK.
+ * Helper method used to generate the blockIDs. The algorithm used is similar
+ * to the Azure storage SDK.
*/
- private void setBlocksCountAndBlockIdPrefix() throws IOException {
-
- try {
+ private void setBlocksCountAndBlockIdPrefix(List<BlockEntry> blockEntries) {
- if (nextBlockCount == UNSET_BLOCKS_COUNT && blockIdPrefix==null) {
+ if (nextBlockCount == UNSET_BLOCKS_COUNT && blockIdPrefix == null) {
- List<BlockEntry> blockEntries =
- blob.downloadBlockList(BlockListingFilter.COMMITTED, new BlobRequestOptions(), opContext);
+ Random sequenceGenerator = new Random();
- String blockZeroBlockId = (blockEntries.size() > 0) ? blockEntries.get(0).getId() : "";
- String prefix = UUID.randomUUID().toString() + "-";
- String sampleNewerVersionBlockId = generateNewerVersionBlockId(prefix, 0);
+ String blockZeroBlockId = (!blockEntries.isEmpty())
+ ? blockEntries.get(0).getId()
+ : "";
+ String prefix = UUID.randomUUID().toString() + "-";
+ String sampleNewerVersionBlockId = generateNewerVersionBlockId(prefix,
+ 0);
- if (blockEntries.size() > 0 && blockZeroBlockId.length() < sampleNewerVersionBlockId.length()) {
-
- // If blob has already been created with 2.2.0, append subsequent blocks with older version (2.2.0) blockId
- // compute nextBlockCount, the way it was done before; and don't use blockIdPrefix
- this.blockIdPrefix = "";
- nextBlockCount = (long) (sequenceGenerator.nextInt(Integer.MAX_VALUE))
- + sequenceGenerator.nextInt(Integer.MAX_VALUE - MAX_BLOCK_COUNT);
- nextBlockCount += blockEntries.size();
-
- } else {
+ if (!blockEntries.isEmpty()
+ && blockZeroBlockId.length() < sampleNewerVersionBlockId.length()) {
- // If there are no existing blocks, create the first block with newer version (4.2.0) blockId
- // If blob has already been created with 4.2.0, append subsequent blocks with newer version (4.2.0) blockId
- this.blockIdPrefix = prefix;
- nextBlockCount = blockEntries.size();
+ // If blob has already been created with 2.2.0, append subsequent blocks
+ // with older version (2.2.0) blockId compute nextBlockCount, the way it
+ // was done before; and don't use blockIdPrefix
+ this.blockIdPrefix = "";
+ nextBlockCount = (long) (sequenceGenerator.nextInt(Integer.MAX_VALUE))
+ + sequenceGenerator.nextInt(
+ Integer.MAX_VALUE - MAX_BLOCK_COUNT);
+ nextBlockCount += blockEntries.size();
- }
+ } else {
+ // If there are no existing blocks, create the first block with newer
+ // version (4.2.0) blockId. If blob has already been created with 4.2.0,
+ // append subsequent blocks with newer version (4.2.0) blockId
+ this.blockIdPrefix = prefix;
+ nextBlockCount = blockEntries.size();
}
-
- } catch (StorageException ex) {
- LOG.debug("Encountered storage exception during setting next Block Count and BlockId prefix."
- + " StorageException : {} ErrorCode : {}", ex, ex.getErrorCode());
- throw new IOException(ex);
}
}
/**
- * Helper method that generates the next block id for uploading a block to azure storage.
+ * Helper method that generates the next block id for uploading a block to
+ * azure storage.
* @return String representing the block ID generated.
- * @throws IOException
+ * @throws IOException if the stream is in invalid state
*/
private String generateBlockId() throws IOException {
- if (nextBlockCount == UNSET_BLOCKS_COUNT) {
- throw new IOException("Append Stream in invalid state. nextBlockCount not set correctly");
- }
-
- if (this.blockIdPrefix == null) {
- throw new IOException("Append Stream in invalid state. blockIdPrefix not set correctly");
- }
-
- if (!this.blockIdPrefix.equals("")) {
-
- return generateNewerVersionBlockId(this.blockIdPrefix, nextBlockCount++);
-
- } else {
-
- return generateOlderVersionBlockId(nextBlockCount++);
-
+ if (nextBlockCount == UNSET_BLOCKS_COUNT || blockIdPrefix == null) {
+ throw new AzureException(
+ "Append Stream in invalid state. nextBlockCount not set correctly");
}
+ return (!blockIdPrefix.isEmpty())
+ ? generateNewerVersionBlockId(blockIdPrefix, nextBlockCount++)
+ : generateOlderVersionBlockId(nextBlockCount++);
}
/**
- * Helper method that generates an older (2.2.0) version blockId
+ * Helper method that generates an older (2.2.0) version blockId.
* @return String representing the block ID generated.
*/
private String generateOlderVersionBlockId(long id) {
- byte[] blockIdInBytes = getBytesFromLong(id);
- return new String(Base64.encodeBase64(blockIdInBytes), StandardCharsets.UTF_8);
+ byte[] blockIdInBytes = new byte[8];
+ for (int m = 0; m < 8; m++) {
+ blockIdInBytes[7 - m] = (byte) ((id >> (8 * m)) & 0xFF);
+ }
+
+ return new String(
+ Base64.encodeBase64(blockIdInBytes),
+ StandardCharsets.UTF_8);
}
/**
- * Helper method that generates an newer (4.2.0) version blockId
+ * Helper method that generates an newer (4.2.0) version blockId.
* @return String representing the block ID generated.
*/
private String generateNewerVersionBlockId(String prefix, long id) {
String blockIdSuffix = String.format("%06d", id);
- byte[] blockIdInBytes = (prefix + blockIdSuffix).getBytes(StandardCharsets.UTF_8);
+ byte[] blockIdInBytes =
+ (prefix + blockIdSuffix).getBytes(StandardCharsets.UTF_8);
return new String(Base64.encodeBase64(blockIdInBytes), StandardCharsets.UTF_8);
}
/**
- * Returns a byte array that represents the data of a <code>long</code> value. This
- * utility method is copied from com.microsoft.azure.storage.core.Utility class.
- * This class is marked as internal, hence we clone the method here and not express
- * dependency on the Utility Class
- *
- * @param value
- * The value from which the byte array will be returned.
- *
- * @return A byte array that represents the data of the specified <code>long</code> value.
+ * This is shared between upload block Runnable and CommitBlockList. The
+ * method captures retry logic
+ * @param blockId block name
+ * @param dataPayload block content
*/
- private static byte[] getBytesFromLong(final long value) {
+ private void writeBlockRequestInternal(String blockId,
+ ByteBuffer dataPayload,
+ boolean bufferPoolBuffer) {
+ IOException lastLocalException = null;
+
+ int uploadRetryAttempts = 0;
+ while (uploadRetryAttempts < MAX_BLOCK_UPLOAD_RETRIES) {
+ try {
+ long startTime = System.nanoTime();
+
+ blob.uploadBlock(blockId, accessCondition, new ByteArrayInputStream(
+ dataPayload.array()), dataPayload.position(),
+ new BlobRequestOptions(), opContext);
- final byte[] tempArray = new byte[8];
+ LOG.debug("upload block finished for {} ms. block {} ",
+ TimeUnit.NANOSECONDS.toMillis(
+ System.nanoTime() - startTime), blockId);
+ break;
+
+ } catch(Exception ioe) {
+ LOG.debug("Encountered exception during uploading block for Blob {}"
+ + " Exception : {}", key, ioe);
+ uploadRetryAttempts++;
+ lastLocalException = new AzureException(
+ "Encountered Exception while uploading block: " + ioe, ioe);
+ try {
+ Thread.sleep(
+ BLOCK_UPLOAD_RETRY_INTERVAL * (uploadRetryAttempts + 1));
+ } catch(InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ break;
+ }
+ }
+ }
- for (int m = 0; m < 8; m++) {
- tempArray[7 - m] = (byte) ((value >> (8 * m)) & 0xFF);
+ if (bufferPoolBuffer) {
+ poolReadyByteBuffers.putBuffer(dataPayload);
}
- return tempArray;
+ if (uploadRetryAttempts == MAX_BLOCK_UPLOAD_RETRIES) {
+ maybeSetFirstError(lastLocalException);
+ }
}
/**
- * Helper method that creates a thread to upload a block to azure storage.
- * @param payload
- * @throws IOException
+ * Set {@link #firstError} to the exception if it is not already set.
+ * @param exception exception to save
*/
- private synchronized void uploadBlockToStorage(byte[] payload)
- throws IOException {
-
- // upload payload to azure storage
- String blockId = generateBlockId();
-
- // Since uploads of the Azure storage are done in parallel threads, we go ahead
- // add the blockId in the uncommitted list. If the upload of the block fails
- // we don't commit the blockIds.
- BlockEntry blockEntry = new BlockEntry(blockId);
- blockEntry.setSize(payload.length);
- uncommittedBlockEntries.add(blockEntry);
- ioThreadPool.execute(new WriteRequest(payload, blockId));
+ private void maybeSetFirstError(IOException exception) {
+ firstError.compareAndSet(null, exception);
}
/**
- * Helper method to updated the Blob metadata during Append lease operations.
- * Blob metadata is updated to holdLease value only if the current lease
- * status is equal to testCondition and the last update on the blob metadata
- * is less that 30 secs old.
- * @param holdLease
- * @param testCondition
- * @return true if the updated lease operation was successful or false otherwise
- * @throws StorageException
+ * Throw the first error caught if it has not been raised already
+ * @throws IOException if one is caught and needs to be thrown.
*/
- private boolean updateBlobAppendMetadata(boolean holdLease, boolean testCondition)
- throws StorageException {
-
- SelfRenewingLease lease = null;
- StorageException lastStorageException = null;
- int leaseRenewalRetryCount = 0;
-
- /*
- * Updating the Blob metadata honours following algorithm based on
- * 1) If the append lease metadata is present
- * 2) Last updated time of the append lease
- * 3) Previous value of the Append lease metadata.
- *
- * The algorithm:
- * 1) If append lease metadata is not part of the Blob. In this case
- * this is the first client to Append so we update the metadata.
- * 2) If append lease metadata is present and timeout has occurred.
- * In this case irrespective of what the value of the append lease is we update the metadata.
- * 3) If append lease metadata is present and is equal to testCondition value (passed as parameter)
- * and timeout has not occurred, we update the metadata.
- * 4) If append lease metadata is present and is not equal to testCondition value (passed as parameter)
- * and timeout has not occurred, we do not update metadata and return false.
- *
- */
- while (leaseRenewalRetryCount < MAX_LEASE_RENEWAL_RETRY_COUNT) {
-
- lastStorageException = null;
-
- synchronized(this) {
- try {
-
- final Calendar currentCalendar = Calendar
- .getInstance(Locale.US);
- currentCalendar.setTimeZone(TimeZone.getTimeZone(UTC_STR));
- long currentTime = currentCalendar.getTime().getTime();
-
- // Acquire lease on the blob.
- lease = new SelfRenewingLease(blob);
+ private void maybeThrowFirstError() throws IOException {
+ if (firstError.get() != null) {
+ firstErrorThrown = true;
+ throw firstError.get();
+ }
+ }
- blob.downloadAttributes(opContext);
- HashMap<String, String> metadata = blob.getMetadata();
+ /**
+ * Write block list. The method captures retry logic
+ */
+ private void writeBlockListRequestInternal() {
- if (metadata.containsKey(APPEND_LEASE)
- && currentTime - Long.parseLong(
- metadata.get(APPEND_LEASE_LAST_MODIFIED)) <= BlockBlobAppendStream.APPEND_LEASE_TIMEOUT
- && !metadata.get(APPEND_LEASE).equals(Boolean.toString(testCondition))) {
- return false;
- }
+ IOException lastLocalException = null;
- metadata.put(APPEND_LEASE, Boolean.toString(holdLease));
- metadata.put(APPEND_LEASE_LAST_MODIFIED, Long.toString(currentTime));
- blob.setMetadata(metadata);
- AccessCondition accessCondition = new AccessCondition();
- accessCondition.setLeaseID(lease.getLeaseID());
- blob.uploadMetadata(accessCondition, null, opContext);
- return true;
+ int uploadRetryAttempts = 0;
+ while (uploadRetryAttempts < MAX_BLOCK_UPLOAD_RETRIES) {
+ try {
- } catch (StorageException ex) {
+ long startTime = System.nanoTime();
- lastStorageException = ex;
- LOG.debug("Lease renewal for Blob : {} encountered Storage Exception : {} "
- + "Error Code : {}",
- key, ex, ex.getErrorCode());
- leaseRenewalRetryCount++;
-
- } finally {
-
- if (lease != null) {
- try {
- lease.free();
- } catch(StorageException ex) {
- LOG.debug("Encountered Storage exception while releasing lease for Blob {} "
- + "during Append metadata operation. Storage Exception {} "
- + "Error Code : {} ", key, ex, ex.getErrorCode());
- } finally {
- lease = null;
- }
- }
- }
- }
+ blob.commitBlockList(blockEntries, accessCondition,
+ new BlobRequestOptions(), opContext);
- if (leaseRenewalRetryCount == MAX_LEASE_RENEWAL_RETRY_COUNT) {
- throw lastStorageException;
- } else {
+ LOG.debug("Upload block list took {} ms for blob {} ",
+ TimeUnit.NANOSECONDS.toMillis(
+ System.nanoTime() - startTime), key);
+ break;
+
+ } catch(Exception ioe) {
+ LOG.debug("Encountered exception during uploading block for Blob {}"
+ + " Exception : {}", key, ioe);
+ uploadRetryAttempts++;
+ lastLocalException = new AzureException(
+ "Encountered Exception while uploading block: " + ioe, ioe);
try {
- Thread.sleep(LEASE_RENEWAL_RETRY_SLEEP_PERIOD);
- } catch(InterruptedException ex) {
- LOG.debug("Blob append metadata updated method interrupted");
+ Thread.sleep(
+ BLOCK_UPLOAD_RETRY_INTERVAL * (uploadRetryAttempts + 1));
+ } catch(InterruptedException ie) {
Thread.currentThread().interrupt();
+ break;
}
}
}
- // The code should not enter here because the while loop will
- // always be executed and if the while loop is executed we
- // would returning from the while loop.
- return false;
+ if (uploadRetryAttempts == MAX_BLOCK_UPLOAD_RETRIES) {
+ maybeSetFirstError(lastLocalException);
+ }
}
/**
- * This is the only method that should be writing to outBuffer to maintain consistency of the outBuffer.
- * @param data
- * @param offset
- * @param length
- * @throws IOException
+ * A ThreadFactory that creates uploader thread with
+ * meaningful names helpful for debugging purposes.
*/
- private synchronized void writeInternal(final byte[] data, final int offset, final int length)
- throws IOException {
+ class UploaderThreadFactory implements ThreadFactory {
- if (!initialized) {
- throw new IOException("Trying to write to an un-initialized Append stream");
+ @Override
+ public Thread newThread(Runnable r) {
+ Thread t = new Thread(r);
+ t.setName(String.format("%s-%d", THREAD_ID_PREFIX,
+ threadSequenceNumber.getAndIncrement()));
+ return t;
}
+ }
- if (closed) {
- throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
- }
+ /**
+ * Upload block commands.
+ */
+ private class UploadBlockCommand extends UploadCommand {
- if (leaseFreed) {
- throw new IOException(String.format("Write called on a append stream not holding lease. Failing Write"));
- }
+ // the block content for upload
+ private final ByteBuffer payload;
- byte[] currentData = new byte[length];
- System.arraycopy(data, offset, currentData, 0, length);
+ // description of the block
+ private final BlockEntry entry;
- // check to see if the data to be appended exceeds the
- // buffer size. If so we upload a block to azure storage.
- while ((outBuffer.size() + currentData.length) > bufferSize) {
+ UploadBlockCommand(String blockId, ByteBuffer payload) {
- byte[] payload = new byte[bufferSize];
+ super(blobLength);
- // Add data from the existing buffer
- System.arraycopy(outBuffer.toByteArray(), 0, payload, 0, outBuffer.size());
+ BlockEntry blockEntry = new BlockEntry(blockId);
+ blockEntry.setSize(payload.position());
+ blockEntry.setSearchMode(BlockSearchMode.LATEST);
- // Updating the available size in the payload
- int availableSpaceInPayload = bufferSize - outBuffer.size();
+ this.payload = payload;
+ this.entry = blockEntry;
- // Adding data from the current call
- System.arraycopy(currentData, 0, payload, outBuffer.size(), availableSpaceInPayload);
+ uncommittedBlockEntries.add(blockEntry);
+ }
- uploadBlockToStorage(payload);
+ /**
+ * Execute command.
+ */
+ void execute() throws InterruptedException {
+
+ uploadingSemaphore.acquire(1);
+ writeBlockRequestInternal(entry.getId(), payload, true);
+ uploadingSemaphore.release(1);
- // updating the currentData buffer
- byte[] tempBuffer = new byte[currentData.length - availableSpaceInPayload];
- System.arraycopy(currentData, availableSpaceInPayload,
- tempBuffer, 0, currentData.length - availableSpaceInPayload);
- currentData = tempBuffer;
- outBuffer = new ByteArrayOutputStream(bufferSize);
}
- outBuffer.write(currentData);
+ void dump() {
+ LOG.debug("upload block {} size: {} for blob {}",
+ entry.getId(),
+ entry.getSize(),
+ key);
+ }
}
/**
- * Runnable instance that uploads the block of data to azure storage.
- *
- *
+ * Upload blob block list commands.
*/
- private class WriteRequest implements Runnable {
- private final byte[] dataPayload;
- private final String blockId;
+ private class UploadBlockListCommand extends UploadCommand {
+
+ private BlockEntry lastBlock = null;
- public WriteRequest(byte[] dataPayload, String blockId) {
- this.dataPayload = dataPayload;
- this.blockId = blockId;
+ UploadBlockListCommand() {
+ super(blobLength);
+
+ if (!uncommittedBlockEntries.isEmpty()) {
+ lastBlock = uncommittedBlockEntries.getLast();
+ }
}
- @Override
- public void run() {
+ void awaitAsDependent() throws InterruptedException {
+ // empty. later commit block does not need to wait previous commit block
+ // lists.
+ }
- int uploadRetryAttempts = 0;
- IOException lastLocalException = null;
- while (uploadRetryAttempts < MAX_BLOCK_UPLOAD_RETRIES) {
- try {
+ void dump() {
+ LOG.debug("commit block list with {} blocks for blob {}",
+ uncommittedBlockEntries.size(), key);
+ }
+
+ /**
+ * Execute command.
+ */
+ public void execute() throws InterruptedException, IOException {
+
+ if (committedBlobLength.get() >= getCommandBlobOffset()) {
+ LOG.debug("commit already applied for {}", key);
+ return;
+ }
+
+ if (lastBlock == null) {
+ LOG.debug("nothing to commit for {}", key);
+ return;
+ }
+
+ LOG.debug("active commands: {} for {}", activeBlockCommands.size(), key);
+
+ for (UploadCommand activeCommand : activeBlockCommands) {
+ if (activeCommand.getCommandBlobOffset() < getCommandBlobOffset()) {
+ activeCommand.dump();
+ activeCommand.awaitAsDependent();
+ } else {
+ break;
+ }
+ }
- blob.uploadBlock(blockId, new ByteArrayInputStream(dataPayload),
- dataPayload.length, new BlobRequestOptions(), opContext);
+ // stop all uploads until the block list is committed
+ uploadingSemaphore.acquire(MAX_NUMBER_THREADS_IN_THREAD_POOL);
+
+ BlockEntry uncommittedBlock;
+ do {
+ uncommittedBlock = uncommittedBlockEntries.poll();
+ blockEntries.add(uncommittedBlock);
+ } while (uncommittedBlock != lastBlock);
+
+ if (blockEntries.size() > activateCompactionBlockCount) {
+ LOG.debug("Block compaction: activated with {} blocks for {}",
+ blockEntries.size(), key);
+
+ // Block compaction
+ long startCompaction = System.nanoTime();
+ blockCompaction();
+ LOG.debug("Block compaction finished for {} ms with {} blocks for {}",
+ TimeUnit.NANOSECONDS.toMillis(
+ System.nanoTime() - startCompaction),
+ blockEntries.size(), key);
+ }
+
+ writeBlockListRequestInternal();
+
+ uploadingSemaphore.release(MAX_NUMBER_THREADS_IN_THREAD_POOL);
+
+ // remove blocks previous commands
+ for (Iterator<UploadCommand> it = activeBlockCommands.iterator();
+ it.hasNext();) {
+ UploadCommand activeCommand = it.next();
+ if (activeCommand.getCommandBlobOffset() <= getCommandBlobOffset()) {
+ it.remove();
+ } else {
break;
- } catch(Exception ioe) {
- Log.debug("Encountered exception during uploading block for Blob : {} Exception : {}", key, ioe);
- uploadRetryAttempts++;
- lastLocalException = new IOException("Encountered Exception while uploading block", ioe);
- try {
- Thread.sleep(BLOCK_UPLOAD_RETRY_INTERVAL);
- } catch(InterruptedException ie) {
- Thread.currentThread().interrupt();
- break;
+ }
+ }
+
+ committedBlobLength.set(getCommandBlobOffset());
+ }
+
+ /**
+ * Internal output stream with read access to the internal buffer.
+ */
+ private class ByteArrayOutputStreamInternal extends ByteArrayOutputStream {
+
+ ByteArrayOutputStreamInternal(int size) {
+ super(size);
+ }
+
+ byte[] getByteArray() {
+ return buf;
+ }
+ }
+
+ /**
+ * Block compaction process.
+ *
+ * Block compaction is only enabled when the number of blocks exceeds
+ * activateCompactionBlockCount. The algorithm searches for the longest
+ * segment [b..e) where (e-b) > 2 && |b| + |b+1| ... |e-1| < maxBlockSize
+ * such that size(b1) + size(b2) + ... + size(bn) < maximum-block-size.
+ * It then downloads the blocks in the sequence, concatenates the data to
+ * form a single block, uploads this new block, and updates the block
+ * list to replace the sequence of blocks with the new block.
+ */
+ private void blockCompaction() throws IOException {
+ //current segment [segmentBegin, segmentEnd) and file offset/size of the
+ // current segment
+ int segmentBegin = 0, segmentEnd = 0;
+ long segmentOffsetBegin = 0, segmentOffsetEnd = 0;
+
+ //longest segment [maxSegmentBegin, maxSegmentEnd) and file offset/size of
+ // the longest segment
+ int maxSegmentBegin = 0, maxSegmentEnd = 0;
+ long maxSegmentOffsetBegin = 0, maxSegmentOffsetEnd = 0;
+
+ for (BlockEntry block : blockEntries) {
+ segmentEnd++;
+ segmentOffsetEnd += block.getSize();
+ if (segmentOffsetEnd - segmentOffsetBegin > maxBlockSize.get()) {
+ if (segmentEnd - segmentBegin > 2) {
+ if (maxSegmentEnd - maxSegmentBegin < segmentEnd - segmentBegin) {
+ maxSegmentBegin = segmentBegin;
+ maxSegmentEnd = segmentEnd;
+ maxSegmentOffsetBegin = segmentOffsetBegin;
+ maxSegmentOffsetEnd = segmentOffsetEnd - block.getSize();
+ }
}
+ segmentBegin = segmentEnd - 1;
+ segmentOffsetBegin = segmentOffsetEnd - block.getSize();
}
}
- if (uploadRetryAttempts == MAX_BLOCK_UPLOAD_RETRIES) {
- lastError = lastLocalException;
+ if (maxSegmentEnd - maxSegmentBegin > 1) {
+
+ LOG.debug("Block compaction: {} blocks for {}",
+ maxSegmentEnd - maxSegmentBegin, key);
+
+ // download synchronously all the blocks from the azure storage
+ ByteArrayOutputStreamInternal blockOutputStream
+ = new ByteArrayOutputStreamInternal(maxBlockSize.get());
+
+ try {
+ long length = maxSegmentOffsetEnd - maxSegmentOffsetBegin;
+ blob.downloadRange(maxSegmentOffsetBegin, length, blockOutputStream,
+ new BlobRequestOptions(), opContext);
+ } catch(StorageException ex) {
+ LOG.error(
+ "Storage exception encountered during block compaction phase"
+ + " : {} Storage Exception : {} Error Code: {}",
+ key, ex, ex.getErrorCode());
+ throw new AzureException(
+ "Encountered Exception while committing append blocks " + ex, ex);
+ }
+
+ // upload synchronously new block to the azure storage
+ String blockId = generateBlockId();
+
+ ByteBuffer byteBuffer = ByteBuffer.wrap(
+ blockOutputStream.getByteArray());
+ byteBuffer.position(blockOutputStream.size());
+
+ writeBlockRequestInternal(blockId, byteBuffer, false);
+
+ // replace blocks from the longest segment with new block id
+ blockEntries.subList(maxSegmentBegin + 1, maxSegmentEnd - 1).clear();
+ BlockEntry newBlock = blockEntries.get(maxSegmentBegin);
+ newBlock.setId(blockId);
+ newBlock.setSearchMode(BlockSearchMode.LATEST);
+ newBlock.setSize(maxSegmentOffsetEnd - maxSegmentOffsetBegin);
}
}
}
/**
- * A ThreadFactory that creates uploader thread with
- * meaningful names helpful for debugging purposes.
+ * Prepare block upload command and queue the command in thread pool executor.
*/
- class UploaderThreadFactory implements ThreadFactory {
+ private synchronized void addBlockUploadCommand() throws IOException {
+
+ maybeThrowFirstError();
+
+ if (blobExist && lease.isFreed()) {
+ throw new AzureException(String.format(
+ "Attempting to upload a block on blob : %s "
+ + " that does not have lease on the Blob. Failing upload", key));
+ }
+
+ int blockSize = outBuffer.position();
+ if (blockSize > 0) {
+ UploadCommand command = new UploadBlockCommand(generateBlockId(),
+ outBuffer);
+ activeBlockCommands.add(command);
+
+ blobLength += blockSize;
+ outBuffer = poolReadyByteBuffers.getBuffer(false, maxBlockSize.get());
+ outBuffer.clear();
+
+ ioThreadPool.execute(new WriteRequest(command));
- @Override
- public Thread newThread(Runnable r) {
- Thread t = new Thread(r);
- t.setName(String.format("%s-%s-%d", THREAD_ID_PREFIX, key,
- threadSequenceNumber.getAndIncrement()));
- return t;
}
}
/**
- * A deamon thread that renews the Append lease on the blob.
- * The thread sleeps for LEASE_RENEWAL_PERIOD time before renewing
- * the lease. If an error is encountered while renewing the lease
- * then an lease is released by this thread, which fails all other
- * operations.
+ * Prepare block list commit command and queue the command in thread pool
+ * executor.
*/
- private class AppendRenewer implements Runnable {
+ private synchronized UploadCommand addFlushCommand() throws IOException {
- @Override
- public void run() {
+ maybeThrowFirstError();
- while (!leaseFreed) {
+ if (blobExist && lease.isFreed()) {
+ throw new AzureException(
+ String.format("Attempting to upload block list on blob : %s"
+ + " that does not have lease on the Blob. Failing upload", key));
+ }
- try {
- Thread.sleep(LEASE_RENEWAL_PERIOD);
- } catch (InterruptedException ie) {
- LOG.debug("Appender Renewer thread interrupted");
- Thread.currentThread().interrupt();
- }
+ UploadCommand command = new UploadBlockListCommand();
+ activeBlockCommands.add(command);
- Log.debug("Attempting to renew append lease on {}", key);
+ ioThreadPool.execute(new WriteRequest(command));
- try {
- if (!leaseFreed) {
- // Update the blob metadata to renew the append lease
- if (!updateBlobAppendMetadata(true, true)) {
- LOG.error("Unable to re-acquire append lease on the Blob {} ", key);
- leaseFreed = true;
- }
- }
- } catch (StorageException ex) {
+ return command;
+ }
- LOG.debug("Lease renewal for Blob : {} encountered "
- + "Storage Exception : {} Error Code : {}", key, ex, ex.getErrorCode());
+ /**
+ * Runnable instance that uploads the block of data to azure storage.
+ */
+ private class WriteRequest implements Runnable {
+ private final UploadCommand command;
- // We swallow the exception here because if the blob metadata is not updated for
- // APPEND_LEASE_TIMEOUT period, another thread would be able to detect this and
- // continue forward if it needs to append.
- leaseFreed = true;
- }
+ WriteRequest(UploadCommand command) {
+ this.command = command;
+ }
+
+ @Override
+ public void run() {
+
+ try {
+ command.dump();
+ long startTime = System.nanoTime();
+ command.execute();
+ command.setCompleted();
+ LOG.debug("command finished for {} ms",
+ TimeUnit.NANOSECONDS.toMillis(
+ System.nanoTime() - startTime));
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ } catch (Exception ex) {
+ LOG.debug(
+ "Encountered exception during execution of command for Blob :"
+ + " {} Exception : {}", key, ex);
+ firstError.compareAndSet(null, new AzureException(ex));
}
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/87af3f49/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java
index 01b957d..dcaa076 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java
@@ -57,6 +57,8 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.Seekable;
+import org.apache.hadoop.fs.StreamCapabilities;
+import org.apache.hadoop.fs.Syncable;
import org.apache.hadoop.fs.azure.metrics.AzureFileSystemInstrumentation;
import org.apache.hadoop.fs.azure.metrics.AzureFileSystemMetricsSystem;
import org.apache.hadoop.fs.azure.security.Constants;
@@ -350,9 +352,9 @@ public class NativeAzureFileSystem extends FileSystem {
}
/**
- * This is an exact copy of org.codehaus.jettison.json.JSONObject.quote
+ * This is an exact copy of org.codehaus.jettison.json.JSONObject.quote
* method.
- *
+ *
* Produce a string in double quotes with backslash sequences in all the
* right places. A backslash will be inserted within </, allowing JSON
* text to be delivered in HTML. In JSON text, a string cannot contain a
@@ -945,11 +947,11 @@ public class NativeAzureFileSystem extends FileSystem {
}
}
- private class NativeAzureFsOutputStream extends OutputStream {
- // We should not override flush() to actually close current block and flush
- // to DFS, this will break applications that assume flush() is a no-op.
- // Applications are advised to use Syncable.hflush() for that purpose.
- // NativeAzureFsOutputStream needs to implement Syncable if needed.
+ /**
+ * Azure output stream; wraps an inner stream of different types.
+ */
+ public class NativeAzureFsOutputStream extends OutputStream
+ implements Syncable, StreamCapabilities {
private String key;
private String keyEncoded;
private OutputStream out;
@@ -981,6 +983,57 @@ public class NativeAzureFileSystem extends FileSystem {
setEncodedKey(anEncodedKey);
}
+ /**
+ * Get a reference to the wrapped output stream.
+ *
+ * @return the underlying output stream
+ */
+ @InterfaceAudience.LimitedPrivate({"HDFS"})
+ public OutputStream getOutStream() {
+ return out;
+ }
+
+ @Override // Syncable
+ public void sync() throws IOException {
+ if (out instanceof Syncable) {
+ ((Syncable) out).hflush();
+ } else {
+ flush();
+ }
+ }
+
+ @Override // Syncable
+ public void hflush() throws IOException {
+ if (out instanceof Syncable) {
+ ((Syncable) out).hflush();
+ } else {
+ flush();
+ }
+ }
+
+ @Override // Syncable
+ public void hsync() throws IOException {
+ if (out instanceof Syncable) {
+ ((Syncable) out).hsync();
+ } else {
+ flush();
+ }
+ }
+
+ /**
+ * Propagate probe of stream capabilities to nested stream
+ * (if supported), else return false.
+ * @param capability string to query the stream support for.
+ * @return true if the nested stream supports the specific capability.
+ */
+ @Override // StreamCapability
+ public boolean hasCapability(String capability) {
+ if (out instanceof StreamCapabilities) {
+ return ((StreamCapabilities) out).hasCapability(capability);
+ }
+ return false;
+ }
+
@Override
public synchronized void close() throws IOException {
if (out != null) {
@@ -988,8 +1041,11 @@ public class NativeAzureFileSystem extends FileSystem {
// before returning to the caller.
//
out.close();
- restoreKey();
- out = null;
+ try {
+ restoreKey();
+ } finally {
+ out = null;
+ }
}
}
@@ -1043,10 +1099,10 @@ public class NativeAzureFileSystem extends FileSystem {
/**
* Writes <code>len</code> from the specified byte array starting at offset
* <code>off</code> to the output stream. The general contract for write(b,
- * off, len) is that some of the bytes in the array <code>
- * b</code b> are written to the output stream in order; element
- * <code>b[off]</code> is the first byte written and
- * <code>b[off+len-1]</code> is the last byte written by this operation.
+ * off, len) is that some of the bytes in the array <code>b</code>
+ * are written to the output stream in order; element <code>b[off]</code>
+ * is the first byte written and <code>b[off+len-1]</code> is the last
+ * byte written by this operation.
*
* @param b
* Byte array to be written.
@@ -1747,7 +1803,7 @@ public class NativeAzureFileSystem extends FileSystem {
OutputStream bufOutStream;
if (store.isPageBlobKey(key)) {
// Store page blobs directly in-place without renames.
- bufOutStream = store.storefile(key, permissionStatus);
+ bufOutStream = store.storefile(key, permissionStatus, key);
} else {
// This is a block blob, so open the output blob stream based on the
// encoded key.
@@ -1775,7 +1831,7 @@ public class NativeAzureFileSystem extends FileSystem {
// these
// blocks.
bufOutStream = new NativeAzureFsOutputStream(store.storefile(
- keyEncoded, permissionStatus), key, keyEncoded);
+ keyEncoded, permissionStatus, key), key, keyEncoded);
}
// Construct the data output stream from the buffered output stream.
FSDataOutputStream fsOut = new FSDataOutputStream(bufOutStream, statistics);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/87af3f49/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeFileSystemStore.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeFileSystemStore.java
index 1c7309f..57a729d 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeFileSystemStore.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeFileSystemStore.java
@@ -50,8 +50,9 @@ interface NativeFileSystemStore {
InputStream retrieve(String key, long byteRangeStart) throws IOException;
- DataOutputStream storefile(String key, PermissionStatus permissionStatus)
- throws AzureException;
+ DataOutputStream storefile(String keyEncoded,
+ PermissionStatus permissionStatus,
+ String key) throws AzureException;
boolean isPageBlobKey(String key);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/87af3f49/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SecureStorageInterfaceImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SecureStorageInterfaceImpl.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SecureStorageInterfaceImpl.java
index 5dbb6bc..7c2722e 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SecureStorageInterfaceImpl.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SecureStorageInterfaceImpl.java
@@ -519,7 +519,7 @@ public class SecureStorageInterfaceImpl extends StorageInterface {
@Override
public SelfRenewingLease acquireLease() throws StorageException {
- return new SelfRenewingLease(this);
+ return new SelfRenewingLease(this, false);
}
}
@@ -557,10 +557,12 @@ public class SecureStorageInterfaceImpl extends StorageInterface {
}
@Override
- public void uploadBlock(String blockId, InputStream sourceStream,
+ public void uploadBlock(String blockId, AccessCondition accessCondition,
+ InputStream sourceStream,
long length, BlobRequestOptions options,
OperationContext opContext) throws IOException, StorageException {
- ((CloudBlockBlob) getBlob()).uploadBlock(blockId, sourceStream, length, null, options, opContext);
+ ((CloudBlockBlob) getBlob()).uploadBlock(blockId, sourceStream, length,
+ accessCondition, options, opContext);
}
@Override
@@ -593,4 +595,4 @@ public class SecureStorageInterfaceImpl extends StorageInterface {
null, options, opContext);
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/87af3f49/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SelfRenewingLease.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SelfRenewingLease.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SelfRenewingLease.java
index 00d5e99..10956f7 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SelfRenewingLease.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SelfRenewingLease.java
@@ -30,6 +30,8 @@ import com.microsoft.azure.storage.blob.CloudBlob;
import java.util.concurrent.atomic.AtomicInteger;
+import static com.microsoft.azure.storage.StorageErrorCodeStrings.LEASE_ALREADY_PRESENT;
+
/**
* An Azure blob lease that automatically renews itself indefinitely
* using a background thread. Use it to synchronize distributed processes,
@@ -66,7 +68,7 @@ public class SelfRenewingLease {
@VisibleForTesting
static final int LEASE_ACQUIRE_RETRY_INTERVAL = 2000;
- public SelfRenewingLease(CloudBlobWrapper blobWrapper)
+ public SelfRenewingLease(CloudBlobWrapper blobWrapper, boolean throwIfPresent)
throws StorageException {
this.leaseFreed = false;
@@ -79,10 +81,14 @@ public class SelfRenewingLease {
leaseID = blob.acquireLease(LEASE_TIMEOUT, null);
} catch (StorageException e) {
+ if (throwIfPresent && e.getErrorCode().equals(LEASE_ALREADY_PRESENT)) {
+ throw e;
+ }
+
// Throw again if we don't want to keep waiting.
// We expect it to be that the lease is already present,
// or in some cases that the blob does not exist.
- if (!"LeaseAlreadyPresent".equals(e.getErrorCode())) {
+ if (!LEASE_ALREADY_PRESENT.equals(e.getErrorCode())) {
LOG.info(
"Caught exception when trying to get lease on blob "
+ blobWrapper.getUri().toString() + ". " + e.getMessage());
http://git-wip-us.apache.org/repos/asf/hadoop/blob/87af3f49/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterface.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterface.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterface.java
index 8b6b082..e03d731 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterface.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterface.java
@@ -665,6 +665,7 @@ abstract class StorageInterface {
*
* @param blockId A String that represents the Base-64 encoded block ID. Note for a given blob
* the length of all Block IDs must be identical.
+ * @param accessCondition An {@link AccessCondition} object that represents the access conditions for the blob.
* @param sourceStream An {@link InputStream} object that represents the input stream to write to the
* block blob.
* @param length A long which represents the length, in bytes, of the stream data,
@@ -678,7 +679,7 @@ abstract class StorageInterface {
* @throws IOException If an I/O error occurred.
* @throws StorageException If a storage service error occurred.
*/
- void uploadBlock(String blockId, InputStream sourceStream,
+ void uploadBlock(String blockId, AccessCondition accessCondition, InputStream sourceStream,
long length, BlobRequestOptions options,
OperationContext opContext) throws IOException, StorageException;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/87af3f49/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterfaceImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterfaceImpl.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterfaceImpl.java
index d3d0370..41a4dbb 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterfaceImpl.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterfaceImpl.java
@@ -277,7 +277,7 @@ class StorageInterfaceImpl extends StorageInterface {
return new CloudBlockBlobWrapperImpl(container.getBlockBlobReference(relativePath));
}
-
+
@Override
public CloudBlobWrapper getPageBlobReference(String relativePath)
throws URISyntaxException, StorageException {
@@ -286,7 +286,7 @@ class StorageInterfaceImpl extends StorageInterface {
}
}
-
+
abstract static class CloudBlobWrapperImpl implements CloudBlobWrapper {
private final CloudBlob blob;
@@ -441,10 +441,10 @@ class StorageInterfaceImpl extends StorageInterface {
@Override
public SelfRenewingLease acquireLease() throws StorageException {
- return new SelfRenewingLease(this);
+ return new SelfRenewingLease(this, false);
}
}
-
+
//
// CloudBlockBlobWrapperImpl
@@ -479,10 +479,10 @@ class StorageInterfaceImpl extends StorageInterface {
}
@Override
- public void uploadBlock(String blockId, InputStream sourceStream,
+ public void uploadBlock(String blockId, AccessCondition accessCondition, InputStream sourceStream,
long length, BlobRequestOptions options,
OperationContext opContext) throws IOException, StorageException {
- ((CloudBlockBlob) getBlob()).uploadBlock(blockId, sourceStream, length, null, options, opContext);
+ ((CloudBlockBlob) getBlob()).uploadBlock(blockId, sourceStream, length, accessCondition, options, opContext);
}
@Override
http://git-wip-us.apache.org/repos/asf/hadoop/blob/87af3f49/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SyncableDataOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SyncableDataOutputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SyncableDataOutputStream.java
index b377f95..04545ac 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SyncableDataOutputStream.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SyncableDataOutputStream.java
@@ -24,6 +24,7 @@ import java.io.OutputStream;
import org.apache.hadoop.fs.StreamCapabilities;
import org.apache.hadoop.fs.Syncable;
+import org.apache.hadoop.classification.InterfaceAudience;
/**
* Support the Syncable interface on top of a DataOutputStream.
@@ -38,6 +39,16 @@ public class SyncableDataOutputStream extends DataOutputStream
super(out);
}
+ /**
+ * Get a reference to the wrapped output stream.
+ *
+ * @return the underlying output stream
+ */
+ @InterfaceAudience.LimitedPrivate({"HDFS"})
+ public OutputStream getOutStream() {
+ return out;
+ }
+
@Override
public boolean hasCapability(String capability) {
if (out instanceof StreamCapabilities) {
@@ -47,9 +58,7 @@ public class SyncableDataOutputStream extends DataOutputStream
}
@Override
- @Deprecated
public void sync() throws IOException {
- hflush();
}
@Override
http://git-wip-us.apache.org/repos/asf/hadoop/blob/87af3f49/hadoop-tools/hadoop-azure/src/site/markdown/index.md
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/site/markdown/index.md b/hadoop-tools/hadoop-azure/src/site/markdown/index.md
index 3b42fc2..97807a3 100644
--- a/hadoop-tools/hadoop-azure/src/site/markdown/index.md
+++ b/hadoop-tools/hadoop-azure/src/site/markdown/index.md
@@ -150,6 +150,40 @@ line argument:
```
+### Block Blob with Compaction Support and Configuration
+
+Block blobs are the default kind of blob and are good for most big-data use
+cases. However, block blobs have strict limit of 50,000 blocks per blob.
+To prevent reaching the limit WASB, by default, does not upload new block to
+the service after every `hflush()` or `hsync()`.
+
+For most of the cases, combining data from multiple `write()` calls in
+blocks of 4Mb is a good optimization. But, in others cases, like HBase log files,
+every call to `hflush()` or `hsync()` must upload the data to the service.
+
+Block blobs with compaction upload the data to the cloud service after every
+`hflush()`/`hsync()`. To mitigate the limit of 50000 blocks, `hflush()
+`/`hsync()` runs once compaction process, if number of blocks in the blob
+is above 32,000.
+
+Block compaction search and replaces a sequence of small blocks with one big
+block. That means there is associated cost with block compaction: reading
+small blocks back to the client and writing it again as one big block.
+
+In order to have the files you create be block blobs with block compaction
+enabled, the client must set the configuration variable
+`fs.azure.block.blob.with.compaction.dir` to a comma-separated list of
+folder names.
+
+For example:
+
+```xml
+<property>
+ <name>fs.azure.block.blob.with.compaction.dir</name>
+ <value>/hbase/WALs,/data/myblobfiles</value>
+</property>
+```
+
### Page Blob Support and Configuration
The Azure Blob Storage interface for Hadoop supports two kinds of blobs,
http://git-wip-us.apache.org/repos/asf/hadoop/blob/87af3f49/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/MockStorageInterface.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/MockStorageInterface.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/MockStorageInterface.java
index 4f26d9f..e0ae7b4 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/MockStorageInterface.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/MockStorageInterface.java
@@ -551,7 +551,8 @@ public class MockStorageInterface extends StorageInterface {
throw new UnsupportedOperationException("downloadBlockList not used in Mock Tests");
}
@Override
- public void uploadBlock(String blockId, InputStream sourceStream,
+ public void uploadBlock(String blockId, AccessCondition accessCondition,
+ InputStream sourceStream,
long length, BlobRequestOptions options,
OperationContext opContext) throws IOException, StorageException {
throw new UnsupportedOperationException("uploadBlock not used in Mock Tests");
http://git-wip-us.apache.org/repos/asf/hadoop/blob/87af3f49/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestAzureConcurrentOutOfBandIo.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestAzureConcurrentOutOfBandIo.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestAzureConcurrentOutOfBandIo.java
index 7ea7534..a10a366 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestAzureConcurrentOutOfBandIo.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestAzureConcurrentOutOfBandIo.java
@@ -107,7 +107,8 @@ public class TestAzureConcurrentOutOfBandIo {
//
outputStream = writerStorageAccount.getStore().storefile(
key,
- new PermissionStatus("", "", FsPermission.getDefault()));
+ new PermissionStatus("", "", FsPermission.getDefault()),
+ key);
Arrays.fill(dataBlockWrite, (byte) (i % 256));
for (int j = 0; j < NUMBER_OF_BLOCKS; j++) {
@@ -141,7 +142,8 @@ public class TestAzureConcurrentOutOfBandIo {
// reading. This eliminates the race between the reader and writer threads.
OutputStream outputStream = testAccount.getStore().storefile(
"WASB_String.txt",
- new PermissionStatus("", "", FsPermission.getDefault()));
+ new PermissionStatus("", "", FsPermission.getDefault()),
+ "WASB_String.txt");
Arrays.fill(dataBlockWrite, (byte) 255);
for (int i = 0; i < NUMBER_OF_BLOCKS; i++) {
outputStream.write(dataBlockWrite);
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org