You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cn...@apache.org on 2016/01/18 18:22:10 UTC
[1/6] hadoop git commit: HADOOP-12635. Adding Append API support for
WASB. Contributed by Dushyanth.
Repository: hadoop
Updated Branches:
refs/heads/branch-2 da887e425 -> 62d616621
refs/heads/branch-2.8 23d729f84 -> ffc0d9888
refs/heads/trunk d40859fab -> 8bc93db2e
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8bc93db2/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestNativeAzureFileSystemAppend.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestNativeAzureFileSystemAppend.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestNativeAzureFileSystemAppend.java
new file mode 100644
index 0000000..de51990
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestNativeAzureFileSystemAppend.java
@@ -0,0 +1,362 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azure;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Arrays;
+
+import org.apache.commons.lang.RandomStringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestNativeAzureFileSystemAppend extends NativeAzureFileSystemBaseTest {
+
+ private static final String TEST_FILE = "test.dat";
+ private static final Path TEST_PATH = new Path(TEST_FILE);
+
+ private AzureBlobStorageTestAccount testAccount = null;
+
+ @Before
+ public void setUp() throws Exception {
+ super.setUp();
+ testAccount = createTestAccount();
+ fs = testAccount.getFileSystem();
+ Configuration conf = fs.getConf();
+ conf.setBoolean(NativeAzureFileSystem.APPEND_SUPPORT_ENABLE_PROPERTY_NAME, true);
+ URI uri = fs.getUri();
+ fs.initialize(uri, conf);
+ }
+
+ /*
+ * Helper method that creates test data of size provided by the
+ * "size" parameter.
+ */
+ private static byte[] getTestData(int size) {
+ byte[] testData = new byte[size];
+ System.arraycopy(RandomStringUtils.randomAlphabetic(size).getBytes(), 0, testData, 0, size);
+ return testData;
+ }
+
+ // Helper method to create file and write fileSize bytes of data on it.
+ private byte[] createBaseFileWithData(int fileSize, Path testPath) throws Throwable {
+
+ FSDataOutputStream createStream = null;
+ try {
+ createStream = fs.create(testPath);
+ byte[] fileData = null;
+
+ if (fileSize != 0) {
+ fileData = getTestData(fileSize);
+ createStream.write(fileData);
+ }
+ return fileData;
+ } finally {
+ if (createStream != null) {
+ createStream.close();
+ }
+ }
+ }
+
+ /*
+ * Helper method to verify a file data equal to "dataLength" parameter
+ */
+ private boolean verifyFileData(int dataLength, byte[] testData, int testDataIndex,
+ FSDataInputStream srcStream) {
+
+ try {
+
+ byte[] fileBuffer = new byte[dataLength];
+ byte[] testDataBuffer = new byte[dataLength];
+
+ int fileBytesRead = srcStream.read(fileBuffer);
+
+ if (fileBytesRead < dataLength) {
+ return false;
+ }
+
+ System.arraycopy(testData, testDataIndex, testDataBuffer, 0, dataLength);
+
+ if (!Arrays.equals(fileBuffer, testDataBuffer)) {
+ return false;
+ }
+
+ return true;
+
+ } catch (Exception ex) {
+ return false;
+ }
+
+ }
+
+ /*
+ * Helper method to verify Append on a testFile.
+ */
+ private boolean verifyAppend(byte[] testData, Path testFile) {
+
+ FSDataInputStream srcStream = null;
+ try {
+
+ srcStream = fs.open(testFile);
+ int baseBufferSize = 2048;
+ int testDataSize = testData.length;
+ int testDataIndex = 0;
+
+ while (testDataSize > baseBufferSize) {
+
+ if (!verifyFileData(baseBufferSize, testData, testDataIndex, srcStream)) {
+ return false;
+ }
+ testDataIndex += baseBufferSize;
+ testDataSize -= baseBufferSize;
+ }
+
+ if (!verifyFileData(testDataSize, testData, testDataIndex, srcStream)) {
+ return false;
+ }
+
+ return true;
+ } catch(Exception ex) {
+ return false;
+ } finally {
+ if (srcStream != null) {
+ try {
+ srcStream.close();
+ } catch(IOException ioe) {
+ // Swallowing
+ }
+ }
+ }
+ }
+
+ /*
+ * Test case to verify if an append on small size data works. This tests
+ * append E2E
+ */
+ @Test
+ public void testSingleAppend() throws Throwable{
+
+ FSDataOutputStream appendStream = null;
+ try {
+ int baseDataSize = 50;
+ byte[] baseDataBuffer = createBaseFileWithData(baseDataSize, TEST_PATH);
+
+ int appendDataSize = 20;
+ byte[] appendDataBuffer = getTestData(appendDataSize);
+ appendStream = fs.append(TEST_PATH, 10);
+ appendStream.write(appendDataBuffer);
+ appendStream.close();
+ byte[] testData = new byte[baseDataSize + appendDataSize];
+ System.arraycopy(baseDataBuffer, 0, testData, 0, baseDataSize);
+ System.arraycopy(appendDataBuffer, 0, testData, baseDataSize, appendDataSize);
+
+ Assert.assertTrue(verifyAppend(testData, TEST_PATH));
+ } finally {
+ if (appendStream != null) {
+ appendStream.close();
+ }
+ }
+ }
+
+ /*
+ * Test case to verify append to an empty file.
+ */
+ @Test
+ public void testSingleAppendOnEmptyFile() throws Throwable {
+
+ FSDataOutputStream appendStream = null;
+
+ try {
+ createBaseFileWithData(0, TEST_PATH);
+
+ int appendDataSize = 20;
+ byte[] appendDataBuffer = getTestData(appendDataSize);
+ appendStream = fs.append(TEST_PATH, 10);
+ appendStream.write(appendDataBuffer);
+ appendStream.close();
+
+ Assert.assertTrue(verifyAppend(appendDataBuffer, TEST_PATH));
+ } finally {
+ if (appendStream != null) {
+ appendStream.close();
+ }
+ }
+ }
+
+ /*
+ * Test to verify that we can open only one Append stream on a File.
+ */
+ @Test
+ public void testSingleAppenderScenario() throws Throwable {
+
+ FSDataOutputStream appendStream1 = null;
+ FSDataOutputStream appendStream2 = null;
+ IOException ioe = null;
+ try {
+ createBaseFileWithData(0, TEST_PATH);
+ appendStream1 = fs.append(TEST_PATH, 10);
+ boolean encounteredException = false;
+ try {
+ appendStream2 = fs.append(TEST_PATH, 10);
+ } catch(IOException ex) {
+ encounteredException = true;
+ ioe = ex;
+ }
+
+ appendStream1.close();
+
+ Assert.assertTrue(encounteredException);
+ GenericTestUtils.assertExceptionContains("Unable to set Append lease on the Blob", ioe);
+ } finally {
+ if (appendStream1 != null) {
+ appendStream1.close();
+ }
+
+ if (appendStream2 != null) {
+ appendStream2.close();
+ }
+ }
+ }
+
+ /*
+ * Tests to verify multiple appends on a Blob.
+ */
+ @Test
+ public void testMultipleAppends() throws Throwable {
+
+ int baseDataSize = 50;
+ byte[] baseDataBuffer = createBaseFileWithData(baseDataSize, TEST_PATH);
+
+ int appendDataSize = 100;
+ int targetAppendCount = 50;
+ byte[] testData = new byte[baseDataSize + (appendDataSize*targetAppendCount)];
+ int testDataIndex = 0;
+ System.arraycopy(baseDataBuffer, 0, testData, testDataIndex, baseDataSize);
+ testDataIndex += baseDataSize;
+
+ int appendCount = 0;
+
+ FSDataOutputStream appendStream = null;
+
+ try {
+ while (appendCount < targetAppendCount) {
+
+ byte[] appendDataBuffer = getTestData(appendDataSize);
+ appendStream = fs.append(TEST_PATH, 30);
+ appendStream.write(appendDataBuffer);
+ appendStream.close();
+
+ System.arraycopy(appendDataBuffer, 0, testData, testDataIndex, appendDataSize);
+ testDataIndex += appendDataSize;
+ appendCount++;
+ }
+
+ Assert.assertTrue(verifyAppend(testData, TEST_PATH));
+
+ } finally {
+ if (appendStream != null) {
+ appendStream.close();
+ }
+ }
+ }
+
+ /*
+ * Test to verify we multiple appends on the same stream.
+ */
+ @Test
+ public void testMultipleAppendsOnSameStream() throws Throwable {
+
+ int baseDataSize = 50;
+ byte[] baseDataBuffer = createBaseFileWithData(baseDataSize, TEST_PATH);
+ int appendDataSize = 100;
+ int targetAppendCount = 50;
+ byte[] testData = new byte[baseDataSize + (appendDataSize*targetAppendCount)];
+ int testDataIndex = 0;
+ System.arraycopy(baseDataBuffer, 0, testData, testDataIndex, baseDataSize);
+ testDataIndex += baseDataSize;
+ int appendCount = 0;
+
+ FSDataOutputStream appendStream = null;
+
+ try {
+
+ while (appendCount < targetAppendCount) {
+
+ appendStream = fs.append(TEST_PATH, 50);
+
+ int singleAppendChunkSize = 20;
+ int appendRunSize = 0;
+ while (appendRunSize < appendDataSize) {
+
+ byte[] appendDataBuffer = getTestData(singleAppendChunkSize);
+ appendStream.write(appendDataBuffer);
+ System.arraycopy(appendDataBuffer, 0, testData,
+ testDataIndex + appendRunSize, singleAppendChunkSize);
+
+ appendRunSize += singleAppendChunkSize;
+ }
+
+ appendStream.close();
+ testDataIndex += appendDataSize;
+ appendCount++;
+ }
+
+ Assert.assertTrue(verifyAppend(testData, TEST_PATH));
+ } finally {
+ if (appendStream != null) {
+ appendStream.close();
+ }
+ }
+ }
+
+ @Test(expected=UnsupportedOperationException.class)
+ /*
+ * Test to verify the behavior when Append Support configuration flag is set to false
+ */
+ public void testFalseConfigurationFlagBehavior() throws Throwable {
+
+ fs = testAccount.getFileSystem();
+ Configuration conf = fs.getConf();
+ conf.setBoolean(NativeAzureFileSystem.APPEND_SUPPORT_ENABLE_PROPERTY_NAME, false);
+ URI uri = fs.getUri();
+ fs.initialize(uri, conf);
+
+ FSDataOutputStream appendStream = null;
+
+ try {
+ createBaseFileWithData(0, TEST_PATH);
+ appendStream = fs.append(TEST_PATH, 10);
+ } finally {
+ if (appendStream != null) {
+ appendStream.close();
+ }
+ }
+ }
+
+ @Override
+ protected AzureBlobStorageTestAccount createTestAccount() throws Exception {
+ return AzureBlobStorageTestAccount.create();
+ }
+}
[4/6] hadoop git commit: HADOOP-12635. Adding Append API support for
WASB. Contributed by Dushyanth.
Posted by cn...@apache.org.
HADOOP-12635. Adding Append API support for WASB. Contributed by Dushyanth.
(cherry picked from commit 8bc93db2e7c64830b6a662f28c8917a9eef4e7c9)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/62d61662
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/62d61662
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/62d61662
Branch: refs/heads/branch-2
Commit: 62d616621134cc5e68f4d5fd32f49ea4d731417c
Parents: da887e4
Author: cnauroth <cn...@apache.org>
Authored: Mon Jan 18 09:08:53 2016 -0800
Committer: cnauroth <cn...@apache.org>
Committed: Mon Jan 18 09:09:05 2016 -0800
----------------------------------------------------------------------
hadoop-common-project/hadoop-common/CHANGES.txt | 2 +
.../fs/azure/AzureNativeFileSystemStore.java | 23 +-
.../hadoop/fs/azure/BlockBlobAppendStream.java | 775 +++++++++++++++++++
.../hadoop/fs/azure/NativeAzureFileSystem.java | 216 +++---
.../fs/azure/NativeAzureFileSystemHelper.java | 107 +++
.../hadoop/fs/azure/NativeFileSystemStore.java | 2 +
.../hadoop/fs/azure/PageBlobOutputStream.java | 17 +-
.../hadoop/fs/azure/StorageInterface.java | 89 ++-
.../hadoop/fs/azure/StorageInterfaceImpl.java | 33 +-
.../hadoop-azure/src/site/markdown/index.md | 20 +-
.../hadoop/fs/azure/MockStorageInterface.java | 34 +-
.../azure/TestNativeAzureFileSystemAppend.java | 362 +++++++++
12 files changed, 1550 insertions(+), 130 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/62d61662/hadoop-common-project/hadoop-common/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt
index 3e79878..fb66b9f 100644
--- a/hadoop-common-project/hadoop-common/CHANGES.txt
+++ b/hadoop-common-project/hadoop-common/CHANGES.txt
@@ -99,6 +99,8 @@ Release 2.8.0 - UNRELEASED
HADOOP-12691. Add CSRF Filter for REST APIs to Hadoop Common.
(Larry McCay via cnauroth)
+ HADOOP-12635. Adding Append API support for WASB. (Dushyanth via cnauroth)
+
IMPROVEMENTS
HADOOP-12458. Retries is typoed to spell Retires in parts of
http://git-wip-us.apache.org/repos/asf/hadoop/blob/62d61662/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java
index a936cd6..0097912 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java
@@ -33,13 +33,11 @@ import java.net.URLEncoder;
import java.security.InvalidKeyException;
import java.util.ArrayList;
import java.util.Calendar;
-import java.util.Collections;
import java.util.Date;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
-import java.util.LinkedList;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
@@ -64,6 +62,7 @@ import org.apache.hadoop.io.IOUtils;
import org.mortbay.util.ajax.JSON;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
import com.google.common.annotations.VisibleForTesting;
import com.microsoft.azure.storage.CloudStorageAccount;
import com.microsoft.azure.storage.OperationContext;
@@ -2681,4 +2680,24 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
close();
super.finalize();
}
+
+ @Override
+ public DataOutputStream retrieveAppendStream(String key, int bufferSize) throws IOException {
+
+ try {
+
+ if (isPageBlobKey(key)) {
+ throw new UnsupportedOperationException("Append not supported for Page Blobs");
+ }
+
+ CloudBlobWrapper blob = this.container.getBlockBlobReference(key);
+
+ BlockBlobAppendStream appendStream = new BlockBlobAppendStream((CloudBlockBlobWrapper) blob, key, bufferSize, getInstrumentedContext());
+ appendStream.initialize();
+
+ return new DataOutputStream(appendStream);
+ } catch(Exception ex) {
+ throw new AzureException(ex);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/62d61662/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/BlockBlobAppendStream.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/BlockBlobAppendStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/BlockBlobAppendStream.java
new file mode 100644
index 0000000..d1ec8df
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/BlockBlobAppendStream.java
@@ -0,0 +1,775 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azure;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.HashMap;
+import java.util.Locale;
+import java.util.List;
+import java.util.Random;
+import java.util.TimeZone;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hadoop.fs.FSExceptionMessages;
+import org.apache.commons.codec.binary.Base64;
+import org.apache.hadoop.fs.azure.StorageInterface.CloudBlockBlobWrapper;
+import org.mortbay.log.Log;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.microsoft.azure.storage.AccessCondition;
+import com.microsoft.azure.storage.OperationContext;
+import com.microsoft.azure.storage.StorageException;
+import com.microsoft.azure.storage.blob.BlobRequestOptions;
+import com.microsoft.azure.storage.blob.BlockEntry;
+import com.microsoft.azure.storage.blob.BlockListingFilter;
+
+/**
+ * Stream object that implememnts append for Block Blobs in WASB.
+ */
+public class BlockBlobAppendStream extends OutputStream {
+
+ private final String key;
+ private final int bufferSize;
+ private ByteArrayOutputStream outBuffer;
+ private final CloudBlockBlobWrapper blob;
+ private final OperationContext opContext;
+
+ /**
+ * Variable to track if the stream has been closed.
+ */
+ private boolean closed = false;
+
+ /**
+ * Variable to track if the append lease is released.
+ */
+
+ private volatile boolean leaseFreed;
+
+ /**
+ * Variable to track if the append stream has been
+ * initialized.
+ */
+
+ private boolean initialized = false;
+
+ /**
+ * Last IOException encountered
+ */
+ private volatile IOException lastError = null;
+
+ /**
+ * List to keep track of the uncommitted azure storage
+ * block ids
+ */
+ private final List<BlockEntry> uncommittedBlockEntries;
+
+ private static final int UNSET_BLOCKS_COUNT = -1;
+
+ /**
+ * Variable to hold the next block id to be used for azure
+ * storage blocks.
+ */
+ private long nextBlockCount = UNSET_BLOCKS_COUNT;
+
+ private final Random sequenceGenerator = new Random();
+
+ /**
+ * Time to wait to renew lease in milliseconds
+ */
+ private static final int LEASE_RENEWAL_PERIOD = 10000;
+
+ /**
+ * Number of times to retry for lease renewal
+ */
+ private static final int MAX_LEASE_RENEWAL_RETRY_COUNT = 3;
+
+ /**
+ * Time to wait before retrying to set the lease
+ */
+ private static final int LEASE_RENEWAL_RETRY_SLEEP_PERIOD = 500;
+
+ /**
+ * Metadata key used on the blob to indicate append lease is active
+ */
+ public static final String APPEND_LEASE = "append_lease";
+
+ /**
+ * Timeout value for the append lease in millisecs. If the lease is not
+ * renewed within 30 seconds then another thread can acquire the append lease
+ * on the blob
+ */
+ public static final int APPEND_LEASE_TIMEOUT = 30000;
+
+ /**
+ * Metdata key used on the blob to indicate last modified time of append lease
+ */
+ public static final String APPEND_LEASE_LAST_MODIFIED = "append_lease_last_modified";
+
+ /**
+ * Number of times block upload needs is retried.
+ */
+ private static final int MAX_BLOCK_UPLOAD_RETRIES = 3;
+
+ /**
+ * Wait time between block upload retries in millisecs.
+ */
+ private static final int BLOCK_UPLOAD_RETRY_INTERVAL = 1000;
+
+ private static final Logger LOG = LoggerFactory.getLogger(BlockBlobAppendStream.class);
+
+ private static final int MAX_BLOCK_COUNT = 100000;
+
+ private ThreadPoolExecutor ioThreadPool;
+
+ /**
+ * Atomic integer to provide thread id for thread names for uploader threads.
+ */
+ private final AtomicInteger threadSequenceNumber;
+
+ /**
+ * Prefix to be used for thread names for uploader threads.
+ */
+ private static final String THREAD_ID_PREFIX = "BlockBlobAppendStream";
+
+ private static final String UTC_STR = "UTC";
+
+ public BlockBlobAppendStream(final CloudBlockBlobWrapper blob,
+ final String aKey, final int bufferSize, final OperationContext opContext)
+ throws IOException {
+
+ if (null == aKey || 0 == aKey.length()) {
+ throw new IllegalArgumentException(
+ "Illegal argument: The key string is null or empty");
+ }
+
+ if (0 >= bufferSize) {
+ throw new IllegalArgumentException(
+ "Illegal argument bufferSize cannot be zero or negative");
+ }
+
+
+ this.blob = blob;
+ this.opContext = opContext;
+ this.key = aKey;
+ this.bufferSize = bufferSize;
+ this.threadSequenceNumber = new AtomicInteger(0);
+ setBlocksCount();
+
+ this.outBuffer = new ByteArrayOutputStream(bufferSize);
+ this.uncommittedBlockEntries = new ArrayList<BlockEntry>();
+
+ // Acquire append lease on the blob.
+ try {
+ //Set the append lease if the value of the append lease is false
+ if (!updateBlobAppendMetadata(true, false)) {
+ LOG.error("Unable to set Append Lease on the Blob : {} "
+ + "Possibly because another client already has a create or append stream open on the Blob", key);
+ throw new IOException("Unable to set Append lease on the Blob. "
+ + "Possibly because another client already had an append stream open on the Blob.");
+ }
+ } catch (StorageException ex) {
+ LOG.error("Encountered Storage exception while acquiring append "
+ + "lease on blob : {}. Storage Exception : {} ErrorCode : {}",
+ key, ex, ex.getErrorCode());
+
+ throw new IOException(ex);
+ }
+
+ leaseFreed = false;
+ }
+
+ /**
+ * Helper method that starts an Append Lease renewer thread and the
+ * thread pool.
+ */
+ public synchronized void initialize() {
+
+ if (initialized) {
+ return;
+ }
+ /*
+ * Start the thread for Append lease renewer.
+ */
+ Thread appendLeaseRenewer = new Thread(new AppendRenewer());
+ appendLeaseRenewer.setDaemon(true);
+ appendLeaseRenewer.setName(String.format("%s-AppendLeaseRenewer", key));
+ appendLeaseRenewer.start();
+
+ /*
+ * Parameters to ThreadPoolExecutor:
+ * corePoolSize : the number of threads to keep in the pool, even if they are idle,
+ * unless allowCoreThreadTimeOut is set
+ * maximumPoolSize : the maximum number of threads to allow in the pool
+ * keepAliveTime - when the number of threads is greater than the core,
+ * this is the maximum time that excess idle threads will
+ * wait for new tasks before terminating.
+ * unit - the time unit for the keepAliveTime argument
+ * workQueue - the queue to use for holding tasks before they are executed
+ * This queue will hold only the Runnable tasks submitted by the execute method.
+ */
+ this.ioThreadPool = new ThreadPoolExecutor(4, 4, 2, TimeUnit.SECONDS,
+ new LinkedBlockingQueue<Runnable>(), new UploaderThreadFactory());
+
+ initialized = true;
+ }
+
+ /**
+ * Get the blob name.
+ *
+ * @return String Blob name.
+ */
+ public String getKey() {
+ return key;
+ }
+
+ /**
+ * Get the backing blob.
+ * @return buffer size of the stream.
+ */
+ public int getBufferSize() {
+ return bufferSize;
+ }
+
+ /**
+ * Writes the specified byte to this output stream. The general contract for
+ * write is that one byte is written to the output stream. The byte to be
+ * written is the eight low-order bits of the argument b. The 24 high-order
+ * bits of b are ignored.
+ *
+ * @param byteVal
+ * the byteValue to write.
+ * @throws IOException
+ * if an I/O error occurs. In particular, an IOException may be
+ * thrown if the output stream has been closed.
+ */
+ @Override
+ public void write(final int byteVal) throws IOException {
+ write(new byte[] { (byte) (byteVal & 0xFF) });
+ }
+
+ /**
+ * Writes b.length bytes from the specified byte array to this output stream.
+ *
+ * @param data
+ * the byte array to write.
+ *
+ * @throws IOException
+ * if an I/O error occurs. In particular, an IOException may be
+ * thrown if the output stream has been closed.
+ */
+ @Override
+ public void write(final byte[] data) throws IOException {
+ write(data, 0, data.length);
+ }
+
+ /**
+ * Writes length bytes from the specified byte array starting at offset to
+ * this output stream.
+ *
+ * @param data
+ * the byte array to write.
+ * @param offset
+ * the start offset in the data.
+ * @param length
+ * the number of bytes to write.
+ * @throws IOException
+ * if an I/O error occurs. In particular, an IOException may be
+ * thrown if the output stream has been closed.
+ */
+ @Override
+ public void write(final byte[] data, final int offset, final int length)
+ throws IOException {
+
+ if (offset < 0 || length < 0 || length > data.length - offset) {
+ throw new IndexOutOfBoundsException("write API in append stream called with invalid arguments");
+ }
+
+ writeInternal(data, offset, length);
+ }
+
+ @Override
+ public synchronized void close() throws IOException {
+
+ if (!initialized) {
+ throw new IOException("Trying to close an uninitialized Append stream");
+ }
+
+ if (closed) {
+ return;
+ }
+
+ if (leaseFreed) {
+ throw new IOException(String.format("Attempting to close an append stream on blob : %s "
+ + " that does not have lease on the Blob. Failing close", key));
+ }
+
+ if (outBuffer.size() > 0) {
+ uploadBlockToStorage(outBuffer.toByteArray());
+ }
+
+ ioThreadPool.shutdown();
+
+ try {
+ if (!ioThreadPool.awaitTermination(10, TimeUnit.MINUTES)) {
+ LOG.error("Time out occured while waiting for IO request to finish in append"
+ + " for blob : {}", key);
+ NativeAzureFileSystemHelper.logAllLiveStackTraces();
+ throw new IOException("Timed out waiting for IO requests to finish");
+ }
+ } catch(InterruptedException intrEx) {
+
+ // Restore the interrupted status
+ Thread.currentThread().interrupt();
+ LOG.error("Upload block operation in append interrupted for blob {}. Failing close", key);
+ throw new IOException("Append Commit interrupted.");
+ }
+
+ // Calling commit after all blocks are succesfully uploaded.
+ if (lastError == null) {
+ commitAppendBlocks();
+ }
+
+ // Perform cleanup.
+ cleanup();
+
+ if (lastError != null) {
+ throw lastError;
+ }
+ }
+
+ /**
+ * Helper method that cleans up the append stream.
+ */
+ private synchronized void cleanup() {
+
+ closed = true;
+
+ try {
+ // Set the value of append lease to false if the value is set to true.
+ updateBlobAppendMetadata(false, true);
+ } catch(StorageException ex) {
+ LOG.debug("Append metadata update on the Blob : {} encountered Storage Exception : {} "
+ + "Error Code : {}",
+ key, ex, ex.getErrorCode());
+ lastError = new IOException(ex);
+ }
+
+ leaseFreed = true;
+ }
+
+ /**
+ * Method to commit all the uncommited blocks to azure storage.
+ * If the commit fails then blocks are automatically cleaned up
+ * by Azure storage.
+ * @throws IOException
+ */
+ private synchronized void commitAppendBlocks() throws IOException {
+
+ SelfRenewingLease lease = null;
+
+ try {
+ if (uncommittedBlockEntries.size() > 0) {
+
+ //Acquiring lease on the blob.
+ lease = new SelfRenewingLease(blob);
+
+ // Downloading existing blocks
+ List<BlockEntry> blockEntries = blob.downloadBlockList(BlockListingFilter.COMMITTED,
+ new BlobRequestOptions(), opContext);
+
+ // Adding uncommitted blocks.
+ blockEntries.addAll(uncommittedBlockEntries);
+
+ AccessCondition accessCondition = new AccessCondition();
+ accessCondition.setLeaseID(lease.getLeaseID());
+ blob.commitBlockList(blockEntries, accessCondition, new BlobRequestOptions(), opContext);
+ uncommittedBlockEntries.clear();
+ }
+ } catch(StorageException ex) {
+ LOG.error("Storage exception encountered during block commit phase of append for blob"
+ + " : {} Storage Exception : {} Error Code: {}", key, ex, ex.getErrorCode());
+ throw new IOException("Encountered Exception while committing append blocks", ex);
+ } finally {
+ if (lease != null) {
+ try {
+ lease.free();
+ } catch(StorageException ex) {
+ LOG.debug("Exception encountered while releasing lease for "
+ + "blob : {} StorageException : {} ErrorCode : {}", key, ex, ex.getErrorCode());
+ // Swallowing exception here as the lease is cleaned up by the SelfRenewingLease object.
+ }
+ }
+ }
+ }
+
+ /**
+ * Helper method used to generate the blockIDs. The algorithm used is similar to the Azure
+ * storage SDK.
+ */
+ private void setBlocksCount() throws IOException {
+ try {
+
+ if (nextBlockCount == UNSET_BLOCKS_COUNT) {
+
+ nextBlockCount = (long) (sequenceGenerator.nextInt(Integer.MAX_VALUE))
+ + sequenceGenerator.nextInt(Integer.MAX_VALUE - MAX_BLOCK_COUNT);
+
+ List<BlockEntry> blockEntries =
+ blob.downloadBlockList(BlockListingFilter.COMMITTED, new BlobRequestOptions(), opContext);
+
+ nextBlockCount += blockEntries.size();
+
+ }
+ } catch (StorageException ex) {
+ LOG.debug("Encountered storage exception during setting next Block Count."
+ + " StorageException : {} ErrorCode : {}", ex, ex.getErrorCode());
+ throw new IOException(ex);
+ }
+ }
+
+ /**
+ * Helper method that generates the next block id for uploading a block to azure storage.
+ * @return String representing the block ID generated.
+ * @throws IOException
+ */
+ private String generateBlockId() throws IOException {
+
+ if (nextBlockCount == UNSET_BLOCKS_COUNT) {
+ throw new IOException("Append Stream in invalid state. nextBlockCount not set correctly");
+ }
+
+ byte[] blockIdInBytes = getBytesFromLong(nextBlockCount);
+ return new String(Base64.encodeBase64(blockIdInBytes), StandardCharsets.UTF_8);
+ }
+
+ /**
+ * Returns a byte array that represents the data of a <code>long</code> value. This
+ * utility method is copied from com.microsoft.azure.storage.core.Utility class.
+ * This class is marked as internal, hence we clone the method here and not express
+ * dependency on the Utility Class
+ *
+ * @param value
+ * The value from which the byte array will be returned.
+ *
+ * @return A byte array that represents the data of the specified <code>long</code> value.
+ */
+ private static byte[] getBytesFromLong(final long value) {
+ final byte[] tempArray = new byte[8];
+
+ for (int m = 0; m < 8; m++) {
+ tempArray[7 - m] = (byte) ((value >> (8 * m)) & 0xFF);
+ }
+
+ return tempArray;
+ }
+ /**
+ * Helper method that creates a thread to upload a block to azure storage.
+ * @param payload
+ * @throws IOException
+ */
+ private synchronized void uploadBlockToStorage(byte[] payload) throws IOException {
+
+ // upload payload to azure storage
+ nextBlockCount++;
+ String blockId = generateBlockId();
+ // Since uploads of the Azure storage are done in parallel threads, we go ahead
+ // add the blockId in the uncommitted list. If the upload of the block fails
+ // we don't commit the blockIds.
+ uncommittedBlockEntries.add(new BlockEntry(blockId));
+ ioThreadPool.execute(new WriteRequest(payload, blockId));
+ }
+
+
+ /**
+ * Helper method to updated the Blob metadata during Append lease operations.
+ * Blob metadata is updated to holdLease value only if the current lease
+ * status is equal to testCondition and the last update on the blob metadata
+ * is less that 30 secs old.
+ * @param holdLease
+ * @param testCondition
+ * @return true if the updated lease operation was successful or false otherwise
+ * @throws StorageException
+ */
+ private boolean updateBlobAppendMetadata(boolean holdLease, boolean testCondition)
+ throws StorageException {
+
+ SelfRenewingLease lease = null;
+ StorageException lastStorageException = null;
+ int leaseRenewalRetryCount = 0;
+
+ /*
+ * Updating the Blob metadata honours following algorithm based on
+ * 1) If the append lease metadata is present
+ * 2) Last updated time of the append lease
+ * 3) Previous value of the Append lease metadata.
+ *
+ * The algorithm:
+ * 1) If append lease metadata is not part of the Blob. In this case
+ * this is the first client to Append so we update the metadata.
+ * 2) If append lease metadata is present and timeout has occurred.
+ * In this case irrespective of what the value of the append lease is we update the metadata.
+ * 3) If append lease metadata is present and is equal to testCondition value (passed as parameter)
+ * and timeout has not occurred, we update the metadata.
+ * 4) If append lease metadata is present and is not equal to testCondition value (passed as parameter)
+ * and timeout has not occurred, we do not update metadata and return false.
+ *
+ */
+ while (leaseRenewalRetryCount < MAX_LEASE_RENEWAL_RETRY_COUNT) {
+
+ lastStorageException = null;
+
+ synchronized(this) {
+ try {
+
+ final Calendar currentCalendar = Calendar
+ .getInstance(Locale.US);
+ currentCalendar.setTimeZone(TimeZone.getTimeZone(UTC_STR));
+ long currentTime = currentCalendar.getTime().getTime();
+
+ // Acquire lease on the blob.
+ lease = new SelfRenewingLease(blob);
+
+ blob.downloadAttributes(opContext);
+ HashMap<String, String> metadata = blob.getMetadata();
+
+ if (metadata.containsKey(APPEND_LEASE)
+ && currentTime - Long.parseLong(
+ metadata.get(APPEND_LEASE_LAST_MODIFIED)) <= BlockBlobAppendStream.APPEND_LEASE_TIMEOUT
+ && !metadata.get(APPEND_LEASE).equals(Boolean.toString(testCondition))) {
+ return false;
+ }
+
+ metadata.put(APPEND_LEASE, Boolean.toString(holdLease));
+ metadata.put(APPEND_LEASE_LAST_MODIFIED, Long.toString(currentTime));
+ blob.setMetadata(metadata);
+ AccessCondition accessCondition = new AccessCondition();
+ accessCondition.setLeaseID(lease.getLeaseID());
+ blob.uploadMetadata(accessCondition, null, opContext);
+ return true;
+
+ } catch (StorageException ex) {
+
+ lastStorageException = ex;
+ LOG.debug("Lease renewal for Blob : {} encountered Storage Exception : {} "
+ + "Error Code : {}",
+ key, ex, ex.getErrorCode());
+ leaseRenewalRetryCount++;
+
+ } finally {
+
+ if (lease != null) {
+ try {
+ lease.free();
+ } catch(StorageException ex) {
+ LOG.debug("Encountered Storage exception while releasing lease for Blob {} "
+ + "during Append metadata operation. Storage Exception {} "
+ + "Error Code : {} ", key, ex, ex.getErrorCode());
+ } finally {
+ lease = null;
+ }
+ }
+ }
+ }
+
+ if (leaseRenewalRetryCount == MAX_LEASE_RENEWAL_RETRY_COUNT) {
+ throw lastStorageException;
+ } else {
+ try {
+ Thread.sleep(LEASE_RENEWAL_RETRY_SLEEP_PERIOD);
+ } catch(InterruptedException ex) {
+ LOG.debug("Blob append metadata updated method interrupted");
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+
+ // The code should not enter here because the while loop will
+ // always be executed and if the while loop is executed we
+ // would returning from the while loop.
+ return false;
+ }
+
+ /**
+ * This is the only method that should be writing to outBuffer to maintain consistency of the outBuffer.
+ * @param data
+ * @param offset
+ * @param length
+ * @throws IOException
+ */
+ private synchronized void writeInternal(final byte[] data, final int offset, final int length)
+ throws IOException {
+
+ if (!initialized) {
+ throw new IOException("Trying to write to an un-initialized Append stream");
+ }
+
+ if (closed) {
+ throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
+ }
+
+ if (leaseFreed) {
+ throw new IOException(String.format("Write called on a append stream not holding lease. Failing Write"));
+ }
+
+ byte[] currentData = new byte[length];
+ System.arraycopy(data, offset, currentData, 0, length);
+
+ // check to see if the data to be appended exceeds the
+ // buffer size. If so we upload a block to azure storage.
+ while ((outBuffer.size() + currentData.length) > bufferSize) {
+
+ byte[] payload = new byte[bufferSize];
+
+ // Add data from the existing buffer
+ System.arraycopy(outBuffer.toByteArray(), 0, payload, 0, outBuffer.size());
+
+ // Updating the available size in the payload
+ int availableSpaceInPayload = bufferSize - outBuffer.size();
+
+ // Adding data from the current call
+ System.arraycopy(currentData, 0, payload, outBuffer.size(), availableSpaceInPayload);
+
+ uploadBlockToStorage(payload);
+
+ // updating the currentData buffer
+ byte[] tempBuffer = new byte[currentData.length - availableSpaceInPayload];
+ System.arraycopy(currentData, availableSpaceInPayload,
+ tempBuffer, 0, currentData.length - availableSpaceInPayload);
+ currentData = tempBuffer;
+ outBuffer = new ByteArrayOutputStream(bufferSize);
+ }
+
+ outBuffer.write(currentData);
+ }
+
+ /**
+ * Runnable instance that uploads the block of data to azure storage.
+ *
+ *
+ */
+ private class WriteRequest implements Runnable {
+ private final byte[] dataPayload;
+ private final String blockId;
+
+ public WriteRequest(byte[] dataPayload, String blockId) {
+ this.dataPayload = dataPayload;
+ this.blockId = blockId;
+ }
+
+ @Override
+ public void run() {
+
+ int uploadRetryAttempts = 0;
+ IOException lastLocalException = null;
+ while (uploadRetryAttempts < MAX_BLOCK_UPLOAD_RETRIES) {
+ try {
+
+ blob.uploadBlock(blockId, new ByteArrayInputStream(dataPayload),
+ dataPayload.length, new BlobRequestOptions(), opContext);
+ break;
+ } catch(Exception ioe) {
+ Log.debug("Encountered exception during uploading block for Blob : {} Exception : {}", key, ioe);
+ uploadRetryAttempts++;
+ lastLocalException = new IOException("Encountered Exception while uploading block", ioe);
+ try {
+ Thread.sleep(BLOCK_UPLOAD_RETRY_INTERVAL);
+ } catch(InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ break;
+ }
+ }
+ }
+
+ if (uploadRetryAttempts == MAX_BLOCK_UPLOAD_RETRIES) {
+ lastError = lastLocalException;
+ }
+ }
+ }
+
+ /**
+ * A ThreadFactory that creates uploader thread with
+ * meaningful names helpful for debugging purposes.
+ */
+ class UploaderThreadFactory implements ThreadFactory {
+
+ @Override
+ public Thread newThread(Runnable r) {
+ Thread t = new Thread(r);
+ t.setName(String.format("%s-%s-%d", THREAD_ID_PREFIX, key,
+ threadSequenceNumber.getAndIncrement()));
+ return t;
+ }
+ }
+
+ /**
+ * A deamon thread that renews the Append lease on the blob.
+ * The thread sleeps for LEASE_RENEWAL_PERIOD time before renewing
+ * the lease. If an error is encountered while renewing the lease
+ * then an lease is released by this thread, which fails all other
+ * operations.
+ */
+ private class AppendRenewer implements Runnable {
+
+ @Override
+ public void run() {
+
+ while (!leaseFreed) {
+
+ try {
+ Thread.sleep(LEASE_RENEWAL_PERIOD);
+ } catch (InterruptedException ie) {
+ LOG.debug("Appender Renewer thread interrupted");
+ Thread.currentThread().interrupt();
+ }
+
+ Log.debug("Attempting to renew append lease on {}", key);
+
+ try {
+ if (!leaseFreed) {
+ // Update the blob metadata to renew the append lease
+ if (!updateBlobAppendMetadata(true, true)) {
+ LOG.error("Unable to re-acquire append lease on the Blob {} ", key);
+ leaseFreed = true;
+ }
+ }
+ } catch (StorageException ex) {
+
+ LOG.debug("Lease renewal for Blob : {} encountered "
+ + "Storage Exception : {} Error Code : {}", key, ex, ex.getErrorCode());
+
+ // We swallow the exception here because if the blob metadata is not updated for
+ // APPEND_LEASE_TIMEOUT period, another thread would be able to detect this and
+ // continue forward if it needs to append.
+ leaseFreed = true;
+ }
+ }
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/62d61662/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java
index d2ff705..ed65184 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.fs.azure;
import java.io.DataInputStream;
+import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.FileNotFoundException;
import java.io.IOException;
@@ -31,7 +32,6 @@ import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.EnumSet;
-import java.util.Iterator;
import java.util.Set;
import java.util.TimeZone;
import java.util.TreeSet;
@@ -41,7 +41,6 @@ import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.commons.lang.StringUtils;
-import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
@@ -60,8 +59,6 @@ import org.apache.hadoop.fs.azure.metrics.AzureFileSystemMetricsSystem;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.permission.PermissionStatus;
import org.apache.hadoop.fs.azure.AzureException;
-import org.apache.hadoop.fs.azure.StorageInterface.CloudBlobWrapper;
-import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Progressable;
import org.slf4j.Logger;
@@ -73,12 +70,8 @@ import org.codehaus.jackson.map.JsonMappingException;
import org.codehaus.jackson.map.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
-import com.microsoft.azure.storage.AccessCondition;
-import com.microsoft.azure.storage.OperationContext;
-import com.microsoft.azure.storage.StorageErrorCode;
import com.microsoft.azure.storage.StorageException;
-import com.microsoft.azure.storage.blob.CloudBlob;
-import com.microsoft.azure.storage.StorageErrorCodeStrings;
+
import org.apache.hadoop.io.IOUtils;
@@ -288,7 +281,7 @@ public class NativeAzureFileSystem extends FileSystem {
throw new IOException("Unable to write RenamePending file for folder rename from "
+ srcKey + " to " + dstKey, e);
} finally {
- NativeAzureFileSystem.cleanup(LOG, output);
+ NativeAzureFileSystemHelper.cleanup(LOG, output);
}
}
@@ -663,6 +656,11 @@ public class NativeAzureFileSystem extends FileSystem {
public static final String SKIP_AZURE_METRICS_PROPERTY_NAME = "fs.azure.skip.metrics";
+ /*
+ * Property to enable Append API.
+ */
+ public static final String APPEND_SUPPORT_ENABLE_PROPERTY_NAME = "fs.azure.enable.append.support";
+
private class NativeAzureFsInputStream extends FSInputStream {
private InputStream in;
private final String key;
@@ -728,7 +726,7 @@ public class NativeAzureFileSystem extends FileSystem {
return result;
} catch(IOException e) {
- Throwable innerException = NativeAzureFileSystem.checkForAzureStorageException(e);
+ Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(e);
if (innerException instanceof StorageException) {
@@ -736,7 +734,7 @@ public class NativeAzureFileSystem extends FileSystem {
+ " Exception details: {} Error Code : {}",
key, e, ((StorageException) innerException).getErrorCode());
- if (NativeAzureFileSystem.isFileNotFoundException((StorageException) innerException)) {
+ if (NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
throw new FileNotFoundException(String.format("%s is not found", key));
}
}
@@ -782,7 +780,7 @@ public class NativeAzureFileSystem extends FileSystem {
return result;
} catch(IOException e) {
- Throwable innerException = NativeAzureFileSystem.checkForAzureStorageException(e);
+ Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(e);
if (innerException instanceof StorageException) {
@@ -790,7 +788,7 @@ public class NativeAzureFileSystem extends FileSystem {
+ " Exception details: {} Error Code : {}",
key, e, ((StorageException) innerException).getErrorCode());
- if (NativeAzureFileSystem.isFileNotFoundException((StorageException) innerException)) {
+ if (NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
throw new FileNotFoundException(String.format("%s is not found", key));
}
}
@@ -822,10 +820,10 @@ public class NativeAzureFileSystem extends FileSystem {
this.pos);
} catch(IOException e) {
- Throwable innerException = NativeAzureFileSystem.checkForAzureStorageException(e);
+ Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(e);
if (innerException instanceof StorageException
- && NativeAzureFileSystem.isFileNotFoundException((StorageException) innerException)) {
+ && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
throw new FileNotFoundException(String.format("%s is not found", key));
}
@@ -1041,7 +1039,7 @@ public class NativeAzureFileSystem extends FileSystem {
private static boolean suppressRetryPolicy = false;
// A counter to create unique (within-process) names for my metrics sources.
private static AtomicInteger metricsSourceNameCounter = new AtomicInteger();
-
+ private boolean appendSupportEnabled = false;
public NativeAzureFileSystem() {
// set store in initialize()
@@ -1164,7 +1162,7 @@ public class NativeAzureFileSystem extends FileSystem {
this.blockSize = conf.getLong(AZURE_BLOCK_SIZE_PROPERTY_NAME,
MAX_AZURE_BLOCK_SIZE);
-
+ this.appendSupportEnabled = conf.getBoolean(APPEND_SUPPORT_ENABLE_PROPERTY_NAME, false);
LOG.debug("NativeAzureFileSystem. Initializing.");
LOG.debug(" blockSize = {}",
conf.getLong(AZURE_BLOCK_SIZE_PROPERTY_NAME, MAX_AZURE_BLOCK_SIZE));
@@ -1294,7 +1292,61 @@ public class NativeAzureFileSystem extends FileSystem {
@Override
public FSDataOutputStream append(Path f, int bufferSize, Progressable progress)
throws IOException {
- throw new IOException("Not supported");
+
+ if (!appendSupportEnabled) {
+ throw new UnsupportedOperationException("Append Support not enabled");
+ }
+
+ LOG.debug("Opening file: {} for append", f);
+
+ Path absolutePath = makeAbsolute(f);
+ String key = pathToKey(absolutePath);
+ FileMetadata meta = null;
+ try {
+ meta = store.retrieveMetadata(key);
+ } catch(Exception ex) {
+
+ Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex);
+
+ if (innerException instanceof StorageException
+ && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
+
+ throw new FileNotFoundException(String.format("%s is not found", key));
+ } else {
+ throw ex;
+ }
+ }
+
+ if (meta == null) {
+ throw new FileNotFoundException(f.toString());
+ }
+
+ if (meta.isDir()) {
+ throw new FileNotFoundException(f.toString()
+ + " is a directory not a file.");
+ }
+
+ if (store.isPageBlobKey(key)) {
+ throw new IOException("Append not supported for Page Blobs");
+ }
+
+ DataOutputStream appendStream = null;
+
+ try {
+ appendStream = store.retrieveAppendStream(key, bufferSize);
+ } catch (Exception ex) {
+
+ Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex);
+
+ if (innerException instanceof StorageException
+ && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
+ throw new FileNotFoundException(String.format("%s is not found", key));
+ } else {
+ throw ex;
+ }
+ }
+
+ return new FSDataOutputStream(appendStream, statistics);
}
@Override
@@ -1379,7 +1431,7 @@ public class NativeAzureFileSystem extends FileSystem {
lease.free();
}
} catch (Exception e) {
- NativeAzureFileSystem.cleanup(LOG, out);
+ NativeAzureFileSystemHelper.cleanup(LOG, out);
String msg = "Unable to free lease on " + parent.toUri();
LOG.error(msg);
throw new IOException(msg, e);
@@ -1577,10 +1629,10 @@ public class NativeAzureFileSystem extends FileSystem {
metaFile = store.retrieveMetadata(key);
} catch (IOException e) {
- Throwable innerException = NativeAzureFileSystem.checkForAzureStorageException(e);
+ Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(e);
if (innerException instanceof StorageException
- && NativeAzureFileSystem.isFileNotFoundException((StorageException) innerException)) {
+ && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
return false;
}
@@ -1611,7 +1663,7 @@ public class NativeAzureFileSystem extends FileSystem {
parentMetadata = store.retrieveMetadata(parentKey);
} catch (IOException e) {
- Throwable innerException = NativeAzureFileSystem.checkForAzureStorageException(e);
+ Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(e);
if (innerException instanceof StorageException) {
// Invalid State.
@@ -1619,7 +1671,7 @@ public class NativeAzureFileSystem extends FileSystem {
// if the file not present. But not retrieving metadata here is an
// unrecoverable state and can only happen if there is a race condition
// hence throwing a IOException
- if (NativeAzureFileSystem.isFileNotFoundException((StorageException) innerException)) {
+ if (NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
throw new IOException("File " + f + " has a parent directory "
+ parentPath + " whose metadata cannot be retrieved. Can't resolve");
}
@@ -1662,10 +1714,10 @@ public class NativeAzureFileSystem extends FileSystem {
instrumentation.fileDeleted();
} catch(IOException e) {
- Throwable innerException = NativeAzureFileSystem.checkForAzureStorageException(e);
+ Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(e);
if (innerException instanceof StorageException
- && NativeAzureFileSystem.isFileNotFoundException((StorageException) innerException)) {
+ && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
return false;
}
@@ -1684,7 +1736,7 @@ public class NativeAzureFileSystem extends FileSystem {
parentMetadata = store.retrieveMetadata(parentKey);
} catch (IOException e) {
- Throwable innerException = NativeAzureFileSystem.checkForAzureStorageException(e);
+ Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(e);
if (innerException instanceof StorageException) {
// Invalid State.
@@ -1692,7 +1744,7 @@ public class NativeAzureFileSystem extends FileSystem {
// if the file not present. But not retrieving metadata here is an
// unrecoverable state and can only happen if there is a race condition
// hence throwing a IOException
- if (NativeAzureFileSystem.isFileNotFoundException((StorageException) innerException)) {
+ if (NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
throw new IOException("File " + f + " has a parent directory "
+ parentPath + " whose metadata cannot be retrieved. Can't resolve");
}
@@ -1728,10 +1780,10 @@ public class NativeAzureFileSystem extends FileSystem {
priorLastKey);
} catch(IOException e) {
- Throwable innerException = NativeAzureFileSystem.checkForAzureStorageException(e);
+ Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(e);
if (innerException instanceof StorageException
- && NativeAzureFileSystem.isFileNotFoundException((StorageException) innerException)) {
+ && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
return false;
}
@@ -1763,10 +1815,10 @@ public class NativeAzureFileSystem extends FileSystem {
instrumentation.fileDeleted();
} catch(IOException e) {
- Throwable innerException = NativeAzureFileSystem.checkForAzureStorageException(e);
+ Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(e);
if (innerException instanceof StorageException
- && NativeAzureFileSystem.isFileNotFoundException((StorageException) innerException)) {
+ && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
return false;
}
@@ -1785,10 +1837,10 @@ public class NativeAzureFileSystem extends FileSystem {
store.delete(key);
} catch(IOException e) {
- Throwable innerException = NativeAzureFileSystem.checkForAzureStorageException(e);
+ Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(e);
if (innerException instanceof StorageException
- && NativeAzureFileSystem.isFileNotFoundException((StorageException) innerException)) {
+ && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
return false;
}
@@ -1829,10 +1881,10 @@ public class NativeAzureFileSystem extends FileSystem {
meta = store.retrieveMetadata(key);
} catch(Exception ex) {
- Throwable innerException = NativeAzureFileSystem.checkForAzureStorageException(ex);
+ Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex);
if (innerException instanceof StorageException
- && NativeAzureFileSystem.isFileNotFoundException((StorageException) innerException)) {
+ && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
throw new FileNotFoundException(String.format("%s is not found", key));
}
@@ -1922,10 +1974,10 @@ public class NativeAzureFileSystem extends FileSystem {
meta = store.retrieveMetadata(key);
} catch (IOException ex) {
- Throwable innerException = NativeAzureFileSystem.checkForAzureStorageException(ex);
+ Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex);
if (innerException instanceof StorageException
- && NativeAzureFileSystem.isFileNotFoundException((StorageException) innerException)) {
+ && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
throw new FileNotFoundException(String.format("%s is not found", f));
}
@@ -1948,10 +2000,10 @@ public class NativeAzureFileSystem extends FileSystem {
listing = store.list(key, AZURE_LIST_ALL, 1, partialKey);
} catch (IOException ex) {
- Throwable innerException = NativeAzureFileSystem.checkForAzureStorageException(ex);
+ Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex);
if (innerException instanceof StorageException
- && NativeAzureFileSystem.isFileNotFoundException((StorageException) innerException)) {
+ && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
throw new FileNotFoundException(String.format("%s is not found", key));
}
@@ -1972,10 +2024,10 @@ public class NativeAzureFileSystem extends FileSystem {
try {
listing = store.list(key, AZURE_LIST_ALL, 1, partialKey);
} catch (IOException ex) {
- Throwable innerException = NativeAzureFileSystem.checkForAzureStorageException(ex);
+ Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex);
if (innerException instanceof StorageException
- && NativeAzureFileSystem.isFileNotFoundException((StorageException) innerException)) {
+ && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
throw new FileNotFoundException(String.format("%s is not found", key));
}
@@ -2196,10 +2248,10 @@ public class NativeAzureFileSystem extends FileSystem {
meta = store.retrieveMetadata(key);
} catch(Exception ex) {
- Throwable innerException = NativeAzureFileSystem.checkForAzureStorageException(ex);
+ Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex);
if (innerException instanceof StorageException
- && NativeAzureFileSystem.isFileNotFoundException((StorageException) innerException)) {
+ && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
throw new FileNotFoundException(String.format("%s is not found", key));
}
@@ -2219,10 +2271,10 @@ public class NativeAzureFileSystem extends FileSystem {
try {
inputStream = store.retrieve(key);
} catch(Exception ex) {
- Throwable innerException = NativeAzureFileSystem.checkForAzureStorageException(ex);
+ Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex);
if (innerException instanceof StorageException
- && NativeAzureFileSystem.isFileNotFoundException((StorageException) innerException)) {
+ && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
throw new FileNotFoundException(String.format("%s is not found", key));
}
@@ -2261,14 +2313,14 @@ public class NativeAzureFileSystem extends FileSystem {
dstMetadata = store.retrieveMetadata(dstKey);
} catch (IOException ex) {
- Throwable innerException = NativeAzureFileSystem.checkForAzureStorageException(ex);
+ Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex);
// A BlobNotFound storage exception in only thrown from retrieveMetdata API when
// there is a race condition. If there is another thread which deletes the destination
// file or folder, then this thread calling rename should be able to continue with
// rename gracefully. Hence the StorageException is swallowed here.
if (innerException instanceof StorageException) {
- if (NativeAzureFileSystem.isFileNotFoundException((StorageException) innerException)) {
+ if (NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
LOG.debug("BlobNotFound exception encountered for Destination key : {}. "
+ "Swallowin the exception to handle race condition gracefully", dstKey);
}
@@ -2294,10 +2346,10 @@ public class NativeAzureFileSystem extends FileSystem {
parentOfDestMetadata = store.retrieveMetadata(pathToKey(absoluteDst.getParent()));
} catch (IOException ex) {
- Throwable innerException = NativeAzureFileSystem.checkForAzureStorageException(ex);
+ Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex);
if (innerException instanceof StorageException
- && NativeAzureFileSystem.isFileNotFoundException((StorageException) innerException)) {
+ && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
LOG.debug("Parent of destination {} doesn't exists. Failing rename", dst);
return false;
@@ -2320,10 +2372,10 @@ public class NativeAzureFileSystem extends FileSystem {
try {
srcMetadata = store.retrieveMetadata(srcKey);
} catch (IOException ex) {
- Throwable innerException = NativeAzureFileSystem.checkForAzureStorageException(ex);
+ Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex);
if (innerException instanceof StorageException
- && NativeAzureFileSystem.isFileNotFoundException((StorageException) innerException)) {
+ && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
LOG.debug("Source {} doesn't exists. Failing rename", src);
return false;
@@ -2342,10 +2394,10 @@ public class NativeAzureFileSystem extends FileSystem {
store.rename(srcKey, dstKey);
} catch(IOException ex) {
- Throwable innerException = NativeAzureFileSystem.checkForAzureStorageException(ex);
+ Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex);
if (innerException instanceof StorageException
- && NativeAzureFileSystem.isFileNotFoundException((StorageException) innerException)) {
+ && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
LOG.debug("BlobNotFoundException encountered. Failing rename", src);
return false;
@@ -2552,10 +2604,10 @@ public class NativeAzureFileSystem extends FileSystem {
try {
metadata = store.retrieveMetadata(key);
} catch (IOException ex) {
- Throwable innerException = NativeAzureFileSystem.checkForAzureStorageException(ex);
+ Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex);
if (innerException instanceof StorageException
- && NativeAzureFileSystem.isFileNotFoundException((StorageException) innerException)) {
+ && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
throw new FileNotFoundException(String.format("File %s doesn't exists.", p));
}
@@ -2591,10 +2643,10 @@ public class NativeAzureFileSystem extends FileSystem {
try {
metadata = store.retrieveMetadata(key);
} catch (IOException ex) {
- Throwable innerException = NativeAzureFileSystem.checkForAzureStorageException(ex);
+ Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex);
if (innerException instanceof StorageException
- && NativeAzureFileSystem.isFileNotFoundException((StorageException) innerException)) {
+ && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
throw new FileNotFoundException(String.format("File %s doesn't exists.", p));
}
@@ -2817,52 +2869,4 @@ public class NativeAzureFileSystem extends FileSystem {
// Return to the caller with the randomized key.
return randomizedKey;
}
-
- private static void cleanup(Logger log, java.io.Closeable closeable) {
- if (closeable != null) {
- try {
- closeable.close();
- } catch(IOException e) {
- if (log != null) {
- log.debug("Exception in closing {}", closeable, e);
- }
- }
- }
- }
-
- /*
- * Helper method to recursively check if the cause of the exception is
- * a Azure storage exception.
- */
- private static Throwable checkForAzureStorageException(Exception e) {
-
- Throwable innerException = e.getCause();
-
- while (innerException != null
- && !(innerException instanceof StorageException)) {
- innerException = innerException.getCause();
- }
-
- return innerException;
- }
-
- /*
- * Helper method to check if the AzureStorageException is
- * because backing blob was not found.
- */
- private static boolean isFileNotFoundException(StorageException e) {
-
- String errorCode = ((StorageException) e).getErrorCode();
- if (errorCode != null
- && (errorCode.equals(StorageErrorCodeStrings.BLOB_NOT_FOUND)
- || errorCode.equals(StorageErrorCodeStrings.RESOURCE_NOT_FOUND)
- || errorCode.equals(StorageErrorCode.BLOB_NOT_FOUND.toString())
- || errorCode.equals(StorageErrorCode.RESOURCE_NOT_FOUND.toString()))) {
-
- return true;
- }
-
- return false;
- }
-
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/62d61662/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystemHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystemHelper.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystemHelper.java
new file mode 100644
index 0000000..40efdc6
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystemHelper.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azure;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.microsoft.azure.storage.StorageErrorCode;
+import com.microsoft.azure.storage.StorageErrorCodeStrings;
+import com.microsoft.azure.storage.StorageException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+/**
+ * Utility class that has helper methods.
+ *
+ */
+
+@InterfaceAudience.Private
+final class NativeAzureFileSystemHelper {
+
+ private NativeAzureFileSystemHelper() {
+ // Hiding the cosnstructor as this is a utility class.
+ }
+
+ private static final Logger LOG = LoggerFactory.getLogger(NativeAzureFileSystemHelper.class);
+
+ public static void cleanup(Logger log, java.io.Closeable closeable) {
+ if (closeable != null) {
+ try {
+ closeable.close();
+ } catch(IOException e) {
+ if (log != null) {
+ log.debug("Exception in closing {}", closeable, e);
+ }
+ }
+ }
+ }
+
+ /*
+ * Helper method to recursively check if the cause of the exception is
+ * a Azure storage exception.
+ */
+ public static Throwable checkForAzureStorageException(Exception e) {
+
+ Throwable innerException = e.getCause();
+
+ while (innerException != null
+ && !(innerException instanceof StorageException)) {
+
+ innerException = innerException.getCause();
+ }
+
+ return innerException;
+ }
+
+ /*
+ * Helper method to check if the AzureStorageException is
+ * because backing blob was not found.
+ */
+ public static boolean isFileNotFoundException(StorageException e) {
+
+ String errorCode = e.getErrorCode();
+ if (errorCode != null
+ && (errorCode.equals(StorageErrorCodeStrings.BLOB_NOT_FOUND)
+ || errorCode.equals(StorageErrorCodeStrings.RESOURCE_NOT_FOUND)
+ || errorCode.equals(StorageErrorCode.BLOB_NOT_FOUND.toString())
+ || errorCode.equals(StorageErrorCode.RESOURCE_NOT_FOUND.toString()))) {
+
+ return true;
+ }
+
+ return false;
+ }
+
+ /*
+ * Helper method that logs stack traces from all live threads.
+ */
+ public static void logAllLiveStackTraces() {
+
+ for (Map.Entry<Thread, StackTraceElement[]> entry : Thread.getAllStackTraces().entrySet()) {
+ LOG.debug("Thread " + entry.getKey().getName());
+ StackTraceElement[] trace = entry.getValue();
+ for (int j = 0; j < trace.length; j++) {
+ LOG.debug("\tat " + trace[j]);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/62d61662/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeFileSystemStore.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeFileSystemStore.java
index 0229cb7..f052b7f 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeFileSystemStore.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeFileSystemStore.java
@@ -107,4 +107,6 @@ interface NativeFileSystemStore {
void delete(String key, SelfRenewingLease lease) throws IOException;
SelfRenewingLease acquireLease(String key) throws AzureException;
+
+ DataOutputStream retrieveAppendStream(String key, int bufferSize) throws IOException;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/62d61662/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/PageBlobOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/PageBlobOutputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/PageBlobOutputStream.java
index 8689375..b2b34f8 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/PageBlobOutputStream.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/PageBlobOutputStream.java
@@ -29,8 +29,6 @@ import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.Arrays;
-import java.util.Iterator;
-import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
@@ -216,7 +214,7 @@ final class PageBlobOutputStream extends OutputStream implements Syncable {
LOG.debug(ioThreadPool.toString());
if (!ioThreadPool.awaitTermination(10, TimeUnit.MINUTES)) {
LOG.debug("Timed out after 10 minutes waiting for IO requests to finish");
- logAllStackTraces();
+ NativeAzureFileSystemHelper.logAllLiveStackTraces();
LOG.debug(ioThreadPool.toString());
throw new IOException("Timed out waiting for IO requests to finish");
}
@@ -230,18 +228,7 @@ final class PageBlobOutputStream extends OutputStream implements Syncable {
closed = true;
}
- // Log the stacks of all threads.
- private void logAllStackTraces() {
- Map liveThreads = Thread.getAllStackTraces();
- for (Iterator i = liveThreads.keySet().iterator(); i.hasNext(); ) {
- Thread key = (Thread) i.next();
- LOG.debug("Thread " + key.getName());
- StackTraceElement[] trace = (StackTraceElement[]) liveThreads.get(key);
- for (int j = 0; j < trace.length; j++) {
- LOG.debug("\tat " + trace[j]);
- }
- }
- }
+
/**
* A single write request for data to write to Azure storage.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/62d61662/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterface.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterface.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterface.java
index ce5f749..c2169a4 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterface.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterface.java
@@ -24,11 +24,13 @@ import java.io.OutputStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
+import java.util.List;
import java.util.EnumSet;
import java.util.HashMap;
import org.apache.hadoop.classification.InterfaceAudience;
+import com.microsoft.azure.storage.AccessCondition;
import com.microsoft.azure.storage.CloudStorageAccount;
import com.microsoft.azure.storage.OperationContext;
import com.microsoft.azure.storage.RetryPolicyFactory;
@@ -36,6 +38,8 @@ import com.microsoft.azure.storage.StorageCredentials;
import com.microsoft.azure.storage.StorageException;
import com.microsoft.azure.storage.blob.BlobListingDetails;
import com.microsoft.azure.storage.blob.BlobProperties;
+import com.microsoft.azure.storage.blob.BlockEntry;
+import com.microsoft.azure.storage.blob.BlockListingFilter;
import com.microsoft.azure.storage.blob.BlobRequestOptions;
import com.microsoft.azure.storage.blob.CloudBlob;
import com.microsoft.azure.storage.blob.CopyState;
@@ -269,13 +273,13 @@ abstract class StorageInterface {
/**
* Uploads the container's metadata using the specified operation context.
- *
+ *
* @param opContext
* An {@link OperationContext} object that represents the context
* for the current operation. This object is used to track requests
* to the storage service, and to provide additional runtime
* information about the operation.
- *
+ *
* @throws StorageException
* If a storage service error occurred.
*/
@@ -545,6 +549,30 @@ abstract class StorageInterface {
void uploadMetadata(OperationContext opContext)
throws StorageException;
+ /**
+ * Uploads the blob's metadata to the storage service using the specified
+ * lease ID, request options, and operation context.
+ *
+ * @param accessCondition
+ * A {@link AccessCondition} object that represents the access conditions for the blob.
+ *
+ * @param options
+ * A {@link BlobRequestOptions} object that specifies any additional options for the request. Specifying
+ * <code>null</code> will use the default request options from the associated service client (
+ * {@link CloudBlobClient}).
+ *
+ * @param opContext
+ * An {@link OperationContext} object that represents the context
+ * for the current operation. This object is used to track requests
+ * to the storage service, and to provide additional runtime
+ * information about the operation.
+ *
+ * @throws StorageException
+ * If a storage service error occurred.
+ */
+ void uploadMetadata(AccessCondition accessCondition, BlobRequestOptions options,
+ OperationContext opContext) throws StorageException;
+
void uploadProperties(OperationContext opContext,
SelfRenewingLease lease)
throws StorageException;
@@ -602,6 +630,63 @@ abstract class StorageInterface {
OutputStream openOutputStream(
BlobRequestOptions options,
OperationContext opContext) throws StorageException;
+
+ /**
+ *
+ * @param filter A {@link BlockListingFilter} value that specifies whether to download
+ * committed blocks, uncommitted blocks, or all blocks.
+ * @param options A {@link BlobRequestOptions} object that specifies any additional options for
+ * the request. Specifying null will use the default request options from
+ * the associated service client ( CloudBlobClient).
+ * @param opContext An {@link OperationContext} object that represents the context for the current
+ * operation. This object is used to track requests to the storage service,
+ * and to provide additional runtime information about the operation.
+ * @return An ArrayList object of {@link BlockEntry} objects that represent the list
+ * block items downloaded from the block blob.
+ * @throws IOException If an I/O error occurred.
+ * @throws StorageException If a storage service error occurred.
+ */
+ List<BlockEntry> downloadBlockList(BlockListingFilter filter, BlobRequestOptions options,
+ OperationContext opContext) throws IOException, StorageException;
+
+ /**
+ *
+ * @param blockId A String that represents the Base-64 encoded block ID. Note for a given blob
+ * the length of all Block IDs must be identical.
+ * @param sourceStream An {@link InputStream} object that represents the input stream to write to the
+ * block blob.
+ * @param length A long which represents the length, in bytes, of the stream data,
+ * or -1 if unknown.
+ * @param options A {@link BlobRequestOptions} object that specifies any additional options for the
+ * request. Specifying null will use the default request options from the
+ * associated service client ( CloudBlobClient).
+ * @param opContext An {@link OperationContext} object that represents the context for the current operation.
+ * This object is used to track requests to the storage service, and to provide
+ * additional runtime information about the operation.
+ * @throws IOException If an I/O error occurred.
+ * @throws StorageException If a storage service error occurred.
+ */
+ void uploadBlock(String blockId, InputStream sourceStream,
+ long length, BlobRequestOptions options,
+ OperationContext opContext) throws IOException, StorageException;
+
+ /**
+ *
+ * @param blockList An enumerable collection of {@link BlockEntry} objects that represents the list
+ * block items being committed. The size field is ignored.
+ * @param accessCondition An {@link AccessCondition} object that represents the access conditions for the blob.
+ * @param options A {@link BlobRequestOptions} object that specifies any additional options for the
+ * request. Specifying null will use the default request options from the associated
+ * service client ( CloudBlobClient).
+ * @param opContext An {@link OperationContext} object that represents the context for the current operation.
+ * This object is used to track requests to the storage service, and to provide additional
+ * runtime information about the operation.
+ * @throws IOException If an I/O error occurred.
+ * @throws StorageException If a storage service error occurred.
+ */
+ void commitBlockList(List<BlockEntry> blockList, AccessCondition accessCondition, BlobRequestOptions options,
+ OperationContext opContext) throws IOException, StorageException;
+
}
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/62d61662/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterfaceImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterfaceImpl.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterfaceImpl.java
index 382ff66..298f3aa 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterfaceImpl.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterfaceImpl.java
@@ -27,7 +27,7 @@ import java.util.ArrayList;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Iterator;
-
+import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience;
import com.microsoft.azure.storage.AccessCondition;
@@ -40,6 +40,8 @@ import com.microsoft.azure.storage.StorageUri;
import com.microsoft.azure.storage.blob.BlobListingDetails;
import com.microsoft.azure.storage.blob.BlobProperties;
import com.microsoft.azure.storage.blob.BlobRequestOptions;
+import com.microsoft.azure.storage.blob.BlockEntry;
+import com.microsoft.azure.storage.blob.BlockListingFilter;
import com.microsoft.azure.storage.blob.CloudBlob;
import com.microsoft.azure.storage.blob.CloudBlobClient;
import com.microsoft.azure.storage.blob.CloudBlobContainer;
@@ -362,7 +364,13 @@ class StorageInterfaceImpl extends StorageInterface {
@Override
public void uploadMetadata(OperationContext opContext)
throws StorageException {
- getBlob().uploadMetadata(null, null, opContext);
+ uploadMetadata(null, null, opContext);
+ }
+
+ @Override
+ public void uploadMetadata(AccessCondition accessConditions, BlobRequestOptions options,
+ OperationContext opContext) throws StorageException{
+ getBlob().uploadMetadata(accessConditions, options, opContext);
}
public void uploadProperties(OperationContext opContext, SelfRenewingLease lease)
@@ -396,7 +404,7 @@ class StorageInterfaceImpl extends StorageInterface {
public void startCopyFromBlob(CloudBlobWrapper sourceBlob, BlobRequestOptions options,
OperationContext opContext)
throws StorageException, URISyntaxException {
- getBlob().startCopyFromBlob(((CloudBlobWrapperImpl)sourceBlob).blob,
+ getBlob().startCopyFromBlob(((CloudBlobWrapperImpl) sourceBlob).blob,
null, null, options, opContext);
}
@@ -440,6 +448,25 @@ class StorageInterfaceImpl extends StorageInterface {
getBlob().uploadProperties(null, null, opContext);
}
+ @Override
+ public List<BlockEntry> downloadBlockList(BlockListingFilter filter, BlobRequestOptions options,
+ OperationContext opContext) throws IOException, StorageException {
+ return ((CloudBlockBlob) getBlob()).downloadBlockList(filter, null, options, opContext);
+
+ }
+
+ @Override
+ public void uploadBlock(String blockId, InputStream sourceStream,
+ long length, BlobRequestOptions options,
+ OperationContext opContext) throws IOException, StorageException {
+ ((CloudBlockBlob) getBlob()).uploadBlock(blockId, sourceStream, length, null, options, opContext);
+ }
+
+ @Override
+ public void commitBlockList(List<BlockEntry> blockList, AccessCondition accessCondition, BlobRequestOptions options,
+ OperationContext opContext) throws IOException, StorageException {
+ ((CloudBlockBlob) getBlob()).commitBlockList(blockList, accessCondition, options, opContext);
+ }
}
static class CloudPageBlobWrapperImpl extends CloudBlobWrapperImpl implements CloudPageBlobWrapper {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/62d61662/hadoop-tools/hadoop-azure/src/site/markdown/index.md
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/site/markdown/index.md b/hadoop-tools/hadoop-azure/src/site/markdown/index.md
index 9d0115a..4402467 100644
--- a/hadoop-tools/hadoop-azure/src/site/markdown/index.md
+++ b/hadoop-tools/hadoop-azure/src/site/markdown/index.md
@@ -23,6 +23,7 @@
* [Page Blob Support and Configuration](#Page_Blob_Support_and_Configuration)
* [Atomic Folder Rename](#Atomic_Folder_Rename)
* [Accessing wasb URLs](#Accessing_wasb_URLs)
+ * [Append API Support and Configuration](#Append_API_Support_and_Configuration)
* [Testing the hadoop-azure Module](#Testing_the_hadoop-azure_Module)
## <a name="Introduction" />Introduction
@@ -51,7 +52,6 @@ on the additional artifacts it requires, notably the
## <a name="Limitations" />Limitations
-* The append operation is not implemented.
* File owner and group are persisted, but the permissions model is not enforced.
Authorization occurs at the level of the entire Azure Blob Storage account.
* File last access time is not tracked.
@@ -199,6 +199,24 @@ It's also possible to configure `fs.defaultFS` to use a `wasb` or `wasbs` URL.
This causes all bare paths, such as `/testDir/testFile` to resolve automatically
to that file system.
+### <a name="Append_API_Support_and_Configuration" />Append API Support and Configuration
+
+The Azure Blob Storage interface for Hadoop has optional support for Append API for
+single writer by setting the configuration `fs.azure.enable.append.support` to true.
+
+For Example:
+
+ <property>
+ <name>fs.azure.enable.append.support</name>
+ <value>true</value>
+ </property>
+
+It must be noted Append support in Azure Blob Storage interface DIFFERS FROM HDFS SEMANTICS. Append
+support does not enforce single writer internally but requires applications to guarantee this semantic.
+It becomes a responsibility of the application either to ensure single-threaded handling for a particular
+file path, or rely on some external locking mechanism of its own. Failure to do so will result in
+unexpected behavior.
+
## <a name="Testing_the_hadoop-azure_Module" />Testing the hadoop-azure Module
The hadoop-azure module includes a full suite of unit tests. Most of the tests
http://git-wip-us.apache.org/repos/asf/hadoop/blob/62d61662/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/MockStorageInterface.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/MockStorageInterface.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/MockStorageInterface.java
index 9f84f4b..2bb2a9a 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/MockStorageInterface.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/MockStorageInterface.java
@@ -32,11 +32,12 @@ import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.TimeZone;
-
+import java.util.List;
import org.apache.commons.httpclient.URIException;
import org.apache.commons.httpclient.util.URIUtil;
import org.apache.commons.lang.NotImplementedException;
+import com.microsoft.azure.storage.AccessCondition;
import com.microsoft.azure.storage.CloudStorageAccount;
import com.microsoft.azure.storage.OperationContext;
import com.microsoft.azure.storage.RetryPolicyFactory;
@@ -46,6 +47,8 @@ import com.microsoft.azure.storage.StorageUri;
import com.microsoft.azure.storage.blob.BlobListingDetails;
import com.microsoft.azure.storage.blob.BlobProperties;
import com.microsoft.azure.storage.blob.BlobRequestOptions;
+import com.microsoft.azure.storage.blob.BlockEntry;
+import com.microsoft.azure.storage.blob.BlockListingFilter;
import com.microsoft.azure.storage.blob.CloudBlob;
import com.microsoft.azure.storage.blob.CloudBlobContainer;
import com.microsoft.azure.storage.blob.CloudBlobDirectory;
@@ -524,6 +527,30 @@ public class MockStorageInterface extends StorageInterface {
public CloudBlob getBlob() {
return null;
}
+
+ @Override
+ public List<BlockEntry> downloadBlockList(BlockListingFilter filter, BlobRequestOptions options,
+ OperationContext opContext) throws IOException, StorageException {
+
+ throw new UnsupportedOperationException("downloadBlockList not used in Mock Tests");
+ }
+ @Override
+ public void uploadBlock(String blockId, InputStream sourceStream,
+ long length, BlobRequestOptions options,
+ OperationContext opContext) throws IOException, StorageException {
+ throw new UnsupportedOperationException("uploadBlock not used in Mock Tests");
+ }
+
+ @Override
+ public void commitBlockList(List<BlockEntry> blockList, AccessCondition accessCondition,
+ BlobRequestOptions options, OperationContext opContext) throws IOException, StorageException {
+ throw new UnsupportedOperationException("commitBlockList not used in Mock Tests");
+ }
+
+ public void uploadMetadata(AccessCondition accessCondition, BlobRequestOptions options,
+ OperationContext opContext) throws StorageException {
+ throw new UnsupportedOperationException("uploadMetadata not used in Mock Tests");
+ }
}
class MockCloudPageBlobWrapper extends MockCloudBlobWrapper
@@ -580,5 +607,10 @@ public class MockStorageInterface extends StorageInterface {
public CloudBlob getBlob() {
return null;
}
+
+ public void uploadMetadata(AccessCondition accessCondition, BlobRequestOptions options,
+ OperationContext opContext) throws StorageException {
+ throw new UnsupportedOperationException("uploadMetadata not used in Mock Tests");
+ }
}
}
[6/6] hadoop git commit: HADOOP-12635. Adding Append API support for
WASB. Contributed by Dushyanth.
Posted by cn...@apache.org.
HADOOP-12635. Adding Append API support for WASB. Contributed by Dushyanth.
(cherry picked from commit 8bc93db2e7c64830b6a662f28c8917a9eef4e7c9)
(cherry picked from commit 62d616621134cc5e68f4d5fd32f49ea4d731417c)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/ffc0d988
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/ffc0d988
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/ffc0d988
Branch: refs/heads/branch-2.8
Commit: ffc0d988869fb964fb052e8d109821807ca75ee1
Parents: 23d729f
Author: cnauroth <cn...@apache.org>
Authored: Mon Jan 18 09:08:53 2016 -0800
Committer: cnauroth <cn...@apache.org>
Committed: Mon Jan 18 09:13:31 2016 -0800
----------------------------------------------------------------------
hadoop-common-project/hadoop-common/CHANGES.txt | 2 +
.../fs/azure/AzureNativeFileSystemStore.java | 23 +-
.../hadoop/fs/azure/BlockBlobAppendStream.java | 775 +++++++++++++++++++
.../hadoop/fs/azure/NativeAzureFileSystem.java | 216 +++---
.../fs/azure/NativeAzureFileSystemHelper.java | 107 +++
.../hadoop/fs/azure/NativeFileSystemStore.java | 2 +
.../hadoop/fs/azure/PageBlobOutputStream.java | 17 +-
.../hadoop/fs/azure/StorageInterface.java | 89 ++-
.../hadoop/fs/azure/StorageInterfaceImpl.java | 33 +-
.../hadoop-azure/src/site/markdown/index.md | 20 +-
.../hadoop/fs/azure/MockStorageInterface.java | 34 +-
.../azure/TestNativeAzureFileSystemAppend.java | 362 +++++++++
12 files changed, 1550 insertions(+), 130 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ffc0d988/hadoop-common-project/hadoop-common/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt
index 1eba762..3711b43 100644
--- a/hadoop-common-project/hadoop-common/CHANGES.txt
+++ b/hadoop-common-project/hadoop-common/CHANGES.txt
@@ -46,6 +46,8 @@ Release 2.8.0 - UNRELEASED
HADOOP-12691. Add CSRF Filter for REST APIs to Hadoop Common.
(Larry McCay via cnauroth)
+ HADOOP-12635. Adding Append API support for WASB. (Dushyanth via cnauroth)
+
IMPROVEMENTS
HADOOP-12458. Retries is typoed to spell Retires in parts of
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ffc0d988/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java
index a936cd6..0097912 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java
@@ -33,13 +33,11 @@ import java.net.URLEncoder;
import java.security.InvalidKeyException;
import java.util.ArrayList;
import java.util.Calendar;
-import java.util.Collections;
import java.util.Date;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
-import java.util.LinkedList;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
@@ -64,6 +62,7 @@ import org.apache.hadoop.io.IOUtils;
import org.mortbay.util.ajax.JSON;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
import com.google.common.annotations.VisibleForTesting;
import com.microsoft.azure.storage.CloudStorageAccount;
import com.microsoft.azure.storage.OperationContext;
@@ -2681,4 +2680,24 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
close();
super.finalize();
}
+
+ @Override
+ public DataOutputStream retrieveAppendStream(String key, int bufferSize) throws IOException {
+
+ try {
+
+ if (isPageBlobKey(key)) {
+ throw new UnsupportedOperationException("Append not supported for Page Blobs");
+ }
+
+ CloudBlobWrapper blob = this.container.getBlockBlobReference(key);
+
+ BlockBlobAppendStream appendStream = new BlockBlobAppendStream((CloudBlockBlobWrapper) blob, key, bufferSize, getInstrumentedContext());
+ appendStream.initialize();
+
+ return new DataOutputStream(appendStream);
+ } catch(Exception ex) {
+ throw new AzureException(ex);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ffc0d988/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/BlockBlobAppendStream.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/BlockBlobAppendStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/BlockBlobAppendStream.java
new file mode 100644
index 0000000..d1ec8df
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/BlockBlobAppendStream.java
@@ -0,0 +1,775 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azure;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.HashMap;
+import java.util.Locale;
+import java.util.List;
+import java.util.Random;
+import java.util.TimeZone;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hadoop.fs.FSExceptionMessages;
+import org.apache.commons.codec.binary.Base64;
+import org.apache.hadoop.fs.azure.StorageInterface.CloudBlockBlobWrapper;
+import org.mortbay.log.Log;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.microsoft.azure.storage.AccessCondition;
+import com.microsoft.azure.storage.OperationContext;
+import com.microsoft.azure.storage.StorageException;
+import com.microsoft.azure.storage.blob.BlobRequestOptions;
+import com.microsoft.azure.storage.blob.BlockEntry;
+import com.microsoft.azure.storage.blob.BlockListingFilter;
+
+/**
+ * Stream object that implememnts append for Block Blobs in WASB.
+ */
+public class BlockBlobAppendStream extends OutputStream {
+
+ private final String key;
+ private final int bufferSize;
+ private ByteArrayOutputStream outBuffer;
+ private final CloudBlockBlobWrapper blob;
+ private final OperationContext opContext;
+
+ /**
+ * Variable to track if the stream has been closed.
+ */
+ private boolean closed = false;
+
+ /**
+ * Variable to track if the append lease is released.
+ */
+
+ private volatile boolean leaseFreed;
+
+ /**
+ * Variable to track if the append stream has been
+ * initialized.
+ */
+
+ private boolean initialized = false;
+
+ /**
+ * Last IOException encountered
+ */
+ private volatile IOException lastError = null;
+
+ /**
+ * List to keep track of the uncommitted azure storage
+ * block ids
+ */
+ private final List<BlockEntry> uncommittedBlockEntries;
+
+ private static final int UNSET_BLOCKS_COUNT = -1;
+
+ /**
+ * Variable to hold the next block id to be used for azure
+ * storage blocks.
+ */
+ private long nextBlockCount = UNSET_BLOCKS_COUNT;
+
+ private final Random sequenceGenerator = new Random();
+
+ /**
+ * Time to wait to renew lease in milliseconds
+ */
+ private static final int LEASE_RENEWAL_PERIOD = 10000;
+
+ /**
+ * Number of times to retry for lease renewal
+ */
+ private static final int MAX_LEASE_RENEWAL_RETRY_COUNT = 3;
+
+ /**
+ * Time to wait before retrying to set the lease
+ */
+ private static final int LEASE_RENEWAL_RETRY_SLEEP_PERIOD = 500;
+
+ /**
+ * Metadata key used on the blob to indicate append lease is active
+ */
+ public static final String APPEND_LEASE = "append_lease";
+
+ /**
+ * Timeout value for the append lease in millisecs. If the lease is not
+ * renewed within 30 seconds then another thread can acquire the append lease
+ * on the blob
+ */
+ public static final int APPEND_LEASE_TIMEOUT = 30000;
+
+ /**
+ * Metdata key used on the blob to indicate last modified time of append lease
+ */
+ public static final String APPEND_LEASE_LAST_MODIFIED = "append_lease_last_modified";
+
+ /**
+ * Number of times block upload needs is retried.
+ */
+ private static final int MAX_BLOCK_UPLOAD_RETRIES = 3;
+
+ /**
+ * Wait time between block upload retries in millisecs.
+ */
+ private static final int BLOCK_UPLOAD_RETRY_INTERVAL = 1000;
+
+ private static final Logger LOG = LoggerFactory.getLogger(BlockBlobAppendStream.class);
+
+ private static final int MAX_BLOCK_COUNT = 100000;
+
+ private ThreadPoolExecutor ioThreadPool;
+
+ /**
+ * Atomic integer to provide thread id for thread names for uploader threads.
+ */
+ private final AtomicInteger threadSequenceNumber;
+
+ /**
+ * Prefix to be used for thread names for uploader threads.
+ */
+ private static final String THREAD_ID_PREFIX = "BlockBlobAppendStream";
+
+ private static final String UTC_STR = "UTC";
+
+ public BlockBlobAppendStream(final CloudBlockBlobWrapper blob,
+ final String aKey, final int bufferSize, final OperationContext opContext)
+ throws IOException {
+
+ if (null == aKey || 0 == aKey.length()) {
+ throw new IllegalArgumentException(
+ "Illegal argument: The key string is null or empty");
+ }
+
+ if (0 >= bufferSize) {
+ throw new IllegalArgumentException(
+ "Illegal argument bufferSize cannot be zero or negative");
+ }
+
+
+ this.blob = blob;
+ this.opContext = opContext;
+ this.key = aKey;
+ this.bufferSize = bufferSize;
+ this.threadSequenceNumber = new AtomicInteger(0);
+ setBlocksCount();
+
+ this.outBuffer = new ByteArrayOutputStream(bufferSize);
+ this.uncommittedBlockEntries = new ArrayList<BlockEntry>();
+
+ // Acquire append lease on the blob.
+ try {
+ //Set the append lease if the value of the append lease is false
+ if (!updateBlobAppendMetadata(true, false)) {
+ LOG.error("Unable to set Append Lease on the Blob : {} "
+ + "Possibly because another client already has a create or append stream open on the Blob", key);
+ throw new IOException("Unable to set Append lease on the Blob. "
+ + "Possibly because another client already had an append stream open on the Blob.");
+ }
+ } catch (StorageException ex) {
+ LOG.error("Encountered Storage exception while acquiring append "
+ + "lease on blob : {}. Storage Exception : {} ErrorCode : {}",
+ key, ex, ex.getErrorCode());
+
+ throw new IOException(ex);
+ }
+
+ leaseFreed = false;
+ }
+
+ /**
+ * Helper method that starts an Append Lease renewer thread and the
+ * thread pool.
+ */
+ public synchronized void initialize() {
+
+ if (initialized) {
+ return;
+ }
+ /*
+ * Start the thread for Append lease renewer.
+ */
+ Thread appendLeaseRenewer = new Thread(new AppendRenewer());
+ appendLeaseRenewer.setDaemon(true);
+ appendLeaseRenewer.setName(String.format("%s-AppendLeaseRenewer", key));
+ appendLeaseRenewer.start();
+
+ /*
+ * Parameters to ThreadPoolExecutor:
+ * corePoolSize : the number of threads to keep in the pool, even if they are idle,
+ * unless allowCoreThreadTimeOut is set
+ * maximumPoolSize : the maximum number of threads to allow in the pool
+ * keepAliveTime - when the number of threads is greater than the core,
+ * this is the maximum time that excess idle threads will
+ * wait for new tasks before terminating.
+ * unit - the time unit for the keepAliveTime argument
+ * workQueue - the queue to use for holding tasks before they are executed
+ * This queue will hold only the Runnable tasks submitted by the execute method.
+ */
+ this.ioThreadPool = new ThreadPoolExecutor(4, 4, 2, TimeUnit.SECONDS,
+ new LinkedBlockingQueue<Runnable>(), new UploaderThreadFactory());
+
+ initialized = true;
+ }
+
+ /**
+ * Get the blob name.
+ *
+ * @return String Blob name.
+ */
+ public String getKey() {
+ return key;
+ }
+
+ /**
+ * Get the backing blob.
+ * @return buffer size of the stream.
+ */
+ public int getBufferSize() {
+ return bufferSize;
+ }
+
+ /**
+ * Writes the specified byte to this output stream. The general contract for
+ * write is that one byte is written to the output stream. The byte to be
+ * written is the eight low-order bits of the argument b. The 24 high-order
+ * bits of b are ignored.
+ *
+ * @param byteVal
+ * the byteValue to write.
+ * @throws IOException
+ * if an I/O error occurs. In particular, an IOException may be
+ * thrown if the output stream has been closed.
+ */
+ @Override
+ public void write(final int byteVal) throws IOException {
+ write(new byte[] { (byte) (byteVal & 0xFF) });
+ }
+
+ /**
+ * Writes b.length bytes from the specified byte array to this output stream.
+ *
+ * @param data
+ * the byte array to write.
+ *
+ * @throws IOException
+ * if an I/O error occurs. In particular, an IOException may be
+ * thrown if the output stream has been closed.
+ */
+ @Override
+ public void write(final byte[] data) throws IOException {
+ write(data, 0, data.length);
+ }
+
+ /**
+ * Writes length bytes from the specified byte array starting at offset to
+ * this output stream.
+ *
+ * @param data
+ * the byte array to write.
+ * @param offset
+ * the start offset in the data.
+ * @param length
+ * the number of bytes to write.
+ * @throws IOException
+ * if an I/O error occurs. In particular, an IOException may be
+ * thrown if the output stream has been closed.
+ */
+ @Override
+ public void write(final byte[] data, final int offset, final int length)
+ throws IOException {
+
+ if (offset < 0 || length < 0 || length > data.length - offset) {
+ throw new IndexOutOfBoundsException("write API in append stream called with invalid arguments");
+ }
+
+ writeInternal(data, offset, length);
+ }
+
+ @Override
+ public synchronized void close() throws IOException {
+
+ if (!initialized) {
+ throw new IOException("Trying to close an uninitialized Append stream");
+ }
+
+ if (closed) {
+ return;
+ }
+
+ if (leaseFreed) {
+ throw new IOException(String.format("Attempting to close an append stream on blob : %s "
+ + " that does not have lease on the Blob. Failing close", key));
+ }
+
+ if (outBuffer.size() > 0) {
+ uploadBlockToStorage(outBuffer.toByteArray());
+ }
+
+ ioThreadPool.shutdown();
+
+ try {
+ if (!ioThreadPool.awaitTermination(10, TimeUnit.MINUTES)) {
+ LOG.error("Time out occured while waiting for IO request to finish in append"
+ + " for blob : {}", key);
+ NativeAzureFileSystemHelper.logAllLiveStackTraces();
+ throw new IOException("Timed out waiting for IO requests to finish");
+ }
+ } catch(InterruptedException intrEx) {
+
+ // Restore the interrupted status
+ Thread.currentThread().interrupt();
+ LOG.error("Upload block operation in append interrupted for blob {}. Failing close", key);
+ throw new IOException("Append Commit interrupted.");
+ }
+
+ // Calling commit after all blocks are succesfully uploaded.
+ if (lastError == null) {
+ commitAppendBlocks();
+ }
+
+ // Perform cleanup.
+ cleanup();
+
+ if (lastError != null) {
+ throw lastError;
+ }
+ }
+
+ /**
+ * Helper method that cleans up the append stream.
+ */
+ private synchronized void cleanup() {
+
+ closed = true;
+
+ try {
+ // Set the value of append lease to false if the value is set to true.
+ updateBlobAppendMetadata(false, true);
+ } catch(StorageException ex) {
+ LOG.debug("Append metadata update on the Blob : {} encountered Storage Exception : {} "
+ + "Error Code : {}",
+ key, ex, ex.getErrorCode());
+ lastError = new IOException(ex);
+ }
+
+ leaseFreed = true;
+ }
+
+ /**
+ * Method to commit all the uncommited blocks to azure storage.
+ * If the commit fails then blocks are automatically cleaned up
+ * by Azure storage.
+ * @throws IOException
+ */
+ private synchronized void commitAppendBlocks() throws IOException {
+
+ SelfRenewingLease lease = null;
+
+ try {
+ if (uncommittedBlockEntries.size() > 0) {
+
+ //Acquiring lease on the blob.
+ lease = new SelfRenewingLease(blob);
+
+ // Downloading existing blocks
+ List<BlockEntry> blockEntries = blob.downloadBlockList(BlockListingFilter.COMMITTED,
+ new BlobRequestOptions(), opContext);
+
+ // Adding uncommitted blocks.
+ blockEntries.addAll(uncommittedBlockEntries);
+
+ AccessCondition accessCondition = new AccessCondition();
+ accessCondition.setLeaseID(lease.getLeaseID());
+ blob.commitBlockList(blockEntries, accessCondition, new BlobRequestOptions(), opContext);
+ uncommittedBlockEntries.clear();
+ }
+ } catch(StorageException ex) {
+ LOG.error("Storage exception encountered during block commit phase of append for blob"
+ + " : {} Storage Exception : {} Error Code: {}", key, ex, ex.getErrorCode());
+ throw new IOException("Encountered Exception while committing append blocks", ex);
+ } finally {
+ if (lease != null) {
+ try {
+ lease.free();
+ } catch(StorageException ex) {
+ LOG.debug("Exception encountered while releasing lease for "
+ + "blob : {} StorageException : {} ErrorCode : {}", key, ex, ex.getErrorCode());
+ // Swallowing exception here as the lease is cleaned up by the SelfRenewingLease object.
+ }
+ }
+ }
+ }
+
+ /**
+ * Helper method used to generate the blockIDs. The algorithm used is similar to the Azure
+ * storage SDK.
+ */
+ private void setBlocksCount() throws IOException {
+ try {
+
+ if (nextBlockCount == UNSET_BLOCKS_COUNT) {
+
+ nextBlockCount = (long) (sequenceGenerator.nextInt(Integer.MAX_VALUE))
+ + sequenceGenerator.nextInt(Integer.MAX_VALUE - MAX_BLOCK_COUNT);
+
+ List<BlockEntry> blockEntries =
+ blob.downloadBlockList(BlockListingFilter.COMMITTED, new BlobRequestOptions(), opContext);
+
+ nextBlockCount += blockEntries.size();
+
+ }
+ } catch (StorageException ex) {
+ LOG.debug("Encountered storage exception during setting next Block Count."
+ + " StorageException : {} ErrorCode : {}", ex, ex.getErrorCode());
+ throw new IOException(ex);
+ }
+ }
+
+ /**
+ * Helper method that generates the next block id for uploading a block to azure storage.
+ * @return String representing the block ID generated.
+ * @throws IOException
+ */
+ private String generateBlockId() throws IOException {
+
+ if (nextBlockCount == UNSET_BLOCKS_COUNT) {
+ throw new IOException("Append Stream in invalid state. nextBlockCount not set correctly");
+ }
+
+ byte[] blockIdInBytes = getBytesFromLong(nextBlockCount);
+ return new String(Base64.encodeBase64(blockIdInBytes), StandardCharsets.UTF_8);
+ }
+
+ /**
+ * Returns a byte array that represents the data of a <code>long</code> value. This
+ * utility method is copied from com.microsoft.azure.storage.core.Utility class.
+ * This class is marked as internal, hence we clone the method here and not express
+ * dependency on the Utility Class
+ *
+ * @param value
+ * The value from which the byte array will be returned.
+ *
+ * @return A byte array that represents the data of the specified <code>long</code> value.
+ */
+ private static byte[] getBytesFromLong(final long value) {
+ final byte[] tempArray = new byte[8];
+
+ for (int m = 0; m < 8; m++) {
+ tempArray[7 - m] = (byte) ((value >> (8 * m)) & 0xFF);
+ }
+
+ return tempArray;
+ }
+ /**
+ * Helper method that creates a thread to upload a block to azure storage.
+ * @param payload
+ * @throws IOException
+ */
+ private synchronized void uploadBlockToStorage(byte[] payload) throws IOException {
+
+ // upload payload to azure storage
+ nextBlockCount++;
+ String blockId = generateBlockId();
+ // Since uploads of the Azure storage are done in parallel threads, we go ahead
+ // add the blockId in the uncommitted list. If the upload of the block fails
+ // we don't commit the blockIds.
+ uncommittedBlockEntries.add(new BlockEntry(blockId));
+ ioThreadPool.execute(new WriteRequest(payload, blockId));
+ }
+
+
+ /**
+ * Helper method to updated the Blob metadata during Append lease operations.
+ * Blob metadata is updated to holdLease value only if the current lease
+ * status is equal to testCondition and the last update on the blob metadata
+ * is less that 30 secs old.
+ * @param holdLease
+ * @param testCondition
+ * @return true if the updated lease operation was successful or false otherwise
+ * @throws StorageException
+ */
+ private boolean updateBlobAppendMetadata(boolean holdLease, boolean testCondition)
+ throws StorageException {
+
+ SelfRenewingLease lease = null;
+ StorageException lastStorageException = null;
+ int leaseRenewalRetryCount = 0;
+
+ /*
+ * Updating the Blob metadata honours following algorithm based on
+ * 1) If the append lease metadata is present
+ * 2) Last updated time of the append lease
+ * 3) Previous value of the Append lease metadata.
+ *
+ * The algorithm:
+ * 1) If append lease metadata is not part of the Blob. In this case
+ * this is the first client to Append so we update the metadata.
+ * 2) If append lease metadata is present and timeout has occurred.
+ * In this case irrespective of what the value of the append lease is we update the metadata.
+ * 3) If append lease metadata is present and is equal to testCondition value (passed as parameter)
+ * and timeout has not occurred, we update the metadata.
+ * 4) If append lease metadata is present and is not equal to testCondition value (passed as parameter)
+ * and timeout has not occurred, we do not update metadata and return false.
+ *
+ */
+ while (leaseRenewalRetryCount < MAX_LEASE_RENEWAL_RETRY_COUNT) {
+
+ lastStorageException = null;
+
+ synchronized(this) {
+ try {
+
+ final Calendar currentCalendar = Calendar
+ .getInstance(Locale.US);
+ currentCalendar.setTimeZone(TimeZone.getTimeZone(UTC_STR));
+ long currentTime = currentCalendar.getTime().getTime();
+
+ // Acquire lease on the blob.
+ lease = new SelfRenewingLease(blob);
+
+ blob.downloadAttributes(opContext);
+ HashMap<String, String> metadata = blob.getMetadata();
+
+ if (metadata.containsKey(APPEND_LEASE)
+ && currentTime - Long.parseLong(
+ metadata.get(APPEND_LEASE_LAST_MODIFIED)) <= BlockBlobAppendStream.APPEND_LEASE_TIMEOUT
+ && !metadata.get(APPEND_LEASE).equals(Boolean.toString(testCondition))) {
+ return false;
+ }
+
+ metadata.put(APPEND_LEASE, Boolean.toString(holdLease));
+ metadata.put(APPEND_LEASE_LAST_MODIFIED, Long.toString(currentTime));
+ blob.setMetadata(metadata);
+ AccessCondition accessCondition = new AccessCondition();
+ accessCondition.setLeaseID(lease.getLeaseID());
+ blob.uploadMetadata(accessCondition, null, opContext);
+ return true;
+
+ } catch (StorageException ex) {
+
+ lastStorageException = ex;
+ LOG.debug("Lease renewal for Blob : {} encountered Storage Exception : {} "
+ + "Error Code : {}",
+ key, ex, ex.getErrorCode());
+ leaseRenewalRetryCount++;
+
+ } finally {
+
+ if (lease != null) {
+ try {
+ lease.free();
+ } catch(StorageException ex) {
+ LOG.debug("Encountered Storage exception while releasing lease for Blob {} "
+ + "during Append metadata operation. Storage Exception {} "
+ + "Error Code : {} ", key, ex, ex.getErrorCode());
+ } finally {
+ lease = null;
+ }
+ }
+ }
+ }
+
+ if (leaseRenewalRetryCount == MAX_LEASE_RENEWAL_RETRY_COUNT) {
+ throw lastStorageException;
+ } else {
+ try {
+ Thread.sleep(LEASE_RENEWAL_RETRY_SLEEP_PERIOD);
+ } catch(InterruptedException ex) {
+ LOG.debug("Blob append metadata updated method interrupted");
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+
+ // The code should not enter here because the while loop will
+ // always be executed and if the while loop is executed we
+ // would returning from the while loop.
+ return false;
+ }
+
+ /**
+ * This is the only method that should be writing to outBuffer to maintain consistency of the outBuffer.
+ * @param data
+ * @param offset
+ * @param length
+ * @throws IOException
+ */
+ private synchronized void writeInternal(final byte[] data, final int offset, final int length)
+ throws IOException {
+
+ if (!initialized) {
+ throw new IOException("Trying to write to an un-initialized Append stream");
+ }
+
+ if (closed) {
+ throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
+ }
+
+ if (leaseFreed) {
+ throw new IOException(String.format("Write called on a append stream not holding lease. Failing Write"));
+ }
+
+ byte[] currentData = new byte[length];
+ System.arraycopy(data, offset, currentData, 0, length);
+
+ // check to see if the data to be appended exceeds the
+ // buffer size. If so we upload a block to azure storage.
+ while ((outBuffer.size() + currentData.length) > bufferSize) {
+
+ byte[] payload = new byte[bufferSize];
+
+ // Add data from the existing buffer
+ System.arraycopy(outBuffer.toByteArray(), 0, payload, 0, outBuffer.size());
+
+ // Updating the available size in the payload
+ int availableSpaceInPayload = bufferSize - outBuffer.size();
+
+ // Adding data from the current call
+ System.arraycopy(currentData, 0, payload, outBuffer.size(), availableSpaceInPayload);
+
+ uploadBlockToStorage(payload);
+
+ // updating the currentData buffer
+ byte[] tempBuffer = new byte[currentData.length - availableSpaceInPayload];
+ System.arraycopy(currentData, availableSpaceInPayload,
+ tempBuffer, 0, currentData.length - availableSpaceInPayload);
+ currentData = tempBuffer;
+ outBuffer = new ByteArrayOutputStream(bufferSize);
+ }
+
+ outBuffer.write(currentData);
+ }
+
+ /**
+ * Runnable instance that uploads the block of data to azure storage.
+ *
+ *
+ */
+ private class WriteRequest implements Runnable {
+ private final byte[] dataPayload;
+ private final String blockId;
+
+ public WriteRequest(byte[] dataPayload, String blockId) {
+ this.dataPayload = dataPayload;
+ this.blockId = blockId;
+ }
+
+ @Override
+ public void run() {
+
+ int uploadRetryAttempts = 0;
+ IOException lastLocalException = null;
+ while (uploadRetryAttempts < MAX_BLOCK_UPLOAD_RETRIES) {
+ try {
+
+ blob.uploadBlock(blockId, new ByteArrayInputStream(dataPayload),
+ dataPayload.length, new BlobRequestOptions(), opContext);
+ break;
+ } catch(Exception ioe) {
+ Log.debug("Encountered exception during uploading block for Blob : {} Exception : {}", key, ioe);
+ uploadRetryAttempts++;
+ lastLocalException = new IOException("Encountered Exception while uploading block", ioe);
+ try {
+ Thread.sleep(BLOCK_UPLOAD_RETRY_INTERVAL);
+ } catch(InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ break;
+ }
+ }
+ }
+
+ if (uploadRetryAttempts == MAX_BLOCK_UPLOAD_RETRIES) {
+ lastError = lastLocalException;
+ }
+ }
+ }
+
+ /**
+ * A ThreadFactory that creates uploader thread with
+ * meaningful names helpful for debugging purposes.
+ */
+ class UploaderThreadFactory implements ThreadFactory {
+
+ @Override
+ public Thread newThread(Runnable r) {
+ Thread t = new Thread(r);
+ t.setName(String.format("%s-%s-%d", THREAD_ID_PREFIX, key,
+ threadSequenceNumber.getAndIncrement()));
+ return t;
+ }
+ }
+
+ /**
+ * A deamon thread that renews the Append lease on the blob.
+ * The thread sleeps for LEASE_RENEWAL_PERIOD time before renewing
+ * the lease. If an error is encountered while renewing the lease
+ * then an lease is released by this thread, which fails all other
+ * operations.
+ */
+ private class AppendRenewer implements Runnable {
+
+ @Override
+ public void run() {
+
+ while (!leaseFreed) {
+
+ try {
+ Thread.sleep(LEASE_RENEWAL_PERIOD);
+ } catch (InterruptedException ie) {
+ LOG.debug("Appender Renewer thread interrupted");
+ Thread.currentThread().interrupt();
+ }
+
+ Log.debug("Attempting to renew append lease on {}", key);
+
+ try {
+ if (!leaseFreed) {
+ // Update the blob metadata to renew the append lease
+ if (!updateBlobAppendMetadata(true, true)) {
+ LOG.error("Unable to re-acquire append lease on the Blob {} ", key);
+ leaseFreed = true;
+ }
+ }
+ } catch (StorageException ex) {
+
+ LOG.debug("Lease renewal for Blob : {} encountered "
+ + "Storage Exception : {} Error Code : {}", key, ex, ex.getErrorCode());
+
+ // We swallow the exception here because if the blob metadata is not updated for
+ // APPEND_LEASE_TIMEOUT period, another thread would be able to detect this and
+ // continue forward if it needs to append.
+ leaseFreed = true;
+ }
+ }
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ffc0d988/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java
index d2ff705..ed65184 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.fs.azure;
import java.io.DataInputStream;
+import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.FileNotFoundException;
import java.io.IOException;
@@ -31,7 +32,6 @@ import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.EnumSet;
-import java.util.Iterator;
import java.util.Set;
import java.util.TimeZone;
import java.util.TreeSet;
@@ -41,7 +41,6 @@ import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.commons.lang.StringUtils;
-import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
@@ -60,8 +59,6 @@ import org.apache.hadoop.fs.azure.metrics.AzureFileSystemMetricsSystem;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.permission.PermissionStatus;
import org.apache.hadoop.fs.azure.AzureException;
-import org.apache.hadoop.fs.azure.StorageInterface.CloudBlobWrapper;
-import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Progressable;
import org.slf4j.Logger;
@@ -73,12 +70,8 @@ import org.codehaus.jackson.map.JsonMappingException;
import org.codehaus.jackson.map.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
-import com.microsoft.azure.storage.AccessCondition;
-import com.microsoft.azure.storage.OperationContext;
-import com.microsoft.azure.storage.StorageErrorCode;
import com.microsoft.azure.storage.StorageException;
-import com.microsoft.azure.storage.blob.CloudBlob;
-import com.microsoft.azure.storage.StorageErrorCodeStrings;
+
import org.apache.hadoop.io.IOUtils;
@@ -288,7 +281,7 @@ public class NativeAzureFileSystem extends FileSystem {
throw new IOException("Unable to write RenamePending file for folder rename from "
+ srcKey + " to " + dstKey, e);
} finally {
- NativeAzureFileSystem.cleanup(LOG, output);
+ NativeAzureFileSystemHelper.cleanup(LOG, output);
}
}
@@ -663,6 +656,11 @@ public class NativeAzureFileSystem extends FileSystem {
public static final String SKIP_AZURE_METRICS_PROPERTY_NAME = "fs.azure.skip.metrics";
+ /*
+ * Property to enable Append API.
+ */
+ public static final String APPEND_SUPPORT_ENABLE_PROPERTY_NAME = "fs.azure.enable.append.support";
+
private class NativeAzureFsInputStream extends FSInputStream {
private InputStream in;
private final String key;
@@ -728,7 +726,7 @@ public class NativeAzureFileSystem extends FileSystem {
return result;
} catch(IOException e) {
- Throwable innerException = NativeAzureFileSystem.checkForAzureStorageException(e);
+ Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(e);
if (innerException instanceof StorageException) {
@@ -736,7 +734,7 @@ public class NativeAzureFileSystem extends FileSystem {
+ " Exception details: {} Error Code : {}",
key, e, ((StorageException) innerException).getErrorCode());
- if (NativeAzureFileSystem.isFileNotFoundException((StorageException) innerException)) {
+ if (NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
throw new FileNotFoundException(String.format("%s is not found", key));
}
}
@@ -782,7 +780,7 @@ public class NativeAzureFileSystem extends FileSystem {
return result;
} catch(IOException e) {
- Throwable innerException = NativeAzureFileSystem.checkForAzureStorageException(e);
+ Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(e);
if (innerException instanceof StorageException) {
@@ -790,7 +788,7 @@ public class NativeAzureFileSystem extends FileSystem {
+ " Exception details: {} Error Code : {}",
key, e, ((StorageException) innerException).getErrorCode());
- if (NativeAzureFileSystem.isFileNotFoundException((StorageException) innerException)) {
+ if (NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
throw new FileNotFoundException(String.format("%s is not found", key));
}
}
@@ -822,10 +820,10 @@ public class NativeAzureFileSystem extends FileSystem {
this.pos);
} catch(IOException e) {
- Throwable innerException = NativeAzureFileSystem.checkForAzureStorageException(e);
+ Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(e);
if (innerException instanceof StorageException
- && NativeAzureFileSystem.isFileNotFoundException((StorageException) innerException)) {
+ && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
throw new FileNotFoundException(String.format("%s is not found", key));
}
@@ -1041,7 +1039,7 @@ public class NativeAzureFileSystem extends FileSystem {
private static boolean suppressRetryPolicy = false;
// A counter to create unique (within-process) names for my metrics sources.
private static AtomicInteger metricsSourceNameCounter = new AtomicInteger();
-
+ private boolean appendSupportEnabled = false;
public NativeAzureFileSystem() {
// set store in initialize()
@@ -1164,7 +1162,7 @@ public class NativeAzureFileSystem extends FileSystem {
this.blockSize = conf.getLong(AZURE_BLOCK_SIZE_PROPERTY_NAME,
MAX_AZURE_BLOCK_SIZE);
-
+ this.appendSupportEnabled = conf.getBoolean(APPEND_SUPPORT_ENABLE_PROPERTY_NAME, false);
LOG.debug("NativeAzureFileSystem. Initializing.");
LOG.debug(" blockSize = {}",
conf.getLong(AZURE_BLOCK_SIZE_PROPERTY_NAME, MAX_AZURE_BLOCK_SIZE));
@@ -1294,7 +1292,61 @@ public class NativeAzureFileSystem extends FileSystem {
@Override
public FSDataOutputStream append(Path f, int bufferSize, Progressable progress)
throws IOException {
- throw new IOException("Not supported");
+
+ if (!appendSupportEnabled) {
+ throw new UnsupportedOperationException("Append Support not enabled");
+ }
+
+ LOG.debug("Opening file: {} for append", f);
+
+ Path absolutePath = makeAbsolute(f);
+ String key = pathToKey(absolutePath);
+ FileMetadata meta = null;
+ try {
+ meta = store.retrieveMetadata(key);
+ } catch(Exception ex) {
+
+ Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex);
+
+ if (innerException instanceof StorageException
+ && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
+
+ throw new FileNotFoundException(String.format("%s is not found", key));
+ } else {
+ throw ex;
+ }
+ }
+
+ if (meta == null) {
+ throw new FileNotFoundException(f.toString());
+ }
+
+ if (meta.isDir()) {
+ throw new FileNotFoundException(f.toString()
+ + " is a directory not a file.");
+ }
+
+ if (store.isPageBlobKey(key)) {
+ throw new IOException("Append not supported for Page Blobs");
+ }
+
+ DataOutputStream appendStream = null;
+
+ try {
+ appendStream = store.retrieveAppendStream(key, bufferSize);
+ } catch (Exception ex) {
+
+ Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex);
+
+ if (innerException instanceof StorageException
+ && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
+ throw new FileNotFoundException(String.format("%s is not found", key));
+ } else {
+ throw ex;
+ }
+ }
+
+ return new FSDataOutputStream(appendStream, statistics);
}
@Override
@@ -1379,7 +1431,7 @@ public class NativeAzureFileSystem extends FileSystem {
lease.free();
}
} catch (Exception e) {
- NativeAzureFileSystem.cleanup(LOG, out);
+ NativeAzureFileSystemHelper.cleanup(LOG, out);
String msg = "Unable to free lease on " + parent.toUri();
LOG.error(msg);
throw new IOException(msg, e);
@@ -1577,10 +1629,10 @@ public class NativeAzureFileSystem extends FileSystem {
metaFile = store.retrieveMetadata(key);
} catch (IOException e) {
- Throwable innerException = NativeAzureFileSystem.checkForAzureStorageException(e);
+ Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(e);
if (innerException instanceof StorageException
- && NativeAzureFileSystem.isFileNotFoundException((StorageException) innerException)) {
+ && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
return false;
}
@@ -1611,7 +1663,7 @@ public class NativeAzureFileSystem extends FileSystem {
parentMetadata = store.retrieveMetadata(parentKey);
} catch (IOException e) {
- Throwable innerException = NativeAzureFileSystem.checkForAzureStorageException(e);
+ Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(e);
if (innerException instanceof StorageException) {
// Invalid State.
@@ -1619,7 +1671,7 @@ public class NativeAzureFileSystem extends FileSystem {
// if the file not present. But not retrieving metadata here is an
// unrecoverable state and can only happen if there is a race condition
// hence throwing a IOException
- if (NativeAzureFileSystem.isFileNotFoundException((StorageException) innerException)) {
+ if (NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
throw new IOException("File " + f + " has a parent directory "
+ parentPath + " whose metadata cannot be retrieved. Can't resolve");
}
@@ -1662,10 +1714,10 @@ public class NativeAzureFileSystem extends FileSystem {
instrumentation.fileDeleted();
} catch(IOException e) {
- Throwable innerException = NativeAzureFileSystem.checkForAzureStorageException(e);
+ Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(e);
if (innerException instanceof StorageException
- && NativeAzureFileSystem.isFileNotFoundException((StorageException) innerException)) {
+ && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
return false;
}
@@ -1684,7 +1736,7 @@ public class NativeAzureFileSystem extends FileSystem {
parentMetadata = store.retrieveMetadata(parentKey);
} catch (IOException e) {
- Throwable innerException = NativeAzureFileSystem.checkForAzureStorageException(e);
+ Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(e);
if (innerException instanceof StorageException) {
// Invalid State.
@@ -1692,7 +1744,7 @@ public class NativeAzureFileSystem extends FileSystem {
// if the file not present. But not retrieving metadata here is an
// unrecoverable state and can only happen if there is a race condition
// hence throwing a IOException
- if (NativeAzureFileSystem.isFileNotFoundException((StorageException) innerException)) {
+ if (NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
throw new IOException("File " + f + " has a parent directory "
+ parentPath + " whose metadata cannot be retrieved. Can't resolve");
}
@@ -1728,10 +1780,10 @@ public class NativeAzureFileSystem extends FileSystem {
priorLastKey);
} catch(IOException e) {
- Throwable innerException = NativeAzureFileSystem.checkForAzureStorageException(e);
+ Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(e);
if (innerException instanceof StorageException
- && NativeAzureFileSystem.isFileNotFoundException((StorageException) innerException)) {
+ && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
return false;
}
@@ -1763,10 +1815,10 @@ public class NativeAzureFileSystem extends FileSystem {
instrumentation.fileDeleted();
} catch(IOException e) {
- Throwable innerException = NativeAzureFileSystem.checkForAzureStorageException(e);
+ Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(e);
if (innerException instanceof StorageException
- && NativeAzureFileSystem.isFileNotFoundException((StorageException) innerException)) {
+ && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
return false;
}
@@ -1785,10 +1837,10 @@ public class NativeAzureFileSystem extends FileSystem {
store.delete(key);
} catch(IOException e) {
- Throwable innerException = NativeAzureFileSystem.checkForAzureStorageException(e);
+ Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(e);
if (innerException instanceof StorageException
- && NativeAzureFileSystem.isFileNotFoundException((StorageException) innerException)) {
+ && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
return false;
}
@@ -1829,10 +1881,10 @@ public class NativeAzureFileSystem extends FileSystem {
meta = store.retrieveMetadata(key);
} catch(Exception ex) {
- Throwable innerException = NativeAzureFileSystem.checkForAzureStorageException(ex);
+ Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex);
if (innerException instanceof StorageException
- && NativeAzureFileSystem.isFileNotFoundException((StorageException) innerException)) {
+ && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
throw new FileNotFoundException(String.format("%s is not found", key));
}
@@ -1922,10 +1974,10 @@ public class NativeAzureFileSystem extends FileSystem {
meta = store.retrieveMetadata(key);
} catch (IOException ex) {
- Throwable innerException = NativeAzureFileSystem.checkForAzureStorageException(ex);
+ Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex);
if (innerException instanceof StorageException
- && NativeAzureFileSystem.isFileNotFoundException((StorageException) innerException)) {
+ && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
throw new FileNotFoundException(String.format("%s is not found", f));
}
@@ -1948,10 +2000,10 @@ public class NativeAzureFileSystem extends FileSystem {
listing = store.list(key, AZURE_LIST_ALL, 1, partialKey);
} catch (IOException ex) {
- Throwable innerException = NativeAzureFileSystem.checkForAzureStorageException(ex);
+ Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex);
if (innerException instanceof StorageException
- && NativeAzureFileSystem.isFileNotFoundException((StorageException) innerException)) {
+ && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
throw new FileNotFoundException(String.format("%s is not found", key));
}
@@ -1972,10 +2024,10 @@ public class NativeAzureFileSystem extends FileSystem {
try {
listing = store.list(key, AZURE_LIST_ALL, 1, partialKey);
} catch (IOException ex) {
- Throwable innerException = NativeAzureFileSystem.checkForAzureStorageException(ex);
+ Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex);
if (innerException instanceof StorageException
- && NativeAzureFileSystem.isFileNotFoundException((StorageException) innerException)) {
+ && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
throw new FileNotFoundException(String.format("%s is not found", key));
}
@@ -2196,10 +2248,10 @@ public class NativeAzureFileSystem extends FileSystem {
meta = store.retrieveMetadata(key);
} catch(Exception ex) {
- Throwable innerException = NativeAzureFileSystem.checkForAzureStorageException(ex);
+ Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex);
if (innerException instanceof StorageException
- && NativeAzureFileSystem.isFileNotFoundException((StorageException) innerException)) {
+ && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
throw new FileNotFoundException(String.format("%s is not found", key));
}
@@ -2219,10 +2271,10 @@ public class NativeAzureFileSystem extends FileSystem {
try {
inputStream = store.retrieve(key);
} catch(Exception ex) {
- Throwable innerException = NativeAzureFileSystem.checkForAzureStorageException(ex);
+ Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex);
if (innerException instanceof StorageException
- && NativeAzureFileSystem.isFileNotFoundException((StorageException) innerException)) {
+ && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
throw new FileNotFoundException(String.format("%s is not found", key));
}
@@ -2261,14 +2313,14 @@ public class NativeAzureFileSystem extends FileSystem {
dstMetadata = store.retrieveMetadata(dstKey);
} catch (IOException ex) {
- Throwable innerException = NativeAzureFileSystem.checkForAzureStorageException(ex);
+ Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex);
// A BlobNotFound storage exception in only thrown from retrieveMetdata API when
// there is a race condition. If there is another thread which deletes the destination
// file or folder, then this thread calling rename should be able to continue with
// rename gracefully. Hence the StorageException is swallowed here.
if (innerException instanceof StorageException) {
- if (NativeAzureFileSystem.isFileNotFoundException((StorageException) innerException)) {
+ if (NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
LOG.debug("BlobNotFound exception encountered for Destination key : {}. "
+ "Swallowin the exception to handle race condition gracefully", dstKey);
}
@@ -2294,10 +2346,10 @@ public class NativeAzureFileSystem extends FileSystem {
parentOfDestMetadata = store.retrieveMetadata(pathToKey(absoluteDst.getParent()));
} catch (IOException ex) {
- Throwable innerException = NativeAzureFileSystem.checkForAzureStorageException(ex);
+ Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex);
if (innerException instanceof StorageException
- && NativeAzureFileSystem.isFileNotFoundException((StorageException) innerException)) {
+ && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
LOG.debug("Parent of destination {} doesn't exists. Failing rename", dst);
return false;
@@ -2320,10 +2372,10 @@ public class NativeAzureFileSystem extends FileSystem {
try {
srcMetadata = store.retrieveMetadata(srcKey);
} catch (IOException ex) {
- Throwable innerException = NativeAzureFileSystem.checkForAzureStorageException(ex);
+ Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex);
if (innerException instanceof StorageException
- && NativeAzureFileSystem.isFileNotFoundException((StorageException) innerException)) {
+ && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
LOG.debug("Source {} doesn't exists. Failing rename", src);
return false;
@@ -2342,10 +2394,10 @@ public class NativeAzureFileSystem extends FileSystem {
store.rename(srcKey, dstKey);
} catch(IOException ex) {
- Throwable innerException = NativeAzureFileSystem.checkForAzureStorageException(ex);
+ Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex);
if (innerException instanceof StorageException
- && NativeAzureFileSystem.isFileNotFoundException((StorageException) innerException)) {
+ && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
LOG.debug("BlobNotFoundException encountered. Failing rename", src);
return false;
@@ -2552,10 +2604,10 @@ public class NativeAzureFileSystem extends FileSystem {
try {
metadata = store.retrieveMetadata(key);
} catch (IOException ex) {
- Throwable innerException = NativeAzureFileSystem.checkForAzureStorageException(ex);
+ Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex);
if (innerException instanceof StorageException
- && NativeAzureFileSystem.isFileNotFoundException((StorageException) innerException)) {
+ && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
throw new FileNotFoundException(String.format("File %s doesn't exists.", p));
}
@@ -2591,10 +2643,10 @@ public class NativeAzureFileSystem extends FileSystem {
try {
metadata = store.retrieveMetadata(key);
} catch (IOException ex) {
- Throwable innerException = NativeAzureFileSystem.checkForAzureStorageException(ex);
+ Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex);
if (innerException instanceof StorageException
- && NativeAzureFileSystem.isFileNotFoundException((StorageException) innerException)) {
+ && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
throw new FileNotFoundException(String.format("File %s doesn't exists.", p));
}
@@ -2817,52 +2869,4 @@ public class NativeAzureFileSystem extends FileSystem {
// Return to the caller with the randomized key.
return randomizedKey;
}
-
- private static void cleanup(Logger log, java.io.Closeable closeable) {
- if (closeable != null) {
- try {
- closeable.close();
- } catch(IOException e) {
- if (log != null) {
- log.debug("Exception in closing {}", closeable, e);
- }
- }
- }
- }
-
- /*
- * Helper method to recursively check if the cause of the exception is
- * a Azure storage exception.
- */
- private static Throwable checkForAzureStorageException(Exception e) {
-
- Throwable innerException = e.getCause();
-
- while (innerException != null
- && !(innerException instanceof StorageException)) {
- innerException = innerException.getCause();
- }
-
- return innerException;
- }
-
- /*
- * Helper method to check if the AzureStorageException is
- * because backing blob was not found.
- */
- private static boolean isFileNotFoundException(StorageException e) {
-
- String errorCode = ((StorageException) e).getErrorCode();
- if (errorCode != null
- && (errorCode.equals(StorageErrorCodeStrings.BLOB_NOT_FOUND)
- || errorCode.equals(StorageErrorCodeStrings.RESOURCE_NOT_FOUND)
- || errorCode.equals(StorageErrorCode.BLOB_NOT_FOUND.toString())
- || errorCode.equals(StorageErrorCode.RESOURCE_NOT_FOUND.toString()))) {
-
- return true;
- }
-
- return false;
- }
-
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ffc0d988/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystemHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystemHelper.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystemHelper.java
new file mode 100644
index 0000000..40efdc6
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystemHelper.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azure;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.microsoft.azure.storage.StorageErrorCode;
+import com.microsoft.azure.storage.StorageErrorCodeStrings;
+import com.microsoft.azure.storage.StorageException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+/**
+ * Utility class that has helper methods.
+ *
+ */
+
+@InterfaceAudience.Private
+final class NativeAzureFileSystemHelper {
+
+ private NativeAzureFileSystemHelper() {
+ // Hiding the cosnstructor as this is a utility class.
+ }
+
+ private static final Logger LOG = LoggerFactory.getLogger(NativeAzureFileSystemHelper.class);
+
+ public static void cleanup(Logger log, java.io.Closeable closeable) {
+ if (closeable != null) {
+ try {
+ closeable.close();
+ } catch(IOException e) {
+ if (log != null) {
+ log.debug("Exception in closing {}", closeable, e);
+ }
+ }
+ }
+ }
+
+ /*
+ * Helper method to recursively check if the cause of the exception is
+ * a Azure storage exception.
+ */
+ public static Throwable checkForAzureStorageException(Exception e) {
+
+ Throwable innerException = e.getCause();
+
+ while (innerException != null
+ && !(innerException instanceof StorageException)) {
+
+ innerException = innerException.getCause();
+ }
+
+ return innerException;
+ }
+
+ /*
+ * Helper method to check if the AzureStorageException is
+ * because backing blob was not found.
+ */
+ public static boolean isFileNotFoundException(StorageException e) {
+
+ String errorCode = e.getErrorCode();
+ if (errorCode != null
+ && (errorCode.equals(StorageErrorCodeStrings.BLOB_NOT_FOUND)
+ || errorCode.equals(StorageErrorCodeStrings.RESOURCE_NOT_FOUND)
+ || errorCode.equals(StorageErrorCode.BLOB_NOT_FOUND.toString())
+ || errorCode.equals(StorageErrorCode.RESOURCE_NOT_FOUND.toString()))) {
+
+ return true;
+ }
+
+ return false;
+ }
+
+ /*
+ * Helper method that logs stack traces from all live threads.
+ */
+ public static void logAllLiveStackTraces() {
+
+ for (Map.Entry<Thread, StackTraceElement[]> entry : Thread.getAllStackTraces().entrySet()) {
+ LOG.debug("Thread " + entry.getKey().getName());
+ StackTraceElement[] trace = entry.getValue();
+ for (int j = 0; j < trace.length; j++) {
+ LOG.debug("\tat " + trace[j]);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ffc0d988/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeFileSystemStore.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeFileSystemStore.java
index 0229cb7..f052b7f 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeFileSystemStore.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeFileSystemStore.java
@@ -107,4 +107,6 @@ interface NativeFileSystemStore {
void delete(String key, SelfRenewingLease lease) throws IOException;
SelfRenewingLease acquireLease(String key) throws AzureException;
+
+ DataOutputStream retrieveAppendStream(String key, int bufferSize) throws IOException;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ffc0d988/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/PageBlobOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/PageBlobOutputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/PageBlobOutputStream.java
index 8689375..b2b34f8 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/PageBlobOutputStream.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/PageBlobOutputStream.java
@@ -29,8 +29,6 @@ import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.Arrays;
-import java.util.Iterator;
-import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
@@ -216,7 +214,7 @@ final class PageBlobOutputStream extends OutputStream implements Syncable {
LOG.debug(ioThreadPool.toString());
if (!ioThreadPool.awaitTermination(10, TimeUnit.MINUTES)) {
LOG.debug("Timed out after 10 minutes waiting for IO requests to finish");
- logAllStackTraces();
+ NativeAzureFileSystemHelper.logAllLiveStackTraces();
LOG.debug(ioThreadPool.toString());
throw new IOException("Timed out waiting for IO requests to finish");
}
@@ -230,18 +228,7 @@ final class PageBlobOutputStream extends OutputStream implements Syncable {
closed = true;
}
- // Log the stacks of all threads.
- private void logAllStackTraces() {
- Map liveThreads = Thread.getAllStackTraces();
- for (Iterator i = liveThreads.keySet().iterator(); i.hasNext(); ) {
- Thread key = (Thread) i.next();
- LOG.debug("Thread " + key.getName());
- StackTraceElement[] trace = (StackTraceElement[]) liveThreads.get(key);
- for (int j = 0; j < trace.length; j++) {
- LOG.debug("\tat " + trace[j]);
- }
- }
- }
+
/**
* A single write request for data to write to Azure storage.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ffc0d988/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterface.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterface.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterface.java
index ce5f749..c2169a4 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterface.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterface.java
@@ -24,11 +24,13 @@ import java.io.OutputStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
+import java.util.List;
import java.util.EnumSet;
import java.util.HashMap;
import org.apache.hadoop.classification.InterfaceAudience;
+import com.microsoft.azure.storage.AccessCondition;
import com.microsoft.azure.storage.CloudStorageAccount;
import com.microsoft.azure.storage.OperationContext;
import com.microsoft.azure.storage.RetryPolicyFactory;
@@ -36,6 +38,8 @@ import com.microsoft.azure.storage.StorageCredentials;
import com.microsoft.azure.storage.StorageException;
import com.microsoft.azure.storage.blob.BlobListingDetails;
import com.microsoft.azure.storage.blob.BlobProperties;
+import com.microsoft.azure.storage.blob.BlockEntry;
+import com.microsoft.azure.storage.blob.BlockListingFilter;
import com.microsoft.azure.storage.blob.BlobRequestOptions;
import com.microsoft.azure.storage.blob.CloudBlob;
import com.microsoft.azure.storage.blob.CopyState;
@@ -269,13 +273,13 @@ abstract class StorageInterface {
/**
* Uploads the container's metadata using the specified operation context.
- *
+ *
* @param opContext
* An {@link OperationContext} object that represents the context
* for the current operation. This object is used to track requests
* to the storage service, and to provide additional runtime
* information about the operation.
- *
+ *
* @throws StorageException
* If a storage service error occurred.
*/
@@ -545,6 +549,30 @@ abstract class StorageInterface {
void uploadMetadata(OperationContext opContext)
throws StorageException;
+ /**
+ * Uploads the blob's metadata to the storage service using the specified
+ * lease ID, request options, and operation context.
+ *
+ * @param accessCondition
+ * A {@link AccessCondition} object that represents the access conditions for the blob.
+ *
+ * @param options
+ * A {@link BlobRequestOptions} object that specifies any additional options for the request. Specifying
+ * <code>null</code> will use the default request options from the associated service client (
+ * {@link CloudBlobClient}).
+ *
+ * @param opContext
+ * An {@link OperationContext} object that represents the context
+ * for the current operation. This object is used to track requests
+ * to the storage service, and to provide additional runtime
+ * information about the operation.
+ *
+ * @throws StorageException
+ * If a storage service error occurred.
+ */
+ void uploadMetadata(AccessCondition accessCondition, BlobRequestOptions options,
+ OperationContext opContext) throws StorageException;
+
void uploadProperties(OperationContext opContext,
SelfRenewingLease lease)
throws StorageException;
@@ -602,6 +630,63 @@ abstract class StorageInterface {
OutputStream openOutputStream(
BlobRequestOptions options,
OperationContext opContext) throws StorageException;
+
+ /**
+ *
+ * @param filter A {@link BlockListingFilter} value that specifies whether to download
+ * committed blocks, uncommitted blocks, or all blocks.
+ * @param options A {@link BlobRequestOptions} object that specifies any additional options for
+ * the request. Specifying null will use the default request options from
+ * the associated service client ( CloudBlobClient).
+ * @param opContext An {@link OperationContext} object that represents the context for the current
+ * operation. This object is used to track requests to the storage service,
+ * and to provide additional runtime information about the operation.
+ * @return An ArrayList object of {@link BlockEntry} objects that represent the list
+ * block items downloaded from the block blob.
+ * @throws IOException If an I/O error occurred.
+ * @throws StorageException If a storage service error occurred.
+ */
+ List<BlockEntry> downloadBlockList(BlockListingFilter filter, BlobRequestOptions options,
+ OperationContext opContext) throws IOException, StorageException;
+
+ /**
+ *
+ * @param blockId A String that represents the Base-64 encoded block ID. Note for a given blob
+ * the length of all Block IDs must be identical.
+ * @param sourceStream An {@link InputStream} object that represents the input stream to write to the
+ * block blob.
+ * @param length A long which represents the length, in bytes, of the stream data,
+ * or -1 if unknown.
+ * @param options A {@link BlobRequestOptions} object that specifies any additional options for the
+ * request. Specifying null will use the default request options from the
+ * associated service client ( CloudBlobClient).
+ * @param opContext An {@link OperationContext} object that represents the context for the current operation.
+ * This object is used to track requests to the storage service, and to provide
+ * additional runtime information about the operation.
+ * @throws IOException If an I/O error occurred.
+ * @throws StorageException If a storage service error occurred.
+ */
+ void uploadBlock(String blockId, InputStream sourceStream,
+ long length, BlobRequestOptions options,
+ OperationContext opContext) throws IOException, StorageException;
+
+ /**
+ *
+ * @param blockList An enumerable collection of {@link BlockEntry} objects that represents the list
+ * block items being committed. The size field is ignored.
+ * @param accessCondition An {@link AccessCondition} object that represents the access conditions for the blob.
+ * @param options A {@link BlobRequestOptions} object that specifies any additional options for the
+ * request. Specifying null will use the default request options from the associated
+ * service client ( CloudBlobClient).
+ * @param opContext An {@link OperationContext} object that represents the context for the current operation.
+ * This object is used to track requests to the storage service, and to provide additional
+ * runtime information about the operation.
+ * @throws IOException If an I/O error occurred.
+ * @throws StorageException If a storage service error occurred.
+ */
+ void commitBlockList(List<BlockEntry> blockList, AccessCondition accessCondition, BlobRequestOptions options,
+ OperationContext opContext) throws IOException, StorageException;
+
}
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ffc0d988/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterfaceImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterfaceImpl.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterfaceImpl.java
index 382ff66..298f3aa 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterfaceImpl.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterfaceImpl.java
@@ -27,7 +27,7 @@ import java.util.ArrayList;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Iterator;
-
+import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience;
import com.microsoft.azure.storage.AccessCondition;
@@ -40,6 +40,8 @@ import com.microsoft.azure.storage.StorageUri;
import com.microsoft.azure.storage.blob.BlobListingDetails;
import com.microsoft.azure.storage.blob.BlobProperties;
import com.microsoft.azure.storage.blob.BlobRequestOptions;
+import com.microsoft.azure.storage.blob.BlockEntry;
+import com.microsoft.azure.storage.blob.BlockListingFilter;
import com.microsoft.azure.storage.blob.CloudBlob;
import com.microsoft.azure.storage.blob.CloudBlobClient;
import com.microsoft.azure.storage.blob.CloudBlobContainer;
@@ -362,7 +364,13 @@ class StorageInterfaceImpl extends StorageInterface {
@Override
public void uploadMetadata(OperationContext opContext)
throws StorageException {
- getBlob().uploadMetadata(null, null, opContext);
+ uploadMetadata(null, null, opContext);
+ }
+
+ @Override
+ public void uploadMetadata(AccessCondition accessConditions, BlobRequestOptions options,
+ OperationContext opContext) throws StorageException{
+ getBlob().uploadMetadata(accessConditions, options, opContext);
}
public void uploadProperties(OperationContext opContext, SelfRenewingLease lease)
@@ -396,7 +404,7 @@ class StorageInterfaceImpl extends StorageInterface {
public void startCopyFromBlob(CloudBlobWrapper sourceBlob, BlobRequestOptions options,
OperationContext opContext)
throws StorageException, URISyntaxException {
- getBlob().startCopyFromBlob(((CloudBlobWrapperImpl)sourceBlob).blob,
+ getBlob().startCopyFromBlob(((CloudBlobWrapperImpl) sourceBlob).blob,
null, null, options, opContext);
}
@@ -440,6 +448,25 @@ class StorageInterfaceImpl extends StorageInterface {
getBlob().uploadProperties(null, null, opContext);
}
+ @Override
+ public List<BlockEntry> downloadBlockList(BlockListingFilter filter, BlobRequestOptions options,
+ OperationContext opContext) throws IOException, StorageException {
+ return ((CloudBlockBlob) getBlob()).downloadBlockList(filter, null, options, opContext);
+
+ }
+
+ @Override
+ public void uploadBlock(String blockId, InputStream sourceStream,
+ long length, BlobRequestOptions options,
+ OperationContext opContext) throws IOException, StorageException {
+ ((CloudBlockBlob) getBlob()).uploadBlock(blockId, sourceStream, length, null, options, opContext);
+ }
+
+ @Override
+ public void commitBlockList(List<BlockEntry> blockList, AccessCondition accessCondition, BlobRequestOptions options,
+ OperationContext opContext) throws IOException, StorageException {
+ ((CloudBlockBlob) getBlob()).commitBlockList(blockList, accessCondition, options, opContext);
+ }
}
static class CloudPageBlobWrapperImpl extends CloudBlobWrapperImpl implements CloudPageBlobWrapper {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ffc0d988/hadoop-tools/hadoop-azure/src/site/markdown/index.md
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/site/markdown/index.md b/hadoop-tools/hadoop-azure/src/site/markdown/index.md
index 9d0115a..4402467 100644
--- a/hadoop-tools/hadoop-azure/src/site/markdown/index.md
+++ b/hadoop-tools/hadoop-azure/src/site/markdown/index.md
@@ -23,6 +23,7 @@
* [Page Blob Support and Configuration](#Page_Blob_Support_and_Configuration)
* [Atomic Folder Rename](#Atomic_Folder_Rename)
* [Accessing wasb URLs](#Accessing_wasb_URLs)
+ * [Append API Support and Configuration](#Append_API_Support_and_Configuration)
* [Testing the hadoop-azure Module](#Testing_the_hadoop-azure_Module)
## <a name="Introduction" />Introduction
@@ -51,7 +52,6 @@ on the additional artifacts it requires, notably the
## <a name="Limitations" />Limitations
-* The append operation is not implemented.
* File owner and group are persisted, but the permissions model is not enforced.
Authorization occurs at the level of the entire Azure Blob Storage account.
* File last access time is not tracked.
@@ -199,6 +199,24 @@ It's also possible to configure `fs.defaultFS` to use a `wasb` or `wasbs` URL.
This causes all bare paths, such as `/testDir/testFile` to resolve automatically
to that file system.
+### <a name="Append_API_Support_and_Configuration" />Append API Support and Configuration
+
+The Azure Blob Storage interface for Hadoop has optional support for Append API for
+single writer by setting the configuration `fs.azure.enable.append.support` to true.
+
+For Example:
+
+ <property>
+ <name>fs.azure.enable.append.support</name>
+ <value>true</value>
+ </property>
+
+It must be noted Append support in Azure Blob Storage interface DIFFERS FROM HDFS SEMANTICS. Append
+support does not enforce single writer internally but requires applications to guarantee this semantic.
+It becomes a responsibility of the application either to ensure single-threaded handling for a particular
+file path, or rely on some external locking mechanism of its own. Failure to do so will result in
+unexpected behavior.
+
## <a name="Testing_the_hadoop-azure_Module" />Testing the hadoop-azure Module
The hadoop-azure module includes a full suite of unit tests. Most of the tests
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ffc0d988/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/MockStorageInterface.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/MockStorageInterface.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/MockStorageInterface.java
index 9f84f4b..2bb2a9a 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/MockStorageInterface.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/MockStorageInterface.java
@@ -32,11 +32,12 @@ import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.TimeZone;
-
+import java.util.List;
import org.apache.commons.httpclient.URIException;
import org.apache.commons.httpclient.util.URIUtil;
import org.apache.commons.lang.NotImplementedException;
+import com.microsoft.azure.storage.AccessCondition;
import com.microsoft.azure.storage.CloudStorageAccount;
import com.microsoft.azure.storage.OperationContext;
import com.microsoft.azure.storage.RetryPolicyFactory;
@@ -46,6 +47,8 @@ import com.microsoft.azure.storage.StorageUri;
import com.microsoft.azure.storage.blob.BlobListingDetails;
import com.microsoft.azure.storage.blob.BlobProperties;
import com.microsoft.azure.storage.blob.BlobRequestOptions;
+import com.microsoft.azure.storage.blob.BlockEntry;
+import com.microsoft.azure.storage.blob.BlockListingFilter;
import com.microsoft.azure.storage.blob.CloudBlob;
import com.microsoft.azure.storage.blob.CloudBlobContainer;
import com.microsoft.azure.storage.blob.CloudBlobDirectory;
@@ -524,6 +527,30 @@ public class MockStorageInterface extends StorageInterface {
public CloudBlob getBlob() {
return null;
}
+
+ @Override
+ public List<BlockEntry> downloadBlockList(BlockListingFilter filter, BlobRequestOptions options,
+ OperationContext opContext) throws IOException, StorageException {
+
+ throw new UnsupportedOperationException("downloadBlockList not used in Mock Tests");
+ }
+ @Override
+ public void uploadBlock(String blockId, InputStream sourceStream,
+ long length, BlobRequestOptions options,
+ OperationContext opContext) throws IOException, StorageException {
+ throw new UnsupportedOperationException("uploadBlock not used in Mock Tests");
+ }
+
+ @Override
+ public void commitBlockList(List<BlockEntry> blockList, AccessCondition accessCondition,
+ BlobRequestOptions options, OperationContext opContext) throws IOException, StorageException {
+ throw new UnsupportedOperationException("commitBlockList not used in Mock Tests");
+ }
+
+ public void uploadMetadata(AccessCondition accessCondition, BlobRequestOptions options,
+ OperationContext opContext) throws StorageException {
+ throw new UnsupportedOperationException("uploadMetadata not used in Mock Tests");
+ }
}
class MockCloudPageBlobWrapper extends MockCloudBlobWrapper
@@ -580,5 +607,10 @@ public class MockStorageInterface extends StorageInterface {
public CloudBlob getBlob() {
return null;
}
+
+ public void uploadMetadata(AccessCondition accessCondition, BlobRequestOptions options,
+ OperationContext opContext) throws StorageException {
+ throw new UnsupportedOperationException("uploadMetadata not used in Mock Tests");
+ }
}
}
[5/6] hadoop git commit: HADOOP-12635. Adding Append API support for
WASB. Contributed by Dushyanth.
Posted by cn...@apache.org.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ffc0d988/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestNativeAzureFileSystemAppend.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestNativeAzureFileSystemAppend.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestNativeAzureFileSystemAppend.java
new file mode 100644
index 0000000..de51990
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestNativeAzureFileSystemAppend.java
@@ -0,0 +1,362 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azure;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Arrays;
+
+import org.apache.commons.lang.RandomStringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestNativeAzureFileSystemAppend extends NativeAzureFileSystemBaseTest {
+
+ private static final String TEST_FILE = "test.dat";
+ private static final Path TEST_PATH = new Path(TEST_FILE);
+
+ private AzureBlobStorageTestAccount testAccount = null;
+
+ @Before
+ public void setUp() throws Exception {
+ super.setUp();
+ testAccount = createTestAccount();
+ fs = testAccount.getFileSystem();
+ Configuration conf = fs.getConf();
+ conf.setBoolean(NativeAzureFileSystem.APPEND_SUPPORT_ENABLE_PROPERTY_NAME, true);
+ URI uri = fs.getUri();
+ fs.initialize(uri, conf);
+ }
+
+ /*
+ * Helper method that creates test data of size provided by the
+ * "size" parameter.
+ */
+ private static byte[] getTestData(int size) {
+ byte[] testData = new byte[size];
+ System.arraycopy(RandomStringUtils.randomAlphabetic(size).getBytes(), 0, testData, 0, size);
+ return testData;
+ }
+
+ // Helper method to create file and write fileSize bytes of data on it.
+ private byte[] createBaseFileWithData(int fileSize, Path testPath) throws Throwable {
+
+ FSDataOutputStream createStream = null;
+ try {
+ createStream = fs.create(testPath);
+ byte[] fileData = null;
+
+ if (fileSize != 0) {
+ fileData = getTestData(fileSize);
+ createStream.write(fileData);
+ }
+ return fileData;
+ } finally {
+ if (createStream != null) {
+ createStream.close();
+ }
+ }
+ }
+
+ /*
+ * Helper method to verify a file data equal to "dataLength" parameter
+ */
+ private boolean verifyFileData(int dataLength, byte[] testData, int testDataIndex,
+ FSDataInputStream srcStream) {
+
+ try {
+
+ byte[] fileBuffer = new byte[dataLength];
+ byte[] testDataBuffer = new byte[dataLength];
+
+ int fileBytesRead = srcStream.read(fileBuffer);
+
+ if (fileBytesRead < dataLength) {
+ return false;
+ }
+
+ System.arraycopy(testData, testDataIndex, testDataBuffer, 0, dataLength);
+
+ if (!Arrays.equals(fileBuffer, testDataBuffer)) {
+ return false;
+ }
+
+ return true;
+
+ } catch (Exception ex) {
+ return false;
+ }
+
+ }
+
+ /*
+ * Helper method to verify Append on a testFile.
+ */
+ private boolean verifyAppend(byte[] testData, Path testFile) {
+
+ FSDataInputStream srcStream = null;
+ try {
+
+ srcStream = fs.open(testFile);
+ int baseBufferSize = 2048;
+ int testDataSize = testData.length;
+ int testDataIndex = 0;
+
+ while (testDataSize > baseBufferSize) {
+
+ if (!verifyFileData(baseBufferSize, testData, testDataIndex, srcStream)) {
+ return false;
+ }
+ testDataIndex += baseBufferSize;
+ testDataSize -= baseBufferSize;
+ }
+
+ if (!verifyFileData(testDataSize, testData, testDataIndex, srcStream)) {
+ return false;
+ }
+
+ return true;
+ } catch(Exception ex) {
+ return false;
+ } finally {
+ if (srcStream != null) {
+ try {
+ srcStream.close();
+ } catch(IOException ioe) {
+ // Swallowing
+ }
+ }
+ }
+ }
+
+ /*
+ * Test case to verify if an append on small size data works. This tests
+ * append E2E
+ */
+ @Test
+ public void testSingleAppend() throws Throwable{
+
+ FSDataOutputStream appendStream = null;
+ try {
+ int baseDataSize = 50;
+ byte[] baseDataBuffer = createBaseFileWithData(baseDataSize, TEST_PATH);
+
+ int appendDataSize = 20;
+ byte[] appendDataBuffer = getTestData(appendDataSize);
+ appendStream = fs.append(TEST_PATH, 10);
+ appendStream.write(appendDataBuffer);
+ appendStream.close();
+ byte[] testData = new byte[baseDataSize + appendDataSize];
+ System.arraycopy(baseDataBuffer, 0, testData, 0, baseDataSize);
+ System.arraycopy(appendDataBuffer, 0, testData, baseDataSize, appendDataSize);
+
+ Assert.assertTrue(verifyAppend(testData, TEST_PATH));
+ } finally {
+ if (appendStream != null) {
+ appendStream.close();
+ }
+ }
+ }
+
+ /*
+ * Test case to verify append to an empty file.
+ */
+ @Test
+ public void testSingleAppendOnEmptyFile() throws Throwable {
+
+ FSDataOutputStream appendStream = null;
+
+ try {
+ createBaseFileWithData(0, TEST_PATH);
+
+ int appendDataSize = 20;
+ byte[] appendDataBuffer = getTestData(appendDataSize);
+ appendStream = fs.append(TEST_PATH, 10);
+ appendStream.write(appendDataBuffer);
+ appendStream.close();
+
+ Assert.assertTrue(verifyAppend(appendDataBuffer, TEST_PATH));
+ } finally {
+ if (appendStream != null) {
+ appendStream.close();
+ }
+ }
+ }
+
+ /*
+ * Test to verify that we can open only one Append stream on a File.
+ */
+ @Test
+ public void testSingleAppenderScenario() throws Throwable {
+
+ FSDataOutputStream appendStream1 = null;
+ FSDataOutputStream appendStream2 = null;
+ IOException ioe = null;
+ try {
+ createBaseFileWithData(0, TEST_PATH);
+ appendStream1 = fs.append(TEST_PATH, 10);
+ boolean encounteredException = false;
+ try {
+ appendStream2 = fs.append(TEST_PATH, 10);
+ } catch(IOException ex) {
+ encounteredException = true;
+ ioe = ex;
+ }
+
+ appendStream1.close();
+
+ Assert.assertTrue(encounteredException);
+ GenericTestUtils.assertExceptionContains("Unable to set Append lease on the Blob", ioe);
+ } finally {
+ if (appendStream1 != null) {
+ appendStream1.close();
+ }
+
+ if (appendStream2 != null) {
+ appendStream2.close();
+ }
+ }
+ }
+
+ /*
+ * Tests to verify multiple appends on a Blob.
+ */
+ @Test
+ public void testMultipleAppends() throws Throwable {
+
+ int baseDataSize = 50;
+ byte[] baseDataBuffer = createBaseFileWithData(baseDataSize, TEST_PATH);
+
+ int appendDataSize = 100;
+ int targetAppendCount = 50;
+ byte[] testData = new byte[baseDataSize + (appendDataSize*targetAppendCount)];
+ int testDataIndex = 0;
+ System.arraycopy(baseDataBuffer, 0, testData, testDataIndex, baseDataSize);
+ testDataIndex += baseDataSize;
+
+ int appendCount = 0;
+
+ FSDataOutputStream appendStream = null;
+
+ try {
+ while (appendCount < targetAppendCount) {
+
+ byte[] appendDataBuffer = getTestData(appendDataSize);
+ appendStream = fs.append(TEST_PATH, 30);
+ appendStream.write(appendDataBuffer);
+ appendStream.close();
+
+ System.arraycopy(appendDataBuffer, 0, testData, testDataIndex, appendDataSize);
+ testDataIndex += appendDataSize;
+ appendCount++;
+ }
+
+ Assert.assertTrue(verifyAppend(testData, TEST_PATH));
+
+ } finally {
+ if (appendStream != null) {
+ appendStream.close();
+ }
+ }
+ }
+
+ /*
+ * Test to verify we multiple appends on the same stream.
+ */
+ @Test
+ public void testMultipleAppendsOnSameStream() throws Throwable {
+
+ int baseDataSize = 50;
+ byte[] baseDataBuffer = createBaseFileWithData(baseDataSize, TEST_PATH);
+ int appendDataSize = 100;
+ int targetAppendCount = 50;
+ byte[] testData = new byte[baseDataSize + (appendDataSize*targetAppendCount)];
+ int testDataIndex = 0;
+ System.arraycopy(baseDataBuffer, 0, testData, testDataIndex, baseDataSize);
+ testDataIndex += baseDataSize;
+ int appendCount = 0;
+
+ FSDataOutputStream appendStream = null;
+
+ try {
+
+ while (appendCount < targetAppendCount) {
+
+ appendStream = fs.append(TEST_PATH, 50);
+
+ int singleAppendChunkSize = 20;
+ int appendRunSize = 0;
+ while (appendRunSize < appendDataSize) {
+
+ byte[] appendDataBuffer = getTestData(singleAppendChunkSize);
+ appendStream.write(appendDataBuffer);
+ System.arraycopy(appendDataBuffer, 0, testData,
+ testDataIndex + appendRunSize, singleAppendChunkSize);
+
+ appendRunSize += singleAppendChunkSize;
+ }
+
+ appendStream.close();
+ testDataIndex += appendDataSize;
+ appendCount++;
+ }
+
+ Assert.assertTrue(verifyAppend(testData, TEST_PATH));
+ } finally {
+ if (appendStream != null) {
+ appendStream.close();
+ }
+ }
+ }
+
+ @Test(expected=UnsupportedOperationException.class)
+ /*
+ * Test to verify the behavior when Append Support configuration flag is set to false
+ */
+ public void testFalseConfigurationFlagBehavior() throws Throwable {
+
+ fs = testAccount.getFileSystem();
+ Configuration conf = fs.getConf();
+ conf.setBoolean(NativeAzureFileSystem.APPEND_SUPPORT_ENABLE_PROPERTY_NAME, false);
+ URI uri = fs.getUri();
+ fs.initialize(uri, conf);
+
+ FSDataOutputStream appendStream = null;
+
+ try {
+ createBaseFileWithData(0, TEST_PATH);
+ appendStream = fs.append(TEST_PATH, 10);
+ } finally {
+ if (appendStream != null) {
+ appendStream.close();
+ }
+ }
+ }
+
+ @Override
+ protected AzureBlobStorageTestAccount createTestAccount() throws Exception {
+ return AzureBlobStorageTestAccount.create();
+ }
+}
[3/6] hadoop git commit: HADOOP-12635. Adding Append API support for
WASB. Contributed by Dushyanth.
Posted by cn...@apache.org.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/62d61662/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestNativeAzureFileSystemAppend.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestNativeAzureFileSystemAppend.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestNativeAzureFileSystemAppend.java
new file mode 100644
index 0000000..de51990
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestNativeAzureFileSystemAppend.java
@@ -0,0 +1,362 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azure;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Arrays;
+
+import org.apache.commons.lang.RandomStringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestNativeAzureFileSystemAppend extends NativeAzureFileSystemBaseTest {
+
+ private static final String TEST_FILE = "test.dat";
+ private static final Path TEST_PATH = new Path(TEST_FILE);
+
+ private AzureBlobStorageTestAccount testAccount = null;
+
+ @Before
+ public void setUp() throws Exception {
+ super.setUp();
+ testAccount = createTestAccount();
+ fs = testAccount.getFileSystem();
+ Configuration conf = fs.getConf();
+ conf.setBoolean(NativeAzureFileSystem.APPEND_SUPPORT_ENABLE_PROPERTY_NAME, true);
+ URI uri = fs.getUri();
+ fs.initialize(uri, conf);
+ }
+
+ /*
+ * Helper method that creates test data of size provided by the
+ * "size" parameter.
+ */
+ private static byte[] getTestData(int size) {
+ byte[] testData = new byte[size];
+ System.arraycopy(RandomStringUtils.randomAlphabetic(size).getBytes(), 0, testData, 0, size);
+ return testData;
+ }
+
+ // Helper method to create file and write fileSize bytes of data on it.
+ private byte[] createBaseFileWithData(int fileSize, Path testPath) throws Throwable {
+
+ FSDataOutputStream createStream = null;
+ try {
+ createStream = fs.create(testPath);
+ byte[] fileData = null;
+
+ if (fileSize != 0) {
+ fileData = getTestData(fileSize);
+ createStream.write(fileData);
+ }
+ return fileData;
+ } finally {
+ if (createStream != null) {
+ createStream.close();
+ }
+ }
+ }
+
+ /*
+ * Helper method to verify a file data equal to "dataLength" parameter
+ */
+ private boolean verifyFileData(int dataLength, byte[] testData, int testDataIndex,
+ FSDataInputStream srcStream) {
+
+ try {
+
+ byte[] fileBuffer = new byte[dataLength];
+ byte[] testDataBuffer = new byte[dataLength];
+
+ int fileBytesRead = srcStream.read(fileBuffer);
+
+ if (fileBytesRead < dataLength) {
+ return false;
+ }
+
+ System.arraycopy(testData, testDataIndex, testDataBuffer, 0, dataLength);
+
+ if (!Arrays.equals(fileBuffer, testDataBuffer)) {
+ return false;
+ }
+
+ return true;
+
+ } catch (Exception ex) {
+ return false;
+ }
+
+ }
+
+ /*
+ * Helper method to verify Append on a testFile.
+ */
+ private boolean verifyAppend(byte[] testData, Path testFile) {
+
+ FSDataInputStream srcStream = null;
+ try {
+
+ srcStream = fs.open(testFile);
+ int baseBufferSize = 2048;
+ int testDataSize = testData.length;
+ int testDataIndex = 0;
+
+ while (testDataSize > baseBufferSize) {
+
+ if (!verifyFileData(baseBufferSize, testData, testDataIndex, srcStream)) {
+ return false;
+ }
+ testDataIndex += baseBufferSize;
+ testDataSize -= baseBufferSize;
+ }
+
+ if (!verifyFileData(testDataSize, testData, testDataIndex, srcStream)) {
+ return false;
+ }
+
+ return true;
+ } catch(Exception ex) {
+ return false;
+ } finally {
+ if (srcStream != null) {
+ try {
+ srcStream.close();
+ } catch(IOException ioe) {
+ // Swallowing
+ }
+ }
+ }
+ }
+
+ /*
+ * Test case to verify if an append on small size data works. This tests
+ * append E2E
+ */
+ @Test
+ public void testSingleAppend() throws Throwable{
+
+ FSDataOutputStream appendStream = null;
+ try {
+ int baseDataSize = 50;
+ byte[] baseDataBuffer = createBaseFileWithData(baseDataSize, TEST_PATH);
+
+ int appendDataSize = 20;
+ byte[] appendDataBuffer = getTestData(appendDataSize);
+ appendStream = fs.append(TEST_PATH, 10);
+ appendStream.write(appendDataBuffer);
+ appendStream.close();
+ byte[] testData = new byte[baseDataSize + appendDataSize];
+ System.arraycopy(baseDataBuffer, 0, testData, 0, baseDataSize);
+ System.arraycopy(appendDataBuffer, 0, testData, baseDataSize, appendDataSize);
+
+ Assert.assertTrue(verifyAppend(testData, TEST_PATH));
+ } finally {
+ if (appendStream != null) {
+ appendStream.close();
+ }
+ }
+ }
+
+ /*
+ * Test case to verify append to an empty file.
+ */
+ @Test
+ public void testSingleAppendOnEmptyFile() throws Throwable {
+
+ FSDataOutputStream appendStream = null;
+
+ try {
+ createBaseFileWithData(0, TEST_PATH);
+
+ int appendDataSize = 20;
+ byte[] appendDataBuffer = getTestData(appendDataSize);
+ appendStream = fs.append(TEST_PATH, 10);
+ appendStream.write(appendDataBuffer);
+ appendStream.close();
+
+ Assert.assertTrue(verifyAppend(appendDataBuffer, TEST_PATH));
+ } finally {
+ if (appendStream != null) {
+ appendStream.close();
+ }
+ }
+ }
+
+ /*
+ * Test to verify that we can open only one Append stream on a File.
+ */
+ @Test
+ public void testSingleAppenderScenario() throws Throwable {
+
+ FSDataOutputStream appendStream1 = null;
+ FSDataOutputStream appendStream2 = null;
+ IOException ioe = null;
+ try {
+ createBaseFileWithData(0, TEST_PATH);
+ appendStream1 = fs.append(TEST_PATH, 10);
+ boolean encounteredException = false;
+ try {
+ appendStream2 = fs.append(TEST_PATH, 10);
+ } catch(IOException ex) {
+ encounteredException = true;
+ ioe = ex;
+ }
+
+ appendStream1.close();
+
+ Assert.assertTrue(encounteredException);
+ GenericTestUtils.assertExceptionContains("Unable to set Append lease on the Blob", ioe);
+ } finally {
+ if (appendStream1 != null) {
+ appendStream1.close();
+ }
+
+ if (appendStream2 != null) {
+ appendStream2.close();
+ }
+ }
+ }
+
+ /*
+ * Tests to verify multiple appends on a Blob.
+ */
+ @Test
+ public void testMultipleAppends() throws Throwable {
+
+ int baseDataSize = 50;
+ byte[] baseDataBuffer = createBaseFileWithData(baseDataSize, TEST_PATH);
+
+ int appendDataSize = 100;
+ int targetAppendCount = 50;
+ byte[] testData = new byte[baseDataSize + (appendDataSize*targetAppendCount)];
+ int testDataIndex = 0;
+ System.arraycopy(baseDataBuffer, 0, testData, testDataIndex, baseDataSize);
+ testDataIndex += baseDataSize;
+
+ int appendCount = 0;
+
+ FSDataOutputStream appendStream = null;
+
+ try {
+ while (appendCount < targetAppendCount) {
+
+ byte[] appendDataBuffer = getTestData(appendDataSize);
+ appendStream = fs.append(TEST_PATH, 30);
+ appendStream.write(appendDataBuffer);
+ appendStream.close();
+
+ System.arraycopy(appendDataBuffer, 0, testData, testDataIndex, appendDataSize);
+ testDataIndex += appendDataSize;
+ appendCount++;
+ }
+
+ Assert.assertTrue(verifyAppend(testData, TEST_PATH));
+
+ } finally {
+ if (appendStream != null) {
+ appendStream.close();
+ }
+ }
+ }
+
+ /*
+ * Test to verify we multiple appends on the same stream.
+ */
+ @Test
+ public void testMultipleAppendsOnSameStream() throws Throwable {
+
+ int baseDataSize = 50;
+ byte[] baseDataBuffer = createBaseFileWithData(baseDataSize, TEST_PATH);
+ int appendDataSize = 100;
+ int targetAppendCount = 50;
+ byte[] testData = new byte[baseDataSize + (appendDataSize*targetAppendCount)];
+ int testDataIndex = 0;
+ System.arraycopy(baseDataBuffer, 0, testData, testDataIndex, baseDataSize);
+ testDataIndex += baseDataSize;
+ int appendCount = 0;
+
+ FSDataOutputStream appendStream = null;
+
+ try {
+
+ while (appendCount < targetAppendCount) {
+
+ appendStream = fs.append(TEST_PATH, 50);
+
+ int singleAppendChunkSize = 20;
+ int appendRunSize = 0;
+ while (appendRunSize < appendDataSize) {
+
+ byte[] appendDataBuffer = getTestData(singleAppendChunkSize);
+ appendStream.write(appendDataBuffer);
+ System.arraycopy(appendDataBuffer, 0, testData,
+ testDataIndex + appendRunSize, singleAppendChunkSize);
+
+ appendRunSize += singleAppendChunkSize;
+ }
+
+ appendStream.close();
+ testDataIndex += appendDataSize;
+ appendCount++;
+ }
+
+ Assert.assertTrue(verifyAppend(testData, TEST_PATH));
+ } finally {
+ if (appendStream != null) {
+ appendStream.close();
+ }
+ }
+ }
+
+ @Test(expected=UnsupportedOperationException.class)
+ /*
+ * Test to verify the behavior when Append Support configuration flag is set to false
+ */
+ public void testFalseConfigurationFlagBehavior() throws Throwable {
+
+ fs = testAccount.getFileSystem();
+ Configuration conf = fs.getConf();
+ conf.setBoolean(NativeAzureFileSystem.APPEND_SUPPORT_ENABLE_PROPERTY_NAME, false);
+ URI uri = fs.getUri();
+ fs.initialize(uri, conf);
+
+ FSDataOutputStream appendStream = null;
+
+ try {
+ createBaseFileWithData(0, TEST_PATH);
+ appendStream = fs.append(TEST_PATH, 10);
+ } finally {
+ if (appendStream != null) {
+ appendStream.close();
+ }
+ }
+ }
+
+ @Override
+ protected AzureBlobStorageTestAccount createTestAccount() throws Exception {
+ return AzureBlobStorageTestAccount.create();
+ }
+}
[2/6] hadoop git commit: HADOOP-12635. Adding Append API support for
WASB. Contributed by Dushyanth.
Posted by cn...@apache.org.
HADOOP-12635. Adding Append API support for WASB. Contributed by Dushyanth.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/8bc93db2
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/8bc93db2
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/8bc93db2
Branch: refs/heads/trunk
Commit: 8bc93db2e7c64830b6a662f28c8917a9eef4e7c9
Parents: d40859f
Author: cnauroth <cn...@apache.org>
Authored: Mon Jan 18 09:08:53 2016 -0800
Committer: cnauroth <cn...@apache.org>
Committed: Mon Jan 18 09:08:58 2016 -0800
----------------------------------------------------------------------
hadoop-common-project/hadoop-common/CHANGES.txt | 2 +
.../fs/azure/AzureNativeFileSystemStore.java | 23 +-
.../hadoop/fs/azure/BlockBlobAppendStream.java | 775 +++++++++++++++++++
.../hadoop/fs/azure/NativeAzureFileSystem.java | 216 +++---
.../fs/azure/NativeAzureFileSystemHelper.java | 107 +++
.../hadoop/fs/azure/NativeFileSystemStore.java | 2 +
.../hadoop/fs/azure/PageBlobOutputStream.java | 17 +-
.../hadoop/fs/azure/StorageInterface.java | 89 ++-
.../hadoop/fs/azure/StorageInterfaceImpl.java | 33 +-
.../hadoop-azure/src/site/markdown/index.md | 20 +-
.../hadoop/fs/azure/MockStorageInterface.java | 34 +-
.../azure/TestNativeAzureFileSystemAppend.java | 362 +++++++++
12 files changed, 1550 insertions(+), 130 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8bc93db2/hadoop-common-project/hadoop-common/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt
index 4b2c9e7..b13876a 100644
--- a/hadoop-common-project/hadoop-common/CHANGES.txt
+++ b/hadoop-common-project/hadoop-common/CHANGES.txt
@@ -741,6 +741,8 @@ Release 2.8.0 - UNRELEASED
HADOOP-12691. Add CSRF Filter for REST APIs to Hadoop Common.
(Larry McCay via cnauroth)
+ HADOOP-12635. Adding Append API support for WASB. (Dushyanth via cnauroth)
+
IMPROVEMENTS
HADOOP-12458. Retries is typoed to spell Retires in parts of
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8bc93db2/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java
index 69ece4a..c8deb46 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java
@@ -33,13 +33,11 @@ import java.net.URLEncoder;
import java.security.InvalidKeyException;
import java.util.ArrayList;
import java.util.Calendar;
-import java.util.Collections;
import java.util.Date;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
-import java.util.LinkedList;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
@@ -64,6 +62,7 @@ import org.apache.hadoop.io.IOUtils;
import org.mortbay.util.ajax.JSON;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
import com.google.common.annotations.VisibleForTesting;
import com.microsoft.azure.storage.CloudStorageAccount;
import com.microsoft.azure.storage.OperationContext;
@@ -2680,4 +2679,24 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
close();
super.finalize();
}
+
+ @Override
+ public DataOutputStream retrieveAppendStream(String key, int bufferSize) throws IOException {
+
+ try {
+
+ if (isPageBlobKey(key)) {
+ throw new UnsupportedOperationException("Append not supported for Page Blobs");
+ }
+
+ CloudBlobWrapper blob = this.container.getBlockBlobReference(key);
+
+ BlockBlobAppendStream appendStream = new BlockBlobAppendStream((CloudBlockBlobWrapper) blob, key, bufferSize, getInstrumentedContext());
+ appendStream.initialize();
+
+ return new DataOutputStream(appendStream);
+ } catch(Exception ex) {
+ throw new AzureException(ex);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8bc93db2/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/BlockBlobAppendStream.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/BlockBlobAppendStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/BlockBlobAppendStream.java
new file mode 100644
index 0000000..d1ec8df
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/BlockBlobAppendStream.java
@@ -0,0 +1,775 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azure;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.HashMap;
+import java.util.Locale;
+import java.util.List;
+import java.util.Random;
+import java.util.TimeZone;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hadoop.fs.FSExceptionMessages;
+import org.apache.commons.codec.binary.Base64;
+import org.apache.hadoop.fs.azure.StorageInterface.CloudBlockBlobWrapper;
+import org.mortbay.log.Log;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.microsoft.azure.storage.AccessCondition;
+import com.microsoft.azure.storage.OperationContext;
+import com.microsoft.azure.storage.StorageException;
+import com.microsoft.azure.storage.blob.BlobRequestOptions;
+import com.microsoft.azure.storage.blob.BlockEntry;
+import com.microsoft.azure.storage.blob.BlockListingFilter;
+
+/**
+ * Stream object that implememnts append for Block Blobs in WASB.
+ */
+public class BlockBlobAppendStream extends OutputStream {
+
+ private final String key;
+ private final int bufferSize;
+ private ByteArrayOutputStream outBuffer;
+ private final CloudBlockBlobWrapper blob;
+ private final OperationContext opContext;
+
+ /**
+ * Variable to track if the stream has been closed.
+ */
+ private boolean closed = false;
+
+ /**
+ * Variable to track if the append lease is released.
+ */
+
+ private volatile boolean leaseFreed;
+
+ /**
+ * Variable to track if the append stream has been
+ * initialized.
+ */
+
+ private boolean initialized = false;
+
+ /**
+ * Last IOException encountered
+ */
+ private volatile IOException lastError = null;
+
+ /**
+ * List to keep track of the uncommitted azure storage
+ * block ids
+ */
+ private final List<BlockEntry> uncommittedBlockEntries;
+
+ private static final int UNSET_BLOCKS_COUNT = -1;
+
+ /**
+ * Variable to hold the next block id to be used for azure
+ * storage blocks.
+ */
+ private long nextBlockCount = UNSET_BLOCKS_COUNT;
+
+ private final Random sequenceGenerator = new Random();
+
+ /**
+ * Time to wait to renew lease in milliseconds
+ */
+ private static final int LEASE_RENEWAL_PERIOD = 10000;
+
+ /**
+ * Number of times to retry for lease renewal
+ */
+ private static final int MAX_LEASE_RENEWAL_RETRY_COUNT = 3;
+
+ /**
+ * Time to wait before retrying to set the lease
+ */
+ private static final int LEASE_RENEWAL_RETRY_SLEEP_PERIOD = 500;
+
+ /**
+ * Metadata key used on the blob to indicate append lease is active
+ */
+ public static final String APPEND_LEASE = "append_lease";
+
+ /**
+ * Timeout value for the append lease in millisecs. If the lease is not
+ * renewed within 30 seconds then another thread can acquire the append lease
+ * on the blob
+ */
+ public static final int APPEND_LEASE_TIMEOUT = 30000;
+
+ /**
+ * Metdata key used on the blob to indicate last modified time of append lease
+ */
+ public static final String APPEND_LEASE_LAST_MODIFIED = "append_lease_last_modified";
+
+ /**
+ * Number of times block upload needs is retried.
+ */
+ private static final int MAX_BLOCK_UPLOAD_RETRIES = 3;
+
+ /**
+ * Wait time between block upload retries in millisecs.
+ */
+ private static final int BLOCK_UPLOAD_RETRY_INTERVAL = 1000;
+
+ private static final Logger LOG = LoggerFactory.getLogger(BlockBlobAppendStream.class);
+
+ private static final int MAX_BLOCK_COUNT = 100000;
+
+ private ThreadPoolExecutor ioThreadPool;
+
+ /**
+ * Atomic integer to provide thread id for thread names for uploader threads.
+ */
+ private final AtomicInteger threadSequenceNumber;
+
+ /**
+ * Prefix to be used for thread names for uploader threads.
+ */
+ private static final String THREAD_ID_PREFIX = "BlockBlobAppendStream";
+
+ private static final String UTC_STR = "UTC";
+
+ public BlockBlobAppendStream(final CloudBlockBlobWrapper blob,
+ final String aKey, final int bufferSize, final OperationContext opContext)
+ throws IOException {
+
+ if (null == aKey || 0 == aKey.length()) {
+ throw new IllegalArgumentException(
+ "Illegal argument: The key string is null or empty");
+ }
+
+ if (0 >= bufferSize) {
+ throw new IllegalArgumentException(
+ "Illegal argument bufferSize cannot be zero or negative");
+ }
+
+
+ this.blob = blob;
+ this.opContext = opContext;
+ this.key = aKey;
+ this.bufferSize = bufferSize;
+ this.threadSequenceNumber = new AtomicInteger(0);
+ setBlocksCount();
+
+ this.outBuffer = new ByteArrayOutputStream(bufferSize);
+ this.uncommittedBlockEntries = new ArrayList<BlockEntry>();
+
+ // Acquire append lease on the blob.
+ try {
+ //Set the append lease if the value of the append lease is false
+ if (!updateBlobAppendMetadata(true, false)) {
+ LOG.error("Unable to set Append Lease on the Blob : {} "
+ + "Possibly because another client already has a create or append stream open on the Blob", key);
+ throw new IOException("Unable to set Append lease on the Blob. "
+ + "Possibly because another client already had an append stream open on the Blob.");
+ }
+ } catch (StorageException ex) {
+ LOG.error("Encountered Storage exception while acquiring append "
+ + "lease on blob : {}. Storage Exception : {} ErrorCode : {}",
+ key, ex, ex.getErrorCode());
+
+ throw new IOException(ex);
+ }
+
+ leaseFreed = false;
+ }
+
+ /**
+ * Helper method that starts an Append Lease renewer thread and the
+ * thread pool.
+ */
+ public synchronized void initialize() {
+
+ if (initialized) {
+ return;
+ }
+ /*
+ * Start the thread for Append lease renewer.
+ */
+ Thread appendLeaseRenewer = new Thread(new AppendRenewer());
+ appendLeaseRenewer.setDaemon(true);
+ appendLeaseRenewer.setName(String.format("%s-AppendLeaseRenewer", key));
+ appendLeaseRenewer.start();
+
+ /*
+ * Parameters to ThreadPoolExecutor:
+ * corePoolSize : the number of threads to keep in the pool, even if they are idle,
+ * unless allowCoreThreadTimeOut is set
+ * maximumPoolSize : the maximum number of threads to allow in the pool
+ * keepAliveTime - when the number of threads is greater than the core,
+ * this is the maximum time that excess idle threads will
+ * wait for new tasks before terminating.
+ * unit - the time unit for the keepAliveTime argument
+ * workQueue - the queue to use for holding tasks before they are executed
+ * This queue will hold only the Runnable tasks submitted by the execute method.
+ */
+ this.ioThreadPool = new ThreadPoolExecutor(4, 4, 2, TimeUnit.SECONDS,
+ new LinkedBlockingQueue<Runnable>(), new UploaderThreadFactory());
+
+ initialized = true;
+ }
+
+ /**
+ * Get the blob name.
+ *
+ * @return String Blob name.
+ */
+ public String getKey() {
+ return key;
+ }
+
+ /**
+ * Get the backing blob.
+ * @return buffer size of the stream.
+ */
+ public int getBufferSize() {
+ return bufferSize;
+ }
+
+ /**
+ * Writes the specified byte to this output stream. The general contract for
+ * write is that one byte is written to the output stream. The byte to be
+ * written is the eight low-order bits of the argument b. The 24 high-order
+ * bits of b are ignored.
+ *
+ * @param byteVal
+ * the byteValue to write.
+ * @throws IOException
+ * if an I/O error occurs. In particular, an IOException may be
+ * thrown if the output stream has been closed.
+ */
+ @Override
+ public void write(final int byteVal) throws IOException {
+ write(new byte[] { (byte) (byteVal & 0xFF) });
+ }
+
+ /**
+ * Writes b.length bytes from the specified byte array to this output stream.
+ *
+ * @param data
+ * the byte array to write.
+ *
+ * @throws IOException
+ * if an I/O error occurs. In particular, an IOException may be
+ * thrown if the output stream has been closed.
+ */
+ @Override
+ public void write(final byte[] data) throws IOException {
+ write(data, 0, data.length);
+ }
+
+ /**
+ * Writes length bytes from the specified byte array starting at offset to
+ * this output stream.
+ *
+ * @param data
+ * the byte array to write.
+ * @param offset
+ * the start offset in the data.
+ * @param length
+ * the number of bytes to write.
+ * @throws IOException
+ * if an I/O error occurs. In particular, an IOException may be
+ * thrown if the output stream has been closed.
+ */
+ @Override
+ public void write(final byte[] data, final int offset, final int length)
+ throws IOException {
+
+ if (offset < 0 || length < 0 || length > data.length - offset) {
+ throw new IndexOutOfBoundsException("write API in append stream called with invalid arguments");
+ }
+
+ writeInternal(data, offset, length);
+ }
+
+ @Override
+ public synchronized void close() throws IOException {
+
+ if (!initialized) {
+ throw new IOException("Trying to close an uninitialized Append stream");
+ }
+
+ if (closed) {
+ return;
+ }
+
+ if (leaseFreed) {
+ throw new IOException(String.format("Attempting to close an append stream on blob : %s "
+ + " that does not have lease on the Blob. Failing close", key));
+ }
+
+ if (outBuffer.size() > 0) {
+ uploadBlockToStorage(outBuffer.toByteArray());
+ }
+
+ ioThreadPool.shutdown();
+
+ try {
+ if (!ioThreadPool.awaitTermination(10, TimeUnit.MINUTES)) {
+ LOG.error("Time out occured while waiting for IO request to finish in append"
+ + " for blob : {}", key);
+ NativeAzureFileSystemHelper.logAllLiveStackTraces();
+ throw new IOException("Timed out waiting for IO requests to finish");
+ }
+ } catch(InterruptedException intrEx) {
+
+ // Restore the interrupted status
+ Thread.currentThread().interrupt();
+ LOG.error("Upload block operation in append interrupted for blob {}. Failing close", key);
+ throw new IOException("Append Commit interrupted.");
+ }
+
+ // Calling commit after all blocks are succesfully uploaded.
+ if (lastError == null) {
+ commitAppendBlocks();
+ }
+
+ // Perform cleanup.
+ cleanup();
+
+ if (lastError != null) {
+ throw lastError;
+ }
+ }
+
+ /**
+ * Helper method that cleans up the append stream.
+ */
+ private synchronized void cleanup() {
+
+ closed = true;
+
+ try {
+ // Set the value of append lease to false if the value is set to true.
+ updateBlobAppendMetadata(false, true);
+ } catch(StorageException ex) {
+ LOG.debug("Append metadata update on the Blob : {} encountered Storage Exception : {} "
+ + "Error Code : {}",
+ key, ex, ex.getErrorCode());
+ lastError = new IOException(ex);
+ }
+
+ leaseFreed = true;
+ }
+
+ /**
+ * Method to commit all the uncommited blocks to azure storage.
+ * If the commit fails then blocks are automatically cleaned up
+ * by Azure storage.
+ * @throws IOException
+ */
+ private synchronized void commitAppendBlocks() throws IOException {
+
+ SelfRenewingLease lease = null;
+
+ try {
+ if (uncommittedBlockEntries.size() > 0) {
+
+ //Acquiring lease on the blob.
+ lease = new SelfRenewingLease(blob);
+
+ // Downloading existing blocks
+ List<BlockEntry> blockEntries = blob.downloadBlockList(BlockListingFilter.COMMITTED,
+ new BlobRequestOptions(), opContext);
+
+ // Adding uncommitted blocks.
+ blockEntries.addAll(uncommittedBlockEntries);
+
+ AccessCondition accessCondition = new AccessCondition();
+ accessCondition.setLeaseID(lease.getLeaseID());
+ blob.commitBlockList(blockEntries, accessCondition, new BlobRequestOptions(), opContext);
+ uncommittedBlockEntries.clear();
+ }
+ } catch(StorageException ex) {
+ LOG.error("Storage exception encountered during block commit phase of append for blob"
+ + " : {} Storage Exception : {} Error Code: {}", key, ex, ex.getErrorCode());
+ throw new IOException("Encountered Exception while committing append blocks", ex);
+ } finally {
+ if (lease != null) {
+ try {
+ lease.free();
+ } catch(StorageException ex) {
+ LOG.debug("Exception encountered while releasing lease for "
+ + "blob : {} StorageException : {} ErrorCode : {}", key, ex, ex.getErrorCode());
+ // Swallowing exception here as the lease is cleaned up by the SelfRenewingLease object.
+ }
+ }
+ }
+ }
+
+ /**
+ * Helper method used to generate the blockIDs. The algorithm used is similar to the Azure
+ * storage SDK.
+ */
+ private void setBlocksCount() throws IOException {
+ try {
+
+ if (nextBlockCount == UNSET_BLOCKS_COUNT) {
+
+ nextBlockCount = (long) (sequenceGenerator.nextInt(Integer.MAX_VALUE))
+ + sequenceGenerator.nextInt(Integer.MAX_VALUE - MAX_BLOCK_COUNT);
+
+ List<BlockEntry> blockEntries =
+ blob.downloadBlockList(BlockListingFilter.COMMITTED, new BlobRequestOptions(), opContext);
+
+ nextBlockCount += blockEntries.size();
+
+ }
+ } catch (StorageException ex) {
+ LOG.debug("Encountered storage exception during setting next Block Count."
+ + " StorageException : {} ErrorCode : {}", ex, ex.getErrorCode());
+ throw new IOException(ex);
+ }
+ }
+
+ /**
+ * Helper method that generates the next block id for uploading a block to azure storage.
+ * @return String representing the block ID generated.
+ * @throws IOException
+ */
+ private String generateBlockId() throws IOException {
+
+ if (nextBlockCount == UNSET_BLOCKS_COUNT) {
+ throw new IOException("Append Stream in invalid state. nextBlockCount not set correctly");
+ }
+
+ byte[] blockIdInBytes = getBytesFromLong(nextBlockCount);
+ return new String(Base64.encodeBase64(blockIdInBytes), StandardCharsets.UTF_8);
+ }
+
+ /**
+ * Returns a byte array that represents the data of a <code>long</code> value. This
+ * utility method is copied from com.microsoft.azure.storage.core.Utility class.
+ * This class is marked as internal, hence we clone the method here and not express
+ * dependency on the Utility Class
+ *
+ * @param value
+ * The value from which the byte array will be returned.
+ *
+ * @return A byte array that represents the data of the specified <code>long</code> value.
+ */
+ private static byte[] getBytesFromLong(final long value) {
+ final byte[] tempArray = new byte[8];
+
+ for (int m = 0; m < 8; m++) {
+ tempArray[7 - m] = (byte) ((value >> (8 * m)) & 0xFF);
+ }
+
+ return tempArray;
+ }
+ /**
+ * Helper method that creates a thread to upload a block to azure storage.
+ * @param payload
+ * @throws IOException
+ */
+ private synchronized void uploadBlockToStorage(byte[] payload) throws IOException {
+
+ // upload payload to azure storage
+ nextBlockCount++;
+ String blockId = generateBlockId();
+ // Since uploads of the Azure storage are done in parallel threads, we go ahead
+ // add the blockId in the uncommitted list. If the upload of the block fails
+ // we don't commit the blockIds.
+ uncommittedBlockEntries.add(new BlockEntry(blockId));
+ ioThreadPool.execute(new WriteRequest(payload, blockId));
+ }
+
+
+ /**
+ * Helper method to updated the Blob metadata during Append lease operations.
+ * Blob metadata is updated to holdLease value only if the current lease
+ * status is equal to testCondition and the last update on the blob metadata
+ * is less that 30 secs old.
+ * @param holdLease
+ * @param testCondition
+ * @return true if the updated lease operation was successful or false otherwise
+ * @throws StorageException
+ */
+ private boolean updateBlobAppendMetadata(boolean holdLease, boolean testCondition)
+ throws StorageException {
+
+ SelfRenewingLease lease = null;
+ StorageException lastStorageException = null;
+ int leaseRenewalRetryCount = 0;
+
+ /*
+ * Updating the Blob metadata honours following algorithm based on
+ * 1) If the append lease metadata is present
+ * 2) Last updated time of the append lease
+ * 3) Previous value of the Append lease metadata.
+ *
+ * The algorithm:
+ * 1) If append lease metadata is not part of the Blob. In this case
+ * this is the first client to Append so we update the metadata.
+ * 2) If append lease metadata is present and timeout has occurred.
+ * In this case irrespective of what the value of the append lease is we update the metadata.
+ * 3) If append lease metadata is present and is equal to testCondition value (passed as parameter)
+ * and timeout has not occurred, we update the metadata.
+ * 4) If append lease metadata is present and is not equal to testCondition value (passed as parameter)
+ * and timeout has not occurred, we do not update metadata and return false.
+ *
+ */
+ while (leaseRenewalRetryCount < MAX_LEASE_RENEWAL_RETRY_COUNT) {
+
+ lastStorageException = null;
+
+ synchronized(this) {
+ try {
+
+ final Calendar currentCalendar = Calendar
+ .getInstance(Locale.US);
+ currentCalendar.setTimeZone(TimeZone.getTimeZone(UTC_STR));
+ long currentTime = currentCalendar.getTime().getTime();
+
+ // Acquire lease on the blob.
+ lease = new SelfRenewingLease(blob);
+
+ blob.downloadAttributes(opContext);
+ HashMap<String, String> metadata = blob.getMetadata();
+
+ if (metadata.containsKey(APPEND_LEASE)
+ && currentTime - Long.parseLong(
+ metadata.get(APPEND_LEASE_LAST_MODIFIED)) <= BlockBlobAppendStream.APPEND_LEASE_TIMEOUT
+ && !metadata.get(APPEND_LEASE).equals(Boolean.toString(testCondition))) {
+ return false;
+ }
+
+ metadata.put(APPEND_LEASE, Boolean.toString(holdLease));
+ metadata.put(APPEND_LEASE_LAST_MODIFIED, Long.toString(currentTime));
+ blob.setMetadata(metadata);
+ AccessCondition accessCondition = new AccessCondition();
+ accessCondition.setLeaseID(lease.getLeaseID());
+ blob.uploadMetadata(accessCondition, null, opContext);
+ return true;
+
+ } catch (StorageException ex) {
+
+ lastStorageException = ex;
+ LOG.debug("Lease renewal for Blob : {} encountered Storage Exception : {} "
+ + "Error Code : {}",
+ key, ex, ex.getErrorCode());
+ leaseRenewalRetryCount++;
+
+ } finally {
+
+ if (lease != null) {
+ try {
+ lease.free();
+ } catch(StorageException ex) {
+ LOG.debug("Encountered Storage exception while releasing lease for Blob {} "
+ + "during Append metadata operation. Storage Exception {} "
+ + "Error Code : {} ", key, ex, ex.getErrorCode());
+ } finally {
+ lease = null;
+ }
+ }
+ }
+ }
+
+ if (leaseRenewalRetryCount == MAX_LEASE_RENEWAL_RETRY_COUNT) {
+ throw lastStorageException;
+ } else {
+ try {
+ Thread.sleep(LEASE_RENEWAL_RETRY_SLEEP_PERIOD);
+ } catch(InterruptedException ex) {
+ LOG.debug("Blob append metadata updated method interrupted");
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+
+ // The code should not enter here because the while loop will
+ // always be executed and if the while loop is executed we
+ // would returning from the while loop.
+ return false;
+ }
+
+ /**
+ * This is the only method that should be writing to outBuffer to maintain consistency of the outBuffer.
+ * @param data
+ * @param offset
+ * @param length
+ * @throws IOException
+ */
+ private synchronized void writeInternal(final byte[] data, final int offset, final int length)
+ throws IOException {
+
+ if (!initialized) {
+ throw new IOException("Trying to write to an un-initialized Append stream");
+ }
+
+ if (closed) {
+ throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
+ }
+
+ if (leaseFreed) {
+ throw new IOException(String.format("Write called on a append stream not holding lease. Failing Write"));
+ }
+
+ byte[] currentData = new byte[length];
+ System.arraycopy(data, offset, currentData, 0, length);
+
+ // check to see if the data to be appended exceeds the
+ // buffer size. If so we upload a block to azure storage.
+ while ((outBuffer.size() + currentData.length) > bufferSize) {
+
+ byte[] payload = new byte[bufferSize];
+
+ // Add data from the existing buffer
+ System.arraycopy(outBuffer.toByteArray(), 0, payload, 0, outBuffer.size());
+
+ // Updating the available size in the payload
+ int availableSpaceInPayload = bufferSize - outBuffer.size();
+
+ // Adding data from the current call
+ System.arraycopy(currentData, 0, payload, outBuffer.size(), availableSpaceInPayload);
+
+ uploadBlockToStorage(payload);
+
+ // updating the currentData buffer
+ byte[] tempBuffer = new byte[currentData.length - availableSpaceInPayload];
+ System.arraycopy(currentData, availableSpaceInPayload,
+ tempBuffer, 0, currentData.length - availableSpaceInPayload);
+ currentData = tempBuffer;
+ outBuffer = new ByteArrayOutputStream(bufferSize);
+ }
+
+ outBuffer.write(currentData);
+ }
+
+ /**
+ * Runnable instance that uploads the block of data to azure storage.
+ *
+ *
+ */
+ private class WriteRequest implements Runnable {
+ private final byte[] dataPayload;
+ private final String blockId;
+
+ public WriteRequest(byte[] dataPayload, String blockId) {
+ this.dataPayload = dataPayload;
+ this.blockId = blockId;
+ }
+
+ @Override
+ public void run() {
+
+ int uploadRetryAttempts = 0;
+ IOException lastLocalException = null;
+ while (uploadRetryAttempts < MAX_BLOCK_UPLOAD_RETRIES) {
+ try {
+
+ blob.uploadBlock(blockId, new ByteArrayInputStream(dataPayload),
+ dataPayload.length, new BlobRequestOptions(), opContext);
+ break;
+ } catch(Exception ioe) {
+ Log.debug("Encountered exception during uploading block for Blob : {} Exception : {}", key, ioe);
+ uploadRetryAttempts++;
+ lastLocalException = new IOException("Encountered Exception while uploading block", ioe);
+ try {
+ Thread.sleep(BLOCK_UPLOAD_RETRY_INTERVAL);
+ } catch(InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ break;
+ }
+ }
+ }
+
+ if (uploadRetryAttempts == MAX_BLOCK_UPLOAD_RETRIES) {
+ lastError = lastLocalException;
+ }
+ }
+ }
+
+ /**
+ * A ThreadFactory that creates uploader thread with
+ * meaningful names helpful for debugging purposes.
+ */
+ class UploaderThreadFactory implements ThreadFactory {
+
+ @Override
+ public Thread newThread(Runnable r) {
+ Thread t = new Thread(r);
+ t.setName(String.format("%s-%s-%d", THREAD_ID_PREFIX, key,
+ threadSequenceNumber.getAndIncrement()));
+ return t;
+ }
+ }
+
+ /**
+ * A deamon thread that renews the Append lease on the blob.
+ * The thread sleeps for LEASE_RENEWAL_PERIOD time before renewing
+ * the lease. If an error is encountered while renewing the lease
+ * then an lease is released by this thread, which fails all other
+ * operations.
+ */
+ private class AppendRenewer implements Runnable {
+
+ @Override
+ public void run() {
+
+ while (!leaseFreed) {
+
+ try {
+ Thread.sleep(LEASE_RENEWAL_PERIOD);
+ } catch (InterruptedException ie) {
+ LOG.debug("Appender Renewer thread interrupted");
+ Thread.currentThread().interrupt();
+ }
+
+ Log.debug("Attempting to renew append lease on {}", key);
+
+ try {
+ if (!leaseFreed) {
+ // Update the blob metadata to renew the append lease
+ if (!updateBlobAppendMetadata(true, true)) {
+ LOG.error("Unable to re-acquire append lease on the Blob {} ", key);
+ leaseFreed = true;
+ }
+ }
+ } catch (StorageException ex) {
+
+ LOG.debug("Lease renewal for Blob : {} encountered "
+ + "Storage Exception : {} Error Code : {}", key, ex, ex.getErrorCode());
+
+ // We swallow the exception here because if the blob metadata is not updated for
+ // APPEND_LEASE_TIMEOUT period, another thread would be able to detect this and
+ // continue forward if it needs to append.
+ leaseFreed = true;
+ }
+ }
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8bc93db2/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java
index d2ff705..ed65184 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.fs.azure;
import java.io.DataInputStream;
+import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.FileNotFoundException;
import java.io.IOException;
@@ -31,7 +32,6 @@ import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.EnumSet;
-import java.util.Iterator;
import java.util.Set;
import java.util.TimeZone;
import java.util.TreeSet;
@@ -41,7 +41,6 @@ import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.commons.lang.StringUtils;
-import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
@@ -60,8 +59,6 @@ import org.apache.hadoop.fs.azure.metrics.AzureFileSystemMetricsSystem;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.permission.PermissionStatus;
import org.apache.hadoop.fs.azure.AzureException;
-import org.apache.hadoop.fs.azure.StorageInterface.CloudBlobWrapper;
-import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Progressable;
import org.slf4j.Logger;
@@ -73,12 +70,8 @@ import org.codehaus.jackson.map.JsonMappingException;
import org.codehaus.jackson.map.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
-import com.microsoft.azure.storage.AccessCondition;
-import com.microsoft.azure.storage.OperationContext;
-import com.microsoft.azure.storage.StorageErrorCode;
import com.microsoft.azure.storage.StorageException;
-import com.microsoft.azure.storage.blob.CloudBlob;
-import com.microsoft.azure.storage.StorageErrorCodeStrings;
+
import org.apache.hadoop.io.IOUtils;
@@ -288,7 +281,7 @@ public class NativeAzureFileSystem extends FileSystem {
throw new IOException("Unable to write RenamePending file for folder rename from "
+ srcKey + " to " + dstKey, e);
} finally {
- NativeAzureFileSystem.cleanup(LOG, output);
+ NativeAzureFileSystemHelper.cleanup(LOG, output);
}
}
@@ -663,6 +656,11 @@ public class NativeAzureFileSystem extends FileSystem {
public static final String SKIP_AZURE_METRICS_PROPERTY_NAME = "fs.azure.skip.metrics";
+ /*
+ * Property to enable Append API.
+ */
+ public static final String APPEND_SUPPORT_ENABLE_PROPERTY_NAME = "fs.azure.enable.append.support";
+
private class NativeAzureFsInputStream extends FSInputStream {
private InputStream in;
private final String key;
@@ -728,7 +726,7 @@ public class NativeAzureFileSystem extends FileSystem {
return result;
} catch(IOException e) {
- Throwable innerException = NativeAzureFileSystem.checkForAzureStorageException(e);
+ Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(e);
if (innerException instanceof StorageException) {
@@ -736,7 +734,7 @@ public class NativeAzureFileSystem extends FileSystem {
+ " Exception details: {} Error Code : {}",
key, e, ((StorageException) innerException).getErrorCode());
- if (NativeAzureFileSystem.isFileNotFoundException((StorageException) innerException)) {
+ if (NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
throw new FileNotFoundException(String.format("%s is not found", key));
}
}
@@ -782,7 +780,7 @@ public class NativeAzureFileSystem extends FileSystem {
return result;
} catch(IOException e) {
- Throwable innerException = NativeAzureFileSystem.checkForAzureStorageException(e);
+ Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(e);
if (innerException instanceof StorageException) {
@@ -790,7 +788,7 @@ public class NativeAzureFileSystem extends FileSystem {
+ " Exception details: {} Error Code : {}",
key, e, ((StorageException) innerException).getErrorCode());
- if (NativeAzureFileSystem.isFileNotFoundException((StorageException) innerException)) {
+ if (NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
throw new FileNotFoundException(String.format("%s is not found", key));
}
}
@@ -822,10 +820,10 @@ public class NativeAzureFileSystem extends FileSystem {
this.pos);
} catch(IOException e) {
- Throwable innerException = NativeAzureFileSystem.checkForAzureStorageException(e);
+ Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(e);
if (innerException instanceof StorageException
- && NativeAzureFileSystem.isFileNotFoundException((StorageException) innerException)) {
+ && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
throw new FileNotFoundException(String.format("%s is not found", key));
}
@@ -1041,7 +1039,7 @@ public class NativeAzureFileSystem extends FileSystem {
private static boolean suppressRetryPolicy = false;
// A counter to create unique (within-process) names for my metrics sources.
private static AtomicInteger metricsSourceNameCounter = new AtomicInteger();
-
+ private boolean appendSupportEnabled = false;
public NativeAzureFileSystem() {
// set store in initialize()
@@ -1164,7 +1162,7 @@ public class NativeAzureFileSystem extends FileSystem {
this.blockSize = conf.getLong(AZURE_BLOCK_SIZE_PROPERTY_NAME,
MAX_AZURE_BLOCK_SIZE);
-
+ this.appendSupportEnabled = conf.getBoolean(APPEND_SUPPORT_ENABLE_PROPERTY_NAME, false);
LOG.debug("NativeAzureFileSystem. Initializing.");
LOG.debug(" blockSize = {}",
conf.getLong(AZURE_BLOCK_SIZE_PROPERTY_NAME, MAX_AZURE_BLOCK_SIZE));
@@ -1294,7 +1292,61 @@ public class NativeAzureFileSystem extends FileSystem {
@Override
public FSDataOutputStream append(Path f, int bufferSize, Progressable progress)
throws IOException {
- throw new IOException("Not supported");
+
+ if (!appendSupportEnabled) {
+ throw new UnsupportedOperationException("Append Support not enabled");
+ }
+
+ LOG.debug("Opening file: {} for append", f);
+
+ Path absolutePath = makeAbsolute(f);
+ String key = pathToKey(absolutePath);
+ FileMetadata meta = null;
+ try {
+ meta = store.retrieveMetadata(key);
+ } catch(Exception ex) {
+
+ Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex);
+
+ if (innerException instanceof StorageException
+ && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
+
+ throw new FileNotFoundException(String.format("%s is not found", key));
+ } else {
+ throw ex;
+ }
+ }
+
+ if (meta == null) {
+ throw new FileNotFoundException(f.toString());
+ }
+
+ if (meta.isDir()) {
+ throw new FileNotFoundException(f.toString()
+ + " is a directory not a file.");
+ }
+
+ if (store.isPageBlobKey(key)) {
+ throw new IOException("Append not supported for Page Blobs");
+ }
+
+ DataOutputStream appendStream = null;
+
+ try {
+ appendStream = store.retrieveAppendStream(key, bufferSize);
+ } catch (Exception ex) {
+
+ Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex);
+
+ if (innerException instanceof StorageException
+ && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
+ throw new FileNotFoundException(String.format("%s is not found", key));
+ } else {
+ throw ex;
+ }
+ }
+
+ return new FSDataOutputStream(appendStream, statistics);
}
@Override
@@ -1379,7 +1431,7 @@ public class NativeAzureFileSystem extends FileSystem {
lease.free();
}
} catch (Exception e) {
- NativeAzureFileSystem.cleanup(LOG, out);
+ NativeAzureFileSystemHelper.cleanup(LOG, out);
String msg = "Unable to free lease on " + parent.toUri();
LOG.error(msg);
throw new IOException(msg, e);
@@ -1577,10 +1629,10 @@ public class NativeAzureFileSystem extends FileSystem {
metaFile = store.retrieveMetadata(key);
} catch (IOException e) {
- Throwable innerException = NativeAzureFileSystem.checkForAzureStorageException(e);
+ Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(e);
if (innerException instanceof StorageException
- && NativeAzureFileSystem.isFileNotFoundException((StorageException) innerException)) {
+ && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
return false;
}
@@ -1611,7 +1663,7 @@ public class NativeAzureFileSystem extends FileSystem {
parentMetadata = store.retrieveMetadata(parentKey);
} catch (IOException e) {
- Throwable innerException = NativeAzureFileSystem.checkForAzureStorageException(e);
+ Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(e);
if (innerException instanceof StorageException) {
// Invalid State.
@@ -1619,7 +1671,7 @@ public class NativeAzureFileSystem extends FileSystem {
// if the file not present. But not retrieving metadata here is an
// unrecoverable state and can only happen if there is a race condition
// hence throwing a IOException
- if (NativeAzureFileSystem.isFileNotFoundException((StorageException) innerException)) {
+ if (NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
throw new IOException("File " + f + " has a parent directory "
+ parentPath + " whose metadata cannot be retrieved. Can't resolve");
}
@@ -1662,10 +1714,10 @@ public class NativeAzureFileSystem extends FileSystem {
instrumentation.fileDeleted();
} catch(IOException e) {
- Throwable innerException = NativeAzureFileSystem.checkForAzureStorageException(e);
+ Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(e);
if (innerException instanceof StorageException
- && NativeAzureFileSystem.isFileNotFoundException((StorageException) innerException)) {
+ && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
return false;
}
@@ -1684,7 +1736,7 @@ public class NativeAzureFileSystem extends FileSystem {
parentMetadata = store.retrieveMetadata(parentKey);
} catch (IOException e) {
- Throwable innerException = NativeAzureFileSystem.checkForAzureStorageException(e);
+ Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(e);
if (innerException instanceof StorageException) {
// Invalid State.
@@ -1692,7 +1744,7 @@ public class NativeAzureFileSystem extends FileSystem {
// if the file not present. But not retrieving metadata here is an
// unrecoverable state and can only happen if there is a race condition
// hence throwing a IOException
- if (NativeAzureFileSystem.isFileNotFoundException((StorageException) innerException)) {
+ if (NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
throw new IOException("File " + f + " has a parent directory "
+ parentPath + " whose metadata cannot be retrieved. Can't resolve");
}
@@ -1728,10 +1780,10 @@ public class NativeAzureFileSystem extends FileSystem {
priorLastKey);
} catch(IOException e) {
- Throwable innerException = NativeAzureFileSystem.checkForAzureStorageException(e);
+ Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(e);
if (innerException instanceof StorageException
- && NativeAzureFileSystem.isFileNotFoundException((StorageException) innerException)) {
+ && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
return false;
}
@@ -1763,10 +1815,10 @@ public class NativeAzureFileSystem extends FileSystem {
instrumentation.fileDeleted();
} catch(IOException e) {
- Throwable innerException = NativeAzureFileSystem.checkForAzureStorageException(e);
+ Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(e);
if (innerException instanceof StorageException
- && NativeAzureFileSystem.isFileNotFoundException((StorageException) innerException)) {
+ && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
return false;
}
@@ -1785,10 +1837,10 @@ public class NativeAzureFileSystem extends FileSystem {
store.delete(key);
} catch(IOException e) {
- Throwable innerException = NativeAzureFileSystem.checkForAzureStorageException(e);
+ Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(e);
if (innerException instanceof StorageException
- && NativeAzureFileSystem.isFileNotFoundException((StorageException) innerException)) {
+ && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
return false;
}
@@ -1829,10 +1881,10 @@ public class NativeAzureFileSystem extends FileSystem {
meta = store.retrieveMetadata(key);
} catch(Exception ex) {
- Throwable innerException = NativeAzureFileSystem.checkForAzureStorageException(ex);
+ Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex);
if (innerException instanceof StorageException
- && NativeAzureFileSystem.isFileNotFoundException((StorageException) innerException)) {
+ && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
throw new FileNotFoundException(String.format("%s is not found", key));
}
@@ -1922,10 +1974,10 @@ public class NativeAzureFileSystem extends FileSystem {
meta = store.retrieveMetadata(key);
} catch (IOException ex) {
- Throwable innerException = NativeAzureFileSystem.checkForAzureStorageException(ex);
+ Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex);
if (innerException instanceof StorageException
- && NativeAzureFileSystem.isFileNotFoundException((StorageException) innerException)) {
+ && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
throw new FileNotFoundException(String.format("%s is not found", f));
}
@@ -1948,10 +2000,10 @@ public class NativeAzureFileSystem extends FileSystem {
listing = store.list(key, AZURE_LIST_ALL, 1, partialKey);
} catch (IOException ex) {
- Throwable innerException = NativeAzureFileSystem.checkForAzureStorageException(ex);
+ Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex);
if (innerException instanceof StorageException
- && NativeAzureFileSystem.isFileNotFoundException((StorageException) innerException)) {
+ && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
throw new FileNotFoundException(String.format("%s is not found", key));
}
@@ -1972,10 +2024,10 @@ public class NativeAzureFileSystem extends FileSystem {
try {
listing = store.list(key, AZURE_LIST_ALL, 1, partialKey);
} catch (IOException ex) {
- Throwable innerException = NativeAzureFileSystem.checkForAzureStorageException(ex);
+ Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex);
if (innerException instanceof StorageException
- && NativeAzureFileSystem.isFileNotFoundException((StorageException) innerException)) {
+ && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
throw new FileNotFoundException(String.format("%s is not found", key));
}
@@ -2196,10 +2248,10 @@ public class NativeAzureFileSystem extends FileSystem {
meta = store.retrieveMetadata(key);
} catch(Exception ex) {
- Throwable innerException = NativeAzureFileSystem.checkForAzureStorageException(ex);
+ Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex);
if (innerException instanceof StorageException
- && NativeAzureFileSystem.isFileNotFoundException((StorageException) innerException)) {
+ && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
throw new FileNotFoundException(String.format("%s is not found", key));
}
@@ -2219,10 +2271,10 @@ public class NativeAzureFileSystem extends FileSystem {
try {
inputStream = store.retrieve(key);
} catch(Exception ex) {
- Throwable innerException = NativeAzureFileSystem.checkForAzureStorageException(ex);
+ Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex);
if (innerException instanceof StorageException
- && NativeAzureFileSystem.isFileNotFoundException((StorageException) innerException)) {
+ && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
throw new FileNotFoundException(String.format("%s is not found", key));
}
@@ -2261,14 +2313,14 @@ public class NativeAzureFileSystem extends FileSystem {
dstMetadata = store.retrieveMetadata(dstKey);
} catch (IOException ex) {
- Throwable innerException = NativeAzureFileSystem.checkForAzureStorageException(ex);
+ Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex);
// A BlobNotFound storage exception in only thrown from retrieveMetdata API when
// there is a race condition. If there is another thread which deletes the destination
// file or folder, then this thread calling rename should be able to continue with
// rename gracefully. Hence the StorageException is swallowed here.
if (innerException instanceof StorageException) {
- if (NativeAzureFileSystem.isFileNotFoundException((StorageException) innerException)) {
+ if (NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
LOG.debug("BlobNotFound exception encountered for Destination key : {}. "
+ "Swallowin the exception to handle race condition gracefully", dstKey);
}
@@ -2294,10 +2346,10 @@ public class NativeAzureFileSystem extends FileSystem {
parentOfDestMetadata = store.retrieveMetadata(pathToKey(absoluteDst.getParent()));
} catch (IOException ex) {
- Throwable innerException = NativeAzureFileSystem.checkForAzureStorageException(ex);
+ Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex);
if (innerException instanceof StorageException
- && NativeAzureFileSystem.isFileNotFoundException((StorageException) innerException)) {
+ && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
LOG.debug("Parent of destination {} doesn't exists. Failing rename", dst);
return false;
@@ -2320,10 +2372,10 @@ public class NativeAzureFileSystem extends FileSystem {
try {
srcMetadata = store.retrieveMetadata(srcKey);
} catch (IOException ex) {
- Throwable innerException = NativeAzureFileSystem.checkForAzureStorageException(ex);
+ Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex);
if (innerException instanceof StorageException
- && NativeAzureFileSystem.isFileNotFoundException((StorageException) innerException)) {
+ && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
LOG.debug("Source {} doesn't exists. Failing rename", src);
return false;
@@ -2342,10 +2394,10 @@ public class NativeAzureFileSystem extends FileSystem {
store.rename(srcKey, dstKey);
} catch(IOException ex) {
- Throwable innerException = NativeAzureFileSystem.checkForAzureStorageException(ex);
+ Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex);
if (innerException instanceof StorageException
- && NativeAzureFileSystem.isFileNotFoundException((StorageException) innerException)) {
+ && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
LOG.debug("BlobNotFoundException encountered. Failing rename", src);
return false;
@@ -2552,10 +2604,10 @@ public class NativeAzureFileSystem extends FileSystem {
try {
metadata = store.retrieveMetadata(key);
} catch (IOException ex) {
- Throwable innerException = NativeAzureFileSystem.checkForAzureStorageException(ex);
+ Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex);
if (innerException instanceof StorageException
- && NativeAzureFileSystem.isFileNotFoundException((StorageException) innerException)) {
+ && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
throw new FileNotFoundException(String.format("File %s doesn't exists.", p));
}
@@ -2591,10 +2643,10 @@ public class NativeAzureFileSystem extends FileSystem {
try {
metadata = store.retrieveMetadata(key);
} catch (IOException ex) {
- Throwable innerException = NativeAzureFileSystem.checkForAzureStorageException(ex);
+ Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex);
if (innerException instanceof StorageException
- && NativeAzureFileSystem.isFileNotFoundException((StorageException) innerException)) {
+ && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
throw new FileNotFoundException(String.format("File %s doesn't exists.", p));
}
@@ -2817,52 +2869,4 @@ public class NativeAzureFileSystem extends FileSystem {
// Return to the caller with the randomized key.
return randomizedKey;
}
-
- private static void cleanup(Logger log, java.io.Closeable closeable) {
- if (closeable != null) {
- try {
- closeable.close();
- } catch(IOException e) {
- if (log != null) {
- log.debug("Exception in closing {}", closeable, e);
- }
- }
- }
- }
-
- /*
- * Helper method to recursively check if the cause of the exception is
- * a Azure storage exception.
- */
- private static Throwable checkForAzureStorageException(Exception e) {
-
- Throwable innerException = e.getCause();
-
- while (innerException != null
- && !(innerException instanceof StorageException)) {
- innerException = innerException.getCause();
- }
-
- return innerException;
- }
-
- /*
- * Helper method to check if the AzureStorageException is
- * because backing blob was not found.
- */
- private static boolean isFileNotFoundException(StorageException e) {
-
- String errorCode = ((StorageException) e).getErrorCode();
- if (errorCode != null
- && (errorCode.equals(StorageErrorCodeStrings.BLOB_NOT_FOUND)
- || errorCode.equals(StorageErrorCodeStrings.RESOURCE_NOT_FOUND)
- || errorCode.equals(StorageErrorCode.BLOB_NOT_FOUND.toString())
- || errorCode.equals(StorageErrorCode.RESOURCE_NOT_FOUND.toString()))) {
-
- return true;
- }
-
- return false;
- }
-
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8bc93db2/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystemHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystemHelper.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystemHelper.java
new file mode 100644
index 0000000..40efdc6
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystemHelper.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azure;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.microsoft.azure.storage.StorageErrorCode;
+import com.microsoft.azure.storage.StorageErrorCodeStrings;
+import com.microsoft.azure.storage.StorageException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+/**
+ * Utility class that has helper methods.
+ *
+ */
+
+@InterfaceAudience.Private
+final class NativeAzureFileSystemHelper {
+
+ private NativeAzureFileSystemHelper() {
+ // Hiding the cosnstructor as this is a utility class.
+ }
+
+ private static final Logger LOG = LoggerFactory.getLogger(NativeAzureFileSystemHelper.class);
+
+ public static void cleanup(Logger log, java.io.Closeable closeable) {
+ if (closeable != null) {
+ try {
+ closeable.close();
+ } catch(IOException e) {
+ if (log != null) {
+ log.debug("Exception in closing {}", closeable, e);
+ }
+ }
+ }
+ }
+
+ /*
+ * Helper method to recursively check if the cause of the exception is
+ * a Azure storage exception.
+ */
+ public static Throwable checkForAzureStorageException(Exception e) {
+
+ Throwable innerException = e.getCause();
+
+ while (innerException != null
+ && !(innerException instanceof StorageException)) {
+
+ innerException = innerException.getCause();
+ }
+
+ return innerException;
+ }
+
+ /*
+ * Helper method to check if the AzureStorageException is
+ * because backing blob was not found.
+ */
+ public static boolean isFileNotFoundException(StorageException e) {
+
+ String errorCode = e.getErrorCode();
+ if (errorCode != null
+ && (errorCode.equals(StorageErrorCodeStrings.BLOB_NOT_FOUND)
+ || errorCode.equals(StorageErrorCodeStrings.RESOURCE_NOT_FOUND)
+ || errorCode.equals(StorageErrorCode.BLOB_NOT_FOUND.toString())
+ || errorCode.equals(StorageErrorCode.RESOURCE_NOT_FOUND.toString()))) {
+
+ return true;
+ }
+
+ return false;
+ }
+
+ /*
+ * Helper method that logs stack traces from all live threads.
+ */
+ public static void logAllLiveStackTraces() {
+
+ for (Map.Entry<Thread, StackTraceElement[]> entry : Thread.getAllStackTraces().entrySet()) {
+ LOG.debug("Thread " + entry.getKey().getName());
+ StackTraceElement[] trace = entry.getValue();
+ for (int j = 0; j < trace.length; j++) {
+ LOG.debug("\tat " + trace[j]);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8bc93db2/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeFileSystemStore.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeFileSystemStore.java
index 0229cb7..f052b7f 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeFileSystemStore.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeFileSystemStore.java
@@ -107,4 +107,6 @@ interface NativeFileSystemStore {
void delete(String key, SelfRenewingLease lease) throws IOException;
SelfRenewingLease acquireLease(String key) throws AzureException;
+
+ DataOutputStream retrieveAppendStream(String key, int bufferSize) throws IOException;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8bc93db2/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/PageBlobOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/PageBlobOutputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/PageBlobOutputStream.java
index 8689375..b2b34f8 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/PageBlobOutputStream.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/PageBlobOutputStream.java
@@ -29,8 +29,6 @@ import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.Arrays;
-import java.util.Iterator;
-import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
@@ -216,7 +214,7 @@ final class PageBlobOutputStream extends OutputStream implements Syncable {
LOG.debug(ioThreadPool.toString());
if (!ioThreadPool.awaitTermination(10, TimeUnit.MINUTES)) {
LOG.debug("Timed out after 10 minutes waiting for IO requests to finish");
- logAllStackTraces();
+ NativeAzureFileSystemHelper.logAllLiveStackTraces();
LOG.debug(ioThreadPool.toString());
throw new IOException("Timed out waiting for IO requests to finish");
}
@@ -230,18 +228,7 @@ final class PageBlobOutputStream extends OutputStream implements Syncable {
closed = true;
}
- // Log the stacks of all threads.
- private void logAllStackTraces() {
- Map liveThreads = Thread.getAllStackTraces();
- for (Iterator i = liveThreads.keySet().iterator(); i.hasNext(); ) {
- Thread key = (Thread) i.next();
- LOG.debug("Thread " + key.getName());
- StackTraceElement[] trace = (StackTraceElement[]) liveThreads.get(key);
- for (int j = 0; j < trace.length; j++) {
- LOG.debug("\tat " + trace[j]);
- }
- }
- }
+
/**
* A single write request for data to write to Azure storage.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8bc93db2/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterface.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterface.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterface.java
index ce5f749..c2169a4 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterface.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterface.java
@@ -24,11 +24,13 @@ import java.io.OutputStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
+import java.util.List;
import java.util.EnumSet;
import java.util.HashMap;
import org.apache.hadoop.classification.InterfaceAudience;
+import com.microsoft.azure.storage.AccessCondition;
import com.microsoft.azure.storage.CloudStorageAccount;
import com.microsoft.azure.storage.OperationContext;
import com.microsoft.azure.storage.RetryPolicyFactory;
@@ -36,6 +38,8 @@ import com.microsoft.azure.storage.StorageCredentials;
import com.microsoft.azure.storage.StorageException;
import com.microsoft.azure.storage.blob.BlobListingDetails;
import com.microsoft.azure.storage.blob.BlobProperties;
+import com.microsoft.azure.storage.blob.BlockEntry;
+import com.microsoft.azure.storage.blob.BlockListingFilter;
import com.microsoft.azure.storage.blob.BlobRequestOptions;
import com.microsoft.azure.storage.blob.CloudBlob;
import com.microsoft.azure.storage.blob.CopyState;
@@ -269,13 +273,13 @@ abstract class StorageInterface {
/**
* Uploads the container's metadata using the specified operation context.
- *
+ *
* @param opContext
* An {@link OperationContext} object that represents the context
* for the current operation. This object is used to track requests
* to the storage service, and to provide additional runtime
* information about the operation.
- *
+ *
* @throws StorageException
* If a storage service error occurred.
*/
@@ -545,6 +549,30 @@ abstract class StorageInterface {
void uploadMetadata(OperationContext opContext)
throws StorageException;
+ /**
+ * Uploads the blob's metadata to the storage service using the specified
+ * lease ID, request options, and operation context.
+ *
+ * @param accessCondition
+ * A {@link AccessCondition} object that represents the access conditions for the blob.
+ *
+ * @param options
+ * A {@link BlobRequestOptions} object that specifies any additional options for the request. Specifying
+ * <code>null</code> will use the default request options from the associated service client (
+ * {@link CloudBlobClient}).
+ *
+ * @param opContext
+ * An {@link OperationContext} object that represents the context
+ * for the current operation. This object is used to track requests
+ * to the storage service, and to provide additional runtime
+ * information about the operation.
+ *
+ * @throws StorageException
+ * If a storage service error occurred.
+ */
+ void uploadMetadata(AccessCondition accessCondition, BlobRequestOptions options,
+ OperationContext opContext) throws StorageException;
+
void uploadProperties(OperationContext opContext,
SelfRenewingLease lease)
throws StorageException;
@@ -602,6 +630,63 @@ abstract class StorageInterface {
OutputStream openOutputStream(
BlobRequestOptions options,
OperationContext opContext) throws StorageException;
+
+ /**
+ *
+ * @param filter A {@link BlockListingFilter} value that specifies whether to download
+ * committed blocks, uncommitted blocks, or all blocks.
+ * @param options A {@link BlobRequestOptions} object that specifies any additional options for
+ * the request. Specifying null will use the default request options from
+ * the associated service client ( CloudBlobClient).
+ * @param opContext An {@link OperationContext} object that represents the context for the current
+ * operation. This object is used to track requests to the storage service,
+ * and to provide additional runtime information about the operation.
+ * @return An ArrayList object of {@link BlockEntry} objects that represent the list
+ * block items downloaded from the block blob.
+ * @throws IOException If an I/O error occurred.
+ * @throws StorageException If a storage service error occurred.
+ */
+ List<BlockEntry> downloadBlockList(BlockListingFilter filter, BlobRequestOptions options,
+ OperationContext opContext) throws IOException, StorageException;
+
+ /**
+ *
+ * @param blockId A String that represents the Base-64 encoded block ID. Note for a given blob
+ * the length of all Block IDs must be identical.
+ * @param sourceStream An {@link InputStream} object that represents the input stream to write to the
+ * block blob.
+ * @param length A long which represents the length, in bytes, of the stream data,
+ * or -1 if unknown.
+ * @param options A {@link BlobRequestOptions} object that specifies any additional options for the
+ * request. Specifying null will use the default request options from the
+ * associated service client ( CloudBlobClient).
+ * @param opContext An {@link OperationContext} object that represents the context for the current operation.
+ * This object is used to track requests to the storage service, and to provide
+ * additional runtime information about the operation.
+ * @throws IOException If an I/O error occurred.
+ * @throws StorageException If a storage service error occurred.
+ */
+ void uploadBlock(String blockId, InputStream sourceStream,
+ long length, BlobRequestOptions options,
+ OperationContext opContext) throws IOException, StorageException;
+
+ /**
+ *
+ * @param blockList An enumerable collection of {@link BlockEntry} objects that represents the list
+ * block items being committed. The size field is ignored.
+ * @param accessCondition An {@link AccessCondition} object that represents the access conditions for the blob.
+ * @param options A {@link BlobRequestOptions} object that specifies any additional options for the
+ * request. Specifying null will use the default request options from the associated
+ * service client ( CloudBlobClient).
+ * @param opContext An {@link OperationContext} object that represents the context for the current operation.
+ * This object is used to track requests to the storage service, and to provide additional
+ * runtime information about the operation.
+ * @throws IOException If an I/O error occurred.
+ * @throws StorageException If a storage service error occurred.
+ */
+ void commitBlockList(List<BlockEntry> blockList, AccessCondition accessCondition, BlobRequestOptions options,
+ OperationContext opContext) throws IOException, StorageException;
+
}
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8bc93db2/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterfaceImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterfaceImpl.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterfaceImpl.java
index 382ff66..298f3aa 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterfaceImpl.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterfaceImpl.java
@@ -27,7 +27,7 @@ import java.util.ArrayList;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Iterator;
-
+import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience;
import com.microsoft.azure.storage.AccessCondition;
@@ -40,6 +40,8 @@ import com.microsoft.azure.storage.StorageUri;
import com.microsoft.azure.storage.blob.BlobListingDetails;
import com.microsoft.azure.storage.blob.BlobProperties;
import com.microsoft.azure.storage.blob.BlobRequestOptions;
+import com.microsoft.azure.storage.blob.BlockEntry;
+import com.microsoft.azure.storage.blob.BlockListingFilter;
import com.microsoft.azure.storage.blob.CloudBlob;
import com.microsoft.azure.storage.blob.CloudBlobClient;
import com.microsoft.azure.storage.blob.CloudBlobContainer;
@@ -362,7 +364,13 @@ class StorageInterfaceImpl extends StorageInterface {
@Override
public void uploadMetadata(OperationContext opContext)
throws StorageException {
- getBlob().uploadMetadata(null, null, opContext);
+ uploadMetadata(null, null, opContext);
+ }
+
+ @Override
+ public void uploadMetadata(AccessCondition accessConditions, BlobRequestOptions options,
+ OperationContext opContext) throws StorageException{
+ getBlob().uploadMetadata(accessConditions, options, opContext);
}
public void uploadProperties(OperationContext opContext, SelfRenewingLease lease)
@@ -396,7 +404,7 @@ class StorageInterfaceImpl extends StorageInterface {
public void startCopyFromBlob(CloudBlobWrapper sourceBlob, BlobRequestOptions options,
OperationContext opContext)
throws StorageException, URISyntaxException {
- getBlob().startCopyFromBlob(((CloudBlobWrapperImpl)sourceBlob).blob,
+ getBlob().startCopyFromBlob(((CloudBlobWrapperImpl) sourceBlob).blob,
null, null, options, opContext);
}
@@ -440,6 +448,25 @@ class StorageInterfaceImpl extends StorageInterface {
getBlob().uploadProperties(null, null, opContext);
}
+ @Override
+ public List<BlockEntry> downloadBlockList(BlockListingFilter filter, BlobRequestOptions options,
+ OperationContext opContext) throws IOException, StorageException {
+ return ((CloudBlockBlob) getBlob()).downloadBlockList(filter, null, options, opContext);
+
+ }
+
+ @Override
+ public void uploadBlock(String blockId, InputStream sourceStream,
+ long length, BlobRequestOptions options,
+ OperationContext opContext) throws IOException, StorageException {
+ ((CloudBlockBlob) getBlob()).uploadBlock(blockId, sourceStream, length, null, options, opContext);
+ }
+
+ @Override
+ public void commitBlockList(List<BlockEntry> blockList, AccessCondition accessCondition, BlobRequestOptions options,
+ OperationContext opContext) throws IOException, StorageException {
+ ((CloudBlockBlob) getBlob()).commitBlockList(blockList, accessCondition, options, opContext);
+ }
}
static class CloudPageBlobWrapperImpl extends CloudBlobWrapperImpl implements CloudPageBlobWrapper {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8bc93db2/hadoop-tools/hadoop-azure/src/site/markdown/index.md
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/site/markdown/index.md b/hadoop-tools/hadoop-azure/src/site/markdown/index.md
index 9d0115a..4402467 100644
--- a/hadoop-tools/hadoop-azure/src/site/markdown/index.md
+++ b/hadoop-tools/hadoop-azure/src/site/markdown/index.md
@@ -23,6 +23,7 @@
* [Page Blob Support and Configuration](#Page_Blob_Support_and_Configuration)
* [Atomic Folder Rename](#Atomic_Folder_Rename)
* [Accessing wasb URLs](#Accessing_wasb_URLs)
+ * [Append API Support and Configuration](#Append_API_Support_and_Configuration)
* [Testing the hadoop-azure Module](#Testing_the_hadoop-azure_Module)
## <a name="Introduction" />Introduction
@@ -51,7 +52,6 @@ on the additional artifacts it requires, notably the
## <a name="Limitations" />Limitations
-* The append operation is not implemented.
* File owner and group are persisted, but the permissions model is not enforced.
Authorization occurs at the level of the entire Azure Blob Storage account.
* File last access time is not tracked.
@@ -199,6 +199,24 @@ It's also possible to configure `fs.defaultFS` to use a `wasb` or `wasbs` URL.
This causes all bare paths, such as `/testDir/testFile` to resolve automatically
to that file system.
+### <a name="Append_API_Support_and_Configuration" />Append API Support and Configuration
+
+The Azure Blob Storage interface for Hadoop has optional support for Append API for
+single writer by setting the configuration `fs.azure.enable.append.support` to true.
+
+For Example:
+
+ <property>
+ <name>fs.azure.enable.append.support</name>
+ <value>true</value>
+ </property>
+
+It must be noted Append support in Azure Blob Storage interface DIFFERS FROM HDFS SEMANTICS. Append
+support does not enforce single writer internally but requires applications to guarantee this semantic.
+It becomes a responsibility of the application either to ensure single-threaded handling for a particular
+file path, or rely on some external locking mechanism of its own. Failure to do so will result in
+unexpected behavior.
+
## <a name="Testing_the_hadoop-azure_Module" />Testing the hadoop-azure Module
The hadoop-azure module includes a full suite of unit tests. Most of the tests
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8bc93db2/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/MockStorageInterface.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/MockStorageInterface.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/MockStorageInterface.java
index 9f84f4b..2bb2a9a 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/MockStorageInterface.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/MockStorageInterface.java
@@ -32,11 +32,12 @@ import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.TimeZone;
-
+import java.util.List;
import org.apache.commons.httpclient.URIException;
import org.apache.commons.httpclient.util.URIUtil;
import org.apache.commons.lang.NotImplementedException;
+import com.microsoft.azure.storage.AccessCondition;
import com.microsoft.azure.storage.CloudStorageAccount;
import com.microsoft.azure.storage.OperationContext;
import com.microsoft.azure.storage.RetryPolicyFactory;
@@ -46,6 +47,8 @@ import com.microsoft.azure.storage.StorageUri;
import com.microsoft.azure.storage.blob.BlobListingDetails;
import com.microsoft.azure.storage.blob.BlobProperties;
import com.microsoft.azure.storage.blob.BlobRequestOptions;
+import com.microsoft.azure.storage.blob.BlockEntry;
+import com.microsoft.azure.storage.blob.BlockListingFilter;
import com.microsoft.azure.storage.blob.CloudBlob;
import com.microsoft.azure.storage.blob.CloudBlobContainer;
import com.microsoft.azure.storage.blob.CloudBlobDirectory;
@@ -524,6 +527,30 @@ public class MockStorageInterface extends StorageInterface {
public CloudBlob getBlob() {
return null;
}
+
+ @Override
+ public List<BlockEntry> downloadBlockList(BlockListingFilter filter, BlobRequestOptions options,
+ OperationContext opContext) throws IOException, StorageException {
+
+ throw new UnsupportedOperationException("downloadBlockList not used in Mock Tests");
+ }
+ @Override
+ public void uploadBlock(String blockId, InputStream sourceStream,
+ long length, BlobRequestOptions options,
+ OperationContext opContext) throws IOException, StorageException {
+ throw new UnsupportedOperationException("uploadBlock not used in Mock Tests");
+ }
+
+ @Override
+ public void commitBlockList(List<BlockEntry> blockList, AccessCondition accessCondition,
+ BlobRequestOptions options, OperationContext opContext) throws IOException, StorageException {
+ throw new UnsupportedOperationException("commitBlockList not used in Mock Tests");
+ }
+
+ public void uploadMetadata(AccessCondition accessCondition, BlobRequestOptions options,
+ OperationContext opContext) throws StorageException {
+ throw new UnsupportedOperationException("uploadMetadata not used in Mock Tests");
+ }
}
class MockCloudPageBlobWrapper extends MockCloudBlobWrapper
@@ -580,5 +607,10 @@ public class MockStorageInterface extends StorageInterface {
public CloudBlob getBlob() {
return null;
}
+
+ public void uploadMetadata(AccessCondition accessCondition, BlobRequestOptions options,
+ OperationContext opContext) throws StorageException {
+ throw new UnsupportedOperationException("uploadMetadata not used in Mock Tests");
+ }
}
}