You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by tm...@apache.org on 2018/08/11 05:37:48 UTC
[44/50] [abbrv] hadoop git commit: HADOOP-15407. HADOOP-15540.
Support Windows Azure Storage - Blob file system "ABFS" in Hadoop: Core
Commit.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fdf5f4c3/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
new file mode 100644
index 0000000..c17a5c1
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
@@ -0,0 +1,402 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.UnsupportedEncodingException;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLEncoder;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Locale;
+
+import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
+import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidUriException;
+import org.apache.hadoop.fs.azurebfs.contracts.services.ConfigurationService;
+import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants;
+import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
+import org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * AbfsClient
+ */
+public class AbfsClient {
+ public static final Logger LOG = LoggerFactory.getLogger(AbfsClient.class);
+ private final URL baseUrl;
+ private final SharedKeyCredentials sharedKeyCredentials;
+ private final String xMsVersion = "2018-03-28";
+ private final ExponentialRetryPolicy retryPolicy;
+ private final String filesystem;
+ private final ConfigurationService configurationService;
+ private final String userAgent;
+
+ public AbfsClient(final URL baseUrl, final SharedKeyCredentials sharedKeyCredentials,
+ final ConfigurationService configurationService,
+ final ExponentialRetryPolicy exponentialRetryPolicy) {
+ this.baseUrl = baseUrl;
+ this.sharedKeyCredentials = sharedKeyCredentials;
+ String baseUrlString = baseUrl.toString();
+ this.filesystem = baseUrlString.substring(baseUrlString.lastIndexOf(AbfsHttpConstants.FORWARD_SLASH) + 1);
+ this.configurationService = configurationService;
+ this.retryPolicy = exponentialRetryPolicy;
+ this.userAgent = initializeUserAgent();
+ }
+
+ public String getFileSystem() {
+ return filesystem;
+ }
+
+ ExponentialRetryPolicy getRetryPolicy() {
+ return retryPolicy;
+ }
+
+ SharedKeyCredentials getSharedKeyCredentials() {
+ return sharedKeyCredentials;
+ }
+
+ List<AbfsHttpHeader> createDefaultHeaders() {
+ final List<AbfsHttpHeader> requestHeaders = new ArrayList<AbfsHttpHeader>();
+ requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.X_MS_VERSION, xMsVersion));
+ requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.ACCEPT, AbfsHttpConstants.APPLICATION_JSON
+ + AbfsHttpConstants.COMMA + AbfsHttpConstants.SINGLE_WHITE_SPACE + AbfsHttpConstants.APPLICATION_OCTET_STREAM));
+ requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.ACCEPT_CHARSET,
+ AbfsHttpConstants.UTF_8));
+ requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.CONTENT_TYPE, AbfsHttpConstants.EMPTY_STRING));
+ requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.USER_AGENT, userAgent));
+ return requestHeaders;
+ }
+
+ AbfsUriQueryBuilder createDefaultUriQueryBuilder() {
+ final AbfsUriQueryBuilder abfsUriQueryBuilder = new AbfsUriQueryBuilder();
+ abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_TIMEOUT, AbfsHttpConstants.DEFAULT_TIMEOUT);
+ return abfsUriQueryBuilder;
+ }
+
+ public AbfsRestOperation createFilesystem() throws AzureBlobFileSystemException {
+ final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
+
+ final AbfsUriQueryBuilder abfsUriQueryBuilder = new AbfsUriQueryBuilder();
+ abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_RESOURCE, AbfsHttpConstants.FILESYSTEM);
+
+ final URL url = createRequestUrl(abfsUriQueryBuilder.toString());
+ final AbfsRestOperation op = new AbfsRestOperation(
+ this,
+ AbfsHttpConstants.HTTP_METHOD_PUT,
+ url,
+ requestHeaders);
+ op.execute();
+ return op;
+ }
+
+ public AbfsRestOperation setFilesystemProperties(final String properties) throws AzureBlobFileSystemException {
+ final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
+ // JDK7 does not support PATCH, so to workaround the issue we will use
+ // PUT and specify the real method in the X-Http-Method-Override header.
+ requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.X_HTTP_METHOD_OVERRIDE,
+ AbfsHttpConstants.HTTP_METHOD_PATCH));
+
+ requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.X_MS_PROPERTIES,
+ properties));
+
+ final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
+ abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_RESOURCE, AbfsHttpConstants.FILESYSTEM);
+
+ final URL url = createRequestUrl(abfsUriQueryBuilder.toString());
+ final AbfsRestOperation op = new AbfsRestOperation(
+ this,
+ AbfsHttpConstants.HTTP_METHOD_PUT,
+ url,
+ requestHeaders);
+ op.execute();
+ return op;
+ }
+
+ public AbfsRestOperation listPath(final String relativePath, final boolean recursive, final int listMaxResults,
+ final String continuation) throws AzureBlobFileSystemException {
+ final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
+
+ final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
+ abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_RESOURCE, AbfsHttpConstants.FILESYSTEM);
+ abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_DIRECTORY, relativePath == null ? "" : urlEncode(relativePath));
+ abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_RECURSIVE, String.valueOf(recursive));
+ abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_CONTINUATION, continuation);
+ abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_MAXRESULTS, String.valueOf(listMaxResults));
+
+ final URL url = createRequestUrl(abfsUriQueryBuilder.toString());
+ final AbfsRestOperation op = new AbfsRestOperation(
+ this,
+ AbfsHttpConstants.HTTP_METHOD_GET,
+ url,
+ requestHeaders);
+ op.execute();
+ return op;
+ }
+
+ public AbfsRestOperation getFilesystemProperties() throws AzureBlobFileSystemException {
+ final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
+
+ final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
+ abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_RESOURCE, AbfsHttpConstants.FILESYSTEM);
+
+ final URL url = createRequestUrl(abfsUriQueryBuilder.toString());
+ final AbfsRestOperation op = new AbfsRestOperation(
+ this,
+ AbfsHttpConstants.HTTP_METHOD_HEAD,
+ url,
+ requestHeaders);
+ op.execute();
+ return op;
+ }
+
+ public AbfsRestOperation deleteFilesystem() throws AzureBlobFileSystemException {
+ final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
+
+ final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
+ abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_RESOURCE, AbfsHttpConstants.FILESYSTEM);
+
+ final URL url = createRequestUrl(abfsUriQueryBuilder.toString());
+ final AbfsRestOperation op = new AbfsRestOperation(
+ this,
+ AbfsHttpConstants.HTTP_METHOD_DELETE,
+ url,
+ requestHeaders);
+ op.execute();
+ return op;
+ }
+
+ public AbfsRestOperation createPath(final String path, final boolean isFile, final boolean overwrite)
+ throws AzureBlobFileSystemException {
+ final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
+ if (!overwrite) {
+ requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.IF_NONE_MATCH, "*"));
+ }
+
+ final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
+ abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_RESOURCE, isFile ? AbfsHttpConstants.FILE : AbfsHttpConstants.DIRECTORY);
+
+ final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
+ final AbfsRestOperation op = new AbfsRestOperation(
+ this,
+ AbfsHttpConstants.HTTP_METHOD_PUT,
+ url,
+ requestHeaders);
+ op.execute();
+ return op;
+ }
+
+ public AbfsRestOperation renamePath(final String source, final String destination, final String continuation)
+ throws AzureBlobFileSystemException {
+ final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
+
+ final String encodedRenameSource = urlEncode(AbfsHttpConstants.FORWARD_SLASH + this.getFileSystem() + source);
+ requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.X_MS_RENAME_SOURCE, encodedRenameSource));
+ requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.IF_NONE_MATCH, AbfsHttpConstants.STAR));
+
+ final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
+ abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_CONTINUATION, continuation);
+
+ final URL url = createRequestUrl(destination, abfsUriQueryBuilder.toString());
+ final AbfsRestOperation op = new AbfsRestOperation(
+ this,
+ AbfsHttpConstants.HTTP_METHOD_PUT,
+ url,
+ requestHeaders);
+ op.execute();
+ return op;
+ }
+
+ public AbfsRestOperation append(final String path, final long position, final byte[] buffer, final int offset,
+ final int length) throws AzureBlobFileSystemException {
+ final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
+ // JDK7 does not support PATCH, so to workaround the issue we will use
+ // PUT and specify the real method in the X-Http-Method-Override header.
+ requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.X_HTTP_METHOD_OVERRIDE,
+ AbfsHttpConstants.HTTP_METHOD_PATCH));
+
+ final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
+ abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_ACTION, AbfsHttpConstants.APPEND_ACTION);
+ abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_POSITION, Long.toString(position));
+
+ final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
+ final AbfsRestOperation op = new AbfsRestOperation(
+ this,
+ AbfsHttpConstants.HTTP_METHOD_PUT,
+ url,
+ requestHeaders, buffer, offset, length);
+ op.execute();
+ return op;
+ }
+
+
+ public AbfsRestOperation flush(final String path, final long position, boolean retainUncommittedData) throws AzureBlobFileSystemException {
+ final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
+ // JDK7 does not support PATCH, so to workaround the issue we will use
+ // PUT and specify the real method in the X-Http-Method-Override header.
+ requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.X_HTTP_METHOD_OVERRIDE,
+ AbfsHttpConstants.HTTP_METHOD_PATCH));
+
+ final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
+ abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_ACTION, AbfsHttpConstants.FLUSH_ACTION);
+ abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_POSITION, Long.toString(position));
+ abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_RETAIN_UNCOMMITTED_DATA, String.valueOf(retainUncommittedData));
+
+ final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
+ final AbfsRestOperation op = new AbfsRestOperation(
+ this,
+ AbfsHttpConstants.HTTP_METHOD_PUT,
+ url,
+ requestHeaders);
+ op.execute();
+ return op;
+ }
+
+ public AbfsRestOperation setPathProperties(final String path, final String properties) throws AzureBlobFileSystemException {
+ final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
+ // JDK7 does not support PATCH, so to workaround the issue we will use
+ // PUT and specify the real method in the X-Http-Method-Override header.
+ requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.X_HTTP_METHOD_OVERRIDE,
+ AbfsHttpConstants.HTTP_METHOD_PATCH));
+
+ requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.X_MS_PROPERTIES, properties));
+
+ final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
+ abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_ACTION, AbfsHttpConstants.SET_PROPERTIES_ACTION);
+
+ final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
+ final AbfsRestOperation op = new AbfsRestOperation(
+ this,
+ AbfsHttpConstants.HTTP_METHOD_PUT,
+ url,
+ requestHeaders);
+ op.execute();
+ return op;
+ }
+
+ public AbfsRestOperation getPathProperties(final String path) throws AzureBlobFileSystemException {
+ final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
+
+ final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
+
+ final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
+ final AbfsRestOperation op = new AbfsRestOperation(
+ this,
+ AbfsHttpConstants.HTTP_METHOD_HEAD,
+ url,
+ requestHeaders);
+ op.execute();
+ return op;
+ }
+
+ public AbfsRestOperation read(final String path, final long position, final byte[] buffer, final int bufferOffset,
+ final int bufferLength, final String eTag) throws AzureBlobFileSystemException {
+ final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
+ requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.RANGE,
+ String.format("bytes=%d-%d", position, position + bufferLength - 1)));
+ requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.IF_MATCH, eTag));
+
+ final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
+
+ final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
+
+ final AbfsRestOperation op = new AbfsRestOperation(
+ this,
+ AbfsHttpConstants.HTTP_METHOD_GET,
+ url,
+ requestHeaders,
+ buffer,
+ bufferOffset,
+ bufferLength);
+ op.execute();
+
+ return op;
+ }
+
+ public AbfsRestOperation deletePath(final String path, final boolean recursive, final String continuation)
+ throws AzureBlobFileSystemException {
+ final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
+
+ final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
+ abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_RECURSIVE, String.valueOf(recursive));
+ abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_CONTINUATION, continuation);
+
+ final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
+ final AbfsRestOperation op = new AbfsRestOperation(
+ this,
+ AbfsHttpConstants.HTTP_METHOD_DELETE,
+ url,
+ requestHeaders);
+ op.execute();
+ return op;
+ }
+
+ private URL createRequestUrl(final String query) throws AzureBlobFileSystemException {
+ return createRequestUrl(AbfsHttpConstants.EMPTY_STRING, query);
+ }
+
+ private URL createRequestUrl(final String path, final String query)
+ throws AzureBlobFileSystemException {
+ final String base = baseUrl.toString();
+ String encodedPath = path;
+ try {
+ encodedPath = urlEncode(path);
+ } catch (AzureBlobFileSystemException ex) {
+ this.LOG.debug(
+ "Unexpected error.", ex);
+ }
+
+ final StringBuilder sb = new StringBuilder();
+ sb.append(base);
+ sb.append(encodedPath);
+ sb.append(query);
+
+ final URL url;
+ try {
+ url = new URL(sb.toString());
+ } catch (MalformedURLException ex) {
+ throw new InvalidUriException(sb.toString());
+ }
+ return url;
+ }
+
+ private static String urlEncode(final String value) throws AzureBlobFileSystemException {
+ String encodedString = null;
+ try {
+ encodedString = URLEncoder.encode(value, AbfsHttpConstants.UTF_8)
+ .replace(AbfsHttpConstants.PLUS, AbfsHttpConstants.PLUS_ENCODE)
+ .replace(AbfsHttpConstants.FORWARD_SLASH_ENCODE, AbfsHttpConstants.FORWARD_SLASH);
+ } catch (UnsupportedEncodingException ex) {
+ throw new InvalidUriException(value);
+ }
+
+ return encodedString;
+ }
+
+ private String initializeUserAgent() {
+ final String userAgentComment = String.format(Locale.ROOT,
+ "(JavaJRE %s; %s %s)",
+ System.getProperty(AbfsHttpConstants.JAVA_VERSION),
+ System.getProperty(AbfsHttpConstants.OS_NAME)
+ .replaceAll(AbfsHttpConstants.SINGLE_WHITE_SPACE, AbfsHttpConstants.EMPTY_STRING),
+ System.getProperty(AbfsHttpConstants.OS_VERSION));
+
+ return String.format(AbfsHttpConstants.CLIENT_VERSION + " %s", userAgentComment);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fdf5f4c3/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpClientFactoryImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpClientFactoryImpl.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpClientFactoryImpl.java
new file mode 100644
index 0000000..9e4c27b
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpClientFactoryImpl.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.net.MalformedURLException;
+import java.net.URI;
+import java.net.URL;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;
+import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants;
+import org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes;
+import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
+import org.apache.hadoop.fs.azurebfs.contracts.services.AbfsHttpClientFactory;
+import org.apache.hadoop.fs.azurebfs.contracts.services.ConfigurationService;
+import org.apache.http.client.utils.URIBuilder;
+import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidUriAuthorityException;
+import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidUriException;
+
+@Singleton
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+class AbfsHttpClientFactoryImpl implements AbfsHttpClientFactory {
+ private final ConfigurationService configurationService;
+
+ @Inject
+ AbfsHttpClientFactoryImpl(
+ final ConfigurationService configurationService) {
+
+ Preconditions.checkNotNull(configurationService, "configurationService");
+
+ this.configurationService = configurationService;
+ }
+
+ @VisibleForTesting
+ URIBuilder getURIBuilder(final String hostName, final FileSystem fs) {
+ final AzureBlobFileSystem abfs = (AzureBlobFileSystem) fs;
+
+ String scheme = FileSystemUriSchemes.HTTP_SCHEME;
+
+ if (abfs.isSecure()) {
+ scheme = FileSystemUriSchemes.HTTPS_SCHEME;
+ }
+
+ final URIBuilder uriBuilder = new URIBuilder();
+ uriBuilder.setScheme(scheme);
+ uriBuilder.setHost(hostName);
+
+ return uriBuilder;
+ }
+
+ public AbfsClient create(final AzureBlobFileSystem fs) throws AzureBlobFileSystemException {
+ final URI uri = fs.getUri();
+ final String authority = uri.getRawAuthority();
+ if (null == authority) {
+ throw new InvalidUriAuthorityException(uri.toString());
+ }
+
+ if (!authority.contains(AbfsHttpConstants.AZURE_DISTRIBUTED_FILE_SYSTEM_AUTHORITY_DELIMITER)) {
+ throw new InvalidUriAuthorityException(uri.toString());
+ }
+
+ final String[] authorityParts = authority.split(AbfsHttpConstants.AZURE_DISTRIBUTED_FILE_SYSTEM_AUTHORITY_DELIMITER, 2);
+
+ if (authorityParts.length < 2 || "".equals(authorityParts[0])) {
+ final String errMsg = String
+ .format("URI '%s' has a malformed authority, expected container name. "
+ + "Authority takes the form "+ FileSystemUriSchemes.ABFS_SCHEME + "://[<container name>@]<account name>",
+ uri.toString());
+ throw new InvalidUriException(errMsg);
+ }
+
+ final String fileSystemName = authorityParts[0];
+ final String accountName = authorityParts[1];
+
+ final URIBuilder uriBuilder = getURIBuilder(accountName, fs);
+
+ final String url = uriBuilder.toString() + AbfsHttpConstants.FORWARD_SLASH + fileSystemName;
+
+ URL baseUrl;
+ try {
+ baseUrl = new URL(url);
+ } catch (MalformedURLException e) {
+ throw new InvalidUriException(String.format("URI '%s' is malformed", uri.toString()));
+ }
+
+ SharedKeyCredentials creds =
+ new SharedKeyCredentials(accountName.substring(0, accountName.indexOf(AbfsHttpConstants.DOT)),
+ this.configurationService.getStorageAccountKey(accountName));
+
+ return new AbfsClient(baseUrl, creds, configurationService, new ExponentialRetryPolicy());
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fdf5f4c3/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpHeader.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpHeader.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpHeader.java
new file mode 100644
index 0000000..46b4c6d
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpHeader.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+/**
+ * The Http Request / Response Headers for Rest AbfsClient
+ */
+public class AbfsHttpHeader {
+ private final String name;
+ private final String value;
+
+ public AbfsHttpHeader(final String name, final String value) {
+ this.name = name;
+ this.value = value;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public String getValue() {
+ return value;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fdf5f4c3/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java
new file mode 100644
index 0000000..0ea9365
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java
@@ -0,0 +1,430 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.util.List;
+import java.util.UUID;
+
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonToken;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants;
+import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
+import org.apache.hadoop.fs.azurebfs.contracts.services.ListResultSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Represents an HTTP operation.
+ */
+public class AbfsHttpOperation {
+ private static final Logger LOG = LoggerFactory.getLogger(AbfsHttpOperation.class);
+
+ private static final int CONNECT_TIMEOUT = 30 * 1000;
+ private static final int READ_TIMEOUT = 30 * 1000;
+
+ private static final int CLEAN_UP_BUFFER_SIZE = 64 * 1024;
+
+ private static final int ONE_THOUSAND = 1000;
+ private static final int ONE_MILLION = ONE_THOUSAND * ONE_THOUSAND;
+
+ private final String method;
+ private final URL url;
+
+ private HttpURLConnection connection;
+ private int statusCode;
+ private String statusDescription;
+ private String storageErrorCode = "";
+ private String storageErrorMessage = "";
+ private String clientRequestId = "";
+ private String requestId = "";
+ private ListResultSchema listResultSchema = null;
+
+ // metrics
+ private int bytesSent;
+ private long bytesReceived;
+
+ // optional trace enabled metrics
+ private final boolean isTraceEnabled;
+ private long connectionTimeMs;
+ private long sendRequestTimeMs;
+ private long recvResponseTimeMs;
+
+ protected HttpURLConnection getConnection() {
+ return connection;
+ }
+
+ public String getMethod() {
+ return method;
+ }
+
+ public URL getUrl() {
+ return url;
+ }
+
+ public int getStatusCode() {
+ return statusCode;
+ }
+
+ public String getStatusDescription() {
+ return statusDescription;
+ }
+
+ public String getStorageErrorCode() {
+ return storageErrorCode;
+ }
+
+ public String getStorageErrorMessage() {
+ return storageErrorMessage;
+ }
+
+ public String getClientRequestId() {
+ return clientRequestId;
+ }
+
+ public String getRequestId() {
+ return requestId;
+ }
+
+ public int getBytesSent() {
+ return bytesSent;
+ }
+
+ public long getBytesReceived() {
+ return bytesReceived;
+ }
+
+ public ListResultSchema getListResultSchema() {
+ return listResultSchema;
+ }
+
+ public String getResponseHeader(String httpHeader) {
+ return connection.getHeaderField(httpHeader);
+ }
+
+ // Returns a trace message for the request
+ @Override
+ public String toString() {
+ final String urlStr = url.toString();
+ final StringBuilder sb = new StringBuilder();
+ sb.append(statusCode);
+ sb.append(",");
+ sb.append(storageErrorCode);
+ sb.append(",cid=");
+ sb.append(clientRequestId);
+ sb.append(",rid=");
+ sb.append(requestId);
+ if (isTraceEnabled) {
+ sb.append(",connMs=");
+ sb.append(connectionTimeMs);
+ sb.append(",sendMs=");
+ sb.append(sendRequestTimeMs);
+ sb.append(",recvMs=");
+ sb.append(recvResponseTimeMs);
+ }
+ sb.append(",sent=");
+ sb.append(bytesSent);
+ sb.append(",recv=");
+ sb.append(bytesReceived);
+ sb.append(",");
+ sb.append(method);
+ sb.append(",");
+ sb.append(urlStr);
+ return sb.toString();
+ }
+
+ /**
+ * Initializes a new HTTP request and opens the connection.
+ *
+ * @param url The full URL including query string parameters.
+ * @param method The HTTP method (PUT, PATCH, POST, GET, HEAD, or DELETE).
+ * @param requestHeaders The HTTP request headers.READ_TIMEOUT
+ *
+ * @throws IOException if an error occurs.
+ */
+ public AbfsHttpOperation(final URL url, final String method, final List<AbfsHttpHeader> requestHeaders)
+ throws IOException {
+ this.isTraceEnabled = this.LOG.isTraceEnabled();
+ this.url = url;
+ this.method = method;
+ this.clientRequestId = UUID.randomUUID().toString();
+
+ this.connection = openConnection();
+
+ this.connection.setConnectTimeout(CONNECT_TIMEOUT);
+ this.connection.setReadTimeout(READ_TIMEOUT);
+
+ this.connection.setRequestMethod(method);
+
+ for (AbfsHttpHeader header : requestHeaders) {
+ this.connection.setRequestProperty(header.getName(), header.getValue());
+ }
+
+ this.connection.setRequestProperty(HttpHeaderConfigurations.X_MS_CLIENT_REQUEST_ID, clientRequestId);
+ }
+
+ /**
+ * Sends the HTTP request. Note that HttpUrlConnection requires that an
+ * empty buffer be sent in order to set the "Content-Length: 0" header, which
+ * is required by our endpoint.
+ *
+ * @param buffer the request entity body.
+ * @param offset an offset into the buffer where the data beings.
+ * @param length the length of the data in the buffer.
+ *
+ * @throws IOException if an error occurs.
+ */
+ public void sendRequest(byte[] buffer, int offset, int length) throws IOException {
+ this.connection.setDoOutput(true);
+ this.connection.setFixedLengthStreamingMode(length);
+ if (buffer == null) {
+ // An empty buffer is sent to set the "Content-Length: 0" header, which
+ // is required by our endpoint.
+ buffer = new byte[]{};
+ offset = 0;
+ length = 0;
+ }
+
+ // send the request body
+
+ long startTime = 0;
+ if (this.isTraceEnabled) {
+ startTime = System.nanoTime();
+ }
+ try (OutputStream outputStream = this.connection.getOutputStream()) {
+ // update bytes sent before they are sent so we may observe
+ // attempted sends as well as successful sends via the
+ // accompanying statusCode
+ this.bytesSent = length;
+ outputStream.write(buffer, offset, length);
+ } finally {
+ if (this.isTraceEnabled) {
+ this.sendRequestTimeMs = elapsedTimeMs(startTime);
+ }
+ }
+ }
+
+ /**
+ * Gets and processes the HTTP response.
+ *
+ * @throws IOException if an error occurs.
+ */
+ public void processResponse(final byte[] buffer, final int offset, final int length) throws IOException {
+
+ // get the response
+ long startTime = 0;
+ if (this.isTraceEnabled) {
+ startTime = System.nanoTime();
+ }
+
+ this.statusCode = this.connection.getResponseCode();
+
+ if (this.isTraceEnabled) {
+ this.recvResponseTimeMs = elapsedTimeMs(startTime);
+ }
+
+ this.statusDescription = this.connection.getResponseMessage();
+
+ this.requestId = this.connection.getHeaderField(HttpHeaderConfigurations.X_MS_REQUEST_ID);
+ if (this.requestId == null) {
+ this.requestId = AbfsHttpConstants.EMPTY_STRING;
+ }
+
+ if (AbfsHttpConstants.HTTP_METHOD_HEAD.equals(this.method)) {
+ // If it is HEAD, and it is ERROR
+ return;
+ }
+
+ if (this.isTraceEnabled) {
+ startTime = System.nanoTime();
+ }
+
+ if (statusCode >= HttpURLConnection.HTTP_BAD_REQUEST) {
+ processStorageErrorResponse();
+ if (this.isTraceEnabled) {
+ this.recvResponseTimeMs += elapsedTimeMs(startTime);
+ }
+ this.bytesReceived = this.connection.getHeaderFieldLong(HttpHeaderConfigurations.CONTENT_LENGTH, 0);
+ } else {
+ // consume the input stream to release resources
+ int totalBytesRead = 0;
+
+ try (InputStream stream = this.connection.getInputStream()) {
+ if (isNullInputStream(stream)) {
+ return;
+ }
+ boolean endOfStream = false;
+
+ // this is a list operation and need to retrieve the data
+ // need a better solution
+ if (AbfsHttpConstants.HTTP_METHOD_GET.equals(this.method) && buffer == null) {
+ parseListFilesResponse(stream);
+ } else {
+ if (buffer != null) {
+ while (totalBytesRead < length) {
+ int bytesRead = stream.read(buffer, offset + totalBytesRead, length - totalBytesRead);
+ if (bytesRead == -1) {
+ endOfStream = true;
+ break;
+ }
+ totalBytesRead += bytesRead;
+ }
+ }
+ if (!endOfStream && stream.read() != -1) {
+ // read and discard
+ int bytesRead = 0;
+ byte[] b = new byte[CLEAN_UP_BUFFER_SIZE];
+ while ((bytesRead = stream.read(b)) >= 0) {
+ totalBytesRead += bytesRead;
+ }
+ }
+ }
+ } catch (IOException ex) {
+ this.LOG.error("UnexpectedError: ", ex);
+ throw ex;
+ } finally {
+ if (this.isTraceEnabled) {
+ this.recvResponseTimeMs += elapsedTimeMs(startTime);
+ }
+ this.bytesReceived = totalBytesRead;
+ }
+ }
+ }
+
+
+ /**
+ * Open the HTTP connection.
+ *
+ * @throws IOException if an error occurs.
+ */
+ private HttpURLConnection openConnection() throws IOException {
+ if (!isTraceEnabled) {
+ return (HttpURLConnection) url.openConnection();
+ }
+ long start = System.nanoTime();
+ try {
+ return (HttpURLConnection) url.openConnection();
+ } finally {
+ connectionTimeMs = elapsedTimeMs(start);
+ }
+ }
+
+ /**
+ * When the request fails, this function is used to parse the responseAbfsHttpClient.LOG.debug("ExpectedError: ", ex);
+ * and extract the storageErrorCode and storageErrorMessage. Any errors
+ * encountered while attempting to process the error response are logged,
+ * but otherwise ignored.
+ *
+ * For storage errors, the response body *usually* has the following format:
+ *
+ * {
+ * "error":
+ * {
+ * "code": "string",
+ * "message": "string"
+ * }
+ * }
+ *
+ */
+ private void processStorageErrorResponse() {
+ try (InputStream stream = connection.getErrorStream()) {
+ if (stream == null) {
+ return;
+ }
+ JsonFactory jf = new JsonFactory();
+ try (JsonParser jp = jf.createParser(stream)) {
+ String fieldName, fieldValue;
+ jp.nextToken(); // START_OBJECT - {
+ jp.nextToken(); // FIELD_NAME - "error":
+ jp.nextToken(); // START_OBJECT - {
+ jp.nextToken();
+ while (jp.hasCurrentToken()) {
+ if (jp.getCurrentToken() == JsonToken.FIELD_NAME) {
+ fieldName = jp.getCurrentName();
+ jp.nextToken();
+ fieldValue = jp.getText();
+ switch (fieldName) {
+ case "code":
+ storageErrorCode = fieldValue;
+ break;
+ case "message":
+ storageErrorMessage = fieldValue;
+ break;
+ default:
+ break;
+ }
+ }
+ jp.nextToken();
+ }
+ }
+ } catch (IOException ex) {
+ // Ignore errors that occur while attempting to parse the storage
+ // error, since the response may have been handled by the HTTP driver
+ // or for other reasons have an unexpected
+ this.LOG.debug("ExpectedError: ", ex);
+ }
+ }
+
+ /**
+ * Returns the elapsed time in milliseconds.
+ */
+ private long elapsedTimeMs(final long startTime) {
+ return (System.nanoTime() - startTime) / ONE_MILLION;
+ }
+
+ /**
+ * Parse the list file response
+ *
+ * @param stream InputStream contains the list results.
+ * @throws IOException
+ */
+ private void parseListFilesResponse(final InputStream stream) throws IOException {
+ if (stream == null) {
+ return;
+ }
+
+ if (listResultSchema != null) {
+ // already parse the response
+ return;
+ }
+
+ try {
+ final ObjectMapper objectMapper = new ObjectMapper();
+ this.listResultSchema = objectMapper.readValue(stream, ListResultSchema.class);
+ } catch (IOException ex) {
+ this.LOG.error("Unable to deserialize list results", ex);
+ throw ex;
+ }
+ }
+
+ /**
+ * Check null stream, this is to pass findbugs's redundant check for NULL
+ * @param stream InputStream
+ */
+ private boolean isNullInputStream(InputStream stream) {
+ return stream == null ? true : false;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fdf5f4c3/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpServiceImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpServiceImpl.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpServiceImpl.java
new file mode 100644
index 0000000..06e1a8a
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpServiceImpl.java
@@ -0,0 +1,693 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import javax.xml.bind.DatatypeConverter;
+import java.io.File;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.ByteBuffer;
+import java.nio.CharBuffer;
+import java.nio.charset.CharacterCodingException;
+import java.nio.charset.Charset;
+import java.nio.charset.CharsetDecoder;
+import java.nio.charset.CharsetEncoder;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Set;
+import java.util.HashSet;
+import java.util.Hashtable;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import com.google.common.base.Preconditions;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
+import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidAbfsRestOperationException;
+import org.joda.time.DateTime;
+import org.joda.time.format.DateTimeFormat;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+
+import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants;
+import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
+import org.apache.hadoop.fs.azurebfs.contracts.exceptions.TimeoutException;
+import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidFileSystemPropertyException;
+import org.apache.hadoop.fs.azurebfs.contracts.services.AbfsHttpService;
+import org.apache.hadoop.fs.azurebfs.contracts.services.AbfsHttpClientFactory;
+import org.apache.hadoop.fs.azurebfs.contracts.services.ConfigurationService;
+import org.apache.hadoop.fs.azurebfs.contracts.services.TracingService;
+import org.apache.hadoop.fs.azurebfs.contracts.services.ListResultEntrySchema;
+import org.apache.hadoop.fs.azurebfs.contracts.services.ListResultSchema;
+import org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;
+import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.hadoop.util.Time.now;
+
+@Singleton
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+final class AbfsHttpServiceImpl implements AbfsHttpService {
+ public static final Logger LOG = LoggerFactory.getLogger(AbfsHttpService.class);
+ private static final String DATE_TIME_PATTERN = "E, dd MMM yyyy HH:mm:ss 'GMT'";
+ private static final String XMS_PROPERTIES_ENCODING = "ISO-8859-1";
+ private static final int LIST_MAX_RESULTS = 5000;
+ private static final int DELETE_DIRECTORY_TIMEOUT_MILISECONDS = 180000;
+ private static final int RENAME_TIMEOUT_MILISECONDS = 180000;
+
+ private final AbfsHttpClientFactory abfsHttpClientFactory;
+ private final ConcurrentHashMap<AzureBlobFileSystem, AbfsClient> clientCache;
+ private final ConfigurationService configurationService;
+ private final Set<String> azureAtomicRenameDirSet;
+
+ @Inject
+ AbfsHttpServiceImpl(
+ final ConfigurationService configurationService,
+ final AbfsHttpClientFactory abfsHttpClientFactory,
+ final TracingService tracingService) {
+ Preconditions.checkNotNull(abfsHttpClientFactory, "abfsHttpClientFactory");
+ Preconditions.checkNotNull(configurationService, "configurationService");
+ Preconditions.checkNotNull(tracingService, "tracingService");
+
+ this.configurationService = configurationService;
+ this.clientCache = new ConcurrentHashMap<>();
+ this.abfsHttpClientFactory = abfsHttpClientFactory;
+ this.azureAtomicRenameDirSet = new HashSet<>(Arrays.asList(configurationService.getAzureAtomicRenameDirs().split(AbfsHttpConstants.COMMA)));
+ }
+
+ @Override
+ public Hashtable<String, String> getFilesystemProperties(final AzureBlobFileSystem azureBlobFileSystem)
+ throws AzureBlobFileSystemException{
+ final AbfsClient client = this.getOrCreateClient(azureBlobFileSystem);
+
+ this.LOG.debug(
+ "getFilesystemProperties for filesystem: {}",
+ client.getFileSystem());
+
+ final Hashtable<String, String> parsedXmsProperties;
+
+ final AbfsRestOperation op = client.getFilesystemProperties();
+ final String xMsProperties = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_PROPERTIES);
+
+ parsedXmsProperties = parseCommaSeparatedXmsProperties(xMsProperties);
+
+ return parsedXmsProperties;
+ }
+
+ @Override
+ public void setFilesystemProperties(final AzureBlobFileSystem azureBlobFileSystem, final Hashtable<String, String> properties) throws
+ AzureBlobFileSystemException {
+ if (properties == null || properties.size() == 0) {
+ return;
+ }
+
+ final AbfsClient client = this.getOrCreateClient(azureBlobFileSystem);
+
+ this.LOG.debug(
+ "setFilesystemProperties for filesystem: {} with properties: {}",
+ client.getFileSystem(),
+ properties);
+
+ final String commaSeparatedProperties;
+ try {
+ commaSeparatedProperties = convertXmsPropertiesToCommaSeparatedString(properties);
+ } catch (CharacterCodingException ex) {
+ throw new InvalidAbfsRestOperationException(ex);
+ }
+ client.setFilesystemProperties(commaSeparatedProperties);
+ }
+
+ @Override
+ public Hashtable<String, String> getPathProperties(final AzureBlobFileSystem azureBlobFileSystem, final Path path) throws
+ AzureBlobFileSystemException {
+ final AbfsClient client = this.getOrCreateClient(azureBlobFileSystem);
+
+ this.LOG.debug(
+ "getPathProperties for filesystem: {} path: {}",
+ client.getFileSystem(),
+ path.toString());
+
+ final Hashtable<String, String> parsedXmsProperties;
+ final AbfsRestOperation op = client.getPathProperties(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path));
+
+ final String xMsProperties = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_PROPERTIES);
+
+ parsedXmsProperties = parseCommaSeparatedXmsProperties(xMsProperties);
+
+ return parsedXmsProperties;
+ }
+
+ @Override
+ public void setPathProperties(final AzureBlobFileSystem azureBlobFileSystem, final Path path, final Hashtable<String,
+ String> properties) throws
+ AzureBlobFileSystemException {
+ final AbfsClient client = this.getOrCreateClient(azureBlobFileSystem);
+
+ this.LOG.debug(
+ "setFilesystemProperties for filesystem: {} path: {} with properties: {}",
+ client.getFileSystem(),
+ path.toString(),
+ properties);
+
+ final String commaSeparatedProperties;
+ try {
+ commaSeparatedProperties = convertXmsPropertiesToCommaSeparatedString(properties);
+ } catch (CharacterCodingException ex) {
+ throw new InvalidAbfsRestOperationException(ex);
+ }
+ client.setPathProperties("/" + getRelativePath(path), commaSeparatedProperties);
+ }
+
+ @Override
+ public void createFilesystem(final AzureBlobFileSystem azureBlobFileSystem) throws AzureBlobFileSystemException {
+ final AbfsClient client = this.getOrCreateClient(azureBlobFileSystem);
+
+ this.LOG.debug(
+ "createFilesystem for filesystem: {}",
+ client.getFileSystem());
+
+ client.createFilesystem();
+ }
+
+ @Override
+ public void deleteFilesystem(final AzureBlobFileSystem azureBlobFileSystem) throws AzureBlobFileSystemException {
+ final AbfsClient client = this.getOrCreateClient(azureBlobFileSystem);
+
+ this.LOG.debug(
+ "deleteFilesystem for filesystem: {}",
+ client.getFileSystem());
+
+ client.deleteFilesystem();
+ }
+
+ @Override
+ public OutputStream createFile(final AzureBlobFileSystem azureBlobFileSystem, final Path path, final boolean overwrite) throws
+ AzureBlobFileSystemException {
+ final AbfsClient client = this.getOrCreateClient(azureBlobFileSystem);
+
+ this.LOG.debug(
+ "createFile filesystem: {} path: {} overwrite: {}",
+ client.getFileSystem(),
+ path.toString(),
+ overwrite);
+
+ client.createPath(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path), true, overwrite);
+
+ final OutputStream outputStream;
+ outputStream = new FSDataOutputStream(
+ new AbfsOutputStream(client, AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path), 0,
+ configurationService.getWriteBufferSize()), null);
+ return outputStream;
+ }
+
+ @Override
+ public Void createDirectory(final AzureBlobFileSystem azureBlobFileSystem, final Path path) throws AzureBlobFileSystemException {
+ final AbfsClient client = this.getOrCreateClient(azureBlobFileSystem);
+
+ this.LOG.debug(
+ "createDirectory filesystem: {} path: {} overwrite: {}",
+ client.getFileSystem(),
+ path.toString());
+
+ client.createPath("/" + getRelativePath(path), false, true);
+
+ return null;
+ }
+
+ @Override
+ public InputStream openFileForRead(final AzureBlobFileSystem azureBlobFileSystem, final Path path,
+ final FileSystem.Statistics statistics) throws AzureBlobFileSystemException {
+ final AbfsClient client = getOrCreateClient(azureBlobFileSystem);
+
+ this.LOG.debug(
+ "openFileForRead filesystem: {} path: {}",
+ client.getFileSystem(),
+ path.toString());
+
+ final AbfsRestOperation op = client.getPathProperties(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path));
+
+ final String resourceType = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_RESOURCE_TYPE);
+ final long contentLength = Long.parseLong(op.getResult().getResponseHeader(HttpHeaderConfigurations.CONTENT_LENGTH));
+ final String eTag = op.getResult().getResponseHeader(HttpHeaderConfigurations.ETAG);
+
+ if (parseIsDirectory(resourceType)) {
+ throw new AbfsRestOperationException(
+ AzureServiceErrorCode.PATH_NOT_FOUND.getStatusCode(),
+ AzureServiceErrorCode.PATH_NOT_FOUND.getErrorCode(),
+ "openFileForRead must be used with files and not directories",
+ null);
+ }
+
+ // Add statistics for InputStream
+ return new FSDataInputStream(
+ new AbfsInputStream(client, statistics, AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path), contentLength,
+ configurationService.getReadBufferSize(), configurationService.getReadAheadQueueDepth(), eTag));
+ }
+
+ @Override
+ public OutputStream openFileForWrite(final AzureBlobFileSystem azureBlobFileSystem, final Path path, final boolean overwrite) throws
+ AzureBlobFileSystemException {
+ final AbfsClient client = getOrCreateClient(azureBlobFileSystem);
+
+ this.LOG.debug(
+ "openFileForWrite filesystem: {} path: {} overwrite: {}",
+ client.getFileSystem(),
+ path.toString(),
+ overwrite);
+
+ final AbfsRestOperation op = client.getPathProperties(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path));
+
+ final String resourceType = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_RESOURCE_TYPE);
+ final Long contentLength = Long.valueOf(op.getResult().getResponseHeader(HttpHeaderConfigurations.CONTENT_LENGTH));
+
+ if (parseIsDirectory(resourceType)) {
+ throw new AbfsRestOperationException(
+ AzureServiceErrorCode.PATH_NOT_FOUND.getStatusCode(),
+ AzureServiceErrorCode.PATH_NOT_FOUND.getErrorCode(),
+ "openFileForRead must be used with files and not directories",
+ null);
+ }
+
+ final long offset = overwrite ? 0 : contentLength;
+
+ final OutputStream outputStream;
+ outputStream = new FSDataOutputStream(
+ new AbfsOutputStream(client, AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path),
+ offset, configurationService.getWriteBufferSize()), null);
+ return outputStream;
+ }
+
+ @Override
+ public void rename(final AzureBlobFileSystem azureBlobFileSystem, final Path source, final Path destination) throws
+ AzureBlobFileSystemException {
+
+ if (isAtomicRenameKey(source.getName())) {
+ this.LOG.warn("The atomic rename feature is not supported by the ABFS scheme; however rename,"
+ +" create and delete operations are atomic if Namespace is enabled for your Azure Storage account.");
+ }
+
+ final AbfsClient client = this.getOrCreateClient(azureBlobFileSystem);
+
+ this.LOG.debug(
+ "renameAsync filesystem: {} source: {} destination: {}",
+ client.getFileSystem(),
+ source.toString(),
+ destination.toString());
+
+ String continuation = null;
+ long deadline = now() + RENAME_TIMEOUT_MILISECONDS;
+
+ do {
+ if (now() > deadline) {
+ LOG.debug(
+ "Rename {} to {} timed out.",
+ source,
+ destination);
+
+ throw new TimeoutException("Rename timed out.");
+ }
+
+ AbfsRestOperation op = client.renamePath(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(source),
+ AbfsHttpConstants.FORWARD_SLASH + getRelativePath(destination), continuation);
+ continuation = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_CONTINUATION);
+
+ } while (continuation != null && !continuation.isEmpty());
+ }
+
+ @Override
+ public void delete(final AzureBlobFileSystem azureBlobFileSystem, final Path path, final boolean recursive) throws
+ AzureBlobFileSystemException {
+ final AbfsClient client = this.getOrCreateClient(azureBlobFileSystem);
+
+ this.LOG.debug(
+ "delete filesystem: {} path: {} recursive: {}",
+ client.getFileSystem(),
+ path.toString(),
+ String.valueOf(recursive));
+
+ String continuation = null;
+ long deadline = now() + DELETE_DIRECTORY_TIMEOUT_MILISECONDS;
+
+ do {
+ if (now() > deadline) {
+ this.LOG.debug(
+ "Delete directory {} timed out.", path);
+
+ throw new TimeoutException("Delete directory timed out.");
+ }
+
+ AbfsRestOperation op = client.deletePath(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path), recursive, continuation);
+ continuation = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_CONTINUATION);
+
+ } while (continuation != null && !continuation.isEmpty());
+ }
+
+ @Override
+ public FileStatus getFileStatus(final AzureBlobFileSystem azureBlobFileSystem, final Path path) throws AzureBlobFileSystemException {
+ final AbfsClient client = this.getOrCreateClient(azureBlobFileSystem);
+
+ this.LOG.debug(
+ "getFileStatus filesystem: {} path: {}",
+ client.getFileSystem(),
+ path.toString());
+
+ if (path.isRoot()) {
+ AbfsRestOperation op = client.getFilesystemProperties();
+ final long blockSize = configurationService.getAzureBlockSize();
+ final String eTag = op.getResult().getResponseHeader(HttpHeaderConfigurations.ETAG);
+ final String lastModified = op.getResult().getResponseHeader(HttpHeaderConfigurations.LAST_MODIFIED);
+ return new VersionedFileStatus(
+ azureBlobFileSystem.getOwnerUser(),
+ azureBlobFileSystem.getOwnerUserPrimaryGroup(),
+ 0,
+ true,
+ 1,
+ blockSize,
+ parseLastModifiedTime(lastModified).getMillis(),
+ path,
+ eTag);
+ } else {
+ AbfsRestOperation op = client.getPathProperties(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path));
+
+ final long blockSize = configurationService.getAzureBlockSize();
+ final String eTag = op.getResult().getResponseHeader(HttpHeaderConfigurations.ETAG);
+ final String lastModified = op.getResult().getResponseHeader(HttpHeaderConfigurations.LAST_MODIFIED);
+ final String contentLength = op.getResult().getResponseHeader(HttpHeaderConfigurations.CONTENT_LENGTH);
+ final String resourceType = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_RESOURCE_TYPE);
+
+ return new VersionedFileStatus(
+ azureBlobFileSystem.getOwnerUser(),
+ azureBlobFileSystem.getOwnerUserPrimaryGroup(),
+ parseContentLength(contentLength),
+ parseIsDirectory(resourceType),
+ 1,
+ blockSize,
+ parseLastModifiedTime(lastModified).getMillis(),
+ path,
+ eTag);
+ }
+ }
+
+ @Override
+ public FileStatus[] listStatus(final AzureBlobFileSystem azureBlobFileSystem, final Path path) throws AzureBlobFileSystemException {
+ final AbfsClient client = this.getOrCreateClient(azureBlobFileSystem);
+
+ this.LOG.debug(
+ "listStatus filesystem: {} path: {}",
+ client.getFileSystem(),
+ path.toString());
+
+ String relativePath = path.isRoot() ? AbfsHttpConstants.EMPTY_STRING : getRelativePath(path);
+ String continuation = null;
+ ArrayList<FileStatus> fileStatuses = new ArrayList<>();
+
+ do {
+ AbfsRestOperation op = client.listPath(relativePath, false, LIST_MAX_RESULTS, continuation);
+ continuation = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_CONTINUATION);
+ ListResultSchema retrievedSchema = op.getResult().getListResultSchema();
+ if (retrievedSchema == null) {
+ throw new AbfsRestOperationException(
+ AzureServiceErrorCode.PATH_NOT_FOUND.getStatusCode(),
+ AzureServiceErrorCode.PATH_NOT_FOUND.getErrorCode(),
+ "listStatusAsync path not found",
+ null, op.getResult());
+ }
+
+ long blockSize = configurationService.getAzureBlockSize();
+
+ for (ListResultEntrySchema entry : retrievedSchema.paths()) {
+ long lastModifiedMillis = 0;
+ long contentLength = entry.contentLength() == null ? 0 : entry.contentLength();
+ boolean isDirectory = entry.isDirectory() == null ? false : entry.isDirectory();
+ if (entry.lastModified() != null && !entry.lastModified().isEmpty()) {
+ final DateTime dateTime = DateTime.parse(
+ entry.lastModified(),
+ DateTimeFormat.forPattern(DATE_TIME_PATTERN).withZoneUTC());
+ lastModifiedMillis = dateTime.getMillis();
+ }
+
+ fileStatuses.add(
+ new VersionedFileStatus(
+ azureBlobFileSystem.getOwnerUser(),
+ azureBlobFileSystem.getOwnerUserPrimaryGroup(),
+ contentLength,
+ isDirectory,
+ 1,
+ blockSize,
+ lastModifiedMillis,
+ azureBlobFileSystem.makeQualified(new Path(File.separator + entry.name())),
+ entry.eTag()));
+ }
+
+ } while (continuation != null && !continuation.isEmpty());
+
+ return fileStatuses.toArray(new FileStatus[0]);
+ }
+
+ @Override
+ public synchronized void closeFileSystem(final AzureBlobFileSystem azureBlobFileSystem) throws AzureBlobFileSystemException {
+ this.clientCache.remove(azureBlobFileSystem);
+ }
+
+ @Override
+ public boolean isAtomicRenameKey(String key) {
+ return isKeyForDirectorySet(key, azureAtomicRenameDirSet);
+ }
+
+ private String getRelativePath(final Path path) {
+ Preconditions.checkNotNull(path, "path");
+ final String relativePath = path.toUri().getPath();
+
+ if (relativePath.length() == 0) {
+ return relativePath;
+ }
+
+ if (relativePath.charAt(0) == Path.SEPARATOR_CHAR) {
+ if (relativePath.length() == 1) {
+ return AbfsHttpConstants.EMPTY_STRING;
+ }
+
+ return relativePath.substring(1);
+ }
+
+ return relativePath;
+ }
+
+ private synchronized AbfsClient getOrCreateClient(final AzureBlobFileSystem azureBlobFileSystem) throws
+ AzureBlobFileSystemException {
+ Preconditions.checkNotNull(azureBlobFileSystem, "azureBlobFileSystem");
+
+ AbfsClient client = this.clientCache.get(azureBlobFileSystem);
+
+ if (client != null) {
+ return client;
+ }
+
+ client = abfsHttpClientFactory.create(azureBlobFileSystem);
+ this.clientCache.put(
+ azureBlobFileSystem,
+ client);
+ return client;
+ }
+
+ private long parseContentLength(final String contentLength) {
+ if (contentLength == null) {
+ return -1;
+ }
+
+ return Long.parseLong(contentLength);
+ }
+
+ private boolean parseIsDirectory(final String resourceType) {
+ return resourceType == null ? false : resourceType.equalsIgnoreCase(AbfsHttpConstants.DIRECTORY);
+ }
+
+ private DateTime parseLastModifiedTime(final String lastModifiedTime) {
+ return DateTime.parse(
+ lastModifiedTime,
+ DateTimeFormat.forPattern(DATE_TIME_PATTERN).withZoneUTC());
+ }
+
+ private String convertXmsPropertiesToCommaSeparatedString(final Hashtable<String, String> properties) throws
+ CharacterCodingException {
+ StringBuilder commaSeparatedProperties = new StringBuilder();
+
+ final CharsetEncoder encoder = Charset.forName(XMS_PROPERTIES_ENCODING).newEncoder();
+
+ for (Map.Entry<String, String> propertyEntry : properties.entrySet()) {
+ String key = propertyEntry.getKey();
+ String value = propertyEntry.getValue();
+
+ Boolean canEncodeValue = encoder.canEncode(value);
+ if (!canEncodeValue) {
+ throw new CharacterCodingException();
+ }
+
+ String encodedPropertyValue = DatatypeConverter.printBase64Binary(encoder.encode(CharBuffer.wrap(value)).array());
+ commaSeparatedProperties.append(key)
+ .append(AbfsHttpConstants.EQUAL)
+ .append(encodedPropertyValue);
+
+ commaSeparatedProperties.append(AbfsHttpConstants.COMMA);
+ }
+
+ if (commaSeparatedProperties.length() != 0) {
+ commaSeparatedProperties.deleteCharAt(commaSeparatedProperties.length() - 1);
+ }
+
+ return commaSeparatedProperties.toString();
+ }
+
+ private Hashtable<String, String> parseCommaSeparatedXmsProperties(String xMsProperties) throws
+ InvalidFileSystemPropertyException, InvalidAbfsRestOperationException {
+ Hashtable<String, String> properties = new Hashtable<>();
+
+ final CharsetDecoder decoder = Charset.forName(XMS_PROPERTIES_ENCODING).newDecoder();
+
+ if (xMsProperties != null && !xMsProperties.isEmpty()) {
+ String[] userProperties = xMsProperties.split(AbfsHttpConstants.COMMA);
+
+ if (userProperties.length == 0) {
+ return properties;
+ }
+
+ for (String property : userProperties) {
+ if (property.isEmpty()) {
+ throw new InvalidFileSystemPropertyException(xMsProperties);
+ }
+
+ String[] nameValue = property.split(AbfsHttpConstants.EQUAL, 2);
+ if (nameValue.length != 2) {
+ throw new InvalidFileSystemPropertyException(xMsProperties);
+ }
+
+ byte[] decodedValue = DatatypeConverter.parseBase64Binary(nameValue[1]);
+
+ final String value;
+ try {
+ value = decoder.decode(ByteBuffer.wrap(decodedValue)).toString();
+ } catch (CharacterCodingException ex) {
+ throw new InvalidAbfsRestOperationException(ex);
+ }
+ properties.put(nameValue[0], value);
+ }
+ }
+
+ return properties;
+ }
+
+ private boolean isKeyForDirectorySet(String key, Set<String> dirSet) {
+ for (String dir : dirSet) {
+ if (dir.isEmpty() || key.startsWith(dir + AbfsHttpConstants.FORWARD_SLASH)) {
+ return true;
+ }
+
+ try {
+ URI uri = new URI(dir);
+ if (null == uri.getAuthority()) {
+ if (key.startsWith(dir + "/")){
+ return true;
+ }
+ }
+ } catch (URISyntaxException e) {
+ this.LOG.info("URI syntax error creating URI for {}", dir);
+ }
+ }
+
+ return false;
+ }
+
+ private static class VersionedFileStatus extends FileStatus {
+ private final String version;
+
+ VersionedFileStatus(
+ final String owner, final String group,
+ final long length, final boolean isdir, final int blockReplication,
+ final long blocksize, final long modificationTime, final Path path,
+ String version) {
+ super(length, isdir, blockReplication, blocksize, modificationTime, 0,
+ new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL),
+ owner,
+ group,
+ path);
+
+ this.version = version;
+ }
+
+ /** Compare if this object is equal to another object.
+ * @param obj the object to be compared.
+ * @return true if two file status has the same path name; false if not.
+ */
+ @Override
+ public boolean equals(Object obj) {
+ if (obj == this) {
+ return true;
+ }
+
+ if (obj == null) {
+ return false;
+ }
+
+ if (this.getClass() == obj.getClass()) {
+ VersionedFileStatus other = (VersionedFileStatus) obj;
+ return this.getPath().equals(other.getPath()) && this.version.equals(other.version);
+ }
+
+ return false;
+ }
+
+ /**
+ * Returns a hash code value for the object, which is defined as
+ * the hash code of the path name.
+ *
+ * @return a hash code value for the path name and version
+ */
+ @Override
+ public int hashCode() {
+ int hash = getPath().hashCode();
+ hash = 89 * hash + (this.version != null ? this.version.hashCode() : 0);
+ return hash;
+ }
+
+ /**
+ * Returns the version of this FileStatus
+ *
+ * @return a string value for the FileStatus version
+ */
+ public String getVersion() {
+ return this.version;
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fdf5f4c3/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java
new file mode 100644
index 0000000..6554380
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java
@@ -0,0 +1,382 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.EOFException;
+import java.io.IOException;
+
+import com.google.common.base.Preconditions;
+
+import org.apache.hadoop.fs.FSExceptionMessages;
+import org.apache.hadoop.fs.FSInputStream;
+import org.apache.hadoop.fs.FileSystem.Statistics;
+import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
+
+/**
+ * The AbfsInputStream for AbfsClient
+ */
+public class AbfsInputStream extends FSInputStream {
+ private final AbfsClient client;
+ private final Statistics statistics;
+ private final String path;
+ private final long contentLength;
+ private final int bufferSize; // default buffer size
+ private final int readAheadQueueDepth; // initialized in constructor
+ private final String eTag; // eTag of the path when InputStream are created
+ private final boolean tolerateOobAppends; // whether tolerate Oob Appends
+ private final boolean readAheadEnabled; // whether enable readAhead;
+
+ private byte[] buffer = null; // will be initialized on first use
+
+ private long fCursor = 0; // cursor of buffer within file - offset of next byte to read from remote server
+ private long fCursorAfterLastRead = -1;
+ private int bCursor = 0; // cursor of read within buffer - offset of next byte to be returned from buffer
+ private int limit = 0; // offset of next byte to be read into buffer from service (i.e., upper marker+1
+ // of valid bytes in buffer)
+ private boolean closed = false;
+
+ public AbfsInputStream(
+ final AbfsClient client,
+ final Statistics statistics,
+ final String path,
+ final long contentLength,
+ final int bufferSize,
+ final int readAheadQueueDepth,
+ final String eTag) {
+ super();
+ this.client = client;
+ this.statistics = statistics;
+ this.path = path;
+ this.contentLength = contentLength;
+ this.bufferSize = bufferSize;
+ this.readAheadQueueDepth = (readAheadQueueDepth >= 0) ? readAheadQueueDepth : 2 * Runtime.getRuntime().availableProcessors();
+ this.eTag = eTag;
+ this.tolerateOobAppends = false;
+ this.readAheadEnabled = true;
+ }
+
+ public String getPath() {
+ return path;
+ }
+
+ @Override
+ public int read() throws IOException {
+ byte[] b = new byte[1];
+ int numberOfBytesRead = read(b, 0, 1);
+ if (numberOfBytesRead < 0) {
+ return -1;
+ } else {
+ return (b[0] & 0xFF);
+ }
+ }
+
+ @Override
+ public synchronized int read(final byte[] b, final int off, final int len) throws IOException {
+ int currentOff = off;
+ int currentLen = len;
+ int lastReadBytes;
+ int totalReadBytes = 0;
+ do {
+ lastReadBytes = readOneBlock(b, currentOff, currentLen);
+ if (lastReadBytes > 0) {
+ currentOff += lastReadBytes;
+ currentLen -= lastReadBytes;
+ totalReadBytes += lastReadBytes;
+ }
+ if (currentLen <= 0 || currentLen > b.length - currentOff) {
+ break;
+ }
+ } while (lastReadBytes > 0);
+ return totalReadBytes > 0 ? totalReadBytes : lastReadBytes;
+ }
+
+ private int readOneBlock(final byte[] b, final int off, final int len) throws IOException {
+ if (closed) {
+ throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
+ }
+
+ Preconditions.checkNotNull(b);
+
+ if (len == 0) {
+ return 0;
+ }
+
+ if (this.available() == 0) {
+ return -1;
+ }
+
+ if (off < 0 || len < 0 || len > b.length - off) {
+ throw new IndexOutOfBoundsException();
+ }
+
+ //If buffer is empty, then fill the buffer.
+ if (bCursor == limit) {
+ //If EOF, then return -1
+ if (fCursor >= contentLength) {
+ return -1;
+ }
+
+ long bytesRead = 0;
+ //reset buffer to initial state - i.e., throw away existing data
+ bCursor = 0;
+ limit = 0;
+ if (buffer == null) {
+ buffer = new byte[bufferSize];
+ }
+
+ // Enable readAhead when reading sequentially
+ if (-1 == fCursorAfterLastRead || fCursorAfterLastRead == fCursor || b.length >= bufferSize) {
+ bytesRead = readInternal(fCursor, buffer, 0, bufferSize, false);
+ } else {
+ bytesRead = readInternal(fCursor, buffer, 0, b.length, true);
+ }
+
+ if (bytesRead == -1) {
+ return -1;
+ }
+
+ limit += bytesRead;
+ fCursor += bytesRead;
+ fCursorAfterLastRead = fCursor;
+ }
+
+ //If there is anything in the buffer, then return lesser of (requested bytes) and (bytes in buffer)
+ //(bytes returned may be less than requested)
+ int bytesRemaining = limit - bCursor;
+ int bytesToRead = Math.min(len, bytesRemaining);
+ System.arraycopy(buffer, bCursor, b, off, bytesToRead);
+ bCursor += bytesToRead;
+ if (statistics != null) {
+ statistics.incrementBytesRead(bytesToRead);
+ }
+ return bytesToRead;
+ }
+
+
+ private int readInternal(final long position, final byte[] b, final int offset, final int length,
+ final boolean bypassReadAhead) throws IOException {
+ if (readAheadEnabled && !bypassReadAhead) {
+ // try reading from read-ahead
+ if (offset != 0) {
+ throw new IllegalArgumentException("readahead buffers cannot have non-zero buffer offsets");
+ }
+ int receivedBytes;
+
+ // queue read-aheads
+ int numReadAheads = this.readAheadQueueDepth;
+ long nextSize;
+ long nextOffset = position;
+ while (numReadAheads > 0 && nextOffset < contentLength) {
+ nextSize = Math.min((long) bufferSize, contentLength - nextOffset);
+ ReadBufferManager.getBufferManager().queueReadAhead(this, nextOffset, (int) nextSize);
+ nextOffset = nextOffset + nextSize;
+ numReadAheads--;
+ }
+
+ // try reading from buffers first
+ receivedBytes = ReadBufferManager.getBufferManager().getBlock(this, position, length, b);
+ if (receivedBytes > 0) {
+ return receivedBytes;
+ }
+
+ // got nothing from read-ahead, do our own read now
+ receivedBytes = readRemote(position, b, offset, length);
+ return receivedBytes;
+ } else {
+ return readRemote(position, b, offset, length);
+ }
+ }
+
+ int readRemote(long position, byte[] b, int offset, int length) throws IOException {
+ if (position < 0) {
+ throw new IllegalArgumentException("attempting to read from negative offset");
+ }
+ if (position >= contentLength) {
+ return -1; // Hadoop prefers -1 to EOFException
+ }
+ if (b == null) {
+ throw new IllegalArgumentException("null byte array passed in to read() method");
+ }
+ if (offset >= b.length) {
+ throw new IllegalArgumentException("offset greater than length of array");
+ }
+ if (length < 0) {
+ throw new IllegalArgumentException("requested read length is less than zero");
+ }
+ if (length > (b.length - offset)) {
+ throw new IllegalArgumentException("requested read length is more than will fit after requested offset in buffer");
+ }
+ final AbfsRestOperation op;
+ try {
+ op = client.read(path, position, b, offset, length, tolerateOobAppends ? "*" : eTag);
+ } catch (AzureBlobFileSystemException ex) {
+ throw new IOException(ex);
+ }
+ long bytesRead = op.getResult().getBytesReceived();
+ if (bytesRead > Integer.MAX_VALUE) {
+ throw new IOException("Unexpected Content-Length");
+ }
+ return (int) bytesRead;
+ }
+
+ /**
+ * Seek to given position in stream.
+ * @param n position to seek to
+ * @throws IOException if there is an error
+ * @throws EOFException if attempting to seek past end of file
+ */
+ @Override
+ public synchronized void seek(long n) throws IOException {
+ if (closed) {
+ throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
+ }
+ if (n < 0) {
+ throw new EOFException(FSExceptionMessages.NEGATIVE_SEEK);
+ }
+ if (n > contentLength) {
+ throw new EOFException(FSExceptionMessages.CANNOT_SEEK_PAST_EOF);
+ }
+
+ if (n>=fCursor-limit && n<=fCursor) { // within buffer
+ bCursor = (int) (n-(fCursor-limit));
+ return;
+ }
+
+ // next read will read from here
+ fCursor = n;
+
+ //invalidate buffer
+ limit = 0;
+ bCursor = 0;
+ }
+
+ @Override
+ public synchronized long skip(long n) throws IOException {
+ if (closed) {
+ throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
+ }
+ long currentPos = getPos();
+ if (currentPos == contentLength) {
+ if (n > 0) {
+ throw new EOFException(FSExceptionMessages.CANNOT_SEEK_PAST_EOF);
+ }
+ }
+ long newPos = currentPos + n;
+ if (newPos < 0) {
+ newPos = 0;
+ n = newPos - currentPos;
+ }
+ if (newPos > contentLength) {
+ newPos = contentLength;
+ n = newPos - currentPos;
+ }
+ seek(newPos);
+ return n;
+ }
+
+ /**
+ * Return the size of the remaining available bytes
+ * if the size is less than or equal to {@link Integer#MAX_VALUE},
+ * otherwise, return {@link Integer#MAX_VALUE}.
+ *
+ * This is to match the behavior of DFSInputStream.available(),
+ * which some clients may rely on (HBase write-ahead log reading in
+ * particular).
+ */
+ @Override
+ public synchronized int available() throws IOException {
+ if (closed) {
+ throw new IOException(
+ FSExceptionMessages.STREAM_IS_CLOSED);
+ }
+ final long remaining = this.contentLength - this.getPos();
+ return remaining <= Integer.MAX_VALUE
+ ? (int) remaining : Integer.MAX_VALUE;
+ }
+
+ /**
+ * Returns the length of the file that this stream refers to. Note that the length returned is the length
+ * as of the time the Stream was opened. Specifically, if there have been subsequent appends to the file,
+ * they wont be reflected in the returned length.
+ *
+ * @return length of the file.
+ * @throws IOException if the stream is closed
+ */
+ public long length() throws IOException {
+ if (closed) {
+ throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
+ }
+ return contentLength;
+ }
+
+ /**
+ * Return the current offset from the start of the file
+ * @throws IOException throws {@link IOException} if there is an error
+ */
+ @Override
+ public synchronized long getPos() throws IOException {
+ if (closed) {
+ throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
+ }
+ return fCursor - limit + bCursor;
+ }
+
+ /**
+ * Seeks a different copy of the data. Returns true if
+ * found a new source, false otherwise.
+ * @throws IOException throws {@link IOException} if there is an error
+ */
+ @Override
+ public boolean seekToNewSource(long l) throws IOException {
+ return false;
+ }
+
+ @Override
+ public synchronized void close() throws IOException {
+ closed = true;
+ buffer = null; // de-reference the buffer so it can be GC'ed sooner
+ }
+
+ /**
+ * Not supported by this stream. Throws {@link UnsupportedOperationException}
+ * @param readlimit ignored
+ */
+ @Override
+ public synchronized void mark(int readlimit) {
+ throw new UnsupportedOperationException("mark()/reset() not supported on this stream");
+ }
+
+ /**
+ * Not supported by this stream. Throws {@link UnsupportedOperationException}
+ */
+ @Override
+ public synchronized void reset() throws IOException {
+ throw new UnsupportedOperationException("mark()/reset() not supported on this stream");
+ }
+
+ /**
+ * gets whether mark and reset are supported by {@code ADLFileInputStream}. Always returns false.
+ *
+ * @return always {@code false}
+ */
+ @Override
+ public boolean markSupported() {
+ return false;
+ }
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org