You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cn...@apache.org on 2014/12/18 00:00:02 UTC
[16/24] hadoop git commit: HADOOP-10809. hadoop-azure: page blob
support. Contributed by Dexter Bradshaw, Mostafa Elhemali, Eric Hanson,
and Mike Liddell.
HADOOP-10809. hadoop-azure: page blob support. Contributed by Dexter Bradshaw, Mostafa Elhemali, Eric Hanson, and Mike Liddell.
(cherry picked from commit 2217e2f8ff418b88eac6ad36cafe3a9795a11f40)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/5a737026
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/5a737026
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/5a737026
Branch: refs/heads/branch-2
Commit: 5a737026cc365eb86c1d72c660fb13d8540a03b4
Parents: 9a2e4f4
Author: cnauroth <cn...@apache.org>
Authored: Wed Oct 8 14:20:23 2014 -0700
Committer: cnauroth <cn...@apache.org>
Committed: Wed Dec 17 14:57:13 2014 -0800
----------------------------------------------------------------------
hadoop-common-project/hadoop-common/CHANGES.txt | 3 +
hadoop-tools/hadoop-azure/README.txt | 48 +
.../dev-support/findbugs-exclude.xml | 31 +
.../hadoop-azure/src/config/checkstyle.xml | 7 +-
.../hadoop/fs/azure/AzureLinkedStack.java | 217 +++
.../fs/azure/AzureNativeFileSystemStore.java | 740 +++++++---
.../hadoop/fs/azure/NativeAzureFileSystem.java | 1337 ++++++++++++++----
.../hadoop/fs/azure/NativeFileSystemStore.java | 22 +-
.../hadoop/fs/azure/PageBlobFormatHelpers.java | 58 +
.../hadoop/fs/azure/PageBlobInputStream.java | 455 ++++++
.../hadoop/fs/azure/PageBlobOutputStream.java | 497 +++++++
.../apache/hadoop/fs/azure/PartialListing.java | 2 +-
.../hadoop/fs/azure/SelfRenewingLease.java | 202 +++
.../fs/azure/SelfThrottlingIntercept.java | 4 +-
.../fs/azure/ShellDecryptionKeyProvider.java | 3 +-
.../hadoop/fs/azure/SimpleKeyProvider.java | 3 +-
.../hadoop/fs/azure/StorageInterface.java | 280 +++-
.../hadoop/fs/azure/StorageInterfaceImpl.java | 184 ++-
.../fs/azure/SyncableDataOutputStream.java | 56 +
.../java/org/apache/hadoop/fs/azure/Wasb.java | 1 -
.../metrics/AzureFileSystemInstrumentation.java | 5 +-
.../metrics/ResponseReceivedMetricUpdater.java | 7 +-
.../services/org.apache.hadoop.fs.FileSystem | 17 +
.../fs/azure/AzureBlobStorageTestAccount.java | 155 +-
.../hadoop/fs/azure/InMemoryBlockBlobStore.java | 70 +-
.../hadoop/fs/azure/MockStorageInterface.java | 233 ++-
.../fs/azure/NativeAzureFileSystemBaseTest.java | 1016 ++++++++++++-
.../hadoop/fs/azure/RunningLiveWasbTests.txt | 22 +
.../azure/TestAzureConcurrentOutOfBandIo.java | 75 +-
.../TestAzureFileSystemErrorConditions.java | 57 +-
.../hadoop/fs/azure/TestBlobDataValidation.java | 12 +-
.../hadoop/fs/azure/TestBlobMetadata.java | 35 +-
.../fs/azure/TestBlobTypeSpeedDifference.java | 160 +++
.../fs/azure/TestNativeAzureFSPageBlobLive.java | 43 +
.../TestNativeAzureFileSystemConcurrency.java | 5 +-
.../TestNativeAzureFileSystemContractLive.java | 26 +
...TestNativeAzureFileSystemContractMocked.java | 25 +
.../TestNativeAzureFileSystemFileNameCheck.java | 11 +-
.../fs/azure/TestNativeAzureFileSystemLive.java | 75 +
.../azure/TestNativeAzureFileSystemMocked.java | 35 +
...stNativeAzureFileSystemOperationsMocked.java | 37 +-
.../TestNativeAzureFileSystemUploadLogic.java | 186 +++
.../azure/TestOutOfBandAzureBlobOperations.java | 17 +-
.../TestOutOfBandAzureBlobOperationsLive.java | 21 +
.../TestReadAndSeekPageBlobAfterWrite.java | 333 +++++
.../apache/hadoop/fs/azure/TestWasbFsck.java | 33 +
.../fs/azure/TestWasbUriAndConfiguration.java | 9 +-
.../fs/azure/metrics/AzureMetricsTestUtil.java | 1 +
.../TestAzureFileSystemInstrumentation.java | 53 +-
.../metrics/TestBandwidthGaugeUpdater.java | 47 -
.../src/test/resources/azure-test.xml | 12 -
51 files changed, 6046 insertions(+), 937 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5a737026/hadoop-common-project/hadoop-common/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt
index 7450513..6509bcb 100644
--- a/hadoop-common-project/hadoop-common/CHANGES.txt
+++ b/hadoop-common-project/hadoop-common/CHANGES.txt
@@ -79,6 +79,9 @@ Release 2.7.0 - UNRELEASED
HADOOP-11238. Update the NameNode's Group Cache in the background when
possible (Chris Li via Colin P. McCabe)
+ HADOOP-10809. hadoop-azure: page blob support. (Dexter Bradshaw,
+ Mostafa Elhemali, Eric Hanson, and Mike Liddell via cnauroth)
+
BUG FIXES
HADOOP-11236. NFS: Fix javadoc warning in RpcProgram.java (Abhiraj Butala via harsh)
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5a737026/hadoop-tools/hadoop-azure/README.txt
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/README.txt b/hadoop-tools/hadoop-azure/README.txt
index 73306d3..a1d1a65 100644
--- a/hadoop-tools/hadoop-azure/README.txt
+++ b/hadoop-tools/hadoop-azure/README.txt
@@ -77,6 +77,54 @@ src\test\resources\azure-test.xml. These settings augment the hadoop configurati
For live tests, set the following in azure-test.xml:
1. "fs.azure.test.account.name -> {azureStorageAccountName}
2. "fs.azure.account.key.{AccountName} -> {fullStorageKey}"
+
+===================================
+Page Blob Support and Configuration
+===================================
+
+The Azure Blob Storage interface for Hadoop supports two kinds of blobs, block blobs
+and page blobs. Block blobs are the default kind of blob and are good for most
+big-data use cases, like input data for Hive, Pig, analytical map-reduce jobs etc.
+Page blob handling in hadoop-azure was introduced to support HBase log files.
+Page blobs can be written any number of times, whereas block blobs can only be
+appended to 50,000 times before you run out of blocks and your writes will fail.
+That won't work for HBase logs, so page blob support was introduced to overcome
+this limitation.
+
+Page blobs can be used for other purposes beyond just HBase log files though.
+They support the Hadoop FileSystem interface. Page blobs can be up to 1TB in
+size, larger than the maximum 200GB size for block blobs.
+
+In order to have the files you create be page blobs, you must set the configuration
+variable fs.azure.page.blob.dir to a comma-separated list of folder names.
+E.g.
+
+ /hbase/WALs,/hbase/oldWALs,/data/mypageblobfiles
+
+You can set this to simply / to make all files page blobs.
+
+The configuration option fs.azure.page.blob.size is the default initial
+size for a page blob. It must be 128MB or greater, and no more than 1TB,
+specified as an integer number of bytes.
+
+====================
+Atomic Folder Rename
+====================
+
+Azure storage stores files as a flat key/value store without formal support
+for folders. The hadoop-azure file system layer simulates folders on top
+of Azure storage. By default, folder rename in the hadoop-azure file system
+layer is not atomic. That means that a failure during a folder rename
+could, for example, leave some folders in the original directory and
+some in the new one.
+
+HBase depends on atomic folder rename. Hence, a configuration setting was
+introduced called fs.azure.atomic.rename.dir that allows you to specify a
+comma-separated list of directories to receive special treatment so that
+folder rename is made atomic. The default value of this setting is just /hbase.
+Redo will be applied to finish a folder rename that fails. A file
+<folderName>-renamePending.json may appear temporarily and is the record of
+the intention of the rename operation, to allow redo in event of a failure.
=============
Findbugs
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5a737026/hadoop-tools/hadoop-azure/dev-support/findbugs-exclude.xml
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/dev-support/findbugs-exclude.xml b/hadoop-tools/hadoop-azure/dev-support/findbugs-exclude.xml
index cc63141..cde1734 100644
--- a/hadoop-tools/hadoop-azure/dev-support/findbugs-exclude.xml
+++ b/hadoop-tools/hadoop-azure/dev-support/findbugs-exclude.xml
@@ -15,5 +15,36 @@
limitations under the License.
-->
<FindBugsFilter>
+ <!-- It is okay to skip up to end of file. No need to check return value. -->
+ <Match>
+ <Class name="org.apache.hadoop.fs.azure.AzureNativeFileSystemStore" />
+ <Method name="retrieve" />
+ <Bug pattern="SR_NOT_CHECKED" />
+ <Priority value="2" />
+ </Match>
+
+ <!-- Returning fully loaded array to iterate through is a convenience
+ and helps performance. -->
+ <Match>
+ <Class name="org.apache.hadoop.fs.azure.NativeAzureFileSystem$FolderRenamePending" />
+ <Method name="getFiles" />
+ <Bug pattern="EI_EXPOSE_REP" />
+ <Priority value="2" />
+ </Match>
+
+ <!-- Need to start keep-alive thread for SelfRenewingLease in constructor. -->
+ <Match>
+ <Class name="org.apache.hadoop.fs.azure.SelfRenewingLease" />
+ <Bug pattern="SC_START_IN_CTOR" />
+ <Priority value="2" />
+ </Match>
+ <!-- Using a key set iterator is fine because this is not a performance-critical
+ method. -->
+ <Match>
+ <Class name="org.apache.hadoop.fs.azure.PageBlobOutputStream" />
+ <Method name="logAllStackTraces" />
+ <Bug pattern="WMI_WRONG_MAP_ITERATOR" />
+ <Priority value="2" />
+ </Match>
</FindBugsFilter>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5a737026/hadoop-tools/hadoop-azure/src/config/checkstyle.xml
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/config/checkstyle.xml b/hadoop-tools/hadoop-azure/src/config/checkstyle.xml
index 3bfc23d..9df4f78 100644
--- a/hadoop-tools/hadoop-azure/src/config/checkstyle.xml
+++ b/hadoop-tools/hadoop-azure/src/config/checkstyle.xml
@@ -108,7 +108,10 @@
<property name="max" value="3000"/>
</module>
- <module name="ParameterNumber"/>
+ <module name="ParameterNumber">
+ <property name="max" value="8"/>
+ </module>
+
<!-- Checks for whitespace -->
@@ -152,7 +155,7 @@
<module name="IllegalInstantiation"/>
<module name="InnerAssignment"/>
<module name="MagicNumber">
- <property name="ignoreNumbers" value="-1, 0, 1, 2, 3, 4, 5, 6, 7, 8, 10, 1000"/>
+ <property name="ignoreNumbers" value="-1, 0, 1, 2, 3, 4, 5, 6, 7, 8, 10, 255, 1000, 1024"/>
</module>
<module name="MissingSwitchDefault"/>
<module name="RedundantThrows"/>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5a737026/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureLinkedStack.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureLinkedStack.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureLinkedStack.java
new file mode 100644
index 0000000..4c52ef0
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureLinkedStack.java
@@ -0,0 +1,217 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azure;
+
+/**
+ * A simple generic stack implementation using linked lists. The stack
+ * implementation has five main operations:
+ * <ul>
+ * <li>push -- adds an element to the top of the stack</li>
+ * <li>pop -- removes an element from the top of the stack and returns a
+ * reference to it</li>
+ * <li>peek -- peek returns an element from the top of the stack without
+ * removing it</li>
+ * <li>isEmpty -- tests whether the stack is empty</li>
+ * <li>size -- returns the size of the stack</li>
+ * <li>toString -- returns a string representation of the stack.</li>
+ * </ul>
+ */
+
+public class AzureLinkedStack<E> {
+ /*
+ * Linked node for Azure stack collection.
+ */
+ private static class AzureLinkedNode<E> {
+ private E element; // Linked element on the list.
+ private AzureLinkedNode<E> next;// Reference to the next linked element on
+ // list.
+
+ /*
+ * The constructor builds the linked node with no successor
+ *
+ * @param element : The value of the element to be stored with this node.
+ */
+ private AzureLinkedNode(E anElement) {
+ element = anElement;
+ next = null;
+ }
+
+ /*
+ * Constructor builds a linked node with a specified successor. The
+ * successor may be null.
+ *
+ * @param anElement : new element to be created.
+ *
+ * @param nextElement: successor to the new element.
+ */
+ private AzureLinkedNode(E anElement, AzureLinkedNode<E> nextElement) {
+ element = anElement;
+ next = nextElement;
+ }
+
+ /*
+ * Get the element stored in the linked node.
+ *
+ * @return E : element stored in linked node.
+ */
+ private E getElement() {
+ return element;
+ }
+
+ /*
+ * Get the successor node to the element.
+ *
+ * @return E : reference to the succeeding node on the list.
+ */
+ private AzureLinkedNode<E> getNext() {
+ return next;
+ }
+ }
+
+ private int count; // The number of elements stored on the stack.
+ private AzureLinkedNode<E> top; // Top of the stack.
+
+ /*
+ * Constructor creating an empty stack.
+ */
+ public AzureLinkedStack() {
+ // Simply initialize the member variables.
+ //
+ count = 0;
+ top = null;
+ }
+
+ /*
+ * Adds an element to the top of the stack.
+ *
+ * @param element : element pushed to the top of the stack.
+ */
+ public void push(E element) {
+ // Create a new node containing a reference to be placed on the stack.
+ // Set the next reference to the new node to point to the current top
+ // of the stack. Set the top reference to point to the new node. Finally
+ // increment the count of nodes on the stack.
+ //
+ AzureLinkedNode<E> newNode = new AzureLinkedNode<E>(element, top);
+ top = newNode;
+ count++;
+ }
+
+ /*
+ * Removes the element at the top of the stack and returns a reference to it.
+ *
+ * @return E : element popped from the top of the stack.
+ *
+ * @throws Exception on pop from an empty stack.
+ */
+ public E pop() throws Exception {
+ // Make sure the stack is not empty. If it is empty, throw a StackEmpty
+ // exception.
+ //
+ if (isEmpty()) {
+ throw new Exception("AzureStackEmpty");
+ }
+
+ // Set a temporary reference equal to the element at the top of the stack,
+ // decrement the count of elements and return reference to the temporary.
+ //
+ E element = top.getElement();
+ top = top.getNext();
+ count--;
+
+ // Return the reference to the element that was at the top of the stack.
+ //
+ return element;
+ }
+
+ /*
+ * Return the top element of the stack without removing it.
+ *
+ * @return E
+ *
+ * @throws Exception on peek into an empty stack.
+ */
+ public E peek() throws Exception {
+ // Make sure the stack is not empty. If it is empty, throw a StackEmpty
+ // exception.
+ //
+ if (isEmpty()) {
+ throw new Exception("AzureStackEmpty");
+ }
+
+ // Set a temporary reference equal to the element at the top of the stack
+ // and return the temporary.
+ //
+ E element = top.getElement();
+ return element;
+ }
+
+ /*
+ * Determines whether the stack is empty
+ *
+ * @return boolean true if the stack is empty and false otherwise.
+ */
+ public boolean isEmpty() {
+ if (0 == size()) {
+ // Zero-sized stack so the stack is empty.
+ //
+ return true;
+ }
+
+ // The stack is not empty.
+ //
+ return false;
+ }
+
+ /*
+ * Determines the size of the stack
+ *
+ * @return int: Count of the number of elements in the stack.
+ */
+ public int size() {
+ return count;
+ }
+
+ /*
+ * Returns a string representation of the stack.
+ *
+ * @return String String representation of all elements in the stack.
+ */
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+
+ AzureLinkedNode<E> current = top;
+ for (int i = 0; i < size(); i++) {
+ E element = current.getElement();
+ sb.append(element.toString());
+ current = current.getNext();
+
+ // Insert commas between strings except after the last string.
+ //
+ if (size() - 1 > i) {
+ sb.append(", ");
+ }
+ }
+
+ // Return the string.
+ //
+ return sb.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5a737026/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java
index 5afbbbe..c091767 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java
@@ -20,31 +20,42 @@ package org.apache.hadoop.fs.azure;
import static org.apache.hadoop.fs.azure.NativeAzureFileSystem.PATH_DELIMITER;
import java.io.BufferedInputStream;
-import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import java.io.UnsupportedEncodingException;
import java.net.URI;
import java.net.URISyntaxException;
+import java.net.URLDecoder;
+import java.net.URLEncoder;
import java.security.InvalidKeyException;
import java.util.ArrayList;
import java.util.Calendar;
+import java.util.Collections;
import java.util.Date;
import java.util.EnumSet;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
+import java.util.Locale;
import java.util.Map;
+import java.util.Set;
+import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.azure.StorageInterface.CloudBlobContainerWrapper;
import org.apache.hadoop.fs.azure.StorageInterface.CloudBlobDirectoryWrapper;
+import org.apache.hadoop.fs.azure.StorageInterface.CloudBlobWrapper;
import org.apache.hadoop.fs.azure.StorageInterface.CloudBlockBlobWrapper;
+import org.apache.hadoop.fs.azure.StorageInterface.CloudPageBlobWrapper;
+import org.apache.hadoop.fs.azure.StorageInterfaceImpl.CloudPageBlobWrapperImpl;
import org.apache.hadoop.fs.azure.metrics.AzureFileSystemInstrumentation;
import org.apache.hadoop.fs.azure.metrics.BandwidthGaugeUpdater;
import org.apache.hadoop.fs.azure.metrics.ErrorMetricUpdater;
@@ -72,7 +83,6 @@ import com.microsoft.windowsazure.storage.blob.DeleteSnapshotsOption;
import com.microsoft.windowsazure.storage.blob.ListBlobItem;
import com.microsoft.windowsazure.storage.core.Utility;
-
/**
* Core implementation of Windows Azure Filesystem for Hadoop.
* Provides the bridging logic between Hadoop's abstract filesystem and Azure Storage
@@ -140,6 +150,33 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
static final String LINK_BACK_TO_UPLOAD_IN_PROGRESS_METADATA_KEY = "hdi_tmpupload";
static final String OLD_LINK_BACK_TO_UPLOAD_IN_PROGRESS_METADATA_KEY = "asv_tmpupload";
+ /**
+ * Configuration key to indicate the set of directories in WASB where we
+ * should store files as page blobs instead of block blobs.
+ *
+ * Entries should be plain directory names (i.e. not URIs) with no leading or
+ * trailing slashes. Delimit the entries with commas.
+ */
+ public static final String KEY_PAGE_BLOB_DIRECTORIES =
+ "fs.azure.page.blob.dir";
+ /**
+ * The set of directories where we should store files as page blobs.
+ */
+ private Set<String> pageBlobDirs;
+
+ /**
+ * Configuration key to indicate the set of directories in WASB where
+ * we should do atomic folder rename synchronized with createNonRecursive.
+ */
+ public static final String KEY_ATOMIC_RENAME_DIRECTORIES =
+ "fs.azure.atomic.rename.dir";
+
+ /**
+ * The set of directories where we should apply atomic folder rename
+ * synchronized with createNonRecursive.
+ */
+ private Set<String> atomicRenameDirs;
+
private static final String HTTP_SCHEME = "http";
private static final String HTTPS_SCHEME = "https";
private static final String WASB_AUTHORITY_DELIMITER = "@";
@@ -148,6 +185,7 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
private static final int DEFAULT_CONCURRENT_WRITES = 8;
// Concurrent reads reads of data written out of band are disable by default.
+ //
private static final boolean DEFAULT_READ_TOLERATE_CONCURRENT_APPEND = false;
// Default block sizes
@@ -155,6 +193,8 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
public static final int DEFAULT_UPLOAD_BLOCK_SIZE = 4 * 1024 * 1024;
// Retry parameter defaults.
+ //
+
private static final int DEFAULT_MIN_BACKOFF_INTERVAL = 1 * 1000; // 1s
private static final int DEFAULT_MAX_BACKOFF_INTERVAL = 30 * 1000; // 30s
private static final int DEFAULT_BACKOFF_INTERVAL = 1 * 1000; // 1s
@@ -169,6 +209,7 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
private static final int STORAGE_CONNECTION_TIMEOUT_DEFAULT = 90;
+
/**
* MEMBER VARIABLES
*/
@@ -181,7 +222,7 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
private boolean connectingUsingSAS = false;
private AzureFileSystemInstrumentation instrumentation;
private BandwidthGaugeUpdater bandwidthGaugeUpdater;
- private static final JSON PERMISSION_JSON_SERIALIZER = createPermissionJsonSerializer();
+ private final static JSON PERMISSION_JSON_SERIALIZER = createPermissionJsonSerializer();
private boolean suppressRetryPolicy = false;
private boolean canCreateOrModifyContainer = false;
@@ -317,7 +358,7 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
public BandwidthGaugeUpdater getBandwidthGaugeUpdater() {
return bandwidthGaugeUpdater;
}
-
+
/**
* Check if concurrent reads and writes on the same blob are allowed.
*
@@ -333,19 +374,18 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
* session with an Azure session. It parses the scheme to ensure it matches
* the storage protocol supported by this file system.
*
- * @param uri
- * - URI for target storage blob.
- * @param conf
- * - reference to configuration object.
+ * @param uri - URI for target storage blob.
+ * @param conf - reference to configuration object.
+ * @param instrumentation - the metrics source that will keep track of operations here.
*
- * @throws IllegalArgumentException
- * if URI or job object is null, or invalid scheme.
+ * @throws IllegalArgumentException if URI or job object is null, or invalid scheme.
*/
@Override
- public void initialize(URI uri, Configuration conf, AzureFileSystemInstrumentation instrumentation) throws AzureException {
-
- if (null == this.storageInteractionLayer) {
- this.storageInteractionLayer = new StorageInterfaceImpl();
+ public void initialize(URI uri, Configuration conf, AzureFileSystemInstrumentation instrumentation)
+ throws IllegalArgumentException, AzureException, IOException {
+
+ if (null == instrumentation) {
+ throw new IllegalArgumentException("Null instrumentation");
}
this.instrumentation = instrumentation;
@@ -377,6 +417,40 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
// Start an Azure storage session.
//
createAzureStorageSession();
+
+ // Extract the directories that should contain page blobs
+ pageBlobDirs = getDirectorySet(KEY_PAGE_BLOB_DIRECTORIES);
+ LOG.debug("Page blob directories: " + setToString(pageBlobDirs));
+
+ // Extract directories that should have atomic rename applied.
+ atomicRenameDirs = getDirectorySet(KEY_ATOMIC_RENAME_DIRECTORIES);
+ String hbaseRoot;
+ try {
+
+ // Add to this the hbase root directory, or /hbase is that is not set.
+ hbaseRoot = verifyAndConvertToStandardFormat(
+ sessionConfiguration.get("hbase.rootdir", "hbase"));
+ atomicRenameDirs.add(hbaseRoot);
+ } catch (URISyntaxException e) {
+ LOG.warn("Unable to initialize HBase root as an atomic rename directory.");
+ }
+ LOG.debug("Atomic rename directories: " + setToString(atomicRenameDirs));
+ }
+
+ /**
+ * Helper to format a string for log output from Set<String>
+ */
+ private String setToString(Set<String> set) {
+ StringBuilder sb = new StringBuilder();
+ int i = 1;
+ for (String s : set) {
+ sb.append("/" + s);
+ if (i != set.size()) {
+ sb.append(", ");
+ }
+ i++;
+ }
+ return sb.toString();
}
/**
@@ -400,8 +474,7 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
"Expected URI with a valid authority");
}
- // Check if authority container the delimiter separating the account name
- // from the
+ // Check if authority container the delimiter separating the account name from the
// the container.
//
if (!authority.contains(WASB_AUTHORITY_DELIMITER)) {
@@ -455,8 +528,7 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
// The URI has a valid authority. Extract the container name. It is the
// second component of the WASB URI authority.
if (!authority.contains(WASB_AUTHORITY_DELIMITER)) {
- // The authority does not have a container name. Use the default container
- // by
+ // The authority does not have a container name. Use the default container by
// setting the container name to the default Azure root container.
//
return AZURE_ROOT_CONTAINER;
@@ -491,9 +563,9 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
private String getHTTPScheme() {
String sessionScheme = sessionUri.getScheme();
// Check if we're on a secure URI scheme: wasbs or the legacy asvs scheme.
- if (sessionScheme != null
- && (sessionScheme.equalsIgnoreCase("asvs") || sessionScheme
- .equalsIgnoreCase("wasbs"))) {
+ if (sessionScheme != null &&
+ (sessionScheme.equalsIgnoreCase("asvs") ||
+ sessionScheme.equalsIgnoreCase("wasbs"))) {
return HTTPS_SCHEME;
} else {
// At this point the scheme should be either null or asv or wasb.
@@ -565,20 +637,22 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
Math.min(cpuCores, DEFAULT_CONCURRENT_WRITES));
// Set up the exponential retry policy.
- minBackoff = sessionConfiguration.getInt(KEY_MIN_BACKOFF_INTERVAL,
- DEFAULT_MIN_BACKOFF_INTERVAL);
+ //
+ minBackoff = sessionConfiguration.getInt(
+ KEY_MIN_BACKOFF_INTERVAL, DEFAULT_MIN_BACKOFF_INTERVAL);
+
+ maxBackoff = sessionConfiguration.getInt(
+ KEY_MAX_BACKOFF_INTERVAL, DEFAULT_MAX_BACKOFF_INTERVAL);
- maxBackoff = sessionConfiguration.getInt(KEY_MAX_BACKOFF_INTERVAL,
- DEFAULT_MAX_BACKOFF_INTERVAL);
+ deltaBackoff = sessionConfiguration.getInt(
+ KEY_BACKOFF_INTERVAL, DEFAULT_BACKOFF_INTERVAL);
- deltaBackoff = sessionConfiguration.getInt(KEY_BACKOFF_INTERVAL,
- DEFAULT_BACKOFF_INTERVAL);
+ maxRetries = sessionConfiguration.getInt(
+ KEY_MAX_IO_RETRIES, DEFAULT_MAX_RETRY_ATTEMPTS);
- maxRetries = sessionConfiguration.getInt(KEY_MAX_IO_RETRIES,
- DEFAULT_MAX_RETRY_ATTEMPTS);
+ storageInteractionLayer.setRetryPolicyFactory(
+ new RetryExponentialRetry(minBackoff, deltaBackoff, maxBackoff, maxRetries));
- storageInteractionLayer.setRetryPolicyFactory(new RetryExponentialRetry(
- minBackoff, deltaBackoff, maxBackoff, maxRetries));
// read the self-throttling config.
selfThrottlingEnabled = sessionConfiguration.getBoolean(
@@ -659,13 +733,15 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
StorageCredentials credentials, String containerName)
throws URISyntaxException, StorageException, AzureException {
+ URI blobEndPoint;
if (isStorageEmulatorAccount(accountName)) {
isStorageEmulator = true;
- CloudStorageAccount account = CloudStorageAccount
- .getDevelopmentStorageAccount();
+ CloudStorageAccount account =
+ CloudStorageAccount.getDevelopmentStorageAccount();
storageInteractionLayer.createBlobClient(account);
} else {
- URI blobEndPoint = new URI(getHTTPScheme() + "://" + accountName);
+ blobEndPoint = new URI(getHTTPScheme() + "://" +
+ accountName);
storageInteractionLayer.createBlobClient(blobEndPoint, credentials);
}
suppressRetryPolicyInClientIfNeeded();
@@ -753,7 +829,8 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
* @throws AzureException
* @throws IOException
*/
- private void createAzureStorageSession() throws AzureException {
+ private void createAzureStorageSession ()
+ throws AzureException, IOException {
// Make sure this object was properly initialized with references to
// the sessionUri and sessionConfiguration.
@@ -886,6 +963,106 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
}
/**
+ * Trims a suffix/prefix from the given string. For example if
+ * s is given as "/xy" and toTrim is "/", this method returns "xy"
+ */
+ private static String trim(String s, String toTrim) {
+ return StringUtils.removeEnd(StringUtils.removeStart(s, toTrim),
+ toTrim);
+ }
+
+ /**
+ * Checks if the given rawDir belongs to this account/container, and
+ * if so returns the canonicalized path for it. Otherwise return null.
+ */
+ private String verifyAndConvertToStandardFormat(String rawDir) throws URISyntaxException {
+ URI asUri = new URI(rawDir);
+ if (asUri.getAuthority() == null
+ || asUri.getAuthority().toLowerCase(Locale.US).equalsIgnoreCase(
+ sessionUri.getAuthority().toLowerCase(Locale.US))) {
+ // Applies to me.
+ return trim(asUri.getPath(), "/");
+ } else {
+ // Doen't apply to me.
+ return null;
+ }
+ }
+
+ /**
+ * Take a comma-separated list of directories from a configuration variable
+ * and transform it to a set of directories.
+ */
+ private Set<String> getDirectorySet(final String configVar)
+ throws AzureException {
+ String[] rawDirs = sessionConfiguration.getStrings(configVar, new String[0]);
+ Set<String> directorySet = new HashSet<String>();
+ for (String currentDir : rawDirs) {
+ String myDir;
+ try {
+ myDir = verifyAndConvertToStandardFormat(currentDir);
+ } catch (URISyntaxException ex) {
+ throw new AzureException(String.format(
+ "The directory %s specified in the configuration entry %s is not" +
+ " a valid URI.",
+ currentDir, configVar));
+ }
+ if (myDir != null) {
+ directorySet.add(myDir);
+ }
+ }
+ return directorySet;
+ }
+
+ /**
+ * Checks if the given key in Azure Storage should be stored as a page
+ * blob instead of block blob.
+ * @throws URISyntaxException
+ */
+ public boolean isPageBlobKey(String key) {
+ return isKeyForDirectorySet(key, pageBlobDirs);
+ }
+
+ /**
+ * Checks if the given key in Azure storage should have synchronized
+ * atomic folder rename createNonRecursive implemented.
+ */
+ @Override
+ public boolean isAtomicRenameKey(String key) {
+ return isKeyForDirectorySet(key, atomicRenameDirs);
+ }
+
+ public boolean isKeyForDirectorySet(String key, Set<String> dirSet) {
+ String defaultFS = FileSystem.getDefaultUri(sessionConfiguration).toString();
+ for (String dir : dirSet) {
+ if (dir.isEmpty() ||
+ key.startsWith(dir + "/")) {
+ return true;
+ }
+
+ // Allow for blob directories with paths relative to the default file
+ // system.
+ //
+ try {
+ URI uriPageBlobDir = new URI (dir);
+ if (null == uriPageBlobDir.getAuthority()) {
+ // Concatenate the default file system prefix with the relative
+ // page blob directory path.
+ //
+ if (key.startsWith(trim(defaultFS, "/") + "/" + dir + "/")){
+ return true;
+ }
+ }
+ } catch (URISyntaxException e) {
+ LOG.info(String.format(
+ "URI syntax error creating URI for %s", dir));
+ }
+ }
+ return false;
+ }
+
+
+
+ /**
* This should be called from any method that does any modifications to the
* underlying container: it makes sure to put the WASB current version in the
* container's metadata if it's not already there.
@@ -1032,15 +1209,15 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
private BlobRequestOptions getDownloadOptions() {
BlobRequestOptions options = new BlobRequestOptions();
- options.setRetryPolicyFactory(new RetryExponentialRetry(minBackoff,
- deltaBackoff, maxBackoff, maxRetries));
+ options.setRetryPolicyFactory(
+ new RetryExponentialRetry(minBackoff, deltaBackoff, maxBackoff, maxRetries));
options.setUseTransactionalContentMD5(getUseTransactionalContentMD5());
return options;
}
@Override
- public DataOutputStream storefile(String key,
- PermissionStatus permissionStatus) throws AzureException {
+ public DataOutputStream storefile(String key, PermissionStatus permissionStatus)
+ throws AzureException {
try {
// Check if a session exists, if not create a session with the
@@ -1066,19 +1243,20 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
checkContainer(ContainerAccessType.PureWrite);
/**
- * Note: Windows Azure Blob Storage does not allow the creation of
- * arbitrary directory paths under the default $root directory. This is by
- * design to eliminate ambiguity in specifying a implicit blob address. A
- * blob in the $root container cannot include a / in its name and must be
- * careful not to include a trailing '/' when referencing blobs in the
- * $root container. A '/; in the $root container permits ambiguous blob
- * names as in the following example involving two containers $root and
- * mycontainer: http://myaccount.blob.core.windows.net/$root
- * http://myaccount.blob.core.windows.net/mycontainer If the URL
- * "mycontainer/somefile.txt were allowed in $root then the URL:
- * http://myaccount.blob.core.windows.net/mycontainer/myblob.txt could
- * mean either: (1) container=mycontainer; blob=myblob.txt (2)
- * container=$root; blob=mycontainer/myblob.txt
+ * Note: Windows Azure Blob Storage does not allow the creation of arbitrary directory
+ * paths under the default $root directory. This is by design to eliminate
+ * ambiguity in specifying a implicit blob address. A blob in the $root conatiner
+ * cannot include a / in its name and must be careful not to include a trailing
+ * '/' when referencing blobs in the $root container.
+ * A '/; in the $root container permits ambiguous blob names as in the following
+ * example involving two containers $root and mycontainer:
+ * http://myaccount.blob.core.windows.net/$root
+ * http://myaccount.blob.core.windows.net/mycontainer
+ * If the URL "mycontainer/somefile.txt were allowed in $root then the URL:
+ * http://myaccount.blob.core.windows.net/mycontainer/myblob.txt
+ * could mean either:
+ * (1) container=mycontainer; blob=myblob.txt
+ * (2) container=$root; blob=mycontainer/myblob.txt
*
* To avoid this type of ambiguity the Azure blob storage prevents
* arbitrary path under $root. For a simple and more consistent user
@@ -1097,17 +1275,15 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
throw new AzureException(errMsg);
}
- // Get the block blob reference from the store's container and
+ // Get the blob reference from the store's container and
// return it.
- CloudBlockBlobWrapper blob = getBlobReference(key);
+ CloudBlobWrapper blob = getBlobReference(key);
storePermissionStatus(blob, permissionStatus);
// Create the output stream for the Azure blob.
- OutputStream outputStream = blob.openOutputStream(getUploadOptions(),
- getInstrumentedContext());
-
- // Return to caller with DataOutput stream.
- DataOutputStream dataOutStream = new DataOutputStream(outputStream);
+ //
+ OutputStream outputStream = openOutputStream(blob);
+ DataOutputStream dataOutStream = new SyncableDataOutputStream(outputStream);
return dataOutStream;
} catch (Exception e) {
// Caught exception while attempting to open the blob output stream.
@@ -1117,6 +1293,40 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
}
/**
+ * Opens a new output stream to the given blob (page or block blob)
+ * to populate it from scratch with data.
+ */
+ private OutputStream openOutputStream(final CloudBlobWrapper blob)
+ throws StorageException {
+ if (blob instanceof CloudPageBlobWrapperImpl){
+ return new PageBlobOutputStream(
+ (CloudPageBlobWrapper)blob, getInstrumentedContext(), sessionConfiguration);
+ } else {
+
+ // Handle both ClouldBlockBlobWrapperImpl and (only for the test code path)
+ // MockCloudBlockBlobWrapper.
+ return ((CloudBlockBlobWrapper) blob).openOutputStream(getUploadOptions(),
+ getInstrumentedContext());
+ }
+ }
+
+ /**
+ * Opens a new input stream for the given blob (page or block blob)
+ * to read its data.
+ */
+ private InputStream openInputStream(CloudBlobWrapper blob)
+ throws StorageException, IOException {
+ if (blob instanceof CloudBlockBlobWrapper) {
+ return blob.openInputStream(getDownloadOptions(),
+ getInstrumentedContext(isConcurrentOOBAppendAllowed()));
+ } else {
+ return new PageBlobInputStream(
+ (CloudPageBlobWrapper) blob, getInstrumentedContext(
+ isConcurrentOOBAppendAllowed()));
+ }
+ }
+
+ /**
* Default permission to use when no permission metadata is found.
*
* @return The default permission to use.
@@ -1125,7 +1335,7 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
return new PermissionStatus("", "", FsPermission.getDefault());
}
- private static void storeMetadataAttribute(CloudBlockBlobWrapper blob,
+ private static void storeMetadataAttribute(CloudBlobWrapper blob,
String key, String value) {
HashMap<String, String> metadata = blob.getMetadata();
if (null == metadata) {
@@ -1135,7 +1345,7 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
blob.setMetadata(metadata);
}
- private static String getMetadataAttribute(CloudBlockBlobWrapper blob,
+ private static String getMetadataAttribute(CloudBlobWrapper blob,
String... keyAlternatives) {
HashMap<String, String> metadata = blob.getMetadata();
if (null == metadata) {
@@ -1149,7 +1359,7 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
return null;
}
- private static void removeMetadataAttribute(CloudBlockBlobWrapper blob,
+ private static void removeMetadataAttribute(CloudBlobWrapper blob,
String key) {
HashMap<String, String> metadata = blob.getMetadata();
if (metadata != null) {
@@ -1158,7 +1368,7 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
}
}
- private void storePermissionStatus(CloudBlockBlobWrapper blob,
+ private static void storePermissionStatus(CloudBlobWrapper blob,
PermissionStatus permissionStatus) {
storeMetadataAttribute(blob, PERMISSION_METADATA_KEY,
PERMISSION_JSON_SERIALIZER.toJSON(permissionStatus));
@@ -1166,39 +1376,55 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
removeMetadataAttribute(blob, OLD_PERMISSION_METADATA_KEY);
}
- private PermissionStatus getPermissionStatus(CloudBlockBlobWrapper blob) {
+ private PermissionStatus getPermissionStatus(CloudBlobWrapper blob) {
String permissionMetadataValue = getMetadataAttribute(blob,
PERMISSION_METADATA_KEY, OLD_PERMISSION_METADATA_KEY);
if (permissionMetadataValue != null) {
- return PermissionStatusJsonSerializer
- .fromJSONString(permissionMetadataValue);
+ return PermissionStatusJsonSerializer.fromJSONString(
+ permissionMetadataValue);
} else {
return defaultPermissionNoBlobMetadata();
}
}
- private static void storeFolderAttribute(CloudBlockBlobWrapper blob) {
+ private static void storeFolderAttribute(CloudBlobWrapper blob) {
storeMetadataAttribute(blob, IS_FOLDER_METADATA_KEY, "true");
// Remove the old metadata key if present
removeMetadataAttribute(blob, OLD_IS_FOLDER_METADATA_KEY);
}
- private static void storeLinkAttribute(CloudBlockBlobWrapper blob,
- String linkTarget) {
- storeMetadataAttribute(blob, LINK_BACK_TO_UPLOAD_IN_PROGRESS_METADATA_KEY,
- linkTarget);
+ private static void storeLinkAttribute(CloudBlobWrapper blob,
+ String linkTarget) throws UnsupportedEncodingException {
+ // We have to URL encode the link attribute as the link URI could
+ // have URI special characters which unless encoded will result
+ // in 403 errors from the server. This is due to metadata properties
+ // being sent in the HTTP header of the request which is in turn used
+ // on the server side to authorize the request.
+ String encodedLinkTarget = null;
+ if (linkTarget != null) {
+ encodedLinkTarget = URLEncoder.encode(linkTarget, "UTF-8");
+ }
+ storeMetadataAttribute(blob,
+ LINK_BACK_TO_UPLOAD_IN_PROGRESS_METADATA_KEY,
+ encodedLinkTarget);
// Remove the old metadata key if present
removeMetadataAttribute(blob,
OLD_LINK_BACK_TO_UPLOAD_IN_PROGRESS_METADATA_KEY);
}
- private static String getLinkAttributeValue(CloudBlockBlobWrapper blob) {
- return getMetadataAttribute(blob,
+ private static String getLinkAttributeValue(CloudBlobWrapper blob)
+ throws UnsupportedEncodingException {
+ String encodedLinkTarget = getMetadataAttribute(blob,
LINK_BACK_TO_UPLOAD_IN_PROGRESS_METADATA_KEY,
OLD_LINK_BACK_TO_UPLOAD_IN_PROGRESS_METADATA_KEY);
+ String linkTarget = null;
+ if (encodedLinkTarget != null) {
+ linkTarget = URLDecoder.decode(encodedLinkTarget, "UTF-8");
+ }
+ return linkTarget;
}
- private static boolean retrieveFolderAttribute(CloudBlockBlobWrapper blob) {
+ private static boolean retrieveFolderAttribute(CloudBlobWrapper blob) {
HashMap<String, String> metadata = blob.getMetadata();
return null != metadata
&& (metadata.containsKey(IS_FOLDER_METADATA_KEY) || metadata
@@ -1255,11 +1481,10 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
try {
checkContainer(ContainerAccessType.PureWrite);
- CloudBlockBlobWrapper blob = getBlobReference(key);
+ CloudBlobWrapper blob = getBlobReference(key);
storePermissionStatus(blob, permissionStatus);
storeFolderAttribute(blob);
- blob.upload(new ByteArrayInputStream(new byte[0]),
- getInstrumentedContext());
+ openOutputStream(blob).close();
} catch (Exception e) {
// Caught exception while attempting upload. Re-throw as an Azure
// storage exception.
@@ -1293,11 +1518,10 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
try {
checkContainer(ContainerAccessType.PureWrite);
- CloudBlockBlobWrapper blob = getBlobReference(key);
+ CloudBlobWrapper blob = getBlobReference(key);
storePermissionStatus(blob, permissionStatus);
storeLinkAttribute(blob, tempBlobKey);
- blob.upload(new ByteArrayInputStream(new byte[0]),
- getInstrumentedContext());
+ openOutputStream(blob).close();
} catch (Exception e) {
// Caught exception while attempting upload. Re-throw as an Azure
// storage exception.
@@ -1322,7 +1546,7 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
try {
checkContainer(ContainerAccessType.PureRead);
- CloudBlockBlobWrapper blob = getBlobReference(key);
+ CloudBlobWrapper blob = getBlobReference(key);
blob.downloadAttributes(getInstrumentedContext());
return getLinkAttributeValue(blob);
} catch (Exception e) {
@@ -1366,10 +1590,12 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
private Iterable<ListBlobItem> listRootBlobs(boolean includeMetadata)
throws StorageException, URISyntaxException {
return rootDirectory.listBlobs(
+ null, false,
+ includeMetadata ?
+ EnumSet.of(BlobListingDetails.METADATA) :
+ EnumSet.noneOf(BlobListingDetails.class),
null,
- false,
- includeMetadata ? EnumSet.of(BlobListingDetails.METADATA) : EnumSet
- .noneOf(BlobListingDetails.class), null, getInstrumentedContext());
+ getInstrumentedContext());
}
/**
@@ -1392,11 +1618,14 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
private Iterable<ListBlobItem> listRootBlobs(String aPrefix,
boolean includeMetadata) throws StorageException, URISyntaxException {
- return rootDirectory.listBlobs(
- aPrefix,
+ Iterable<ListBlobItem> list = rootDirectory.listBlobs(aPrefix,
false,
- includeMetadata ? EnumSet.of(BlobListingDetails.METADATA) : EnumSet
- .noneOf(BlobListingDetails.class), null, getInstrumentedContext());
+ includeMetadata ?
+ EnumSet.of(BlobListingDetails.METADATA) :
+ EnumSet.noneOf(BlobListingDetails.class),
+ null,
+ getInstrumentedContext());
+ return list;
}
/**
@@ -1423,15 +1652,17 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
* @throws URISyntaxException
*
*/
- private Iterable<ListBlobItem> listRootBlobs(String aPrefix,
- boolean useFlatBlobListing, EnumSet<BlobListingDetails> listingDetails,
- BlobRequestOptions options, OperationContext opContext)
- throws StorageException, URISyntaxException {
+ private Iterable<ListBlobItem> listRootBlobs(String aPrefix, boolean useFlatBlobListing,
+ EnumSet<BlobListingDetails> listingDetails, BlobRequestOptions options,
+ OperationContext opContext) throws StorageException, URISyntaxException {
- CloudBlobDirectoryWrapper directory = this.container
- .getDirectoryReference(aPrefix);
- return directory.listBlobs(null, useFlatBlobListing, listingDetails,
- options, opContext);
+ CloudBlobDirectoryWrapper directory = this.container.getDirectoryReference(aPrefix);
+ return directory.listBlobs(
+ null,
+ useFlatBlobListing,
+ listingDetails,
+ options,
+ opContext);
}
/**
@@ -1447,15 +1678,18 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
* @throws URISyntaxException
*
*/
- private CloudBlockBlobWrapper getBlobReference(String aKey)
+ private CloudBlobWrapper getBlobReference(String aKey)
throws StorageException, URISyntaxException {
- CloudBlockBlobWrapper blob = this.container.getBlockBlobReference(aKey);
-
+ CloudBlobWrapper blob = null;
+ if (isPageBlobKey(aKey)) {
+ blob = this.container.getPageBlobReference(aKey);
+ } else {
+ blob = this.container.getBlockBlobReference(aKey);
blob.setStreamMinimumReadSizeInBytes(downloadBlockSizeBytes);
blob.setWriteBlockSizeInBytes(uploadBlockSizeBytes);
+ }
- // Return with block blob.
return blob;
}
@@ -1492,7 +1726,7 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
*
* @returns normKey
*/
- private String normalizeKey(CloudBlockBlobWrapper blob) {
+ private String normalizeKey(CloudBlobWrapper blob) {
return normalizeKey(blob.getUri());
}
@@ -1552,20 +1786,19 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
instrumentation,
bandwidthGaugeUpdater);
- // Bind operation context to receive send request callbacks on this
- // operation.
- // If reads concurrent to OOB writes are allowed, the interception will
- // reset the conditional header on all Azure blob storage read requests.
+ // Bind operation context to receive send request callbacks on this operation.
+ // If reads concurrent to OOB writes are allowed, the interception will reset
+ // the conditional header on all Azure blob storage read requests.
if (bindConcurrentOOBIo) {
SendRequestIntercept.bind(storageInteractionLayer.getCredentials(),
operationContext, true);
}
if (testHookOperationContext != null) {
- operationContext = testHookOperationContext
- .modifyOperationContext(operationContext);
+ operationContext =
+ testHookOperationContext.modifyOperationContext(operationContext);
}
-
+
ErrorMetricUpdater.hook(operationContext, instrumentation);
// Return the operation context.
@@ -1605,7 +1838,7 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
BlobMaterialization.Implicit);
}
- CloudBlockBlobWrapper blob = getBlobReference(key);
+ CloudBlobWrapper blob = getBlobReference(key);
// Download attributes and return file metadata only if the blob
// exists.
@@ -1634,7 +1867,8 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
return new FileMetadata(
key, // Always return denormalized key with metadata.
- properties.getLength(), properties.getLastModified().getTime(),
+ getDataLength(blob, properties),
+ properties.getLastModified().getTime(),
getPermissionStatus(blob));
}
}
@@ -1642,17 +1876,23 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
// There is no file with that key name, but maybe it is a folder.
// Query the underlying folder/container to list the blobs stored
// there under that key.
- Iterable<ListBlobItem> objects = listRootBlobs(key, true,
- EnumSet.of(BlobListingDetails.METADATA), null,
+ //
+ Iterable<ListBlobItem> objects =
+ listRootBlobs(
+ key,
+ true,
+ EnumSet.of(BlobListingDetails.METADATA),
+ null,
getInstrumentedContext());
// Check if the directory/container has the blob items.
for (ListBlobItem blobItem : objects) {
- if (blobItem instanceof CloudBlockBlobWrapper) {
+ if (blobItem instanceof CloudBlockBlobWrapper
+ || blobItem instanceof CloudPageBlobWrapper) {
LOG.debug("Found blob as a directory-using this file under it to infer its properties "
+ blobItem.getUri());
- blob = (CloudBlockBlobWrapper) blobItem;
+ blob = (CloudBlobWrapper) blobItem;
// The key specifies a directory. Create a FileMetadata object which
// specifies as such.
BlobProperties properties = blob.getProperties();
@@ -1672,10 +1912,7 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
}
@Override
- public DataInputStream retrieve(String key) throws AzureException {
- InputStream inStream = null;
- BufferedInputStream inBufStream = null;
- try {
+ public DataInputStream retrieve(String key) throws AzureException, IOException {
try {
// Check if a session exists, if not create a session with the
// Azure storage server.
@@ -1688,27 +1925,13 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
checkContainer(ContainerAccessType.PureRead);
// Get blob reference and open the input buffer stream.
- CloudBlockBlobWrapper blob = getBlobReference(key);
- inStream = blob.openInputStream(getDownloadOptions(),
- getInstrumentedContext(isConcurrentOOBAppendAllowed()));
-
- inBufStream = new BufferedInputStream(inStream);
+ CloudBlobWrapper blob = getBlobReference(key);
+ BufferedInputStream inBufStream = new BufferedInputStream(
+ openInputStream(blob));
// Return a data input stream.
DataInputStream inDataStream = new DataInputStream(inBufStream);
return inDataStream;
- }
- catch (Exception e){
- // close the streams on error.
- // We use nested try-catch as stream.close() can throw IOException.
- if(inBufStream != null){
- inBufStream.close();
- }
- if(inStream != null){
- inStream.close();
- }
- throw e;
- }
} catch (Exception e) {
// Re-throw as an Azure storage exception.
throw new AzureException(e);
@@ -1717,11 +1940,7 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
@Override
public DataInputStream retrieve(String key, long startByteOffset)
- throws AzureException {
-
- InputStream in = null;
- DataInputStream inDataStream = null;
- try {
+ throws AzureException, IOException {
try {
// Check if a session exists, if not create a session with the
// Azure storage server.
@@ -1734,31 +1953,20 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
checkContainer(ContainerAccessType.PureRead);
// Get blob reference and open the input buffer stream.
- CloudBlockBlobWrapper blob = getBlobReference(key);
+ CloudBlobWrapper blob = getBlobReference(key);
// Open input stream and seek to the start offset.
- in = blob.openInputStream(getDownloadOptions(),
- getInstrumentedContext(isConcurrentOOBAppendAllowed()));
+ InputStream in = blob.openInputStream(
+ getDownloadOptions(), getInstrumentedContext(isConcurrentOOBAppendAllowed()));
// Create a data input stream.
- inDataStream = new DataInputStream(in);
- long skippedBytes = inDataStream.skip(startByteOffset);
- if (skippedBytes != startByteOffset) {
- throw new IOException("Couldn't skip the requested number of bytes");
- }
+ DataInputStream inDataStream = new DataInputStream(in);
+
+ // Skip bytes and ignore return value. This is okay
+ // because if you try to skip too far you will be positioned
+ // at the end and reads will not return data.
+ inDataStream.skip(startByteOffset);
return inDataStream;
- }
- catch (Exception e){
- // close the streams on error.
- // We use nested try-catch as stream.close() can throw IOException.
- if(inDataStream != null){
- inDataStream.close();
- }
- if(in != null){
- in.close();
- }
- throw e;
- }
} catch (Exception e) {
// Re-throw as an Azure storage exception.
throw new AzureException(e);
@@ -1825,13 +2033,14 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
for (ListBlobItem blobItem : objects) {
// Check that the maximum listing count is not exhausted.
//
- if (0 < maxListingCount && fileMetadata.size() >= maxListingCount) {
+ if (0 < maxListingCount
+ && fileMetadata.size() >= maxListingCount) {
break;
}
- if (blobItem instanceof CloudBlockBlobWrapper) {
+ if (blobItem instanceof CloudBlockBlobWrapper || blobItem instanceof CloudPageBlobWrapper) {
String blobKey = null;
- CloudBlockBlobWrapper blob = (CloudBlockBlobWrapper) blobItem;
+ CloudBlobWrapper blob = (CloudBlobWrapper) blobItem;
BlobProperties properties = blob.getProperties();
// Determine format of the blob name depending on whether an absolute
@@ -1840,11 +2049,14 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
FileMetadata metadata;
if (retrieveFolderAttribute(blob)) {
- metadata = new FileMetadata(blobKey, properties.getLastModified()
- .getTime(), getPermissionStatus(blob),
+ metadata = new FileMetadata(blobKey,
+ properties.getLastModified().getTime(),
+ getPermissionStatus(blob),
BlobMaterialization.Explicit);
} else {
- metadata = new FileMetadata(blobKey, properties.getLength(),
+ metadata = new FileMetadata(
+ blobKey,
+ getDataLength(blob, properties),
properties.getLastModified().getTime(),
getPermissionStatus(blob));
}
@@ -1890,9 +2102,11 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
}
// Note: Original code indicated that this may be a hack.
priorLastKey = null;
- return new PartialListing(priorLastKey,
+ PartialListing listing = new PartialListing(priorLastKey,
fileMetadata.toArray(new FileMetadata[] {}),
- 0 == fileMetadata.size() ? new String[] {} : new String[] { prefix });
+ 0 == fileMetadata.size() ? new String[] {}
+ : new String[] { prefix });
+ return listing;
} catch (Exception e) {
// Re-throw as an Azure storage exception.
//
@@ -1919,7 +2133,9 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
final int maxListingDepth) throws Exception {
// Push the blob directory onto the stack.
- LinkedList<Iterator<ListBlobItem>> dirIteratorStack = new LinkedList<Iterator<ListBlobItem>>();
+ //
+ AzureLinkedStack<Iterator<ListBlobItem>> dirIteratorStack =
+ new AzureLinkedStack<Iterator<ListBlobItem>>();
Iterable<ListBlobItem> blobItems = aCloudBlobDirectory.listBlobs(null,
false, EnumSet.of(BlobListingDetails.METADATA), null,
@@ -1958,9 +2174,10 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
// Add the file metadata to the list if this is not a blob
// directory item.
- if (blobItem instanceof CloudBlockBlobWrapper) {
+ //
+ if (blobItem instanceof CloudBlockBlobWrapper || blobItem instanceof CloudPageBlobWrapper) {
String blobKey = null;
- CloudBlockBlobWrapper blob = (CloudBlockBlobWrapper) blobItem;
+ CloudBlobWrapper blob = (CloudBlobWrapper) blobItem;
BlobProperties properties = blob.getProperties();
// Determine format of the blob name depending on whether an absolute
@@ -1969,11 +2186,14 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
FileMetadata metadata;
if (retrieveFolderAttribute(blob)) {
- metadata = new FileMetadata(blobKey, properties.getLastModified()
- .getTime(), getPermissionStatus(blob),
+ metadata = new FileMetadata(blobKey,
+ properties.getLastModified().getTime(),
+ getPermissionStatus(blob),
BlobMaterialization.Explicit);
} else {
- metadata = new FileMetadata(blobKey, properties.getLength(),
+ metadata = new FileMetadata(
+ blobKey,
+ getDataLength(blob, properties),
properties.getLastModified().getTime(),
getPermissionStatus(blob));
}
@@ -2016,7 +2236,9 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
// Note: Something smarter should be done about permissions. Maybe
// inherit the permissions of the first non-directory blob.
// Also, getting a proper value for last-modified is tricky.
- FileMetadata directoryMetadata = new FileMetadata(dirKey, 0,
+ //
+ FileMetadata directoryMetadata = new FileMetadata(dirKey,
+ 0,
defaultPermissionNoBlobMetadata(),
BlobMaterialization.Implicit);
@@ -2050,26 +2272,48 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
}
/**
- * Deletes the given blob, taking special care that if we get a blob-not-found
- * exception upon retrying the operation, we just swallow the error since what
- * most probably happened is that the first operation succeeded on the server.
- *
- * @param blob
- * The blob to delete.
+ * Return the actual data length of the blob with the specified properties.
+ * If it is a page blob, you can't rely on the length from the properties
+ * argument and you must get it from the file. Otherwise, you can.
+ */
+ private long getDataLength(CloudBlobWrapper blob, BlobProperties properties)
+ throws AzureException {
+ if (blob instanceof CloudPageBlobWrapper) {
+ try {
+ return PageBlobInputStream.getPageBlobSize((CloudPageBlobWrapper) blob,
+ getInstrumentedContext(
+ isConcurrentOOBAppendAllowed()));
+ } catch (Exception e) {
+ throw new AzureException(
+ "Unexpected exception getting page blob actual data size.", e);
+ }
+ }
+ return properties.getLength();
+ }
+
+ /**
+ * Deletes the given blob, taking special care that if we get a
+ * blob-not-found exception upon retrying the operation, we just
+ * swallow the error since what most probably happened is that
+ * the first operation succeeded on the server.
+ * @param blob The blob to delete.
+ * @param leaseID A string identifying the lease, or null if no
+ * lease is to be used.
* @throws StorageException
*/
- private void safeDelete(CloudBlockBlobWrapper blob) throws StorageException {
+ private void safeDelete(CloudBlobWrapper blob, SelfRenewingLease lease) throws StorageException {
OperationContext operationContext = getInstrumentedContext();
try {
- blob.delete(operationContext);
+ blob.delete(operationContext, lease);
} catch (StorageException e) {
// On exception, check that if:
// 1. It's a BlobNotFound exception AND
// 2. It got there after one-or-more retries THEN
// we swallow the exception.
- if (e.getErrorCode() != null && e.getErrorCode().equals("BlobNotFound")
- && operationContext.getRequestResults().size() > 1
- && operationContext.getRequestResults().get(0).getException() != null) {
+ if (e.getErrorCode() != null &&
+ e.getErrorCode().equals("BlobNotFound") &&
+ operationContext.getRequestResults().size() > 1 &&
+ operationContext.getRequestResults().get(0).getException() != null) {
if (LOG.isDebugEnabled()) {
LOG.debug("Swallowing delete exception on retry: " + e.getMessage());
}
@@ -2077,21 +2321,25 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
} else {
throw e;
}
+ } finally {
+ if (lease != null) {
+ lease.free();
+ }
}
}
@Override
- public void delete(String key) throws IOException {
+ public void delete(String key, SelfRenewingLease lease) throws IOException {
try {
if (checkContainer(ContainerAccessType.ReadThenWrite) == ContainerState.DoesntExist) {
// Container doesn't exist, no need to do anything
return;
}
- // Get the blob reference an delete it.
- CloudBlockBlobWrapper blob = getBlobReference(key);
+ // Get the blob reference and delete it.
+ CloudBlobWrapper blob = getBlobReference(key);
if (blob.exists(getInstrumentedContext())) {
- safeDelete(blob);
+ safeDelete(blob, lease);
}
} catch (Exception e) {
// Re-throw as an Azure storage exception.
@@ -2100,12 +2348,27 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
}
@Override
+ public void delete(String key) throws IOException {
+ delete(key, null);
+ }
+
+ @Override
public void rename(String srcKey, String dstKey) throws IOException {
+ rename(srcKey, dstKey, false, null);
+ }
+
+ @Override
+ public void rename(String srcKey, String dstKey, boolean acquireLease,
+ SelfRenewingLease existingLease) throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("Moving " + srcKey + " to " + dstKey);
}
+ if (acquireLease && existingLease != null) {
+ throw new IOException("Cannot acquire new lease if one already exists.");
+ }
+
try {
// Attempts rename may occur before opening any streams so first,
// check if a session exists, if not create a session with the Azure
@@ -2120,52 +2383,76 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
checkContainer(ContainerAccessType.ReadThenWrite);
// Get the source blob and assert its existence. If the source key
// needs to be normalized then normalize it.
- CloudBlockBlobWrapper srcBlob = getBlobReference(srcKey);
+ //
+ CloudBlobWrapper srcBlob = getBlobReference(srcKey);
if (!srcBlob.exists(getInstrumentedContext())) {
- throw new AzureException("Source blob " + srcKey + " does not exist.");
+ throw new AzureException ("Source blob " + srcKey +
+ " does not exist.");
+ }
+
+ /**
+ * Conditionally get a lease on the source blob to prevent other writers
+ * from changing it. This is used for correctness in HBase when log files
+ * are renamed. It generally should do no harm other than take a little
+ * more time for other rename scenarios. When the HBase master renames a
+ * log file folder, the lease locks out other writers. This
+ * prevents a region server that the master thinks is dead, but is still
+ * alive, from committing additional updates. This is different than
+ * when HBase runs on HDFS, where the region server recovers the lease
+ * on a log file, to gain exclusive access to it, before it splits it.
+ */
+ SelfRenewingLease lease = null;
+ if (acquireLease) {
+ lease = srcBlob.acquireLease();
+ } else if (existingLease != null) {
+ lease = existingLease;
}
// Get the destination blob. The destination key always needs to be
// normalized.
- CloudBlockBlobWrapper dstBlob = getBlobReference(dstKey);
+ //
+ CloudBlobWrapper dstBlob = getBlobReference(dstKey);
+
+ // TODO: Remove at the time when we move to Azure Java SDK 1.2+.
+ // This is the workaround provided by Azure Java SDK team to
+ // mitigate the issue with un-encoded x-ms-copy-source HTTP
+ // request header. Azure sdk version before 1.2+ does not encode this
+ // header what causes all URIs that have special (category "other")
+ // characters in the URI not to work with startCopyFromBlob when
+ // specified as source (requests fail with HTTP 403).
+ URI srcUri = new URI(srcBlob.getUri().toASCIIString());
// Rename the source blob to the destination blob by copying it to
// the destination blob then deleting it.
//
- dstBlob.startCopyFromBlob(srcBlob, getInstrumentedContext());
+ dstBlob.startCopyFromBlob(srcUri, getInstrumentedContext());
waitForCopyToComplete(dstBlob, getInstrumentedContext());
- safeDelete(srcBlob);
+ safeDelete(srcBlob, lease);
} catch (Exception e) {
// Re-throw exception as an Azure storage exception.
throw new AzureException(e);
}
}
- private void waitForCopyToComplete(CloudBlockBlobWrapper blob,
- OperationContext opContext) throws AzureException {
+ private void waitForCopyToComplete(CloudBlobWrapper blob, OperationContext opContext){
boolean copyInProgress = true;
- int exceptionCount = 0;
while (copyInProgress) {
try {
blob.downloadAttributes(opContext);
- } catch (StorageException se) {
- exceptionCount++;
- if(exceptionCount > 10){
- throw new AzureException("Too many storage exceptions during waitForCopyToComplete", se);
}
+ catch (StorageException se){
}
- // test for null because mocked filesystem doesn't know about copystates
- // yet.
- copyInProgress = (blob.getCopyState() != null && blob.getCopyState()
- .getStatus() == CopyStatus.PENDING);
+ // test for null because mocked filesystem doesn't know about copystates yet.
+ copyInProgress = (blob.getCopyState() != null && blob.getCopyState().getStatus() == CopyStatus.PENDING);
if (copyInProgress) {
try {
Thread.sleep(1000);
- } catch (InterruptedException ie) {
- Thread.currentThread().interrupt();
+ }
+ catch (InterruptedException ie){
+ //ignore
}
}
}
@@ -2179,7 +2466,7 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
throws AzureException {
try {
checkContainer(ContainerAccessType.ReadThenWrite);
- CloudBlockBlobWrapper blob = getBlobReference(key);
+ CloudBlobWrapper blob = getBlobReference(key);
blob.downloadAttributes(getInstrumentedContext());
storePermissionStatus(blob, newPermission);
blob.uploadMetadata(getInstrumentedContext());
@@ -2220,28 +2507,51 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
}
}
+ /**
+ * Get a lease on the blob identified by key. This lease will be renewed
+ * indefinitely by a background thread.
+ */
+ @Override
+ public SelfRenewingLease acquireLease(String key) throws AzureException {
+ LOG.debug("acquiring lease on " + key);
+ try {
+ checkContainer(ContainerAccessType.ReadThenWrite);
+ CloudBlobWrapper blob = getBlobReference(key);
+ return blob.acquireLease();
+ }
+ catch (Exception e) {
+
+ // Caught exception while attempting to get lease. Re-throw as an
+ // Azure storage exception.
+ throw new AzureException(e);
+ }
+ }
+
@Override
- public void updateFolderLastModifiedTime(String key, Date lastModified)
+ public void updateFolderLastModifiedTime(String key, Date lastModified,
+ SelfRenewingLease folderLease)
throws AzureException {
try {
checkContainer(ContainerAccessType.ReadThenWrite);
- CloudBlockBlobWrapper blob = getBlobReference(key);
+ CloudBlobWrapper blob = getBlobReference(key);
blob.getProperties().setLastModified(lastModified);
- blob.uploadProperties(getInstrumentedContext());
+ blob.uploadProperties(getInstrumentedContext(), folderLease);
} catch (Exception e) {
- // Caught exception while attempting update the properties. Re-throw as an
+
+ // Caught exception while attempting to update the properties. Re-throw as an
// Azure storage exception.
throw new AzureException(e);
}
}
@Override
- public void updateFolderLastModifiedTime(String key) throws AzureException {
+ public void updateFolderLastModifiedTime(String key,
+ SelfRenewingLease folderLease) throws AzureException {
final Calendar lastModifiedCalendar = Calendar
.getInstance(Utility.LOCALE_US);
lastModifiedCalendar.setTimeZone(Utility.UTC_ZONE);
Date lastModified = lastModifiedCalendar.getTime();
- updateFolderLastModifiedTime(key, lastModified);
+ updateFolderLastModifiedTime(key, lastModified, folderLease);
}
@Override