You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by tm...@apache.org on 2018/08/11 05:37:52 UTC
[48/50] [abbrv] hadoop git commit: HADOOP-15560. ABFS: removed
dependency injection and unnecessary dependencies. Contributed by Da Zhou.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/62ea8ad9/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpClientFactoryImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpClientFactoryImpl.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpClientFactoryImpl.java
deleted file mode 100644
index 9e4c27b..0000000
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpClientFactoryImpl.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.fs.azurebfs.services;
-
-import java.net.MalformedURLException;
-import java.net.URI;
-import java.net.URL;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.inject.Inject;
-import com.google.inject.Singleton;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;
-import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants;
-import org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes;
-import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
-import org.apache.hadoop.fs.azurebfs.contracts.services.AbfsHttpClientFactory;
-import org.apache.hadoop.fs.azurebfs.contracts.services.ConfigurationService;
-import org.apache.http.client.utils.URIBuilder;
-import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidUriAuthorityException;
-import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidUriException;
-
-@Singleton
-@InterfaceAudience.Private
-@InterfaceStability.Evolving
-class AbfsHttpClientFactoryImpl implements AbfsHttpClientFactory {
- private final ConfigurationService configurationService;
-
- @Inject
- AbfsHttpClientFactoryImpl(
- final ConfigurationService configurationService) {
-
- Preconditions.checkNotNull(configurationService, "configurationService");
-
- this.configurationService = configurationService;
- }
-
- @VisibleForTesting
- URIBuilder getURIBuilder(final String hostName, final FileSystem fs) {
- final AzureBlobFileSystem abfs = (AzureBlobFileSystem) fs;
-
- String scheme = FileSystemUriSchemes.HTTP_SCHEME;
-
- if (abfs.isSecure()) {
- scheme = FileSystemUriSchemes.HTTPS_SCHEME;
- }
-
- final URIBuilder uriBuilder = new URIBuilder();
- uriBuilder.setScheme(scheme);
- uriBuilder.setHost(hostName);
-
- return uriBuilder;
- }
-
- public AbfsClient create(final AzureBlobFileSystem fs) throws AzureBlobFileSystemException {
- final URI uri = fs.getUri();
- final String authority = uri.getRawAuthority();
- if (null == authority) {
- throw new InvalidUriAuthorityException(uri.toString());
- }
-
- if (!authority.contains(AbfsHttpConstants.AZURE_DISTRIBUTED_FILE_SYSTEM_AUTHORITY_DELIMITER)) {
- throw new InvalidUriAuthorityException(uri.toString());
- }
-
- final String[] authorityParts = authority.split(AbfsHttpConstants.AZURE_DISTRIBUTED_FILE_SYSTEM_AUTHORITY_DELIMITER, 2);
-
- if (authorityParts.length < 2 || "".equals(authorityParts[0])) {
- final String errMsg = String
- .format("URI '%s' has a malformed authority, expected container name. "
- + "Authority takes the form "+ FileSystemUriSchemes.ABFS_SCHEME + "://[<container name>@]<account name>",
- uri.toString());
- throw new InvalidUriException(errMsg);
- }
-
- final String fileSystemName = authorityParts[0];
- final String accountName = authorityParts[1];
-
- final URIBuilder uriBuilder = getURIBuilder(accountName, fs);
-
- final String url = uriBuilder.toString() + AbfsHttpConstants.FORWARD_SLASH + fileSystemName;
-
- URL baseUrl;
- try {
- baseUrl = new URL(url);
- } catch (MalformedURLException e) {
- throw new InvalidUriException(String.format("URI '%s' is malformed", uri.toString()));
- }
-
- SharedKeyCredentials creds =
- new SharedKeyCredentials(accountName.substring(0, accountName.indexOf(AbfsHttpConstants.DOT)),
- this.configurationService.getStorageAccountKey(accountName));
-
- return new AbfsClient(baseUrl, creds, configurationService, new ExponentialRetryPolicy());
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/62ea8ad9/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpServiceImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpServiceImpl.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpServiceImpl.java
deleted file mode 100644
index 06e1a8a..0000000
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpServiceImpl.java
+++ /dev/null
@@ -1,693 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.fs.azurebfs.services;
-
-import javax.xml.bind.DatatypeConverter;
-import java.io.File;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.nio.ByteBuffer;
-import java.nio.CharBuffer;
-import java.nio.charset.CharacterCodingException;
-import java.nio.charset.Charset;
-import java.nio.charset.CharsetDecoder;
-import java.nio.charset.CharsetEncoder;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Set;
-import java.util.HashSet;
-import java.util.Hashtable;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-import com.google.common.base.Preconditions;
-import com.google.inject.Inject;
-import com.google.inject.Singleton;
-import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
-import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidAbfsRestOperationException;
-import org.joda.time.DateTime;
-import org.joda.time.format.DateTimeFormat;
-
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
-
-import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants;
-import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
-import org.apache.hadoop.fs.azurebfs.contracts.exceptions.TimeoutException;
-import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidFileSystemPropertyException;
-import org.apache.hadoop.fs.azurebfs.contracts.services.AbfsHttpService;
-import org.apache.hadoop.fs.azurebfs.contracts.services.AbfsHttpClientFactory;
-import org.apache.hadoop.fs.azurebfs.contracts.services.ConfigurationService;
-import org.apache.hadoop.fs.azurebfs.contracts.services.TracingService;
-import org.apache.hadoop.fs.azurebfs.contracts.services.ListResultEntrySchema;
-import org.apache.hadoop.fs.azurebfs.contracts.services.ListResultSchema;
-import org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;
-import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
-import org.apache.hadoop.fs.permission.FsAction;
-import org.apache.hadoop.fs.permission.FsPermission;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static org.apache.hadoop.util.Time.now;
-
-@Singleton
-@InterfaceAudience.Private
-@InterfaceStability.Evolving
-final class AbfsHttpServiceImpl implements AbfsHttpService {
- public static final Logger LOG = LoggerFactory.getLogger(AbfsHttpService.class);
- private static final String DATE_TIME_PATTERN = "E, dd MMM yyyy HH:mm:ss 'GMT'";
- private static final String XMS_PROPERTIES_ENCODING = "ISO-8859-1";
- private static final int LIST_MAX_RESULTS = 5000;
- private static final int DELETE_DIRECTORY_TIMEOUT_MILISECONDS = 180000;
- private static final int RENAME_TIMEOUT_MILISECONDS = 180000;
-
- private final AbfsHttpClientFactory abfsHttpClientFactory;
- private final ConcurrentHashMap<AzureBlobFileSystem, AbfsClient> clientCache;
- private final ConfigurationService configurationService;
- private final Set<String> azureAtomicRenameDirSet;
-
- @Inject
- AbfsHttpServiceImpl(
- final ConfigurationService configurationService,
- final AbfsHttpClientFactory abfsHttpClientFactory,
- final TracingService tracingService) {
- Preconditions.checkNotNull(abfsHttpClientFactory, "abfsHttpClientFactory");
- Preconditions.checkNotNull(configurationService, "configurationService");
- Preconditions.checkNotNull(tracingService, "tracingService");
-
- this.configurationService = configurationService;
- this.clientCache = new ConcurrentHashMap<>();
- this.abfsHttpClientFactory = abfsHttpClientFactory;
- this.azureAtomicRenameDirSet = new HashSet<>(Arrays.asList(configurationService.getAzureAtomicRenameDirs().split(AbfsHttpConstants.COMMA)));
- }
-
- @Override
- public Hashtable<String, String> getFilesystemProperties(final AzureBlobFileSystem azureBlobFileSystem)
- throws AzureBlobFileSystemException{
- final AbfsClient client = this.getOrCreateClient(azureBlobFileSystem);
-
- this.LOG.debug(
- "getFilesystemProperties for filesystem: {}",
- client.getFileSystem());
-
- final Hashtable<String, String> parsedXmsProperties;
-
- final AbfsRestOperation op = client.getFilesystemProperties();
- final String xMsProperties = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_PROPERTIES);
-
- parsedXmsProperties = parseCommaSeparatedXmsProperties(xMsProperties);
-
- return parsedXmsProperties;
- }
-
- @Override
- public void setFilesystemProperties(final AzureBlobFileSystem azureBlobFileSystem, final Hashtable<String, String> properties) throws
- AzureBlobFileSystemException {
- if (properties == null || properties.size() == 0) {
- return;
- }
-
- final AbfsClient client = this.getOrCreateClient(azureBlobFileSystem);
-
- this.LOG.debug(
- "setFilesystemProperties for filesystem: {} with properties: {}",
- client.getFileSystem(),
- properties);
-
- final String commaSeparatedProperties;
- try {
- commaSeparatedProperties = convertXmsPropertiesToCommaSeparatedString(properties);
- } catch (CharacterCodingException ex) {
- throw new InvalidAbfsRestOperationException(ex);
- }
- client.setFilesystemProperties(commaSeparatedProperties);
- }
-
- @Override
- public Hashtable<String, String> getPathProperties(final AzureBlobFileSystem azureBlobFileSystem, final Path path) throws
- AzureBlobFileSystemException {
- final AbfsClient client = this.getOrCreateClient(azureBlobFileSystem);
-
- this.LOG.debug(
- "getPathProperties for filesystem: {} path: {}",
- client.getFileSystem(),
- path.toString());
-
- final Hashtable<String, String> parsedXmsProperties;
- final AbfsRestOperation op = client.getPathProperties(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path));
-
- final String xMsProperties = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_PROPERTIES);
-
- parsedXmsProperties = parseCommaSeparatedXmsProperties(xMsProperties);
-
- return parsedXmsProperties;
- }
-
- @Override
- public void setPathProperties(final AzureBlobFileSystem azureBlobFileSystem, final Path path, final Hashtable<String,
- String> properties) throws
- AzureBlobFileSystemException {
- final AbfsClient client = this.getOrCreateClient(azureBlobFileSystem);
-
- this.LOG.debug(
- "setFilesystemProperties for filesystem: {} path: {} with properties: {}",
- client.getFileSystem(),
- path.toString(),
- properties);
-
- final String commaSeparatedProperties;
- try {
- commaSeparatedProperties = convertXmsPropertiesToCommaSeparatedString(properties);
- } catch (CharacterCodingException ex) {
- throw new InvalidAbfsRestOperationException(ex);
- }
- client.setPathProperties("/" + getRelativePath(path), commaSeparatedProperties);
- }
-
- @Override
- public void createFilesystem(final AzureBlobFileSystem azureBlobFileSystem) throws AzureBlobFileSystemException {
- final AbfsClient client = this.getOrCreateClient(azureBlobFileSystem);
-
- this.LOG.debug(
- "createFilesystem for filesystem: {}",
- client.getFileSystem());
-
- client.createFilesystem();
- }
-
- @Override
- public void deleteFilesystem(final AzureBlobFileSystem azureBlobFileSystem) throws AzureBlobFileSystemException {
- final AbfsClient client = this.getOrCreateClient(azureBlobFileSystem);
-
- this.LOG.debug(
- "deleteFilesystem for filesystem: {}",
- client.getFileSystem());
-
- client.deleteFilesystem();
- }
-
- @Override
- public OutputStream createFile(final AzureBlobFileSystem azureBlobFileSystem, final Path path, final boolean overwrite) throws
- AzureBlobFileSystemException {
- final AbfsClient client = this.getOrCreateClient(azureBlobFileSystem);
-
- this.LOG.debug(
- "createFile filesystem: {} path: {} overwrite: {}",
- client.getFileSystem(),
- path.toString(),
- overwrite);
-
- client.createPath(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path), true, overwrite);
-
- final OutputStream outputStream;
- outputStream = new FSDataOutputStream(
- new AbfsOutputStream(client, AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path), 0,
- configurationService.getWriteBufferSize()), null);
- return outputStream;
- }
-
- @Override
- public Void createDirectory(final AzureBlobFileSystem azureBlobFileSystem, final Path path) throws AzureBlobFileSystemException {
- final AbfsClient client = this.getOrCreateClient(azureBlobFileSystem);
-
- this.LOG.debug(
- "createDirectory filesystem: {} path: {} overwrite: {}",
- client.getFileSystem(),
- path.toString());
-
- client.createPath("/" + getRelativePath(path), false, true);
-
- return null;
- }
-
- @Override
- public InputStream openFileForRead(final AzureBlobFileSystem azureBlobFileSystem, final Path path,
- final FileSystem.Statistics statistics) throws AzureBlobFileSystemException {
- final AbfsClient client = getOrCreateClient(azureBlobFileSystem);
-
- this.LOG.debug(
- "openFileForRead filesystem: {} path: {}",
- client.getFileSystem(),
- path.toString());
-
- final AbfsRestOperation op = client.getPathProperties(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path));
-
- final String resourceType = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_RESOURCE_TYPE);
- final long contentLength = Long.parseLong(op.getResult().getResponseHeader(HttpHeaderConfigurations.CONTENT_LENGTH));
- final String eTag = op.getResult().getResponseHeader(HttpHeaderConfigurations.ETAG);
-
- if (parseIsDirectory(resourceType)) {
- throw new AbfsRestOperationException(
- AzureServiceErrorCode.PATH_NOT_FOUND.getStatusCode(),
- AzureServiceErrorCode.PATH_NOT_FOUND.getErrorCode(),
- "openFileForRead must be used with files and not directories",
- null);
- }
-
- // Add statistics for InputStream
- return new FSDataInputStream(
- new AbfsInputStream(client, statistics, AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path), contentLength,
- configurationService.getReadBufferSize(), configurationService.getReadAheadQueueDepth(), eTag));
- }
-
- @Override
- public OutputStream openFileForWrite(final AzureBlobFileSystem azureBlobFileSystem, final Path path, final boolean overwrite) throws
- AzureBlobFileSystemException {
- final AbfsClient client = getOrCreateClient(azureBlobFileSystem);
-
- this.LOG.debug(
- "openFileForWrite filesystem: {} path: {} overwrite: {}",
- client.getFileSystem(),
- path.toString(),
- overwrite);
-
- final AbfsRestOperation op = client.getPathProperties(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path));
-
- final String resourceType = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_RESOURCE_TYPE);
- final Long contentLength = Long.valueOf(op.getResult().getResponseHeader(HttpHeaderConfigurations.CONTENT_LENGTH));
-
- if (parseIsDirectory(resourceType)) {
- throw new AbfsRestOperationException(
- AzureServiceErrorCode.PATH_NOT_FOUND.getStatusCode(),
- AzureServiceErrorCode.PATH_NOT_FOUND.getErrorCode(),
- "openFileForRead must be used with files and not directories",
- null);
- }
-
- final long offset = overwrite ? 0 : contentLength;
-
- final OutputStream outputStream;
- outputStream = new FSDataOutputStream(
- new AbfsOutputStream(client, AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path),
- offset, configurationService.getWriteBufferSize()), null);
- return outputStream;
- }
-
- @Override
- public void rename(final AzureBlobFileSystem azureBlobFileSystem, final Path source, final Path destination) throws
- AzureBlobFileSystemException {
-
- if (isAtomicRenameKey(source.getName())) {
- this.LOG.warn("The atomic rename feature is not supported by the ABFS scheme; however rename,"
- +" create and delete operations are atomic if Namespace is enabled for your Azure Storage account.");
- }
-
- final AbfsClient client = this.getOrCreateClient(azureBlobFileSystem);
-
- this.LOG.debug(
- "renameAsync filesystem: {} source: {} destination: {}",
- client.getFileSystem(),
- source.toString(),
- destination.toString());
-
- String continuation = null;
- long deadline = now() + RENAME_TIMEOUT_MILISECONDS;
-
- do {
- if (now() > deadline) {
- LOG.debug(
- "Rename {} to {} timed out.",
- source,
- destination);
-
- throw new TimeoutException("Rename timed out.");
- }
-
- AbfsRestOperation op = client.renamePath(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(source),
- AbfsHttpConstants.FORWARD_SLASH + getRelativePath(destination), continuation);
- continuation = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_CONTINUATION);
-
- } while (continuation != null && !continuation.isEmpty());
- }
-
- @Override
- public void delete(final AzureBlobFileSystem azureBlobFileSystem, final Path path, final boolean recursive) throws
- AzureBlobFileSystemException {
- final AbfsClient client = this.getOrCreateClient(azureBlobFileSystem);
-
- this.LOG.debug(
- "delete filesystem: {} path: {} recursive: {}",
- client.getFileSystem(),
- path.toString(),
- String.valueOf(recursive));
-
- String continuation = null;
- long deadline = now() + DELETE_DIRECTORY_TIMEOUT_MILISECONDS;
-
- do {
- if (now() > deadline) {
- this.LOG.debug(
- "Delete directory {} timed out.", path);
-
- throw new TimeoutException("Delete directory timed out.");
- }
-
- AbfsRestOperation op = client.deletePath(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path), recursive, continuation);
- continuation = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_CONTINUATION);
-
- } while (continuation != null && !continuation.isEmpty());
- }
-
- @Override
- public FileStatus getFileStatus(final AzureBlobFileSystem azureBlobFileSystem, final Path path) throws AzureBlobFileSystemException {
- final AbfsClient client = this.getOrCreateClient(azureBlobFileSystem);
-
- this.LOG.debug(
- "getFileStatus filesystem: {} path: {}",
- client.getFileSystem(),
- path.toString());
-
- if (path.isRoot()) {
- AbfsRestOperation op = client.getFilesystemProperties();
- final long blockSize = configurationService.getAzureBlockSize();
- final String eTag = op.getResult().getResponseHeader(HttpHeaderConfigurations.ETAG);
- final String lastModified = op.getResult().getResponseHeader(HttpHeaderConfigurations.LAST_MODIFIED);
- return new VersionedFileStatus(
- azureBlobFileSystem.getOwnerUser(),
- azureBlobFileSystem.getOwnerUserPrimaryGroup(),
- 0,
- true,
- 1,
- blockSize,
- parseLastModifiedTime(lastModified).getMillis(),
- path,
- eTag);
- } else {
- AbfsRestOperation op = client.getPathProperties(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path));
-
- final long blockSize = configurationService.getAzureBlockSize();
- final String eTag = op.getResult().getResponseHeader(HttpHeaderConfigurations.ETAG);
- final String lastModified = op.getResult().getResponseHeader(HttpHeaderConfigurations.LAST_MODIFIED);
- final String contentLength = op.getResult().getResponseHeader(HttpHeaderConfigurations.CONTENT_LENGTH);
- final String resourceType = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_RESOURCE_TYPE);
-
- return new VersionedFileStatus(
- azureBlobFileSystem.getOwnerUser(),
- azureBlobFileSystem.getOwnerUserPrimaryGroup(),
- parseContentLength(contentLength),
- parseIsDirectory(resourceType),
- 1,
- blockSize,
- parseLastModifiedTime(lastModified).getMillis(),
- path,
- eTag);
- }
- }
-
- @Override
- public FileStatus[] listStatus(final AzureBlobFileSystem azureBlobFileSystem, final Path path) throws AzureBlobFileSystemException {
- final AbfsClient client = this.getOrCreateClient(azureBlobFileSystem);
-
- this.LOG.debug(
- "listStatus filesystem: {} path: {}",
- client.getFileSystem(),
- path.toString());
-
- String relativePath = path.isRoot() ? AbfsHttpConstants.EMPTY_STRING : getRelativePath(path);
- String continuation = null;
- ArrayList<FileStatus> fileStatuses = new ArrayList<>();
-
- do {
- AbfsRestOperation op = client.listPath(relativePath, false, LIST_MAX_RESULTS, continuation);
- continuation = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_CONTINUATION);
- ListResultSchema retrievedSchema = op.getResult().getListResultSchema();
- if (retrievedSchema == null) {
- throw new AbfsRestOperationException(
- AzureServiceErrorCode.PATH_NOT_FOUND.getStatusCode(),
- AzureServiceErrorCode.PATH_NOT_FOUND.getErrorCode(),
- "listStatusAsync path not found",
- null, op.getResult());
- }
-
- long blockSize = configurationService.getAzureBlockSize();
-
- for (ListResultEntrySchema entry : retrievedSchema.paths()) {
- long lastModifiedMillis = 0;
- long contentLength = entry.contentLength() == null ? 0 : entry.contentLength();
- boolean isDirectory = entry.isDirectory() == null ? false : entry.isDirectory();
- if (entry.lastModified() != null && !entry.lastModified().isEmpty()) {
- final DateTime dateTime = DateTime.parse(
- entry.lastModified(),
- DateTimeFormat.forPattern(DATE_TIME_PATTERN).withZoneUTC());
- lastModifiedMillis = dateTime.getMillis();
- }
-
- fileStatuses.add(
- new VersionedFileStatus(
- azureBlobFileSystem.getOwnerUser(),
- azureBlobFileSystem.getOwnerUserPrimaryGroup(),
- contentLength,
- isDirectory,
- 1,
- blockSize,
- lastModifiedMillis,
- azureBlobFileSystem.makeQualified(new Path(File.separator + entry.name())),
- entry.eTag()));
- }
-
- } while (continuation != null && !continuation.isEmpty());
-
- return fileStatuses.toArray(new FileStatus[0]);
- }
-
- @Override
- public synchronized void closeFileSystem(final AzureBlobFileSystem azureBlobFileSystem) throws AzureBlobFileSystemException {
- this.clientCache.remove(azureBlobFileSystem);
- }
-
- @Override
- public boolean isAtomicRenameKey(String key) {
- return isKeyForDirectorySet(key, azureAtomicRenameDirSet);
- }
-
- private String getRelativePath(final Path path) {
- Preconditions.checkNotNull(path, "path");
- final String relativePath = path.toUri().getPath();
-
- if (relativePath.length() == 0) {
- return relativePath;
- }
-
- if (relativePath.charAt(0) == Path.SEPARATOR_CHAR) {
- if (relativePath.length() == 1) {
- return AbfsHttpConstants.EMPTY_STRING;
- }
-
- return relativePath.substring(1);
- }
-
- return relativePath;
- }
-
- private synchronized AbfsClient getOrCreateClient(final AzureBlobFileSystem azureBlobFileSystem) throws
- AzureBlobFileSystemException {
- Preconditions.checkNotNull(azureBlobFileSystem, "azureBlobFileSystem");
-
- AbfsClient client = this.clientCache.get(azureBlobFileSystem);
-
- if (client != null) {
- return client;
- }
-
- client = abfsHttpClientFactory.create(azureBlobFileSystem);
- this.clientCache.put(
- azureBlobFileSystem,
- client);
- return client;
- }
-
- private long parseContentLength(final String contentLength) {
- if (contentLength == null) {
- return -1;
- }
-
- return Long.parseLong(contentLength);
- }
-
- private boolean parseIsDirectory(final String resourceType) {
- return resourceType == null ? false : resourceType.equalsIgnoreCase(AbfsHttpConstants.DIRECTORY);
- }
-
- private DateTime parseLastModifiedTime(final String lastModifiedTime) {
- return DateTime.parse(
- lastModifiedTime,
- DateTimeFormat.forPattern(DATE_TIME_PATTERN).withZoneUTC());
- }
-
- private String convertXmsPropertiesToCommaSeparatedString(final Hashtable<String, String> properties) throws
- CharacterCodingException {
- StringBuilder commaSeparatedProperties = new StringBuilder();
-
- final CharsetEncoder encoder = Charset.forName(XMS_PROPERTIES_ENCODING).newEncoder();
-
- for (Map.Entry<String, String> propertyEntry : properties.entrySet()) {
- String key = propertyEntry.getKey();
- String value = propertyEntry.getValue();
-
- Boolean canEncodeValue = encoder.canEncode(value);
- if (!canEncodeValue) {
- throw new CharacterCodingException();
- }
-
- String encodedPropertyValue = DatatypeConverter.printBase64Binary(encoder.encode(CharBuffer.wrap(value)).array());
- commaSeparatedProperties.append(key)
- .append(AbfsHttpConstants.EQUAL)
- .append(encodedPropertyValue);
-
- commaSeparatedProperties.append(AbfsHttpConstants.COMMA);
- }
-
- if (commaSeparatedProperties.length() != 0) {
- commaSeparatedProperties.deleteCharAt(commaSeparatedProperties.length() - 1);
- }
-
- return commaSeparatedProperties.toString();
- }
-
- private Hashtable<String, String> parseCommaSeparatedXmsProperties(String xMsProperties) throws
- InvalidFileSystemPropertyException, InvalidAbfsRestOperationException {
- Hashtable<String, String> properties = new Hashtable<>();
-
- final CharsetDecoder decoder = Charset.forName(XMS_PROPERTIES_ENCODING).newDecoder();
-
- if (xMsProperties != null && !xMsProperties.isEmpty()) {
- String[] userProperties = xMsProperties.split(AbfsHttpConstants.COMMA);
-
- if (userProperties.length == 0) {
- return properties;
- }
-
- for (String property : userProperties) {
- if (property.isEmpty()) {
- throw new InvalidFileSystemPropertyException(xMsProperties);
- }
-
- String[] nameValue = property.split(AbfsHttpConstants.EQUAL, 2);
- if (nameValue.length != 2) {
- throw new InvalidFileSystemPropertyException(xMsProperties);
- }
-
- byte[] decodedValue = DatatypeConverter.parseBase64Binary(nameValue[1]);
-
- final String value;
- try {
- value = decoder.decode(ByteBuffer.wrap(decodedValue)).toString();
- } catch (CharacterCodingException ex) {
- throw new InvalidAbfsRestOperationException(ex);
- }
- properties.put(nameValue[0], value);
- }
- }
-
- return properties;
- }
-
- private boolean isKeyForDirectorySet(String key, Set<String> dirSet) {
- for (String dir : dirSet) {
- if (dir.isEmpty() || key.startsWith(dir + AbfsHttpConstants.FORWARD_SLASH)) {
- return true;
- }
-
- try {
- URI uri = new URI(dir);
- if (null == uri.getAuthority()) {
- if (key.startsWith(dir + "/")){
- return true;
- }
- }
- } catch (URISyntaxException e) {
- this.LOG.info("URI syntax error creating URI for {}", dir);
- }
- }
-
- return false;
- }
-
- private static class VersionedFileStatus extends FileStatus {
- private final String version;
-
- VersionedFileStatus(
- final String owner, final String group,
- final long length, final boolean isdir, final int blockReplication,
- final long blocksize, final long modificationTime, final Path path,
- String version) {
- super(length, isdir, blockReplication, blocksize, modificationTime, 0,
- new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL),
- owner,
- group,
- path);
-
- this.version = version;
- }
-
- /** Compare if this object is equal to another object.
- * @param obj the object to be compared.
- * @return true if two file status has the same path name; false if not.
- */
- @Override
- public boolean equals(Object obj) {
- if (obj == this) {
- return true;
- }
-
- if (obj == null) {
- return false;
- }
-
- if (this.getClass() == obj.getClass()) {
- VersionedFileStatus other = (VersionedFileStatus) obj;
- return this.getPath().equals(other.getPath()) && this.version.equals(other.version);
- }
-
- return false;
- }
-
- /**
- * Returns a hash code value for the object, which is defined as
- * the hash code of the path name.
- *
- * @return a hash code value for the path name and version
- */
- @Override
- public int hashCode() {
- int hash = getPath().hashCode();
- hash = 89 * hash + (this.version != null ? this.version.hashCode() : 0);
- return hash;
- }
-
- /**
- * Returns the version of this FileStatus
- *
- * @return a string value for the FileStatus version
- */
- public String getVersion() {
- return this.version;
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/62ea8ad9/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsServiceInjectorImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsServiceInjectorImpl.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsServiceInjectorImpl.java
deleted file mode 100644
index 1cbf6b5..0000000
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsServiceInjectorImpl.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.fs.azurebfs.services;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import com.google.inject.AbstractModule;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.azurebfs.contracts.services.AbfsHttpClientFactory;
-import org.apache.hadoop.fs.azurebfs.contracts.services.AbfsHttpService;
-import org.apache.hadoop.fs.azurebfs.contracts.services.ConfigurationService;
-import org.apache.hadoop.fs.azurebfs.contracts.services.TracingService;
-
-/**
- * This class is responsible to configure all the services used by Azure Blob File System.
- */
-@InterfaceAudience.Private
-@InterfaceStability.Evolving
-class AbfsServiceInjectorImpl extends AbstractModule {
- private final Configuration configuration;
- private final Map<Class, Class> providers;
- private final Map<Class, Object> instances;
-
- AbfsServiceInjectorImpl(Configuration configuration) {
- this.providers = new HashMap<>();
- this.instances = new HashMap<>();
- this.configuration = configuration;
-
- this.instances.put(Configuration.class, this.configuration);
-
- this.providers.put(ConfigurationService.class, ConfigurationServiceImpl.class);
-
- this.providers.put(AbfsHttpService.class, AbfsHttpServiceImpl.class);
- this.providers.put(AbfsHttpClientFactory.class, AbfsHttpClientFactoryImpl.class);
-
- this.providers.put(TracingService.class, TracingServiceImpl.class);
- }
-
- @Override
- protected void configure() {
- for (Map.Entry<Class, Object> entrySet : this.instances.entrySet()) {
- bind(entrySet.getKey()).toInstance(entrySet.getValue());
- }
-
- for (Map.Entry<Class, Class> entrySet : this.providers.entrySet()) {
- bind(entrySet.getKey()).to(entrySet.getValue());
- }
- }
-
- protected Configuration getConfiguration() {
- return this.configuration;
- }
-
- protected Map<Class, Class> getProviders() {
- return this.providers;
- }
-
- protected Map<Class, Object> getInstances() {
- return this.instances;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/62ea8ad9/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsServiceProviderImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsServiceProviderImpl.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsServiceProviderImpl.java
deleted file mode 100644
index 8560620..0000000
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsServiceProviderImpl.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.fs.azurebfs.services;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.inject.Guice;
-import com.google.inject.Injector;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.azurebfs.contracts.exceptions.ServiceResolutionException;
-import org.apache.hadoop.fs.azurebfs.contracts.services.AbfsServiceProvider;
-import org.apache.hadoop.fs.azurebfs.contracts.services.InjectableService;
-
-/**
- * Dependency injected Azure Storage services provider.
- */
-@InterfaceAudience.Public
-@InterfaceStability.Evolving
-public final class AbfsServiceProviderImpl implements AbfsServiceProvider {
- private static AbfsServiceProviderImpl abfsServiceProvider;
- private final Injector abfsServiceInjector;
-
- private AbfsServiceProviderImpl(final Configuration configuration) {
- this.abfsServiceInjector = Guice.createInjector(new AbfsServiceInjectorImpl(Preconditions.checkNotNull(configuration, "configuration")));
- }
-
- @VisibleForTesting
- private AbfsServiceProviderImpl(final Injector abfsServiceInjector) {
- Preconditions.checkNotNull(abfsServiceInjector, "abfsServiceInjector");
- this.abfsServiceInjector = abfsServiceInjector;
- }
-
- /**
- * Create an instance or returns existing instance of service provider.
- * This method must be marked as synchronized to ensure thread-safety.
- * @param configuration hadoop configuration.
- * @return AbfsServiceProvider the service provider instance.
- */
- public static synchronized AbfsServiceProvider create(final Configuration configuration) {
- if (abfsServiceProvider == null) {
- abfsServiceProvider = new AbfsServiceProviderImpl(configuration);
- }
-
- return abfsServiceProvider;
- }
-
- /**
- * Returns current instance of service provider.
- * @return AbfsServiceProvider the service provider instance.
- */
- public static AbfsServiceProvider instance() {
- return abfsServiceProvider;
- }
-
- @VisibleForTesting
- static synchronized AbfsServiceProvider create(Injector serviceInjector) {
- abfsServiceProvider = new AbfsServiceProviderImpl(serviceInjector);
- return abfsServiceProvider;
- }
-
- /**
- * Returns an instance of resolved injectable service by class name.
- * The injectable service must be configured first to be resolvable.
- * @param clazz the injectable service which is expected to be returned.
- * @param <T> The type of injectable service.
- * @return T instance
- * @throws ServiceResolutionException if the service is not resolvable.
- */
- @Override
- public <T extends InjectableService> T get(final Class<T> clazz) throws ServiceResolutionException {
- try {
- return this.abfsServiceInjector.getInstance(clazz);
- } catch (Exception ex) {
- throw new ServiceResolutionException(clazz.getSimpleName(), ex);
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/62ea8ad9/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ConfigurationServiceImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ConfigurationServiceImpl.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ConfigurationServiceImpl.java
deleted file mode 100644
index 568ee5d..0000000
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ConfigurationServiceImpl.java
+++ /dev/null
@@ -1,317 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.fs.azurebfs.services;
-
-import java.lang.reflect.Field;
-import java.util.Map;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.inject.Inject;
-import com.google.inject.Singleton;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys;
-import org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations;
-import org.apache.hadoop.fs.azurebfs.contracts.annotations.ConfigurationValidationAnnotations.IntegerConfigurationValidatorAnnotation;
-import org.apache.hadoop.fs.azurebfs.contracts.annotations.ConfigurationValidationAnnotations.LongConfigurationValidatorAnnotation;
-import org.apache.hadoop.fs.azurebfs.contracts.annotations.ConfigurationValidationAnnotations.StringConfigurationValidatorAnnotation;
-import org.apache.hadoop.fs.azurebfs.contracts.annotations.ConfigurationValidationAnnotations.Base64StringConfigurationValidatorAnnotation;
-import org.apache.hadoop.fs.azurebfs.contracts.annotations.ConfigurationValidationAnnotations.BooleanConfigurationValidatorAnnotation;
-import org.apache.hadoop.fs.azurebfs.contracts.exceptions.ConfigurationPropertyNotFoundException;
-import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidConfigurationValueException;
-import org.apache.hadoop.fs.azurebfs.contracts.services.ConfigurationService;
-import org.apache.hadoop.fs.azurebfs.diagnostics.Base64StringConfigurationBasicValidator;
-import org.apache.hadoop.fs.azurebfs.diagnostics.BooleanConfigurationBasicValidator;
-import org.apache.hadoop.fs.azurebfs.diagnostics.IntegerConfigurationBasicValidator;
-import org.apache.hadoop.fs.azurebfs.diagnostics.LongConfigurationBasicValidator;
-import org.apache.hadoop.fs.azurebfs.diagnostics.StringConfigurationBasicValidator;
-
-@Singleton
-@InterfaceAudience.Private
-@InterfaceStability.Evolving
-class ConfigurationServiceImpl implements ConfigurationService {
- private final Configuration configuration;
- private final boolean isSecure;
-
- @IntegerConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.AZURE_WRITE_BUFFER_SIZE,
- MinValue = FileSystemConfigurations.MIN_BUFFER_SIZE,
- MaxValue = FileSystemConfigurations.MAX_BUFFER_SIZE,
- DefaultValue = FileSystemConfigurations.DEFAULT_WRITE_BUFFER_SIZE)
- private int writeBufferSize;
-
- @IntegerConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.AZURE_READ_BUFFER_SIZE,
- MinValue = FileSystemConfigurations.MIN_BUFFER_SIZE,
- MaxValue = FileSystemConfigurations.MAX_BUFFER_SIZE,
- DefaultValue = FileSystemConfigurations.DEFAULT_READ_BUFFER_SIZE)
- private int readBufferSize;
-
- @IntegerConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.AZURE_MIN_BACKOFF_INTERVAL,
- DefaultValue = FileSystemConfigurations.DEFAULT_MIN_BACKOFF_INTERVAL)
- private int minBackoffInterval;
-
- @IntegerConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.AZURE_MAX_BACKOFF_INTERVAL,
- DefaultValue = FileSystemConfigurations.DEFAULT_MAX_BACKOFF_INTERVAL)
- private int maxBackoffInterval;
-
- @IntegerConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.AZURE_BACKOFF_INTERVAL,
- DefaultValue = FileSystemConfigurations.DEFAULT_BACKOFF_INTERVAL)
- private int backoffInterval;
-
- @IntegerConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.AZURE_MAX_IO_RETRIES,
- MinValue = 0,
- DefaultValue = FileSystemConfigurations.DEFAULT_MAX_RETRY_ATTEMPTS)
- private int maxIoRetries;
-
- @LongConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.AZURE_BLOCK_SIZE_PROPERTY_NAME,
- MinValue = 0,
- MaxValue = FileSystemConfigurations.MAX_AZURE_BLOCK_SIZE,
- DefaultValue = FileSystemConfigurations.MAX_AZURE_BLOCK_SIZE)
- private long azureBlockSize;
-
- @StringConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.AZURE_BLOCK_LOCATION_HOST_PROPERTY_NAME,
- DefaultValue = FileSystemConfigurations.AZURE_BLOCK_LOCATION_HOST_DEFAULT)
- private String azureBlockLocationHost;
-
- @IntegerConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.AZURE_CONCURRENT_CONNECTION_VALUE_OUT,
- MinValue = 1,
- DefaultValue = FileSystemConfigurations.MAX_CONCURRENT_WRITE_THREADS)
- private int maxConcurrentWriteThreads;
-
- @IntegerConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.AZURE_CONCURRENT_CONNECTION_VALUE_IN,
- MinValue = 1,
- DefaultValue = FileSystemConfigurations.MAX_CONCURRENT_READ_THREADS)
- private int maxConcurrentReadThreads;
-
- @BooleanConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.AZURE_TOLERATE_CONCURRENT_APPEND,
- DefaultValue = FileSystemConfigurations.DEFAULT_READ_TOLERATE_CONCURRENT_APPEND)
- private boolean tolerateOobAppends;
-
- @StringConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.FS_AZURE_ATOMIC_RENAME_KEY,
- DefaultValue = FileSystemConfigurations.DEFAULT_FS_AZURE_ATOMIC_RENAME_DIRECTORIES)
- private String azureAtomicDirs;
-
- @BooleanConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.AZURE_CREATE_REMOTE_FILESYSTEM_DURING_INITIALIZATION,
- DefaultValue = FileSystemConfigurations.DEFAULT_AZURE_CREATE_REMOTE_FILESYSTEM_DURING_INITIALIZATION)
- private boolean createRemoteFileSystemDuringInitialization;
-
- @IntegerConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.FS_AZURE_READ_AHEAD_QUEUE_DEPTH,
- DefaultValue = FileSystemConfigurations.DEFAULT_READ_AHEAD_QUEUE_DEPTH)
- private int readAheadQueueDepth;
-
- private Map<String, String> storageAccountKeys;
-
- @Inject
- ConfigurationServiceImpl(final Configuration configuration) throws IllegalAccessException, InvalidConfigurationValueException {
- this.configuration = configuration;
- this.isSecure = this.configuration.getBoolean(ConfigurationKeys.FS_AZURE_SECURE_MODE, false);
-
- validateStorageAccountKeys();
- Field[] fields = this.getClass().getDeclaredFields();
- for (Field field : fields) {
- field.setAccessible(true);
- if (field.isAnnotationPresent(IntegerConfigurationValidatorAnnotation.class)) {
- field.set(this, validateInt(field));
- } else if (field.isAnnotationPresent(LongConfigurationValidatorAnnotation.class)) {
- field.set(this, validateLong(field));
- } else if (field.isAnnotationPresent(StringConfigurationValidatorAnnotation.class)) {
- field.set(this, validateString(field));
- } else if (field.isAnnotationPresent(Base64StringConfigurationValidatorAnnotation.class)) {
- field.set(this, validateBase64String(field));
- } else if (field.isAnnotationPresent(BooleanConfigurationValidatorAnnotation.class)) {
- field.set(this, validateBoolean(field));
- }
- }
- }
-
- @Override
- public boolean isEmulator() {
- return this.getConfiguration().getBoolean(ConfigurationKeys.FS_AZURE_EMULATOR_ENABLED, false);
- }
-
- @Override
- public boolean isSecureMode() {
- return this.isSecure;
- }
-
- @Override
- public String getStorageAccountKey(final String accountName) throws ConfigurationPropertyNotFoundException {
- String accountKey = this.storageAccountKeys.get(ConfigurationKeys.FS_AZURE_ACCOUNT_KEY_PROPERTY_NAME + accountName);
- if (accountKey == null) {
- throw new ConfigurationPropertyNotFoundException(accountName);
- }
-
- return accountKey;
- }
-
- @Override
- public Configuration getConfiguration() {
- return this.configuration;
- }
-
- @Override
- public int getWriteBufferSize() {
- return this.writeBufferSize;
- }
-
- @Override
- public int getReadBufferSize() {
- return this.readBufferSize;
- }
-
- @Override
- public int getMinBackoffIntervalMilliseconds() {
- return this.minBackoffInterval;
- }
-
- @Override
- public int getMaxBackoffIntervalMilliseconds() {
- return this.maxBackoffInterval;
- }
-
- @Override
- public int getBackoffIntervalMilliseconds() {
- return this.backoffInterval;
- }
-
- @Override
- public int getMaxIoRetries() {
- return this.maxIoRetries;
- }
-
- @Override
- public long getAzureBlockSize() {
- return this.azureBlockSize;
- }
-
- @Override
- public String getAzureBlockLocationHost() {
- return this.azureBlockLocationHost;
- }
-
- @Override
- public int getMaxConcurrentWriteThreads() {
- return this.maxConcurrentWriteThreads;
- }
-
- @Override
- public int getMaxConcurrentReadThreads() {
- return this.maxConcurrentReadThreads;
- }
-
- @Override
- public boolean getTolerateOobAppends() {
- return this.tolerateOobAppends;
- }
-
- @Override
- public String getAzureAtomicRenameDirs() {
- return this.azureAtomicDirs;
- }
-
- @Override
- public boolean getCreateRemoteFileSystemDuringInitialization() {
- return this.createRemoteFileSystemDuringInitialization;
- }
-
- @Override
- public int getReadAheadQueueDepth() {
- return this.readAheadQueueDepth;
- }
-
- void validateStorageAccountKeys() throws InvalidConfigurationValueException {
- Base64StringConfigurationBasicValidator validator = new Base64StringConfigurationBasicValidator(
- ConfigurationKeys.FS_AZURE_ACCOUNT_KEY_PROPERTY_NAME, "", true);
- this.storageAccountKeys = this.configuration.getValByRegex(ConfigurationKeys.FS_AZURE_ACCOUNT_KEY_PROPERTY_NAME_REGX);
-
- for (Map.Entry<String, String> account : this.storageAccountKeys.entrySet()) {
- validator.validate(account.getValue());
- }
- }
-
- int validateInt(Field field) throws IllegalAccessException, InvalidConfigurationValueException {
- IntegerConfigurationValidatorAnnotation validator = field.getAnnotation(IntegerConfigurationValidatorAnnotation.class);
- String value = this.configuration.get(validator.ConfigurationKey());
-
- // validate
- return new IntegerConfigurationBasicValidator(
- validator.MinValue(),
- validator.MaxValue(),
- validator.DefaultValue(),
- validator.ConfigurationKey(),
- validator.ThrowIfInvalid()).validate(value);
- }
-
- long validateLong(Field field) throws IllegalAccessException, InvalidConfigurationValueException {
- LongConfigurationValidatorAnnotation validator = field.getAnnotation(LongConfigurationValidatorAnnotation.class);
- String value = this.configuration.get(validator.ConfigurationKey());
-
- // validate
- return new LongConfigurationBasicValidator(
- validator.MinValue(),
- validator.MaxValue(),
- validator.DefaultValue(),
- validator.ConfigurationKey(),
- validator.ThrowIfInvalid()).validate(value);
- }
-
- String validateString(Field field) throws IllegalAccessException, InvalidConfigurationValueException {
- StringConfigurationValidatorAnnotation validator = field.getAnnotation(StringConfigurationValidatorAnnotation.class);
- String value = this.configuration.get(validator.ConfigurationKey());
-
- // validate
- return new StringConfigurationBasicValidator(
- validator.ConfigurationKey(),
- validator.DefaultValue(),
- validator.ThrowIfInvalid()).validate(value);
- }
-
- String validateBase64String(Field field) throws IllegalAccessException, InvalidConfigurationValueException {
- Base64StringConfigurationValidatorAnnotation validator = field.getAnnotation((Base64StringConfigurationValidatorAnnotation.class));
- String value = this.configuration.get(validator.ConfigurationKey());
-
- // validate
- return new Base64StringConfigurationBasicValidator(
- validator.ConfigurationKey(),
- validator.DefaultValue(),
- validator.ThrowIfInvalid()).validate(value);
- }
-
- boolean validateBoolean(Field field) throws IllegalAccessException, InvalidConfigurationValueException {
- BooleanConfigurationValidatorAnnotation validator = field.getAnnotation(BooleanConfigurationValidatorAnnotation.class);
- String value = this.configuration.get(validator.ConfigurationKey());
-
- // validate
- return new BooleanConfigurationBasicValidator(
- validator.ConfigurationKey(),
- validator.DefaultValue(),
- validator.ThrowIfInvalid()).validate(value);
- }
-
- @VisibleForTesting
- void setReadBufferSize(int bufferSize) {
- this.readBufferSize = bufferSize;
- }
-
- @VisibleForTesting
- void setWriteBufferSize(int bufferSize) {
- this.writeBufferSize = bufferSize;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/62ea8ad9/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ExponentialRetryPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ExponentialRetryPolicy.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ExponentialRetryPolicy.java
index 0c92612..54aa1ab 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ExponentialRetryPolicy.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ExponentialRetryPolicy.java
@@ -21,7 +21,10 @@ package org.apache.hadoop.fs.azurebfs.services;
import java.util.Random;
import java.net.HttpURLConnection;
-class ExponentialRetryPolicy {
+/**
+ * Retry policy used by AbfsClient.
+ * */
+public class ExponentialRetryPolicy {
/**
* Represents the default number of retry attempts.
*/
@@ -83,7 +86,7 @@ class ExponentialRetryPolicy {
/**
* Initializes a new instance of the {@link ExponentialRetryPolicy} class.
*/
- ExponentialRetryPolicy() {
+ public ExponentialRetryPolicy() {
this(DEFAULT_CLIENT_RETRY_COUNT, DEFAULT_MIN_BACKOFF, DEFAULT_MAX_BACKOFF, DEFAULT_CLIENT_BACKOFF);
}
@@ -96,7 +99,7 @@ class ExponentialRetryPolicy {
* @param deltaBackoff The value that will be used to calculate a random delta in the exponential delay
* between retries.
*/
- ExponentialRetryPolicy(final int retryCount, final int minBackoff, final int maxBackoff, final int deltaBackoff) {
+ public ExponentialRetryPolicy(final int retryCount, final int minBackoff, final int maxBackoff, final int deltaBackoff) {
this.retryCount = retryCount;
this.minBackoff = minBackoff;
this.maxBackoff = maxBackoff;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/62ea8ad9/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/LoggerSpanReceiver.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/LoggerSpanReceiver.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/LoggerSpanReceiver.java
deleted file mode 100644
index 99190e6..0000000
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/LoggerSpanReceiver.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.fs.azurebfs.services;
-
-import java.io.IOException;
-
-import com.google.common.base.Preconditions;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;
-import org.apache.htrace.core.HTraceConfiguration;
-import org.apache.htrace.core.Span;
-import org.apache.htrace.core.SpanReceiver;
-import org.apache.htrace.fasterxml.jackson.core.JsonProcessingException;
-import org.apache.htrace.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.htrace.fasterxml.jackson.databind.ObjectWriter;
-import org.apache.htrace.fasterxml.jackson.databind.SerializationFeature;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * LoggerSpanReceiver is a layer between HTrace and log4j only used for {@link org.apache.hadoop.fs.azurebfs.contracts.services.TracingService}
- */
-@InterfaceAudience.Public
-@InterfaceStability.Evolving
-public class LoggerSpanReceiver extends SpanReceiver {
- private static final ObjectWriter JSON_WRITER =
- new ObjectMapper()
- .configure(SerializationFeature.INDENT_OUTPUT, true)
- .configure(SerializationFeature.WRITE_BIGDECIMAL_AS_PLAIN, true)
- .configure(SerializationFeature.WRITE_EMPTY_JSON_ARRAYS, false)
- .configure(SerializationFeature.USE_EQUALITY_FOR_OBJECT_ID, false)
- .writer();
-
- public LoggerSpanReceiver(HTraceConfiguration hTraceConfiguration) {
- Preconditions.checkNotNull(hTraceConfiguration, "hTraceConfiguration");
- }
-
- @Override
- public void receiveSpan(final Span span) {
- String jsonValue;
-
- Logger logger = LoggerFactory.getLogger(AzureBlobFileSystem.class);
-
- try {
- jsonValue = JSON_WRITER.writeValueAsString(span);
- logger.trace(jsonValue);
- } catch (JsonProcessingException e) {
- logger.error("Json processing error: " + e.getMessage());
- }
- }
-
- @Override
- public void close() throws IOException {
- // No-Op
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/62ea8ad9/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/TracingServiceImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/TracingServiceImpl.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/TracingServiceImpl.java
deleted file mode 100644
index 57b6463..0000000
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/TracingServiceImpl.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.fs.azurebfs.services;
-
-import java.io.PrintWriter;
-import java.io.StringWriter;
-import java.util.Objects;
-
-import com.google.common.base.Preconditions;
-import com.google.inject.Inject;
-import com.google.inject.Singleton;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
-import org.apache.hadoop.fs.azurebfs.contracts.services.TracingService;
-import org.apache.htrace.core.HTraceConfiguration;
-import org.apache.htrace.core.Sampler;
-import org.apache.htrace.core.SpanId;
-import org.apache.htrace.core.TraceScope;
-import org.apache.htrace.core.Tracer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-@Singleton
-@InterfaceAudience.Private
-@InterfaceStability.Evolving
-final class TracingServiceImpl implements TracingService {
- private static final Logger LOG = LoggerFactory.getLogger(TracingService.class);
-
- private final Tracer tracer;
- private final ThreadLocal<SpanId> currentScopeId;
-
- @Inject
- TracingServiceImpl(
- final Configuration configuration) {
- Preconditions.checkNotNull(configuration, "configuration");
-
- this.currentScopeId = new ThreadLocal<>();
-
- this.tracer = new Tracer.Builder(TracingService.class.getSimpleName()).
- conf(new HTraceConfiguration() {
- @Override
- public String get(String key) {
- if (Objects.equals(key, Tracer.SPAN_RECEIVER_CLASSES_KEY)) {
- return LoggerSpanReceiver.class.getName();
- }
- return null;
- }
-
- @Override
- public String get(String key, String defaultValue) {
- String value = get(key);
- if (value != null) {
- return value;
- }
- return defaultValue;
- }
- }).
- build();
-
- this.tracer.addSampler(Sampler.ALWAYS);
- }
-
- @Override
- public TraceScope traceBegin(String description) {
- if (this.LOG.isTraceEnabled()) {
- TraceScope traceScope = this.tracer.newScope(description);
- this.currentScopeId.set(traceScope.getSpanId());
- return traceScope;
- }
-
- return null;
- }
-
- @Override
- public TraceScope traceBegin(String description, SpanId parentSpanId) {
- if (this.LOG.isTraceEnabled()) {
- TraceScope traceScope = this.tracer.newScope(description, parentSpanId);
- this.currentScopeId.set(traceScope.getSpanId());
- return traceScope;
- }
-
- return null;
- }
-
- @Override
- public void traceException(TraceScope traceScope, AzureBlobFileSystemException azureBlobFileSystemException) {
- if (this.LOG.isTraceEnabled()) {
- Preconditions.checkNotNull(traceScope, "traceScope");
- Preconditions.checkNotNull(azureBlobFileSystemException, "azureBlobFileSystemException");
-
- StringWriter stringWriter = new StringWriter();
- PrintWriter printWriter = new PrintWriter(stringWriter);
- azureBlobFileSystemException.printStackTrace(printWriter);
- printWriter.flush();
-
- traceScope.addKVAnnotation("Exception", stringWriter.toString());
- }
- }
-
- @Override
- public SpanId getCurrentTraceScopeSpanId() {
- return this.currentScopeId.get();
- }
-
- @Override
- public void traceEnd(TraceScope traceScope) {
- if (this.LOG.isTraceEnabled()) {
- Preconditions.checkNotNull(traceScope, "traceScope");
-
- SpanId[] parents = traceScope.getSpan().getParents();
- this.currentScopeId.set(parents != null && parents.length > 0 ? parents[parents.length - 1] : null);
- traceScope.close();
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/62ea8ad9/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/DependencyInjectedTest.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/DependencyInjectedTest.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/DependencyInjectedTest.java
index 5ec1e2e..74a530c 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/DependencyInjectedTest.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/DependencyInjectedTest.java
@@ -23,11 +23,9 @@ import java.util.UUID;
import java.util.concurrent.Callable;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
-import org.apache.hadoop.fs.azurebfs.services.AbfsServiceProviderImpl;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
-import org.mockito.internal.util.MockUtil;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
@@ -38,12 +36,6 @@ import org.apache.hadoop.fs.azure.metrics.AzureFileSystemInstrumentation;
import org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys;
import org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes;
import org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys;
-import org.apache.hadoop.fs.azurebfs.contracts.services.AbfsHttpClientFactory;
-import org.apache.hadoop.fs.azurebfs.contracts.services.AbfsHttpService;
-import org.apache.hadoop.fs.azurebfs.contracts.services.ConfigurationService;
-import org.apache.hadoop.fs.azurebfs.services.MockAbfsHttpClientFactoryImpl;
-import org.apache.hadoop.fs.azurebfs.services.MockAbfsServiceInjectorImpl;
-import org.apache.hadoop.fs.azurebfs.services.MockServiceProviderImpl;
import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.FILE_SYSTEM_NOT_FOUND;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
@@ -54,7 +46,6 @@ import static org.junit.Assume.assumeNotNull;
* Provide dependencies for AzureBlobFileSystem tests.
*/
public abstract class DependencyInjectedTest {
- private final MockAbfsServiceInjectorImpl mockServiceInjector;
private final boolean isEmulator;
private NativeAzureFileSystem wasb;
private String abfsScheme;
@@ -64,21 +55,19 @@ public abstract class DependencyInjectedTest {
private String accountName;
private String testUrl;
+ public static final String TEST_CONTAINER_PREFIX = "abfs-testcontainer-";
+
public DependencyInjectedTest(final boolean secure) {
this(secure ? FileSystemUriSchemes.ABFS_SECURE_SCHEME : FileSystemUriSchemes.ABFS_SCHEME);
}
- public MockAbfsServiceInjectorImpl getMockServiceInjector() {
- return this.mockServiceInjector;
- }
-
protected DependencyInjectedTest() {
this(FileSystemUriSchemes.ABFS_SCHEME);
}
private DependencyInjectedTest(final String scheme) {
abfsScheme = scheme;
- fileSystemName = UUID.randomUUID().toString();
+ fileSystemName = TEST_CONTAINER_PREFIX + UUID.randomUUID().toString();
configuration = new Configuration();
configuration.addResource("azure-bfs-test.xml");
@@ -98,18 +87,14 @@ public abstract class DependencyInjectedTest {
this.testUrl = defaultUri.toString();
configuration.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, defaultUri.toString());
configuration.setBoolean(ConfigurationKeys.AZURE_CREATE_REMOTE_FILESYSTEM_DURING_INITIALIZATION, true);
- this.mockServiceInjector = new MockAbfsServiceInjectorImpl(configuration);
this.isEmulator = this.configuration.getBoolean(ConfigurationKeys.FS_AZURE_EMULATOR_ENABLED, false);
this.accountName = this.configuration.get(TestConfigurationKeys.FS_AZURE_TEST_ACCOUNT_NAME);
}
@Before
public void initialize() throws Exception {
- if (this.isEmulator) {
- this.mockServiceInjector.replaceProvider(AbfsHttpClientFactory.class, MockAbfsHttpClientFactoryImpl.class);
- }
-
- MockServiceProviderImpl.create(this.mockServiceInjector);
+ //Create filesystem first to make sure getWasbFileSystem() can return an existed filesystem.
+ this.getFileSystem();
if (!this.isEmulator) {
final URI wasbUri = new URI(abfsUrlToWasbUrl(this.getTestUrl()));
@@ -133,28 +118,24 @@ public abstract class DependencyInjectedTest {
FileSystem.closeAll();
final AzureBlobFileSystem fs = this.getFileSystem();
- final AbfsHttpService abfsHttpService = AbfsServiceProviderImpl.instance().get(AbfsHttpService.class);
- abfsHttpService.deleteFilesystem(fs);
-
- if (!(new MockUtil().isMock(abfsHttpService))) {
- AbfsRestOperationException ex = intercept(
- AbfsRestOperationException.class,
- new Callable<Void>() {
- @Override
- public Void call() throws Exception {
- abfsHttpService.getFilesystemProperties(fs);
- return null;
- }
- });
-
- assertEquals(FILE_SYSTEM_NOT_FOUND.getStatusCode(), ex.getStatusCode());
- }
+ final AzureBlobFileSystemStore abfsStore = fs.getAbfsStore();
+ abfsStore.deleteFilesystem();
+
+ AbfsRestOperationException ex = intercept(
+ AbfsRestOperationException.class,
+ new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ fs.getAbfsStore().getFilesystemProperties();
+ return null;
+ }
+ });
+
+ assertEquals(FILE_SYSTEM_NOT_FOUND.getStatusCode(), ex.getStatusCode());
}
public AzureBlobFileSystem getFileSystem() throws Exception {
- final Configuration configuration = AbfsServiceProviderImpl.instance().get(ConfigurationService.class).getConfiguration();
- final AzureBlobFileSystem fs = (AzureBlobFileSystem) FileSystem.get(configuration);
- return fs;
+ return (AzureBlobFileSystem) FileSystem.get(this.configuration);
}
protected NativeAzureFileSystem getWasbFileSystem() {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/62ea8ad9/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemE2E.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemE2E.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemE2E.java
index 4985f58..ad22f99 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemE2E.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemE2E.java
@@ -22,15 +22,12 @@ import java.io.IOException;
import java.util.Arrays;
import java.util.Random;
-
-import org.apache.hadoop.fs.azurebfs.services.AbfsServiceProviderImpl;
import org.junit.Test;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.azurebfs.contracts.services.ConfigurationService;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys;
@@ -52,8 +49,6 @@ public class ITestAzureBlobFileSystemE2E extends DependencyInjectedTest {
super();
Configuration configuration = this.getConfiguration();
configuration.set(ConfigurationKeys.FS_AZURE_READ_AHEAD_QUEUE_DEPTH, "0");
- this.getMockServiceInjector().replaceInstance(Configuration.class, configuration);
-
}
@Test
@@ -82,7 +77,7 @@ public class ITestAzureBlobFileSystemE2E extends DependencyInjectedTest {
@Test (expected = IOException.class)
public void testOOBWrites() throws Exception {
final AzureBlobFileSystem fs = this.getFileSystem();
- int readBufferSize = AbfsServiceProviderImpl.instance().get(ConfigurationService.class).getReadBufferSize();
+ int readBufferSize = fs.getAbfsStore().getAbfsConfiguration().getReadBufferSize();
fs.create(TEST_FILE);
FSDataOutputStream writeStream = fs.create(TEST_FILE);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/62ea8ad9/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRandomRead.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRandomRead.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRandomRead.java
index 9477587..8b96c69 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRandomRead.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRandomRead.java
@@ -535,17 +535,16 @@ public class ITestAzureBlobFileSystemRandomRead extends DependencyInjectedTest {
character = (character == 'z') ? 'a' : (char) ((int) character + 1);
}
- System.out.println(("Creating test file {} of size: {} " + TEST_FILE_PATH
- + TEST_FILE_SIZE));
+ System.out.println(String.format("Creating test file %s of size: %d ", TEST_FILE_PATH, TEST_FILE_SIZE));
ContractTestUtils.NanoTimer timer = new ContractTestUtils.NanoTimer();
- try(FSDataOutputStream outputStream = fs.create(TEST_FILE_PATH)) {
+ try (FSDataOutputStream outputStream = fs.create(TEST_FILE_PATH)) {
int bytesWritten = 0;
while (bytesWritten < TEST_FILE_SIZE) {
outputStream.write(buffer);
bytesWritten += buffer.length;
}
- System.out.println("Closing stream {}" + outputStream);
+ System.out.println(String.format("Closing stream %s", outputStream));
ContractTestUtils.NanoTimer closeTimer
= new ContractTestUtils.NanoTimer();
outputStream.close();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/62ea8ad9/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestFileSystemInitialization.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestFileSystemInitialization.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestFileSystemInitialization.java
index aa30a85..29af1b8 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestFileSystemInitialization.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestFileSystemInitialization.java
@@ -22,18 +22,10 @@ import java.net.URI;
import org.junit.Assert;
import org.junit.Test;
-import org.mockito.Mockito;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
-import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes;
-import org.apache.hadoop.fs.azurebfs.contracts.services.AbfsHttpService;
-import org.apache.hadoop.fs.azurebfs.services.AbfsServiceProviderImpl;
-
-import static org.mockito.Matchers.anyObject;
-import static org.mockito.Mockito.doReturn;
/**
* Test AzureBlobFileSystem initialization.
@@ -41,18 +33,11 @@ import static org.mockito.Mockito.doReturn;
public class ITestFileSystemInitialization extends DependencyInjectedTest {
public ITestFileSystemInitialization() {
super();
-
- this.getMockServiceInjector().removeProvider(AbfsHttpService.class);
- this.getMockServiceInjector().replaceInstance(AbfsHttpService.class, Mockito.mock(AbfsHttpService.class));
}
@Test
public void ensureAzureBlobFileSystemIsInitialized() throws Exception {
- doReturn(new FileStatus(0, true, 0, 0, 0, new Path("/blah")))
- .when(AbfsServiceProviderImpl.instance().get(AbfsHttpService.class))
- .getFileStatus((AzureBlobFileSystem) anyObject(), (Path) anyObject());
-
- final FileSystem fs = FileSystem.get(this.getConfiguration());
+ final FileSystem fs = this.getFileSystem();
final String accountName = this.getAccountName();
final String filesystem = this.getFileSystemName();
@@ -62,16 +47,12 @@ public class ITestFileSystemInitialization extends DependencyInjectedTest {
@Test
public void ensureSecureAzureBlobFileSystemIsInitialized() throws Exception {
- doReturn(new FileStatus(0, true, 0, 0, 0, new Path("/blah")))
- .when(AbfsServiceProviderImpl.instance().get(AbfsHttpService.class))
- .getFileStatus((AzureBlobFileSystem) anyObject(), (Path) anyObject());
-
final String accountName = this.getAccountName();
final String filesystem = this.getFileSystemName();
final URI defaultUri = new URI(FileSystemUriSchemes.ABFS_SECURE_SCHEME, filesystem + "@" + accountName, null, null, null);
this.getConfiguration().set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, defaultUri.toString());
- final FileSystem fs = FileSystem.get(this.getConfiguration());
+ final FileSystem fs = this.getFileSystem();
Assert.assertEquals(fs.getUri(), new URI(FileSystemUriSchemes.ABFS_SECURE_SCHEME, filesystem + "@" + accountName, null, null, null));
Assert.assertNotNull(fs.getWorkingDirectory());
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/62ea8ad9/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestFileSystemProperties.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestFileSystemProperties.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestFileSystemProperties.java
new file mode 100644
index 0000000..62d967e
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestFileSystemProperties.java
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs;
+
+import java.util.Hashtable;
+
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Test FileSystemProperties.
+ */
+public class ITestFileSystemProperties extends DependencyInjectedTest {
+ private static final int TEST_DATA = 100;
+ private static final Path TEST_PATH = new Path("/testfile");
+ public ITestFileSystemProperties() {
+ super();
+ }
+
+ @Test
+ public void testReadWriteBytesToFileAndEnsureThreadPoolCleanup() throws Exception {
+ final AzureBlobFileSystem fs = this.getFileSystem();
+ testWriteOneByteToFileAndEnsureThreadPoolCleanup();
+
+ FSDataInputStream inputStream = fs.open(TEST_PATH, 4 * 1024 * 1024);
+ int i = inputStream.read();
+
+ assertEquals(TEST_DATA, i);
+ }
+
+ @Test
+ public void testWriteOneByteToFileAndEnsureThreadPoolCleanup() throws Exception {
+ final AzureBlobFileSystem fs = this.getFileSystem();
+ FSDataOutputStream stream = fs.create(TEST_PATH);
+
+ stream.write(TEST_DATA);
+ stream.close();
+
+ FileStatus fileStatus = fs.getFileStatus(TEST_PATH);
+ assertEquals(1, fileStatus.getLen());
+ }
+
+ @Test
+ @Ignore("JDK7 doesn't support PATCH, so PUT is used. Fix is applied in latest test tenant")
+ public void testBase64FileSystemProperties() throws Exception {
+ final AzureBlobFileSystem fs = this.getFileSystem();
+
+ final Hashtable<String, String> properties = new Hashtable<>();
+ properties.put("key", "{ value: value }");
+ fs.getAbfsStore().setFilesystemProperties(properties);
+ Hashtable<String, String> fetchedProperties = fs.getAbfsStore().getFilesystemProperties();
+
+ Assert.assertEquals(properties, fetchedProperties);
+ }
+
+ @Test
+ public void testBase64PathProperties() throws Exception {
+ final AzureBlobFileSystem fs = this.getFileSystem();
+ final Hashtable<String, String> properties = new Hashtable<>();
+ properties.put("key", "{ value: valueTest }");
+ fs.create(TEST_PATH);
+ fs.getAbfsStore().setPathProperties(TEST_PATH, properties);
+ Hashtable<String, String> fetchedProperties =
+ fs.getAbfsStore().getPathProperties(TEST_PATH);
+
+ Assert.assertEquals(properties, fetchedProperties);
+ }
+
+ @Test (expected = Exception.class)
+ public void testBase64InvalidFileSystemProperties() throws Exception {
+ final AzureBlobFileSystem fs = this.getFileSystem();
+ final Hashtable<String, String> properties = new Hashtable<>();
+ properties.put("key", "{ value: valueæ² }");
+ fs.getAbfsStore().setFilesystemProperties(properties);
+ Hashtable<String, String> fetchedProperties = fs.getAbfsStore().getFilesystemProperties();
+
+ Assert.assertEquals(properties, fetchedProperties);
+ }
+
+ @Test (expected = Exception.class)
+ public void testBase64InvalidPathProperties() throws Exception {
+ final AzureBlobFileSystem fs = this.getFileSystem();
+ final Hashtable<String, String> properties = new Hashtable<>();
+ properties.put("key", "{ value: valueTestå…© }");
+ fs.create(TEST_PATH);
+ fs.getAbfsStore().setPathProperties(TEST_PATH, properties);
+ Hashtable<String, String> fetchedProperties = fs.getAbfsStore().getPathProperties(TEST_PATH);
+
+ Assert.assertEquals(properties, fetchedProperties);
+ }
+
+ @Test
+ public void testSetFileSystemProperties() throws Exception {
+ final AzureBlobFileSystem fs = this.getFileSystem();
+ final Hashtable<String, String> properties = new Hashtable<>();
+ properties.put("containerForDevTest", "true");
+ fs.getAbfsStore().setFilesystemProperties(properties);
+ Hashtable<String, String> fetchedProperties = fs.getAbfsStore().getFilesystemProperties();
+
+ Assert.assertEquals(properties, fetchedProperties);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/62ea8ad9/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestFileSystemRegistration.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestFileSystemRegistration.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestFileSystemRegistration.java
index a55599b..ef61e52 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestFileSystemRegistration.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestFileSystemRegistration.java
@@ -20,22 +20,14 @@ package org.apache.hadoop.fs.azurebfs;
import java.net.URI;
-import org.apache.hadoop.fs.azurebfs.services.AbfsServiceProviderImpl;
import org.junit.Assert;
import org.junit.Test;
-import org.mockito.Mockito;
import org.apache.hadoop.fs.AbstractFileSystem;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FileContext;
-import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes;
-import org.apache.hadoop.fs.azurebfs.contracts.services.AbfsHttpService;
-
-import static org.mockito.Matchers.anyObject;
-import static org.mockito.Mockito.doReturn;
/**
* Test AzureBlobFileSystem registration.
@@ -43,17 +35,10 @@ import static org.mockito.Mockito.doReturn;
public class ITestFileSystemRegistration extends DependencyInjectedTest {
public ITestFileSystemRegistration() throws Exception {
super();
-
- this.getMockServiceInjector().removeProvider(AbfsHttpService.class);
- this.getMockServiceInjector().replaceInstance(AbfsHttpService.class, Mockito.mock(AbfsHttpService.class));
}
@Test
public void ensureAzureBlobFileSystemIsDefaultFileSystem() throws Exception {
- doReturn(new FileStatus(0, true, 0, 0, 0, new Path("/blah")))
- .when(AbfsServiceProviderImpl.instance().get(AbfsHttpService.class))
- .getFileStatus((AzureBlobFileSystem) anyObject(), (Path) anyObject());
-
FileSystem fs = FileSystem.get(this.getConfiguration());
Assert.assertTrue(fs instanceof AzureBlobFileSystem);
@@ -63,14 +48,10 @@ public class ITestFileSystemRegistration extends DependencyInjectedTest {
@Test
public void ensureSecureAzureBlobFileSystemIsDefaultFileSystem() throws Exception {
- doReturn(new FileStatus(0, true, 0, 0, 0, new Path("/blah")))
- .when(AbfsServiceProviderImpl.instance().get(AbfsHttpService.class))
- .getFileStatus((AzureBlobFileSystem) anyObject(), (Path) anyObject());
-
final String accountName = this.getAccountName();
- final String filesystem = this.getFileSystemName();
+ final String fileSystemName = this.getFileSystemName();
- final URI defaultUri = new URI(FileSystemUriSchemes.ABFS_SECURE_SCHEME, filesystem + "@" + accountName, null, null, null);
+ final URI defaultUri = new URI(FileSystemUriSchemes.ABFS_SECURE_SCHEME, fileSystemName + "@" + accountName, null, null, null);
this.getConfiguration().set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, defaultUri.toString());
FileSystem fs = FileSystem.get(this.getConfiguration());
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org