You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ma...@apache.org on 2018/09/23 03:24:12 UTC
[15/45] hadoop git commit: HADOOP-15560. ABFS: removed dependency
injection and unnecessary dependencies. Contributed by Da Zhou.
HADOOP-15560. ABFS: removed dependency injection and unnecessary dependencies.
Contributed by Da Zhou.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/a271fd0e
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/a271fd0e
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/a271fd0e
Branch: refs/heads/trunk
Commit: a271fd0eca75cef8b8ba940cdac8ad4fd21b4462
Parents: f044dee
Author: Steve Loughran <st...@apache.org>
Authored: Tue Jul 3 18:55:10 2018 +0200
Committer: Thomas Marquardt <tm...@microsoft.com>
Committed: Mon Sep 17 19:54:01 2018 +0000
----------------------------------------------------------------------
hadoop-tools/hadoop-azure/pom.xml | 18 -
.../src/config/checkstyle-suppressions.xml | 2 +-
.../hadoop/fs/azurebfs/AzureBlobFileSystem.java | 88 ++-
.../fs/azurebfs/AzureBlobFileSystemStore.java | 701 +++++++++++++++++++
.../exceptions/ServiceResolutionException.java | 36 -
.../services/AbfsHttpClientFactory.java | 39 --
.../contracts/services/AbfsHttpService.java | 162 -----
.../contracts/services/AbfsServiceProvider.java | 40 --
.../services/ConfigurationService.java | 143 ----
.../contracts/services/InjectableService.java | 30 -
.../contracts/services/TracingService.java | 66 --
.../hadoop/fs/azurebfs/services/AbfsClient.java | 7 +-
.../fs/azurebfs/services/AbfsConfiguration.java | 297 ++++++++
.../services/AbfsHttpClientFactoryImpl.java | 116 ---
.../azurebfs/services/AbfsHttpServiceImpl.java | 693 ------------------
.../services/AbfsServiceInjectorImpl.java | 81 ---
.../services/AbfsServiceProviderImpl.java | 96 ---
.../services/ConfigurationServiceImpl.java | 317 ---------
.../services/ExponentialRetryPolicy.java | 9 +-
.../azurebfs/services/LoggerSpanReceiver.java | 74 --
.../azurebfs/services/TracingServiceImpl.java | 134 ----
.../fs/azurebfs/DependencyInjectedTest.java | 59 +-
.../azurebfs/ITestAzureBlobFileSystemE2E.java | 7 +-
.../ITestAzureBlobFileSystemRandomRead.java | 7 +-
.../azurebfs/ITestFileSystemInitialization.java | 23 +-
.../fs/azurebfs/ITestFileSystemProperties.java | 126 ++++
.../azurebfs/ITestFileSystemRegistration.java | 23 +-
.../ITestAzureBlobFileSystemBasics.java | 11 +-
.../services/ITestAbfsHttpServiceImpl.java | 122 ----
.../services/ITestReadWriteAndSeek.java | 8 +-
.../services/ITestTracingServiceImpl.java | 79 ---
.../services/MockAbfsHttpClientFactoryImpl.java | 69 --
.../services/MockAbfsServiceInjectorImpl.java | 50 --
.../services/MockServiceProviderImpl.java | 36 -
.../TestAbfsConfigurationFieldsValidation.java | 149 ++++
...estConfigurationServiceFieldsValidation.java | 149 ----
.../utils/CleanUpAbfsTestContainer.java | 68 ++
37 files changed, 1432 insertions(+), 2703 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a271fd0e/hadoop-tools/hadoop-azure/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/pom.xml b/hadoop-tools/hadoop-azure/pom.xml
index d4046ef..cbd4dfb 100644
--- a/hadoop-tools/hadoop-azure/pom.xml
+++ b/hadoop-tools/hadoop-azure/pom.xml
@@ -150,12 +150,6 @@
</dependency>
<dependency>
- <groupId>org.threadly</groupId>
- <artifactId>threadly</artifactId>
- <scope>compile</scope>
- </dependency>
-
- <dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<scope>compile</scope>
@@ -186,18 +180,6 @@
</dependency>
<dependency>
- <groupId>org.apache.htrace</groupId>
- <artifactId>htrace-core</artifactId>
- <scope>compile</scope>
- </dependency>
-
- <dependency>
- <groupId>org.apache.htrace</groupId>
- <artifactId>htrace-core4</artifactId>
- <scope>compile</scope>
- </dependency>
-
- <dependency>
<groupId>com.google.inject</groupId>
<artifactId>guice</artifactId>
<scope>compile</scope>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a271fd0e/hadoop-tools/hadoop-azure/src/config/checkstyle-suppressions.xml
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/config/checkstyle-suppressions.xml b/hadoop-tools/hadoop-azure/src/config/checkstyle-suppressions.xml
index 0204355..751a227 100644
--- a/hadoop-tools/hadoop-azure/src/config/checkstyle-suppressions.xml
+++ b/hadoop-tools/hadoop-azure/src/config/checkstyle-suppressions.xml
@@ -43,5 +43,5 @@
<suppressions>
<suppress checks="ParameterNumber|MagicNumber"
- files="org[\\/]apache[\\/]hadoop[\\/]fs[\\/]azurebfs[\\/]services[\\/]AbfsHttpServiceImpl.java"/>
+ files="org[\\/]apache[\\/]hadoop[\\/]fs[\\/]azurebfs[\\/]AzureBlobFileSystemStore.java"/>
</suppressions>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a271fd0e/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java
index 707c81e..cf5acbb 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java
@@ -39,10 +39,8 @@ import com.google.common.base.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.threadly.util.ExceptionUtils;
import org.apache.hadoop.fs.PathIOException;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
-import org.apache.hadoop.fs.azurebfs.services.AbfsServiceProviderImpl;
import org.apache.commons.lang.ArrayUtils;
import org.apache.hadoop.classification.InterfaceAudience;
@@ -58,10 +56,6 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations;
import org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes;
-import org.apache.hadoop.fs.azurebfs.contracts.services.TracingService;
-import org.apache.hadoop.fs.azurebfs.contracts.services.ConfigurationService;
-import org.apache.hadoop.fs.azurebfs.contracts.services.AbfsHttpService;
-import org.apache.hadoop.fs.azurebfs.contracts.services.AbfsServiceProvider;
import org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.FileSystemOperationUnhandledException;
@@ -70,7 +64,6 @@ import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidUriException;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Progressable;
-import org.apache.htrace.core.TraceScope;
/**
* A {@link org.apache.hadoop.fs.FileSystem} for reading and writing files stored on <a
@@ -85,10 +78,7 @@ public class AzureBlobFileSystem extends FileSystem {
private UserGroupInformation userGroupInformation;
private String user;
private String primaryUserGroup;
- private AbfsServiceProvider abfsServiceProvider;
- private TracingService tracingService;
- private AbfsHttpService abfsHttpService;
- private ConfigurationService configurationService;
+ private AzureBlobFileSystemStore abfsStore;
private boolean isClosed;
@Override
@@ -96,18 +86,8 @@ public class AzureBlobFileSystem extends FileSystem {
throws IOException {
uri = ensureAuthority(uri, configuration);
super.initialize(uri, configuration);
-
setConf(configuration);
- try {
- this.abfsServiceProvider = AbfsServiceProviderImpl.create(configuration);
- this.tracingService = abfsServiceProvider.get(TracingService.class);
- this.abfsHttpService = abfsServiceProvider.get(AbfsHttpService.class);
- this.configurationService = abfsServiceProvider.get(ConfigurationService.class);
- } catch (AzureBlobFileSystemException exception) {
- throw new IOException(exception);
- }
-
this.LOG.debug(
"Initializing AzureBlobFileSystem for {}", uri);
@@ -115,13 +95,14 @@ public class AzureBlobFileSystem extends FileSystem {
this.userGroupInformation = UserGroupInformation.getCurrentUser();
this.user = userGroupInformation.getUserName();
this.primaryUserGroup = userGroupInformation.getPrimaryGroupName();
+ this.abfsStore = new AzureBlobFileSystemStore(uri, this.isSecure(), configuration, userGroupInformation);
this.LOG.debug(
"Initializing NativeAzureFileSystem for {}", uri);
this.setWorkingDirectory(this.getHomeDirectory());
- if (this.configurationService.getCreateRemoteFileSystemDuringInitialization()) {
+ if (abfsStore.getAbfsConfiguration().getCreateRemoteFileSystemDuringInitialization()) {
this.createFileSystem();
}
@@ -143,7 +124,7 @@ public class AzureBlobFileSystem extends FileSystem {
"AzureBlobFileSystem.open path: {} bufferSize: {}", path.toString(), bufferSize);
try {
- InputStream inputStream = abfsHttpService.openFileForRead(this, makeQualified(path), statistics);
+ InputStream inputStream = abfsStore.openFileForRead(makeQualified(path), statistics);
return new FSDataInputStream(inputStream);
} catch(AzureBlobFileSystemException ex) {
checkException(path, ex);
@@ -162,7 +143,7 @@ public class AzureBlobFileSystem extends FileSystem {
blockSize);
try {
- OutputStream outputStream = abfsHttpService.createFile(this, makeQualified(f), overwrite);
+ OutputStream outputStream = abfsStore.createFile(makeQualified(f), overwrite);
return new FSDataOutputStream(outputStream, statistics);
} catch(AzureBlobFileSystemException ex) {
checkException(f, ex);
@@ -221,7 +202,7 @@ public class AzureBlobFileSystem extends FileSystem {
bufferSize);
try {
- OutputStream outputStream = abfsHttpService.openFileForWrite(this, makeQualified(f), false);
+ OutputStream outputStream = abfsStore.openFileForWrite(makeQualified(f), false);
return new FSDataOutputStream(outputStream, statistics);
} catch(AzureBlobFileSystemException ex) {
checkException(f, ex);
@@ -251,7 +232,7 @@ public class AzureBlobFileSystem extends FileSystem {
adjustedDst = new Path(dst, sourceFileName);
}
- abfsHttpService.rename(this, makeQualified(src), makeQualified(adjustedDst));
+ abfsStore.rename(makeQualified(src), makeQualified(adjustedDst));
return true;
} catch(AzureBlobFileSystemException ex) {
checkException(
@@ -281,7 +262,7 @@ public class AzureBlobFileSystem extends FileSystem {
}
try {
- abfsHttpService.delete(this, makeQualified(f), recursive);
+ abfsStore.delete(makeQualified(f), recursive);
return true;
} catch (AzureBlobFileSystemException ex) {
checkException(f, ex, AzureServiceErrorCode.PATH_NOT_FOUND);
@@ -296,7 +277,7 @@ public class AzureBlobFileSystem extends FileSystem {
"AzureBlobFileSystem.listStatus path: {}", f.toString());
try {
- FileStatus[] result = abfsHttpService.listStatus(this, makeQualified(f));
+ FileStatus[] result = abfsStore.listStatus(makeQualified(f));
return result;
} catch (AzureBlobFileSystemException ex) {
checkException(f, ex);
@@ -316,7 +297,7 @@ public class AzureBlobFileSystem extends FileSystem {
}
try {
- abfsHttpService.createDirectory(this, makeQualified(f));
+ abfsStore.createDirectory(makeQualified(f));
return true;
} catch (AzureBlobFileSystemException ex) {
checkException(f, ex, AzureServiceErrorCode.PATH_ALREADY_EXISTS);
@@ -332,13 +313,7 @@ public class AzureBlobFileSystem extends FileSystem {
super.close();
this.LOG.debug("AzureBlobFileSystem.close");
-
- try {
- abfsHttpService.closeFileSystem(this);
- } catch (AzureBlobFileSystemException ex) {
- checkException(null, ex);
- this.isClosed = true;
- }
+ this.isClosed = true;
}
@Override
@@ -346,7 +321,7 @@ public class AzureBlobFileSystem extends FileSystem {
this.LOG.debug("AzureBlobFileSystem.getFileStatus path: {}", f.toString());
try {
- return abfsHttpService.getFileStatus(this, makeQualified(f));
+ return abfsStore.getFileStatus(makeQualified(f));
} catch(AzureBlobFileSystemException ex) {
checkException(f, ex);
return null;
@@ -397,7 +372,7 @@ public class AzureBlobFileSystem extends FileSystem {
if (file.getLen() < start) {
return new BlockLocation[0];
}
- final String blobLocationHost = this.configurationService.getAzureBlockLocationHost();
+ final String blobLocationHost = this.abfsStore.getAbfsConfiguration().getAzureBlockLocationHost();
final String[] name = { blobLocationHost };
final String[] host = { blobLocationHost };
@@ -477,12 +452,10 @@ public class AzureBlobFileSystem extends FileSystem {
this.LOG.debug(
"AzureBlobFileSystem.createFileSystem uri: {}", uri);
try {
- abfsHttpService.createFilesystem(this);
+ this.abfsStore.createFilesystem();
} catch (AzureBlobFileSystemException ex) {
checkException(null, ex, AzureServiceErrorCode.FILE_SYSTEM_ALREADY_EXISTS);
}
-
-
}
private URI ensureAuthority(URI uri, final Configuration conf) {
@@ -540,25 +513,19 @@ public class AzureBlobFileSystem extends FileSystem {
final Callable<T> callableFileOperation,
T defaultResultValue) throws IOException {
- final TraceScope traceScope = tracingService.traceBegin(scopeDescription);
try {
final T executionResult = callableFileOperation.call();
return new FileSystemOperation(executionResult, null);
} catch (AbfsRestOperationException abfsRestOperationException) {
return new FileSystemOperation(defaultResultValue, abfsRestOperationException);
} catch (AzureBlobFileSystemException azureBlobFileSystemException) {
- tracingService.traceException(traceScope, azureBlobFileSystemException);
throw new IOException(azureBlobFileSystemException);
} catch (Exception exception) {
if (exception instanceof ExecutionException) {
- exception = (Exception) ExceptionUtils.getRootCause(exception);
+ exception = (Exception) getRootCause(exception);
}
-
final FileSystemOperationUnhandledException fileSystemOperationUnhandledException = new FileSystemOperationUnhandledException(exception);
- tracingService.traceException(traceScope, fileSystemOperationUnhandledException);
throw new IOException(fileSystemOperationUnhandledException);
- } finally {
- tracingService.traceEnd(traceScope);
}
}
@@ -590,6 +557,26 @@ public class AzureBlobFileSystem extends FileSystem {
}
}
+ /**
+ * Gets the root cause of a provided {@link Throwable}. If there is no cause for the
+ * {@link Throwable} provided into this function, the original {@link Throwable} is returned.
+ *
+ * @param throwable starting {@link Throwable}
+ * @return root cause {@link Throwable}
+ */
+ private Throwable getRootCause(Throwable throwable) {
+ if (throwable == null) {
+ throw new IllegalArgumentException("throwable can not be null");
+ }
+
+ Throwable result = throwable;
+ while (result.getCause() != null) {
+ result = result.getCause();
+ }
+
+ return result;
+ }
+
@VisibleForTesting
FileSystem.Statistics getFsStatistics() {
return this.statistics;
@@ -609,4 +596,9 @@ public class AzureBlobFileSystem extends FileSystem {
return this.exception != null;
}
}
+
+ @VisibleForTesting
+ AzureBlobFileSystemStore getAbfsStore() {
+ return this.abfsStore;
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a271fd0e/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
new file mode 100644
index 0000000..134277f
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
@@ -0,0 +1,701 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.azurebfs;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.MalformedURLException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.nio.ByteBuffer;
+import java.nio.CharBuffer;
+import java.nio.charset.CharacterCodingException;
+import java.nio.charset.Charset;
+import java.nio.charset.CharsetDecoder;
+import java.nio.charset.CharsetEncoder;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Hashtable;
+import java.util.Map;
+import java.util.Set;
+import javax.xml.bind.DatatypeConverter;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants;
+import org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes;
+import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
+import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
+import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
+import org.apache.hadoop.fs.azurebfs.contracts.exceptions.FileSystemOperationUnhandledException;
+import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidAbfsRestOperationException;
+import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidFileSystemPropertyException;
+import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidUriAuthorityException;
+import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidUriException;
+import org.apache.hadoop.fs.azurebfs.contracts.exceptions.TimeoutException;
+import org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode;
+import org.apache.hadoop.fs.azurebfs.contracts.services.ListResultEntrySchema;
+import org.apache.hadoop.fs.azurebfs.contracts.services.ListResultSchema;
+import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
+import org.apache.hadoop.fs.azurebfs.services.AbfsConfiguration;
+import org.apache.hadoop.fs.azurebfs.services.AbfsInputStream;
+import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream;
+import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation;
+import org.apache.hadoop.fs.azurebfs.services.ExponentialRetryPolicy;
+import org.apache.hadoop.fs.azurebfs.services.SharedKeyCredentials;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.http.client.utils.URIBuilder;
+import org.joda.time.DateTime;
+import org.joda.time.format.DateTimeFormat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.hadoop.util.Time.now;
+
+/**
+ * Provides the bridging logic between Hadoop's abstract filesystem and Azure Storage
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class AzureBlobFileSystemStore {
+ private static final Logger LOG = LoggerFactory.getLogger(AzureBlobFileSystemStore.class);
+
+ private AbfsClient client;
+ private URI uri;
+ private final UserGroupInformation userGroupInformation;
+ private static final String DATE_TIME_PATTERN = "E, dd MMM yyyy HH:mm:ss 'GMT'";
+ private static final String XMS_PROPERTIES_ENCODING = "ISO-8859-1";
+ private static final int LIST_MAX_RESULTS = 5000;
+ private static final int DELETE_DIRECTORY_TIMEOUT_MILISECONDS = 180000;
+ private static final int RENAME_TIMEOUT_MILISECONDS = 180000;
+
+ private final AbfsConfiguration abfsConfiguration;
+ private final Set<String> azureAtomicRenameDirSet;
+
+
+ public AzureBlobFileSystemStore(URI uri, boolean isSeure, Configuration configuration, UserGroupInformation userGroupInformation)
+ throws AzureBlobFileSystemException {
+ this.uri = uri;
+ try {
+ this.abfsConfiguration = new AbfsConfiguration(configuration);
+ } catch (IllegalAccessException exception) {
+ throw new FileSystemOperationUnhandledException(exception);
+ }
+
+ this.userGroupInformation = userGroupInformation;
+ this.azureAtomicRenameDirSet = new HashSet<>(Arrays.asList(abfsConfiguration.getAzureAtomicRenameDirs().split(AbfsHttpConstants.COMMA)));
+
+ initializeClient(uri, isSeure);
+ }
+
+ @VisibleForTesting
+ URIBuilder getURIBuilder(final String hostName, boolean isSecure) {
+ String scheme = isSecure ? FileSystemUriSchemes.HTTPS_SCHEME : FileSystemUriSchemes.HTTP_SCHEME;
+
+ final URIBuilder uriBuilder = new URIBuilder();
+ uriBuilder.setScheme(scheme);
+ uriBuilder.setHost(hostName);
+
+ return uriBuilder;
+ }
+
+ public AbfsConfiguration getAbfsConfiguration() {
+ return this.abfsConfiguration;
+ }
+
+ public Hashtable<String, String> getFilesystemProperties() throws AzureBlobFileSystemException {
+ this.LOG.debug(
+ "getFilesystemProperties for filesystem: {}",
+ client.getFileSystem());
+
+ final Hashtable<String, String> parsedXmsProperties;
+
+ final AbfsRestOperation op = client.getFilesystemProperties();
+ final String xMsProperties = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_PROPERTIES);
+
+ parsedXmsProperties = parseCommaSeparatedXmsProperties(xMsProperties);
+
+ return parsedXmsProperties;
+ }
+
+ public void setFilesystemProperties(final Hashtable<String, String> properties) throws AzureBlobFileSystemException {
+ if (properties == null || properties.size() == 0) {
+ return;
+ }
+
+ this.LOG.debug(
+ "setFilesystemProperties for filesystem: {} with properties: {}",
+ client.getFileSystem(),
+ properties);
+
+ final String commaSeparatedProperties;
+ try {
+ commaSeparatedProperties = convertXmsPropertiesToCommaSeparatedString(properties);
+ } catch (CharacterCodingException ex) {
+ throw new InvalidAbfsRestOperationException(ex);
+ }
+
+ client.setFilesystemProperties(commaSeparatedProperties);
+ }
+
+ public Hashtable<String, String> getPathProperties(final Path path) throws AzureBlobFileSystemException {
+ this.LOG.debug(
+ "getPathProperties for filesystem: {} path: {}",
+ client.getFileSystem(),
+ path.toString());
+
+ final Hashtable<String, String> parsedXmsProperties;
+ final AbfsRestOperation op = client.getPathProperties(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path));
+
+ final String xMsProperties = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_PROPERTIES);
+
+ parsedXmsProperties = parseCommaSeparatedXmsProperties(xMsProperties);
+
+ return parsedXmsProperties;
+ }
+
+ public void setPathProperties(final Path path, final Hashtable<String, String> properties) throws AzureBlobFileSystemException {
+ this.LOG.debug(
+ "setFilesystemProperties for filesystem: {} path: {} with properties: {}",
+ client.getFileSystem(),
+ path.toString(),
+ properties);
+
+ final String commaSeparatedProperties;
+ try {
+ commaSeparatedProperties = convertXmsPropertiesToCommaSeparatedString(properties);
+ } catch (CharacterCodingException ex) {
+ throw new InvalidAbfsRestOperationException(ex);
+ }
+ client.setPathProperties("/" + getRelativePath(path), commaSeparatedProperties);
+ }
+
+ public void createFilesystem() throws AzureBlobFileSystemException {
+ this.LOG.debug(
+ "createFilesystem for filesystem: {}",
+ client.getFileSystem());
+
+ client.createFilesystem();
+ }
+
+ public void deleteFilesystem() throws AzureBlobFileSystemException {
+ this.LOG.debug(
+ "deleteFilesystem for filesystem: {}",
+ client.getFileSystem());
+
+ client.deleteFilesystem();
+ }
+
+ public OutputStream createFile(final Path path, final boolean overwrite) throws AzureBlobFileSystemException {
+ this.LOG.debug(
+ "createFile filesystem: {} path: {} overwrite: {}",
+ client.getFileSystem(),
+ path.toString(),
+ overwrite);
+
+ client.createPath(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path), true, overwrite);
+
+ final OutputStream outputStream;
+ outputStream = new FSDataOutputStream(
+ new AbfsOutputStream(client, AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path), 0,
+ abfsConfiguration.getWriteBufferSize()), null);
+ return outputStream;
+ }
+
+ public Void createDirectory(final Path path) throws AzureBlobFileSystemException {
+ this.LOG.debug(
+ "createDirectory filesystem: {} path: {} overwrite: {}",
+ client.getFileSystem(),
+ path.toString());
+
+ client.createPath("/" + getRelativePath(path), false, true);
+
+ return null;
+ }
+
+ public InputStream openFileForRead(final Path path, final FileSystem.Statistics statistics) throws AzureBlobFileSystemException {
+
+ this.LOG.debug(
+ "openFileForRead filesystem: {} path: {}",
+ client.getFileSystem(),
+ path.toString());
+
+ final AbfsRestOperation op = client.getPathProperties(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path));
+
+ final String resourceType = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_RESOURCE_TYPE);
+ final long contentLength = Long.parseLong(op.getResult().getResponseHeader(HttpHeaderConfigurations.CONTENT_LENGTH));
+ final String eTag = op.getResult().getResponseHeader(HttpHeaderConfigurations.ETAG);
+
+ if (parseIsDirectory(resourceType)) {
+ throw new AbfsRestOperationException(
+ AzureServiceErrorCode.PATH_NOT_FOUND.getStatusCode(),
+ AzureServiceErrorCode.PATH_NOT_FOUND.getErrorCode(),
+ "openFileForRead must be used with files and not directories",
+ null);
+ }
+
+ // Add statistics for InputStream
+ return new FSDataInputStream(
+ new AbfsInputStream(client, statistics, AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path), contentLength,
+ abfsConfiguration.getReadBufferSize(), abfsConfiguration.getReadAheadQueueDepth(), eTag));
+ }
+
+ public OutputStream openFileForWrite(final Path path, final boolean overwrite) throws
+ AzureBlobFileSystemException {
+ this.LOG.debug(
+ "openFileForWrite filesystem: {} path: {} overwrite: {}",
+ client.getFileSystem(),
+ path.toString(),
+ overwrite);
+
+ final AbfsRestOperation op = client.getPathProperties(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path));
+
+ final String resourceType = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_RESOURCE_TYPE);
+ final Long contentLength = Long.valueOf(op.getResult().getResponseHeader(HttpHeaderConfigurations.CONTENT_LENGTH));
+
+ if (parseIsDirectory(resourceType)) {
+ throw new AbfsRestOperationException(
+ AzureServiceErrorCode.PATH_NOT_FOUND.getStatusCode(),
+ AzureServiceErrorCode.PATH_NOT_FOUND.getErrorCode(),
+ "openFileForRead must be used with files and not directories",
+ null);
+ }
+
+ final long offset = overwrite ? 0 : contentLength;
+
+ final OutputStream outputStream;
+ outputStream = new FSDataOutputStream(
+ new AbfsOutputStream(client, AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path),
+ offset, abfsConfiguration.getWriteBufferSize()), null);
+ return outputStream;
+ }
+
+ public void rename(final Path source, final Path destination) throws
+ AzureBlobFileSystemException {
+
+ if (isAtomicRenameKey(source.getName())) {
+ this.LOG.warn("The atomic rename feature is not supported by the ABFS scheme; however rename,"
+ +" create and delete operations are atomic if Namespace is enabled for your Azure Storage account.");
+ }
+
+ this.LOG.debug(
+ "renameAsync filesystem: {} source: {} destination: {}",
+ client.getFileSystem(),
+ source.toString(),
+ destination.toString());
+
+ String continuation = null;
+ long deadline = now() + RENAME_TIMEOUT_MILISECONDS;
+
+ do {
+ if (now() > deadline) {
+ LOG.debug(
+ "Rename {} to {} timed out.",
+ source,
+ destination);
+
+ throw new TimeoutException("Rename timed out.");
+ }
+
+ AbfsRestOperation op = client.renamePath(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(source),
+ AbfsHttpConstants.FORWARD_SLASH + getRelativePath(destination), continuation);
+ continuation = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_CONTINUATION);
+
+ } while (continuation != null && !continuation.isEmpty());
+ }
+
+ public void delete(final Path path, final boolean recursive) throws
+ AzureBlobFileSystemException {
+
+ this.LOG.debug(
+ "delete filesystem: {} path: {} recursive: {}",
+ client.getFileSystem(),
+ path.toString(),
+ String.valueOf(recursive));
+
+ String continuation = null;
+ long deadline = now() + DELETE_DIRECTORY_TIMEOUT_MILISECONDS;
+
+ do {
+ if (now() > deadline) {
+ this.LOG.debug(
+ "Delete directory {} timed out.", path);
+
+ throw new TimeoutException("Delete directory timed out.");
+ }
+
+ AbfsRestOperation op = client.deletePath(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path), recursive, continuation);
+ continuation = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_CONTINUATION);
+
+ } while (continuation != null && !continuation.isEmpty());
+ }
+
+ public FileStatus getFileStatus(final Path path) throws IOException {
+
+ this.LOG.debug(
+ "getFileStatus filesystem: {} path: {}",
+ client.getFileSystem(),
+ path.toString());
+
+ if (path.isRoot()) {
+ AbfsRestOperation op = client.getFilesystemProperties();
+ final long blockSize = abfsConfiguration.getAzureBlockSize();
+ final String eTag = op.getResult().getResponseHeader(HttpHeaderConfigurations.ETAG);
+ final String lastModified = op.getResult().getResponseHeader(HttpHeaderConfigurations.LAST_MODIFIED);
+ return new VersionedFileStatus(
+ userGroupInformation.getUserName(),
+ userGroupInformation.getPrimaryGroupName(),
+ 0,
+ true,
+ 1,
+ blockSize,
+ parseLastModifiedTime(lastModified).getMillis(),
+ path,
+ eTag);
+ } else {
+ AbfsRestOperation op = client.getPathProperties(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path));
+
+ final long blockSize = abfsConfiguration.getAzureBlockSize();
+ final String eTag = op.getResult().getResponseHeader(HttpHeaderConfigurations.ETAG);
+ final String lastModified = op.getResult().getResponseHeader(HttpHeaderConfigurations.LAST_MODIFIED);
+ final String contentLength = op.getResult().getResponseHeader(HttpHeaderConfigurations.CONTENT_LENGTH);
+ final String resourceType = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_RESOURCE_TYPE);
+
+ return new VersionedFileStatus(
+ userGroupInformation.getUserName(),
+ userGroupInformation.getPrimaryGroupName(),
+ parseContentLength(contentLength),
+ parseIsDirectory(resourceType),
+ 1,
+ blockSize,
+ parseLastModifiedTime(lastModified).getMillis(),
+ path,
+ eTag);
+ }
+ }
+
+ public FileStatus[] listStatus(final Path path) throws IOException {
+ this.LOG.debug(
+ "listStatus filesystem: {} path: {}",
+ client.getFileSystem(),
+ path.toString());
+
+ String relativePath = path.isRoot() ? AbfsHttpConstants.EMPTY_STRING : getRelativePath(path);
+ String continuation = null;
+ ArrayList<FileStatus> fileStatuses = new ArrayList<>();
+
+ do {
+ AbfsRestOperation op = client.listPath(relativePath, false, LIST_MAX_RESULTS, continuation);
+ continuation = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_CONTINUATION);
+ ListResultSchema retrievedSchema = op.getResult().getListResultSchema();
+ if (retrievedSchema == null) {
+ throw new AbfsRestOperationException(
+ AzureServiceErrorCode.PATH_NOT_FOUND.getStatusCode(),
+ AzureServiceErrorCode.PATH_NOT_FOUND.getErrorCode(),
+ "listStatusAsync path not found",
+ null, op.getResult());
+ }
+
+ long blockSize = abfsConfiguration.getAzureBlockSize();
+
+ for (ListResultEntrySchema entry : retrievedSchema.paths()) {
+ long lastModifiedMillis = 0;
+ long contentLength = entry.contentLength() == null ? 0 : entry.contentLength();
+ boolean isDirectory = entry.isDirectory() == null ? false : entry.isDirectory();
+ if (entry.lastModified() != null && !entry.lastModified().isEmpty()) {
+ final DateTime dateTime = DateTime.parse(
+ entry.lastModified(),
+ DateTimeFormat.forPattern(DATE_TIME_PATTERN).withZoneUTC());
+ lastModifiedMillis = dateTime.getMillis();
+ }
+
+ Path entryPath = new Path(File.separator + entry.name());
+ entryPath = entryPath.makeQualified(this.uri, entryPath);
+
+ fileStatuses.add(
+ new VersionedFileStatus(
+ userGroupInformation.getUserName(),
+ userGroupInformation.getPrimaryGroupName(),
+ contentLength,
+ isDirectory,
+ 1,
+ blockSize,
+ lastModifiedMillis,
+ entryPath,
+ entry.eTag()));
+ }
+
+ } while (continuation != null && !continuation.isEmpty());
+
+ return fileStatuses.toArray(new FileStatus[0]);
+ }
+
+ public boolean isAtomicRenameKey(String key) {
+ return isKeyForDirectorySet(key, azureAtomicRenameDirSet);
+ }
+
+ private void initializeClient(URI uri, boolean isSeure) throws AzureBlobFileSystemException {
+ if (this.client != null) {
+ return;
+ }
+
+ final String authority = uri.getRawAuthority();
+ if (null == authority) {
+ throw new InvalidUriAuthorityException(uri.toString());
+ }
+
+ if (!authority.contains(AbfsHttpConstants.AZURE_DISTRIBUTED_FILE_SYSTEM_AUTHORITY_DELIMITER)) {
+ throw new InvalidUriAuthorityException(uri.toString());
+ }
+
+ final String[] authorityParts = authority.split(AbfsHttpConstants.AZURE_DISTRIBUTED_FILE_SYSTEM_AUTHORITY_DELIMITER, 2);
+
+ if (authorityParts.length < 2 || "".equals(authorityParts[0])) {
+ final String errMsg = String
+ .format("URI '%s' has a malformed authority, expected container name. "
+ + "Authority takes the form "+ FileSystemUriSchemes.ABFS_SCHEME + "://[<container name>@]<account name>",
+ uri.toString());
+ throw new InvalidUriException(errMsg);
+ }
+
+ final String fileSystemName = authorityParts[0];
+ final String accountName = authorityParts[1];
+
+ final URIBuilder uriBuilder = getURIBuilder(accountName, isSeure);
+
+ final String url = uriBuilder.toString() + AbfsHttpConstants.FORWARD_SLASH + fileSystemName;
+
+ URL baseUrl;
+ try {
+ baseUrl = new URL(url);
+ } catch (MalformedURLException e) {
+ throw new InvalidUriException(String.format("URI '%s' is malformed", uri.toString()));
+ }
+
+ SharedKeyCredentials creds =
+ new SharedKeyCredentials(accountName.substring(0, accountName.indexOf(AbfsHttpConstants.DOT)),
+ this.abfsConfiguration.getStorageAccountKey(accountName));
+
+ this.client = new AbfsClient(baseUrl, creds, abfsConfiguration, new ExponentialRetryPolicy());
+ }
+
+ private String getRelativePath(final Path path) {
+ Preconditions.checkNotNull(path, "path");
+ final String relativePath = path.toUri().getPath();
+
+ if (relativePath.length() == 0) {
+ return relativePath;
+ }
+
+ if (relativePath.charAt(0) == Path.SEPARATOR_CHAR) {
+ if (relativePath.length() == 1) {
+ return AbfsHttpConstants.EMPTY_STRING;
+ }
+
+ return relativePath.substring(1);
+ }
+
+ return relativePath;
+ }
+
+ private long parseContentLength(final String contentLength) {
+ if (contentLength == null) {
+ return -1;
+ }
+
+ return Long.parseLong(contentLength);
+ }
+
+ private boolean parseIsDirectory(final String resourceType) {
+ return resourceType == null ? false : resourceType.equalsIgnoreCase(AbfsHttpConstants.DIRECTORY);
+ }
+
+ private DateTime parseLastModifiedTime(final String lastModifiedTime) {
+ return DateTime.parse(
+ lastModifiedTime,
+ DateTimeFormat.forPattern(DATE_TIME_PATTERN).withZoneUTC());
+ }
+
+ private String convertXmsPropertiesToCommaSeparatedString(final Hashtable<String, String> properties) throws
+ CharacterCodingException {
+ StringBuilder commaSeparatedProperties = new StringBuilder();
+
+ final CharsetEncoder encoder = Charset.forName(XMS_PROPERTIES_ENCODING).newEncoder();
+
+ for (Map.Entry<String, String> propertyEntry : properties.entrySet()) {
+ String key = propertyEntry.getKey();
+ String value = propertyEntry.getValue();
+
+ Boolean canEncodeValue = encoder.canEncode(value);
+ if (!canEncodeValue) {
+ throw new CharacterCodingException();
+ }
+
+ String encodedPropertyValue = DatatypeConverter.printBase64Binary(encoder.encode(CharBuffer.wrap(value)).array());
+ commaSeparatedProperties.append(key)
+ .append(AbfsHttpConstants.EQUAL)
+ .append(encodedPropertyValue);
+
+ commaSeparatedProperties.append(AbfsHttpConstants.COMMA);
+ }
+
+ if (commaSeparatedProperties.length() != 0) {
+ commaSeparatedProperties.deleteCharAt(commaSeparatedProperties.length() - 1);
+ }
+
+ return commaSeparatedProperties.toString();
+ }
+
+ private Hashtable<String, String> parseCommaSeparatedXmsProperties(String xMsProperties) throws
+ InvalidFileSystemPropertyException, InvalidAbfsRestOperationException {
+ Hashtable<String, String> properties = new Hashtable<>();
+
+ final CharsetDecoder decoder = Charset.forName(XMS_PROPERTIES_ENCODING).newDecoder();
+
+ if (xMsProperties != null && !xMsProperties.isEmpty()) {
+ String[] userProperties = xMsProperties.split(AbfsHttpConstants.COMMA);
+
+ if (userProperties.length == 0) {
+ return properties;
+ }
+
+ for (String property : userProperties) {
+ if (property.isEmpty()) {
+ throw new InvalidFileSystemPropertyException(xMsProperties);
+ }
+
+ String[] nameValue = property.split(AbfsHttpConstants.EQUAL, 2);
+ if (nameValue.length != 2) {
+ throw new InvalidFileSystemPropertyException(xMsProperties);
+ }
+
+ byte[] decodedValue = DatatypeConverter.parseBase64Binary(nameValue[1]);
+
+ final String value;
+ try {
+ value = decoder.decode(ByteBuffer.wrap(decodedValue)).toString();
+ } catch (CharacterCodingException ex) {
+ throw new InvalidAbfsRestOperationException(ex);
+ }
+ properties.put(nameValue[0], value);
+ }
+ }
+
+ return properties;
+ }
+
+ private boolean isKeyForDirectorySet(String key, Set<String> dirSet) {
+ for (String dir : dirSet) {
+ if (dir.isEmpty() || key.startsWith(dir + AbfsHttpConstants.FORWARD_SLASH)) {
+ return true;
+ }
+
+ try {
+ URI uri = new URI(dir);
+ if (null == uri.getAuthority()) {
+ if (key.startsWith(dir + "/")){
+ return true;
+ }
+ }
+ } catch (URISyntaxException e) {
+ this.LOG.info("URI syntax error creating URI for {}", dir);
+ }
+ }
+
+ return false;
+ }
+
+ private static class VersionedFileStatus extends FileStatus {
+ private final String version;
+
+ VersionedFileStatus(
+ final String owner, final String group,
+ final long length, final boolean isdir, final int blockReplication,
+ final long blocksize, final long modificationTime, final Path path,
+ String version) {
+ super(length, isdir, blockReplication, blocksize, modificationTime, 0,
+ new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL),
+ owner,
+ group,
+ path);
+
+ this.version = version;
+ }
+
+ /** Compare if this object is equal to another object.
+ * @param obj the object to be compared.
+ * @return true if two file status has the same path name; false if not.
+ */
+ @Override
+ public boolean equals(Object obj) {
+ if (obj == this) {
+ return true;
+ }
+
+ if (obj == null) {
+ return false;
+ }
+
+ if (this.getClass() == obj.getClass()) {
+ VersionedFileStatus other = (VersionedFileStatus) obj;
+ return this.getPath().equals(other.getPath()) && this.version.equals(other.version);
+ }
+
+ return false;
+ }
+
+ /**
+ * Returns a hash code value for the object, which is defined as
+ * the hash code of the path name.
+ *
+ * @return a hash code value for the path name and version
+ */
+ @Override
+ public int hashCode() {
+ int hash = getPath().hashCode();
+ hash = 89 * hash + (this.version != null ? this.version.hashCode() : 0);
+ return hash;
+ }
+
+ /**
+ * Returns the version of this FileStatus
+ *
+ * @return a string value for the FileStatus version
+ */
+ public String getVersion() {
+ return this.version;
+ }
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a271fd0e/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/exceptions/ServiceResolutionException.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/exceptions/ServiceResolutionException.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/exceptions/ServiceResolutionException.java
deleted file mode 100644
index 694d902..0000000
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/exceptions/ServiceResolutionException.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.fs.azurebfs.contracts.exceptions;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.fs.azurebfs.services.AbfsServiceProviderImpl;
-
-/**
- * Thrown a service is either not configured to be injected or the service is not existing.
- * For service registration
- * @see AbfsServiceProviderImpl
- */
-@InterfaceAudience.Public
-@InterfaceStability.Evolving
-public final class ServiceResolutionException extends AzureBlobFileSystemException {
- public ServiceResolutionException(String serviceName, Exception innerException) {
- super(String.format("%s cannot be resolved.", serviceName), innerException);
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a271fd0e/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/AbfsHttpClientFactory.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/AbfsHttpClientFactory.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/AbfsHttpClientFactory.java
deleted file mode 100644
index c433f9a..0000000
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/AbfsHttpClientFactory.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.hadoop.fs.azurebfs.contracts.services;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;
-import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
-import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
-
-/**
- * AbfsClient factory.
- */
-@InterfaceAudience.Public
-@InterfaceStability.Evolving
-public interface AbfsHttpClientFactory extends InjectableService {
- /**
- * Creates and configures an instance of new AbfsClient
- * @return AbfsClient instance
- */
- AbfsClient create(AzureBlobFileSystem fs) throws AzureBlobFileSystemException;
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a271fd0e/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/AbfsHttpService.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/AbfsHttpService.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/AbfsHttpService.java
deleted file mode 100644
index 3107fa3..0000000
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/AbfsHttpService.java
+++ /dev/null
@@ -1,162 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.fs.azurebfs.contracts.services;
-
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.Hashtable;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;
-import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
-
-/**
- * File System http service to provide network calls for file system operations.
- */
-@InterfaceAudience.Public
-@InterfaceStability.Evolving
-public interface AbfsHttpService extends InjectableService {
- /**
- * Gets filesystem properties on the Azure service.
- * @param azureBlobFileSystem filesystem to get the properties.
- * @return Hashtable<String, String> hash table containing all the filesystem properties.
- */
- Hashtable<String, String> getFilesystemProperties(AzureBlobFileSystem azureBlobFileSystem) throws AzureBlobFileSystemException;
-
-
- /**
- * Sets filesystem properties on the Azure service.
- * @param azureBlobFileSystem filesystem to get the properties.
- * @param properties file system properties to set.
- */
- void setFilesystemProperties(AzureBlobFileSystem azureBlobFileSystem, Hashtable<String, String> properties) throws
- AzureBlobFileSystemException;
-
- /**
- * Gets path properties on the Azure service.
- * @param azureBlobFileSystem filesystem to get the properties of the path.
- * @param path path to get properties.
- * @return Hashtable<String, String> hash table containing all the path properties.
- */
- Hashtable<String, String> getPathProperties(AzureBlobFileSystem azureBlobFileSystem, Path path) throws AzureBlobFileSystemException;
-
- /**
- * Sets path properties on the Azure service.
- * @param azureBlobFileSystem filesystem to get the properties of the path.
- * @param path path to set properties.
- * @param properties hash table containing all the path properties.
- */
- void setPathProperties(AzureBlobFileSystem azureBlobFileSystem, Path path, Hashtable<String, String> properties) throws
- AzureBlobFileSystemException;
-
- /**
- * Creates filesystem on the Azure service.
- * @param azureBlobFileSystem filesystem to be created.
- */
- void createFilesystem(AzureBlobFileSystem azureBlobFileSystem) throws AzureBlobFileSystemException;
-
- /**
- * Deletes filesystem on the Azure service.
- * @param azureBlobFileSystem filesystem to be deleted.
- */
- void deleteFilesystem(AzureBlobFileSystem azureBlobFileSystem) throws AzureBlobFileSystemException;
-
- /**
- * Creates a file on the Azure service.
- * @param azureBlobFileSystem filesystem to create file or directory.
- * @param path path of the file to be created.
- * @param overwrite should overwrite.
- * @return OutputStream stream to the file.
- */
- OutputStream createFile(AzureBlobFileSystem azureBlobFileSystem, Path path, boolean overwrite) throws AzureBlobFileSystemException;
-
- /**
- * Creates a directory on the Azure service.
- * @param azureBlobFileSystem filesystem to create file or directory.
- * @param path path of the directory to be created.
- * @return OutputStream stream to the file.
- */
- Void createDirectory(AzureBlobFileSystem azureBlobFileSystem, Path path) throws AzureBlobFileSystemException;
-
- /**
- * Opens a file to read and returns the stream.
- * @param azureBlobFileSystem filesystem to read a file from.
- * @param path file path to read.
- * @return InputStream a stream to the file to read.
- */
- InputStream openFileForRead(AzureBlobFileSystem azureBlobFileSystem, Path path, FileSystem.Statistics statistics) throws AzureBlobFileSystemException;
-
- /**
- * Opens a file to write and returns the stream.
- * @param azureBlobFileSystem filesystem to write a file to.
- * @param path file path to write.
- * @param overwrite should overwrite.
- * @return OutputStream a stream to the file to write.
- */
- OutputStream openFileForWrite(AzureBlobFileSystem azureBlobFileSystem, Path path, boolean overwrite) throws AzureBlobFileSystemException;
-
- /**
- * Renames a file or directory from source to destination.
- * @param azureBlobFileSystem filesystem to rename a path.
- * @param source source path.
- * @param destination destination path.
- */
- void rename(AzureBlobFileSystem azureBlobFileSystem, Path source, Path destination) throws AzureBlobFileSystemException;
-
- /**
- * Deletes a file or directory.
- * @param azureBlobFileSystem filesystem to delete the path.
- * @param path file path to be deleted.
- * @param recursive true if path is a directory and recursive deletion is desired.
- */
- void delete(AzureBlobFileSystem azureBlobFileSystem, Path path, boolean recursive) throws AzureBlobFileSystemException;
-
- /**
- * Gets path's status under the provided path on the Azure service.
- * @param azureBlobFileSystem filesystem to perform the get file status operation.
- * @param path path delimiter.
- * @return FileStatus FileStatus of the path in the file system.
- */
- FileStatus getFileStatus(AzureBlobFileSystem azureBlobFileSystem, Path path) throws AzureBlobFileSystemException;
-
- /**
- * Lists all the paths under the provided path on the Azure service.
- * @param azureBlobFileSystem filesystem to perform the list operation.
- * @param path path delimiter.
- * @return FileStatus[] list of all paths in the file system.
- */
- FileStatus[] listStatus(AzureBlobFileSystem azureBlobFileSystem, Path path) throws AzureBlobFileSystemException;
-
- /**
- * Closes the client to filesystem to Azure service.
- * @param azureBlobFileSystem filesystem to perform the list operation.
- */
- void closeFileSystem(AzureBlobFileSystem azureBlobFileSystem) throws AzureBlobFileSystemException;
-
- /**
- * Checks for the given path if it is marked as atomic rename directory or not.
- * @param key
- * @return True if the given path is listed under atomic rename property otherwise False.
- */
- boolean isAtomicRenameKey(String key);
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a271fd0e/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/AbfsServiceProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/AbfsServiceProvider.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/AbfsServiceProvider.java
deleted file mode 100644
index bd98bae..0000000
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/AbfsServiceProvider.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.fs.azurebfs.contracts.services;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.fs.azurebfs.contracts.exceptions.ServiceResolutionException;
-
-/**
- * Dependency injected Azure Storage services provider interface.
- */
-@InterfaceAudience.Public
-@InterfaceStability.Evolving
-public interface AbfsServiceProvider {
- /**
- * Returns an instance of resolved injectable service by class name.
- * The injectable service must be configured first to be resolvable.
- * @param clazz the injectable service which is expected to be returned.
- * @param <T> The type of injectable service.
- * @return T instance
- * @throws ServiceResolutionException if the service is not resolvable.
- */
- <T extends InjectableService> T get(Class<T> clazz) throws ServiceResolutionException;
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a271fd0e/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/ConfigurationService.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/ConfigurationService.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/ConfigurationService.java
deleted file mode 100644
index ee40c9d..0000000
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/ConfigurationService.java
+++ /dev/null
@@ -1,143 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.fs.azurebfs.contracts.services;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.azurebfs.contracts.exceptions.ConfigurationPropertyNotFoundException;
-
-/**
- * Configuration service collects required Azure Hadoop configurations and provides it to the consumers.
- */
-@InterfaceAudience.Public
-@InterfaceStability.Evolving
-public interface ConfigurationService extends InjectableService {
- /**
- * Checks if ABFS is running from Emulator;
- * @return is emulator mode.
- */
- boolean isEmulator();
-
- /**
- * Retrieves storage secure mode from Hadoop configuration;
- * @return storage secure mode;
- */
- boolean isSecureMode();
-
- /**
- * Retrieves storage account key for provided account name from Hadoop configuration.
- * @param accountName the account name to retrieve the key.
- * @return storage account key;
- */
- String getStorageAccountKey(String accountName) throws ConfigurationPropertyNotFoundException;
-
- /**
- * Returns Hadoop configuration.
- * @return Hadoop configuration.
- */
- Configuration getConfiguration();
-
- /**
- * Retrieves configured write buffer size
- * @return the size of the write buffer
- */
- int getWriteBufferSize();
-
- /**
- * Retrieves configured read buffer size
- * @return the size of the read buffer
- */
- int getReadBufferSize();
-
- /**
- * Retrieves configured min backoff interval
- * @return min backoff interval
- */
- int getMinBackoffIntervalMilliseconds();
-
- /**
- * Retrieves configured max backoff interval
- * @return max backoff interval
- */
- int getMaxBackoffIntervalMilliseconds();
-
- /**
- * Retrieves configured backoff interval
- * @return backoff interval
- */
- int getBackoffIntervalMilliseconds();
-
- /**
- * Retrieves configured num of retries
- * @return num of retries
- */
- int getMaxIoRetries();
-
- /**
- * Retrieves configured azure block size
- * @return azure block size
- */
- long getAzureBlockSize();
-
- /**
- * Retrieves configured azure block location host
- * @return azure block location host
- */
- String getAzureBlockLocationHost();
-
- /**
- * Retrieves configured number of concurrent threads
- * @return number of concurrent write threads
- */
- int getMaxConcurrentWriteThreads();
-
- /**
- * Retrieves configured number of concurrent threads
- * @return number of concurrent read threads
- */
- int getMaxConcurrentReadThreads();
-
- /**
- * Retrieves configured boolean for tolerating out of band writes to files
- * @return configured boolean for tolerating out of band writes to files
- */
- boolean getTolerateOobAppends();
-
- /**
- * Retrieves the comma-separated list of directories to receive special treatment so that folder
- * rename is made atomic. The default value for this setting is just '/hbase'.
- * Example directories list : <value>/hbase,/data</value>
- * @see <a href="https://hadoop.apache.org/docs/stable/hadoop-azure/index.html#Configuring_Credentials">AtomicRenameProperty</a>
- * @return atomic rename directories
- */
- String getAzureAtomicRenameDirs();
-
- /**
- * Retrieves configured boolean for creating remote file system during initialization
- * @return configured boolean for creating remote file system during initialization
- */
- boolean getCreateRemoteFileSystemDuringInitialization();
-
- /**
- * Retrieves configured value of read ahead queue
- * @return depth of read ahead
- */
- int getReadAheadQueueDepth();
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a271fd0e/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/InjectableService.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/InjectableService.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/InjectableService.java
deleted file mode 100644
index 8b3801f..0000000
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/InjectableService.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.fs.azurebfs.contracts.services;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-
-/**
- * Marker interface for all the injectable services.
- */
-@InterfaceAudience.Public
-@InterfaceStability.Evolving
-public interface InjectableService {
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a271fd0e/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/TracingService.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/TracingService.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/TracingService.java
deleted file mode 100644
index 267d11f..0000000
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/TracingService.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.fs.azurebfs.contracts.services;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
-import org.apache.htrace.core.SpanId;
-import org.apache.htrace.core.TraceScope;
-
-/**
- * Azure Blob File System tracing service.
- */
-@InterfaceAudience.Private
-@InterfaceStability.Evolving
-public interface TracingService extends InjectableService {
- /**
- * Creates a {@link TraceScope} object with the provided description.
- * @param description the trace description.
- * @return created traceScope.
- */
- TraceScope traceBegin(String description);
-
- /**
- * Creates a {@link TraceScope} object with the provided description.
- * @param description the trace description.
- * @param parentSpanId the span id of the parent trace scope.
- * @return create traceScope
- */
- TraceScope traceBegin(String description, SpanId parentSpanId);
-
- /**
- * Gets current thread latest generated traceScope id.
- * @return current thread latest generated traceScope id.
- */
- SpanId getCurrentTraceScopeSpanId();
-
- /**
- * Appends the provided exception to the trace scope.
- * @param traceScope the scope which exception needs to be attached to.
- * @param azureBlobFileSystemException the exception to be attached to the scope.
- */
- void traceException(TraceScope traceScope, AzureBlobFileSystemException azureBlobFileSystemException);
-
- /**
- * Ends the provided traceScope.
- * @param traceScope the scope that needs to be ended.
- */
- void traceEnd(TraceScope traceScope);
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a271fd0e/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
index c17a5c1..a78e7af 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
@@ -28,7 +28,6 @@ import java.util.Locale;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidUriException;
-import org.apache.hadoop.fs.azurebfs.contracts.services.ConfigurationService;
import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants;
import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
import org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams;
@@ -45,17 +44,17 @@ public class AbfsClient {
private final String xMsVersion = "2018-03-28";
private final ExponentialRetryPolicy retryPolicy;
private final String filesystem;
- private final ConfigurationService configurationService;
+ private final AbfsConfiguration abfsConfiguration;
private final String userAgent;
public AbfsClient(final URL baseUrl, final SharedKeyCredentials sharedKeyCredentials,
- final ConfigurationService configurationService,
+ final AbfsConfiguration abfsConfiguration,
final ExponentialRetryPolicy exponentialRetryPolicy) {
this.baseUrl = baseUrl;
this.sharedKeyCredentials = sharedKeyCredentials;
String baseUrlString = baseUrl.toString();
this.filesystem = baseUrlString.substring(baseUrlString.lastIndexOf(AbfsHttpConstants.FORWARD_SLASH) + 1);
- this.configurationService = configurationService;
+ this.abfsConfiguration = abfsConfiguration;
this.retryPolicy = exponentialRetryPolicy;
this.userAgent = initializeUserAgent();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a271fd0e/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsConfiguration.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsConfiguration.java
new file mode 100644
index 0000000..8def1bb
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsConfiguration.java
@@ -0,0 +1,297 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.lang.reflect.Field;
+import java.util.Map;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys;
+import org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations;
+import org.apache.hadoop.fs.azurebfs.contracts.annotations.ConfigurationValidationAnnotations.IntegerConfigurationValidatorAnnotation;
+import org.apache.hadoop.fs.azurebfs.contracts.annotations.ConfigurationValidationAnnotations.LongConfigurationValidatorAnnotation;
+import org.apache.hadoop.fs.azurebfs.contracts.annotations.ConfigurationValidationAnnotations.StringConfigurationValidatorAnnotation;
+import org.apache.hadoop.fs.azurebfs.contracts.annotations.ConfigurationValidationAnnotations.Base64StringConfigurationValidatorAnnotation;
+import org.apache.hadoop.fs.azurebfs.contracts.annotations.ConfigurationValidationAnnotations.BooleanConfigurationValidatorAnnotation;
+import org.apache.hadoop.fs.azurebfs.contracts.exceptions.ConfigurationPropertyNotFoundException;
+import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidConfigurationValueException;
+import org.apache.hadoop.fs.azurebfs.diagnostics.Base64StringConfigurationBasicValidator;
+import org.apache.hadoop.fs.azurebfs.diagnostics.BooleanConfigurationBasicValidator;
+import org.apache.hadoop.fs.azurebfs.diagnostics.IntegerConfigurationBasicValidator;
+import org.apache.hadoop.fs.azurebfs.diagnostics.LongConfigurationBasicValidator;
+import org.apache.hadoop.fs.azurebfs.diagnostics.StringConfigurationBasicValidator;
+
+/**
+ * Configuration for Azure Blob FileSystem.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class AbfsConfiguration{
+ private final Configuration configuration;
+ private final boolean isSecure;
+
+ @IntegerConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.AZURE_WRITE_BUFFER_SIZE,
+ MinValue = FileSystemConfigurations.MIN_BUFFER_SIZE,
+ MaxValue = FileSystemConfigurations.MAX_BUFFER_SIZE,
+ DefaultValue = FileSystemConfigurations.DEFAULT_WRITE_BUFFER_SIZE)
+ private int writeBufferSize;
+
+ @IntegerConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.AZURE_READ_BUFFER_SIZE,
+ MinValue = FileSystemConfigurations.MIN_BUFFER_SIZE,
+ MaxValue = FileSystemConfigurations.MAX_BUFFER_SIZE,
+ DefaultValue = FileSystemConfigurations.DEFAULT_READ_BUFFER_SIZE)
+ private int readBufferSize;
+
+ @IntegerConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.AZURE_MIN_BACKOFF_INTERVAL,
+ DefaultValue = FileSystemConfigurations.DEFAULT_MIN_BACKOFF_INTERVAL)
+ private int minBackoffInterval;
+
+ @IntegerConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.AZURE_MAX_BACKOFF_INTERVAL,
+ DefaultValue = FileSystemConfigurations.DEFAULT_MAX_BACKOFF_INTERVAL)
+ private int maxBackoffInterval;
+
+ @IntegerConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.AZURE_BACKOFF_INTERVAL,
+ DefaultValue = FileSystemConfigurations.DEFAULT_BACKOFF_INTERVAL)
+ private int backoffInterval;
+
+ @IntegerConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.AZURE_MAX_IO_RETRIES,
+ MinValue = 0,
+ DefaultValue = FileSystemConfigurations.DEFAULT_MAX_RETRY_ATTEMPTS)
+ private int maxIoRetries;
+
+ @LongConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.AZURE_BLOCK_SIZE_PROPERTY_NAME,
+ MinValue = 0,
+ MaxValue = FileSystemConfigurations.MAX_AZURE_BLOCK_SIZE,
+ DefaultValue = FileSystemConfigurations.MAX_AZURE_BLOCK_SIZE)
+ private long azureBlockSize;
+
+ @StringConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.AZURE_BLOCK_LOCATION_HOST_PROPERTY_NAME,
+ DefaultValue = FileSystemConfigurations.AZURE_BLOCK_LOCATION_HOST_DEFAULT)
+ private String azureBlockLocationHost;
+
+ @IntegerConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.AZURE_CONCURRENT_CONNECTION_VALUE_OUT,
+ MinValue = 1,
+ DefaultValue = FileSystemConfigurations.MAX_CONCURRENT_WRITE_THREADS)
+ private int maxConcurrentWriteThreads;
+
+ @IntegerConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.AZURE_CONCURRENT_CONNECTION_VALUE_IN,
+ MinValue = 1,
+ DefaultValue = FileSystemConfigurations.MAX_CONCURRENT_READ_THREADS)
+ private int maxConcurrentReadThreads;
+
+ @BooleanConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.AZURE_TOLERATE_CONCURRENT_APPEND,
+ DefaultValue = FileSystemConfigurations.DEFAULT_READ_TOLERATE_CONCURRENT_APPEND)
+ private boolean tolerateOobAppends;
+
+ @StringConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.FS_AZURE_ATOMIC_RENAME_KEY,
+ DefaultValue = FileSystemConfigurations.DEFAULT_FS_AZURE_ATOMIC_RENAME_DIRECTORIES)
+ private String azureAtomicDirs;
+
+ @BooleanConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.AZURE_CREATE_REMOTE_FILESYSTEM_DURING_INITIALIZATION,
+ DefaultValue = FileSystemConfigurations.DEFAULT_AZURE_CREATE_REMOTE_FILESYSTEM_DURING_INITIALIZATION)
+ private boolean createRemoteFileSystemDuringInitialization;
+
+ @IntegerConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.FS_AZURE_READ_AHEAD_QUEUE_DEPTH,
+ DefaultValue = FileSystemConfigurations.DEFAULT_READ_AHEAD_QUEUE_DEPTH)
+ private int readAheadQueueDepth;
+
+ private Map<String, String> storageAccountKeys;
+
+ public AbfsConfiguration(final Configuration configuration) throws IllegalAccessException, InvalidConfigurationValueException {
+ this.configuration = configuration;
+ this.isSecure = this.configuration.getBoolean(ConfigurationKeys.FS_AZURE_SECURE_MODE, false);
+
+ validateStorageAccountKeys();
+ Field[] fields = this.getClass().getDeclaredFields();
+ for (Field field : fields) {
+ field.setAccessible(true);
+ if (field.isAnnotationPresent(IntegerConfigurationValidatorAnnotation.class)) {
+ field.set(this, validateInt(field));
+ } else if (field.isAnnotationPresent(LongConfigurationValidatorAnnotation.class)) {
+ field.set(this, validateLong(field));
+ } else if (field.isAnnotationPresent(StringConfigurationValidatorAnnotation.class)) {
+ field.set(this, validateString(field));
+ } else if (field.isAnnotationPresent(Base64StringConfigurationValidatorAnnotation.class)) {
+ field.set(this, validateBase64String(field));
+ } else if (field.isAnnotationPresent(BooleanConfigurationValidatorAnnotation.class)) {
+ field.set(this, validateBoolean(field));
+ }
+ }
+ }
+
+ public boolean isEmulator() {
+ return this.getConfiguration().getBoolean(ConfigurationKeys.FS_AZURE_EMULATOR_ENABLED, false);
+ }
+
+ public boolean isSecureMode() {
+ return this.isSecure;
+ }
+
+ public String getStorageAccountKey(final String accountName) throws ConfigurationPropertyNotFoundException {
+ String accountKey = this.storageAccountKeys.get(ConfigurationKeys.FS_AZURE_ACCOUNT_KEY_PROPERTY_NAME + accountName);
+ if (accountKey == null) {
+ throw new ConfigurationPropertyNotFoundException(accountName);
+ }
+
+ return accountKey;
+ }
+
+ public Configuration getConfiguration() {
+ return this.configuration;
+ }
+
+ public int getWriteBufferSize() {
+ return this.writeBufferSize;
+ }
+
+ public int getReadBufferSize() {
+ return this.readBufferSize;
+ }
+
+ public int getMinBackoffIntervalMilliseconds() {
+ return this.minBackoffInterval;
+ }
+
+ public int getMaxBackoffIntervalMilliseconds() {
+ return this.maxBackoffInterval;
+ }
+
+ public int getBackoffIntervalMilliseconds() {
+ return this.backoffInterval;
+ }
+
+ public int getMaxIoRetries() {
+ return this.maxIoRetries;
+ }
+
+ public long getAzureBlockSize() {
+ return this.azureBlockSize;
+ }
+
+ public String getAzureBlockLocationHost() {
+ return this.azureBlockLocationHost;
+ }
+
+ public int getMaxConcurrentWriteThreads() {
+ return this.maxConcurrentWriteThreads;
+ }
+
+ public int getMaxConcurrentReadThreads() {
+ return this.maxConcurrentReadThreads;
+ }
+
+ public boolean getTolerateOobAppends() {
+ return this.tolerateOobAppends;
+ }
+
+ public String getAzureAtomicRenameDirs() {
+ return this.azureAtomicDirs;
+ }
+
+ public boolean getCreateRemoteFileSystemDuringInitialization() {
+ return this.createRemoteFileSystemDuringInitialization;
+ }
+
+ public int getReadAheadQueueDepth() {
+ return this.readAheadQueueDepth;
+ }
+
+ void validateStorageAccountKeys() throws InvalidConfigurationValueException {
+ Base64StringConfigurationBasicValidator validator = new Base64StringConfigurationBasicValidator(
+ ConfigurationKeys.FS_AZURE_ACCOUNT_KEY_PROPERTY_NAME, "", true);
+ this.storageAccountKeys = this.configuration.getValByRegex(ConfigurationKeys.FS_AZURE_ACCOUNT_KEY_PROPERTY_NAME_REGX);
+
+ for (Map.Entry<String, String> account : this.storageAccountKeys.entrySet()) {
+ validator.validate(account.getValue());
+ }
+ }
+
+ int validateInt(Field field) throws IllegalAccessException, InvalidConfigurationValueException {
+ IntegerConfigurationValidatorAnnotation validator = field.getAnnotation(IntegerConfigurationValidatorAnnotation.class);
+ String value = this.configuration.get(validator.ConfigurationKey());
+
+ // validate
+ return new IntegerConfigurationBasicValidator(
+ validator.MinValue(),
+ validator.MaxValue(),
+ validator.DefaultValue(),
+ validator.ConfigurationKey(),
+ validator.ThrowIfInvalid()).validate(value);
+ }
+
+ long validateLong(Field field) throws IllegalAccessException, InvalidConfigurationValueException {
+ LongConfigurationValidatorAnnotation validator = field.getAnnotation(LongConfigurationValidatorAnnotation.class);
+ String value = this.configuration.get(validator.ConfigurationKey());
+
+ // validate
+ return new LongConfigurationBasicValidator(
+ validator.MinValue(),
+ validator.MaxValue(),
+ validator.DefaultValue(),
+ validator.ConfigurationKey(),
+ validator.ThrowIfInvalid()).validate(value);
+ }
+
+ String validateString(Field field) throws IllegalAccessException, InvalidConfigurationValueException {
+ StringConfigurationValidatorAnnotation validator = field.getAnnotation(StringConfigurationValidatorAnnotation.class);
+ String value = this.configuration.get(validator.ConfigurationKey());
+
+ // validate
+ return new StringConfigurationBasicValidator(
+ validator.ConfigurationKey(),
+ validator.DefaultValue(),
+ validator.ThrowIfInvalid()).validate(value);
+ }
+
+ String validateBase64String(Field field) throws IllegalAccessException, InvalidConfigurationValueException {
+ Base64StringConfigurationValidatorAnnotation validator = field.getAnnotation((Base64StringConfigurationValidatorAnnotation.class));
+ String value = this.configuration.get(validator.ConfigurationKey());
+
+ // validate
+ return new Base64StringConfigurationBasicValidator(
+ validator.ConfigurationKey(),
+ validator.DefaultValue(),
+ validator.ThrowIfInvalid()).validate(value);
+ }
+
+ boolean validateBoolean(Field field) throws IllegalAccessException, InvalidConfigurationValueException {
+ BooleanConfigurationValidatorAnnotation validator = field.getAnnotation(BooleanConfigurationValidatorAnnotation.class);
+ String value = this.configuration.get(validator.ConfigurationKey());
+
+ // validate
+ return new BooleanConfigurationBasicValidator(
+ validator.ConfigurationKey(),
+ validator.DefaultValue(),
+ validator.ThrowIfInvalid()).validate(value);
+ }
+
+ @VisibleForTesting
+ void setReadBufferSize(int bufferSize) {
+ this.readBufferSize = bufferSize;
+ }
+
+ @VisibleForTesting
+ void setWriteBufferSize(int bufferSize) {
+ this.writeBufferSize = bufferSize;
+ }
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org