You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by tm...@apache.org on 2018/08/11 05:37:48 UTC

[44/50] [abbrv] hadoop git commit: HADOOP-15407. HADOOP-15540. Support Windows Azure Storage - Blob file system "ABFS" in Hadoop: Core Commit.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fdf5f4c3/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
new file mode 100644
index 0000000..c17a5c1
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
@@ -0,0 +1,402 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.UnsupportedEncodingException;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLEncoder;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Locale;
+
+import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
+import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidUriException;
+import org.apache.hadoop.fs.azurebfs.contracts.services.ConfigurationService;
+import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants;
+import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
+import org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * AbfsClient
+ */
+public class AbfsClient {
+  public static final Logger LOG = LoggerFactory.getLogger(AbfsClient.class);
+  private final URL baseUrl;
+  private final SharedKeyCredentials sharedKeyCredentials;
+  private final String xMsVersion = "2018-03-28";
+  private final ExponentialRetryPolicy retryPolicy;
+  private final String filesystem;
+  private final ConfigurationService configurationService;
+  private final String userAgent;
+
+  public AbfsClient(final URL baseUrl, final SharedKeyCredentials sharedKeyCredentials,
+                    final ConfigurationService configurationService,
+                    final ExponentialRetryPolicy exponentialRetryPolicy) {
+    this.baseUrl = baseUrl;
+    this.sharedKeyCredentials = sharedKeyCredentials;
+    String baseUrlString = baseUrl.toString();
+    this.filesystem = baseUrlString.substring(baseUrlString.lastIndexOf(AbfsHttpConstants.FORWARD_SLASH) + 1);
+    this.configurationService = configurationService;
+    this.retryPolicy = exponentialRetryPolicy;
+    this.userAgent = initializeUserAgent();
+  }
+
+  public String getFileSystem() {
+    return filesystem;
+  }
+
+  ExponentialRetryPolicy getRetryPolicy() {
+    return retryPolicy;
+  }
+
+  SharedKeyCredentials getSharedKeyCredentials() {
+    return sharedKeyCredentials;
+  }
+
+  List<AbfsHttpHeader> createDefaultHeaders() {
+    final List<AbfsHttpHeader> requestHeaders = new ArrayList<AbfsHttpHeader>();
+    requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.X_MS_VERSION, xMsVersion));
+    requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.ACCEPT, AbfsHttpConstants.APPLICATION_JSON
+            + AbfsHttpConstants.COMMA + AbfsHttpConstants.SINGLE_WHITE_SPACE + AbfsHttpConstants.APPLICATION_OCTET_STREAM));
+    requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.ACCEPT_CHARSET,
+            AbfsHttpConstants.UTF_8));
+    requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.CONTENT_TYPE, AbfsHttpConstants.EMPTY_STRING));
+    requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.USER_AGENT, userAgent));
+    return requestHeaders;
+  }
+
+  AbfsUriQueryBuilder createDefaultUriQueryBuilder() {
+    final AbfsUriQueryBuilder abfsUriQueryBuilder = new AbfsUriQueryBuilder();
+    abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_TIMEOUT, AbfsHttpConstants.DEFAULT_TIMEOUT);
+    return abfsUriQueryBuilder;
+  }
+
+  public AbfsRestOperation createFilesystem() throws AzureBlobFileSystemException {
+    final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
+
+    final AbfsUriQueryBuilder abfsUriQueryBuilder = new AbfsUriQueryBuilder();
+    abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_RESOURCE, AbfsHttpConstants.FILESYSTEM);
+
+    final URL url = createRequestUrl(abfsUriQueryBuilder.toString());
+    final AbfsRestOperation op = new AbfsRestOperation(
+            this,
+            AbfsHttpConstants.HTTP_METHOD_PUT,
+            url,
+            requestHeaders);
+    op.execute();
+    return op;
+  }
+
+  public AbfsRestOperation setFilesystemProperties(final String properties) throws AzureBlobFileSystemException {
+    final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
+    // JDK7 does not support PATCH, so to workaround the issue we will use
+    // PUT and specify the real method in the X-Http-Method-Override header.
+    requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.X_HTTP_METHOD_OVERRIDE,
+            AbfsHttpConstants.HTTP_METHOD_PATCH));
+
+    requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.X_MS_PROPERTIES,
+            properties));
+
+    final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
+    abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_RESOURCE, AbfsHttpConstants.FILESYSTEM);
+
+    final URL url = createRequestUrl(abfsUriQueryBuilder.toString());
+    final AbfsRestOperation op = new AbfsRestOperation(
+            this,
+            AbfsHttpConstants.HTTP_METHOD_PUT,
+            url,
+            requestHeaders);
+    op.execute();
+    return op;
+  }
+
+  public AbfsRestOperation listPath(final String relativePath, final boolean recursive, final int listMaxResults,
+                                    final String continuation) throws AzureBlobFileSystemException {
+    final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
+
+    final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
+    abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_RESOURCE, AbfsHttpConstants.FILESYSTEM);
+    abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_DIRECTORY, relativePath == null ? "" : urlEncode(relativePath));
+    abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_RECURSIVE, String.valueOf(recursive));
+    abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_CONTINUATION, continuation);
+    abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_MAXRESULTS, String.valueOf(listMaxResults));
+
+    final URL url = createRequestUrl(abfsUriQueryBuilder.toString());
+    final AbfsRestOperation op = new AbfsRestOperation(
+            this,
+            AbfsHttpConstants.HTTP_METHOD_GET,
+            url,
+            requestHeaders);
+    op.execute();
+    return op;
+  }
+
+  public AbfsRestOperation getFilesystemProperties() throws AzureBlobFileSystemException {
+    final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
+
+    final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
+    abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_RESOURCE, AbfsHttpConstants.FILESYSTEM);
+
+    final URL url = createRequestUrl(abfsUriQueryBuilder.toString());
+    final AbfsRestOperation op = new AbfsRestOperation(
+            this,
+            AbfsHttpConstants.HTTP_METHOD_HEAD,
+            url,
+            requestHeaders);
+    op.execute();
+    return op;
+  }
+
+  public AbfsRestOperation deleteFilesystem() throws AzureBlobFileSystemException {
+    final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
+
+    final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
+    abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_RESOURCE, AbfsHttpConstants.FILESYSTEM);
+
+    final URL url = createRequestUrl(abfsUriQueryBuilder.toString());
+    final AbfsRestOperation op = new AbfsRestOperation(
+            this,
+            AbfsHttpConstants.HTTP_METHOD_DELETE,
+            url,
+            requestHeaders);
+    op.execute();
+    return op;
+  }
+
+  public AbfsRestOperation createPath(final String path, final boolean isFile, final boolean overwrite)
+          throws AzureBlobFileSystemException {
+    final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
+    if (!overwrite) {
+      requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.IF_NONE_MATCH, "*"));
+    }
+
+    final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
+    abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_RESOURCE, isFile ? AbfsHttpConstants.FILE : AbfsHttpConstants.DIRECTORY);
+
+    final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
+    final AbfsRestOperation op = new AbfsRestOperation(
+            this,
+            AbfsHttpConstants.HTTP_METHOD_PUT,
+            url,
+            requestHeaders);
+    op.execute();
+    return op;
+  }
+
+  public AbfsRestOperation renamePath(final String source, final String destination, final String continuation)
+          throws AzureBlobFileSystemException {
+    final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
+
+    final String encodedRenameSource = urlEncode(AbfsHttpConstants.FORWARD_SLASH + this.getFileSystem() + source);
+    requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.X_MS_RENAME_SOURCE, encodedRenameSource));
+    requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.IF_NONE_MATCH, AbfsHttpConstants.STAR));
+
+    final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
+    abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_CONTINUATION, continuation);
+
+    final URL url = createRequestUrl(destination, abfsUriQueryBuilder.toString());
+    final AbfsRestOperation op = new AbfsRestOperation(
+            this,
+            AbfsHttpConstants.HTTP_METHOD_PUT,
+            url,
+            requestHeaders);
+    op.execute();
+    return op;
+  }
+
+  public AbfsRestOperation append(final String path, final long position, final byte[] buffer, final int offset,
+                                  final int length) throws AzureBlobFileSystemException {
+    final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
+    // JDK7 does not support PATCH, so to workaround the issue we will use
+    // PUT and specify the real method in the X-Http-Method-Override header.
+    requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.X_HTTP_METHOD_OVERRIDE,
+            AbfsHttpConstants.HTTP_METHOD_PATCH));
+
+    final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
+    abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_ACTION, AbfsHttpConstants.APPEND_ACTION);
+    abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_POSITION, Long.toString(position));
+
+    final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
+    final AbfsRestOperation op = new AbfsRestOperation(
+            this,
+            AbfsHttpConstants.HTTP_METHOD_PUT,
+            url,
+            requestHeaders, buffer, offset, length);
+    op.execute();
+    return op;
+  }
+
+
+  public AbfsRestOperation flush(final String path, final long position, boolean retainUncommittedData) throws AzureBlobFileSystemException {
+    final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
+    // JDK7 does not support PATCH, so to workaround the issue we will use
+    // PUT and specify the real method in the X-Http-Method-Override header.
+    requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.X_HTTP_METHOD_OVERRIDE,
+            AbfsHttpConstants.HTTP_METHOD_PATCH));
+
+    final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
+    abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_ACTION, AbfsHttpConstants.FLUSH_ACTION);
+    abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_POSITION, Long.toString(position));
+    abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_RETAIN_UNCOMMITTED_DATA, String.valueOf(retainUncommittedData));
+
+    final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
+    final AbfsRestOperation op = new AbfsRestOperation(
+            this,
+            AbfsHttpConstants.HTTP_METHOD_PUT,
+            url,
+            requestHeaders);
+    op.execute();
+    return op;
+  }
+
+  public AbfsRestOperation setPathProperties(final String path, final String properties) throws AzureBlobFileSystemException {
+    final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
+    // JDK7 does not support PATCH, so to workaround the issue we will use
+    // PUT and specify the real method in the X-Http-Method-Override header.
+    requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.X_HTTP_METHOD_OVERRIDE,
+            AbfsHttpConstants.HTTP_METHOD_PATCH));
+
+    requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.X_MS_PROPERTIES, properties));
+
+    final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
+    abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_ACTION, AbfsHttpConstants.SET_PROPERTIES_ACTION);
+
+    final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
+    final AbfsRestOperation op = new AbfsRestOperation(
+            this,
+            AbfsHttpConstants.HTTP_METHOD_PUT,
+            url,
+            requestHeaders);
+    op.execute();
+    return op;
+  }
+
+  public AbfsRestOperation getPathProperties(final String path) throws AzureBlobFileSystemException {
+    final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
+
+    final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
+
+    final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
+    final AbfsRestOperation op = new AbfsRestOperation(
+            this,
+            AbfsHttpConstants.HTTP_METHOD_HEAD,
+            url,
+            requestHeaders);
+    op.execute();
+    return op;
+  }
+
+  public AbfsRestOperation read(final String path, final long position, final byte[] buffer, final int bufferOffset,
+                                final int bufferLength, final String eTag) throws AzureBlobFileSystemException {
+    final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
+    requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.RANGE,
+            String.format("bytes=%d-%d", position, position + bufferLength - 1)));
+    requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.IF_MATCH, eTag));
+
+    final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
+
+    final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
+
+    final AbfsRestOperation op = new AbfsRestOperation(
+            this,
+            AbfsHttpConstants.HTTP_METHOD_GET,
+            url,
+            requestHeaders,
+            buffer,
+            bufferOffset,
+            bufferLength);
+    op.execute();
+
+    return op;
+  }
+
+  public AbfsRestOperation deletePath(final String path, final boolean recursive, final String continuation)
+          throws AzureBlobFileSystemException {
+    final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
+
+    final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
+    abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_RECURSIVE, String.valueOf(recursive));
+    abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_CONTINUATION, continuation);
+
+    final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
+    final AbfsRestOperation op = new AbfsRestOperation(
+            this,
+            AbfsHttpConstants.HTTP_METHOD_DELETE,
+            url,
+            requestHeaders);
+    op.execute();
+    return op;
+  }
+
+  private URL createRequestUrl(final String query) throws AzureBlobFileSystemException {
+    return createRequestUrl(AbfsHttpConstants.EMPTY_STRING, query);
+  }
+
+  private URL createRequestUrl(final String path, final String query)
+          throws AzureBlobFileSystemException {
+    final String base = baseUrl.toString();
+    String encodedPath = path;
+    try {
+      encodedPath = urlEncode(path);
+    } catch (AzureBlobFileSystemException ex) {
+      this.LOG.debug(
+              "Unexpected error.", ex);
+    }
+
+    final StringBuilder sb = new StringBuilder();
+    sb.append(base);
+    sb.append(encodedPath);
+    sb.append(query);
+
+    final URL url;
+    try {
+      url = new URL(sb.toString());
+    } catch (MalformedURLException ex) {
+      throw new InvalidUriException(sb.toString());
+    }
+    return url;
+  }
+
+  private static String urlEncode(final String value) throws AzureBlobFileSystemException {
+    String encodedString = null;
+    try {
+      encodedString =  URLEncoder.encode(value, AbfsHttpConstants.UTF_8)
+          .replace(AbfsHttpConstants.PLUS, AbfsHttpConstants.PLUS_ENCODE)
+          .replace(AbfsHttpConstants.FORWARD_SLASH_ENCODE, AbfsHttpConstants.FORWARD_SLASH);
+    } catch (UnsupportedEncodingException ex) {
+        throw new InvalidUriException(value);
+    }
+
+    return encodedString;
+  }
+
+  private String initializeUserAgent() {
+    final String userAgentComment = String.format(Locale.ROOT,
+            "(JavaJRE %s; %s %s)",
+            System.getProperty(AbfsHttpConstants.JAVA_VERSION),
+            System.getProperty(AbfsHttpConstants.OS_NAME)
+                    .replaceAll(AbfsHttpConstants.SINGLE_WHITE_SPACE, AbfsHttpConstants.EMPTY_STRING),
+            System.getProperty(AbfsHttpConstants.OS_VERSION));
+
+    return String.format(AbfsHttpConstants.CLIENT_VERSION + " %s", userAgentComment);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fdf5f4c3/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpClientFactoryImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpClientFactoryImpl.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpClientFactoryImpl.java
new file mode 100644
index 0000000..9e4c27b
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpClientFactoryImpl.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.net.MalformedURLException;
+import java.net.URI;
+import java.net.URL;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;
+import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants;
+import org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes;
+import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
+import org.apache.hadoop.fs.azurebfs.contracts.services.AbfsHttpClientFactory;
+import org.apache.hadoop.fs.azurebfs.contracts.services.ConfigurationService;
+import org.apache.http.client.utils.URIBuilder;
+import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidUriAuthorityException;
+import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidUriException;
+
+@Singleton
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+class AbfsHttpClientFactoryImpl implements AbfsHttpClientFactory {
+  private final ConfigurationService configurationService;
+
+  @Inject
+  AbfsHttpClientFactoryImpl(
+      final ConfigurationService configurationService) {
+
+    Preconditions.checkNotNull(configurationService, "configurationService");
+
+    this.configurationService = configurationService;
+  }
+
+  @VisibleForTesting
+  URIBuilder getURIBuilder(final String hostName, final FileSystem fs) {
+    final AzureBlobFileSystem abfs = (AzureBlobFileSystem) fs;
+
+    String scheme = FileSystemUriSchemes.HTTP_SCHEME;
+
+    if (abfs.isSecure()) {
+      scheme = FileSystemUriSchemes.HTTPS_SCHEME;
+    }
+
+    final URIBuilder uriBuilder = new URIBuilder();
+    uriBuilder.setScheme(scheme);
+    uriBuilder.setHost(hostName);
+
+    return uriBuilder;
+  }
+
+  public AbfsClient create(final AzureBlobFileSystem fs) throws AzureBlobFileSystemException {
+    final URI uri = fs.getUri();
+    final String authority = uri.getRawAuthority();
+    if (null == authority) {
+      throw new InvalidUriAuthorityException(uri.toString());
+    }
+
+    if (!authority.contains(AbfsHttpConstants.AZURE_DISTRIBUTED_FILE_SYSTEM_AUTHORITY_DELIMITER)) {
+      throw new InvalidUriAuthorityException(uri.toString());
+    }
+
+    final String[] authorityParts = authority.split(AbfsHttpConstants.AZURE_DISTRIBUTED_FILE_SYSTEM_AUTHORITY_DELIMITER, 2);
+
+    if (authorityParts.length < 2 || "".equals(authorityParts[0])) {
+      final String errMsg = String
+          .format("URI '%s' has a malformed authority, expected container name. "
+                  + "Authority takes the form "+ FileSystemUriSchemes.ABFS_SCHEME + "://[<container name>@]<account name>",
+              uri.toString());
+      throw new InvalidUriException(errMsg);
+    }
+
+    final String fileSystemName = authorityParts[0];
+    final String accountName = authorityParts[1];
+
+    final URIBuilder uriBuilder = getURIBuilder(accountName, fs);
+
+    final String url = uriBuilder.toString() + AbfsHttpConstants.FORWARD_SLASH + fileSystemName;
+
+    URL baseUrl;
+    try {
+      baseUrl = new URL(url);
+    } catch (MalformedURLException e) {
+      throw new InvalidUriException(String.format("URI '%s' is malformed", uri.toString()));
+    }
+
+    SharedKeyCredentials creds =
+        new SharedKeyCredentials(accountName.substring(0, accountName.indexOf(AbfsHttpConstants.DOT)),
+                this.configurationService.getStorageAccountKey(accountName));
+
+    return new AbfsClient(baseUrl, creds, configurationService, new ExponentialRetryPolicy());
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fdf5f4c3/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpHeader.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpHeader.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpHeader.java
new file mode 100644
index 0000000..46b4c6d
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpHeader.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+/**
+ * The Http Request / Response Headers for Rest AbfsClient
+ */
+public class AbfsHttpHeader {
+  private final String name;
+  private final String value;
+
+  public AbfsHttpHeader(final String name, final String value) {
+    this.name = name;
+    this.value = value;
+  }
+
+  public String getName() {
+    return name;
+  }
+
+  public String getValue() {
+    return value;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fdf5f4c3/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java
new file mode 100644
index 0000000..0ea9365
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java
@@ -0,0 +1,430 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.util.List;
+import java.util.UUID;
+
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonToken;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants;
+import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
+import org.apache.hadoop.fs.azurebfs.contracts.services.ListResultSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Represents an HTTP operation.
+ */
+public class AbfsHttpOperation {
+  private static final Logger LOG = LoggerFactory.getLogger(AbfsHttpOperation.class);
+
+  private static final int CONNECT_TIMEOUT = 30 * 1000;
+  private static final int READ_TIMEOUT = 30 * 1000;
+
+  private static final int CLEAN_UP_BUFFER_SIZE = 64 * 1024;
+
+  private static final int ONE_THOUSAND = 1000;
+  private static final int ONE_MILLION = ONE_THOUSAND * ONE_THOUSAND;
+
+  private final String method;
+  private final URL url;
+
+  private HttpURLConnection connection;
+  private int statusCode;
+  private String statusDescription;
+  private String storageErrorCode = "";
+  private String storageErrorMessage  = "";
+  private String clientRequestId = "";
+  private String requestId  = "";
+  private ListResultSchema listResultSchema = null;
+
+  // metrics
+  private int bytesSent;
+  private long bytesReceived;
+
+  // optional trace enabled metrics
+  private final boolean isTraceEnabled;
+  private long connectionTimeMs;
+  private long sendRequestTimeMs;
+  private long recvResponseTimeMs;
+
+  protected  HttpURLConnection getConnection() {
+    return connection;
+  }
+
+  public String getMethod() {
+    return method;
+  }
+
+  public URL getUrl() {
+    return url;
+  }
+
+  public int getStatusCode() {
+    return statusCode;
+  }
+
+  public String getStatusDescription() {
+    return statusDescription;
+  }
+
+  public String getStorageErrorCode() {
+    return storageErrorCode;
+  }
+
+  public String getStorageErrorMessage() {
+    return storageErrorMessage;
+  }
+
+  public String getClientRequestId() {
+    return clientRequestId;
+  }
+
+  public String getRequestId() {
+    return requestId;
+  }
+
+  public int getBytesSent() {
+    return bytesSent;
+  }
+
+  public long getBytesReceived() {
+    return bytesReceived;
+  }
+
+  public ListResultSchema getListResultSchema() {
+    return listResultSchema;
+  }
+
+  public String getResponseHeader(String httpHeader) {
+    return connection.getHeaderField(httpHeader);
+  }
+
+  // Returns a trace message for the request
+  @Override
+  public String toString() {
+    final String urlStr = url.toString();
+    final StringBuilder sb = new StringBuilder();
+    sb.append(statusCode);
+    sb.append(",");
+    sb.append(storageErrorCode);
+    sb.append(",cid=");
+    sb.append(clientRequestId);
+    sb.append(",rid=");
+    sb.append(requestId);
+    if (isTraceEnabled) {
+      sb.append(",connMs=");
+      sb.append(connectionTimeMs);
+      sb.append(",sendMs=");
+      sb.append(sendRequestTimeMs);
+      sb.append(",recvMs=");
+      sb.append(recvResponseTimeMs);
+    }
+    sb.append(",sent=");
+    sb.append(bytesSent);
+    sb.append(",recv=");
+    sb.append(bytesReceived);
+    sb.append(",");
+    sb.append(method);
+    sb.append(",");
+    sb.append(urlStr);
+    return sb.toString();
+  }
+
+  /**
+   * Initializes a new HTTP request and opens the connection.
+   *
+   * @param url The full URL including query string parameters.
+   * @param method The HTTP method (PUT, PATCH, POST, GET, HEAD, or DELETE).
+   * @param requestHeaders The HTTP request headers.READ_TIMEOUT
+   *
+   * @throws IOException if an error occurs.
+   */
+  public AbfsHttpOperation(final URL url, final String method, final List<AbfsHttpHeader> requestHeaders)
+      throws IOException {
+    this.isTraceEnabled = this.LOG.isTraceEnabled();
+    this.url = url;
+    this.method = method;
+    this.clientRequestId = UUID.randomUUID().toString();
+
+    this.connection = openConnection();
+
+    this.connection.setConnectTimeout(CONNECT_TIMEOUT);
+    this.connection.setReadTimeout(READ_TIMEOUT);
+
+    this.connection.setRequestMethod(method);
+
+    for (AbfsHttpHeader header : requestHeaders) {
+      this.connection.setRequestProperty(header.getName(), header.getValue());
+    }
+
+    this.connection.setRequestProperty(HttpHeaderConfigurations.X_MS_CLIENT_REQUEST_ID, clientRequestId);
+  }
+
+  /**
+   * Sends the HTTP request.  Note that HttpUrlConnection requires that an
+   * empty buffer be sent in order to set the "Content-Length: 0" header, which
+   * is required by our endpoint.
+   *
+   * @param buffer the request entity body.
+   * @param offset an offset into the buffer where the data beings.
+   * @param length the length of the data in the buffer.
+   *
+   * @throws IOException if an error occurs.
+   */
+  public void sendRequest(byte[] buffer, int offset, int length) throws IOException {
+    this.connection.setDoOutput(true);
+    this.connection.setFixedLengthStreamingMode(length);
+    if (buffer == null) {
+      // An empty buffer is sent to set the "Content-Length: 0" header, which
+      // is required by our endpoint.
+      buffer = new byte[]{};
+      offset = 0;
+      length = 0;
+    }
+
+    // send the request body
+
+    long startTime = 0;
+    if (this.isTraceEnabled) {
+      startTime = System.nanoTime();
+    }
+    try (OutputStream outputStream = this.connection.getOutputStream()) {
+      // update bytes sent before they are sent so we may observe
+      // attempted sends as well as successful sends via the
+      // accompanying statusCode
+      this.bytesSent = length;
+      outputStream.write(buffer, offset, length);
+    } finally {
+      if (this.isTraceEnabled) {
+        this.sendRequestTimeMs = elapsedTimeMs(startTime);
+      }
+    }
+  }
+
+  /**
+   * Gets and processes the HTTP response.
+   *
+   * @throws IOException if an error occurs.
+   */
+  public void processResponse(final byte[] buffer, final int offset, final int length) throws IOException {
+
+    // get the response
+    long startTime = 0;
+    if (this.isTraceEnabled) {
+      startTime = System.nanoTime();
+    }
+
+    this.statusCode = this.connection.getResponseCode();
+
+    if (this.isTraceEnabled) {
+      this.recvResponseTimeMs = elapsedTimeMs(startTime);
+    }
+
+    this.statusDescription = this.connection.getResponseMessage();
+
+    this.requestId = this.connection.getHeaderField(HttpHeaderConfigurations.X_MS_REQUEST_ID);
+    if (this.requestId == null) {
+      this.requestId = AbfsHttpConstants.EMPTY_STRING;
+    }
+
+    if (AbfsHttpConstants.HTTP_METHOD_HEAD.equals(this.method)) {
+      // If it is HEAD, and it is ERROR
+      return;
+    }
+
+    if (this.isTraceEnabled) {
+      startTime = System.nanoTime();
+    }
+
+    if (statusCode >= HttpURLConnection.HTTP_BAD_REQUEST) {
+      processStorageErrorResponse();
+      if (this.isTraceEnabled) {
+        this.recvResponseTimeMs += elapsedTimeMs(startTime);
+      }
+      this.bytesReceived = this.connection.getHeaderFieldLong(HttpHeaderConfigurations.CONTENT_LENGTH, 0);
+    } else {
+      // consume the input stream to release resources
+      int totalBytesRead = 0;
+
+      try (InputStream stream = this.connection.getInputStream()) {
+        if (isNullInputStream(stream)) {
+          return;
+        }
+        boolean endOfStream = false;
+
+        // this is a list operation and need to retrieve the data
+        // need a better solution
+        if (AbfsHttpConstants.HTTP_METHOD_GET.equals(this.method) && buffer == null) {
+          parseListFilesResponse(stream);
+        } else {
+          if (buffer != null) {
+            while (totalBytesRead < length) {
+              int bytesRead = stream.read(buffer, offset + totalBytesRead, length - totalBytesRead);
+              if (bytesRead == -1) {
+                endOfStream = true;
+                break;
+              }
+              totalBytesRead += bytesRead;
+            }
+          }
+          if (!endOfStream && stream.read() != -1) {
+            // read and discard
+            int bytesRead = 0;
+            byte[] b = new byte[CLEAN_UP_BUFFER_SIZE];
+            while ((bytesRead = stream.read(b)) >= 0) {
+              totalBytesRead += bytesRead;
+            }
+          }
+        }
+      } catch (IOException ex) {
+        this.LOG.error("UnexpectedError: ", ex);
+        throw ex;
+      } finally {
+        if (this.isTraceEnabled) {
+          this.recvResponseTimeMs += elapsedTimeMs(startTime);
+        }
+        this.bytesReceived = totalBytesRead;
+      }
+    }
+  }
+
+
+  /**
+   * Open the HTTP connection.
+   *
+   * @throws IOException if an error occurs.
+   */
+  private HttpURLConnection openConnection() throws IOException {
+    if (!isTraceEnabled) {
+      return (HttpURLConnection) url.openConnection();
+    }
+    long start = System.nanoTime();
+    try {
+      return (HttpURLConnection) url.openConnection();
+    } finally {
+      connectionTimeMs = elapsedTimeMs(start);
+    }
+  }
+
+  /**
+   * When the request fails, this function is used to parse the responseAbfsHttpClient.LOG.debug("ExpectedError: ", ex);
+   * and extract the storageErrorCode and storageErrorMessage.  Any errors
+   * encountered while attempting to process the error response are logged,
+   * but otherwise ignored.
+   *
+   * For storage errors, the response body *usually* has the following format:
+   *
+   * {
+   *   "error":
+   *   {
+   *     "code": "string",
+   *     "message": "string"
+   *   }
+   * }
+   *
+   */
+  private void processStorageErrorResponse() {
+    try (InputStream stream = connection.getErrorStream()) {
+      if (stream == null) {
+        return;
+      }
+      JsonFactory jf = new JsonFactory();
+      try (JsonParser jp = jf.createParser(stream)) {
+        String fieldName, fieldValue;
+        jp.nextToken();  // START_OBJECT - {
+        jp.nextToken();  // FIELD_NAME - "error":
+        jp.nextToken();  // START_OBJECT - {
+        jp.nextToken();
+        while (jp.hasCurrentToken()) {
+          if (jp.getCurrentToken() == JsonToken.FIELD_NAME) {
+            fieldName = jp.getCurrentName();
+            jp.nextToken();
+            fieldValue = jp.getText();
+            switch (fieldName) {
+              case "code":
+                storageErrorCode = fieldValue;
+                break;
+              case "message":
+                storageErrorMessage = fieldValue;
+                break;
+              default:
+                break;
+            }
+          }
+          jp.nextToken();
+        }
+      }
+    } catch (IOException ex) {
+      // Ignore errors that occur while attempting to parse the storage
+      // error, since the response may have been handled by the HTTP driver
+      // or for other reasons have an unexpected
+      this.LOG.debug("ExpectedError: ", ex);
+    }
+  }
+
+  /**
+   * Returns the elapsed time in milliseconds.
+   */
+  private long elapsedTimeMs(final long startTime) {
+    return (System.nanoTime() - startTime) / ONE_MILLION;
+  }
+
+  /**
+   * Parse the list file response
+   *
+   * @param stream InputStream contains the list results.
+   * @throws IOException
+   */
+  private void parseListFilesResponse(final InputStream stream) throws IOException {
+    if (stream == null) {
+      return;
+    }
+
+    if (listResultSchema != null) {
+      // already parse the response
+      return;
+    }
+
+    try {
+      final ObjectMapper objectMapper = new ObjectMapper();
+      this.listResultSchema = objectMapper.readValue(stream, ListResultSchema.class);
+    } catch (IOException ex) {
+      this.LOG.error("Unable to deserialize list results", ex);
+      throw ex;
+    }
+  }
+
+  /**
+   * Check null stream, this is to pass findbugs's redundant check for NULL
+   * @param stream InputStream
+   */
+  private boolean isNullInputStream(InputStream stream) {
+    return stream == null ? true : false;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fdf5f4c3/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpServiceImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpServiceImpl.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpServiceImpl.java
new file mode 100644
index 0000000..06e1a8a
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpServiceImpl.java
@@ -0,0 +1,693 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import javax.xml.bind.DatatypeConverter;
+import java.io.File;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.ByteBuffer;
+import java.nio.CharBuffer;
+import java.nio.charset.CharacterCodingException;
+import java.nio.charset.Charset;
+import java.nio.charset.CharsetDecoder;
+import java.nio.charset.CharsetEncoder;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Set;
+import java.util.HashSet;
+import java.util.Hashtable;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import com.google.common.base.Preconditions;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
+import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidAbfsRestOperationException;
+import org.joda.time.DateTime;
+import org.joda.time.format.DateTimeFormat;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+
+import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants;
+import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
+import org.apache.hadoop.fs.azurebfs.contracts.exceptions.TimeoutException;
+import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidFileSystemPropertyException;
+import org.apache.hadoop.fs.azurebfs.contracts.services.AbfsHttpService;
+import org.apache.hadoop.fs.azurebfs.contracts.services.AbfsHttpClientFactory;
+import org.apache.hadoop.fs.azurebfs.contracts.services.ConfigurationService;
+import org.apache.hadoop.fs.azurebfs.contracts.services.TracingService;
+import org.apache.hadoop.fs.azurebfs.contracts.services.ListResultEntrySchema;
+import org.apache.hadoop.fs.azurebfs.contracts.services.ListResultSchema;
+import org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;
+import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.hadoop.util.Time.now;
+
+@Singleton
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+final class AbfsHttpServiceImpl implements AbfsHttpService {
+  public static final Logger LOG = LoggerFactory.getLogger(AbfsHttpService.class);
+  private static final String DATE_TIME_PATTERN = "E, dd MMM yyyy HH:mm:ss 'GMT'";
+  private static final String XMS_PROPERTIES_ENCODING = "ISO-8859-1";
+  private static final int LIST_MAX_RESULTS = 5000;
+  private static final int DELETE_DIRECTORY_TIMEOUT_MILISECONDS = 180000;
+  private static final int RENAME_TIMEOUT_MILISECONDS = 180000;
+
+  private final AbfsHttpClientFactory abfsHttpClientFactory;
+  private final ConcurrentHashMap<AzureBlobFileSystem, AbfsClient> clientCache;
+  private final ConfigurationService configurationService;
+  private final Set<String> azureAtomicRenameDirSet;
+
+  @Inject
+  AbfsHttpServiceImpl(
+      final ConfigurationService configurationService,
+      final AbfsHttpClientFactory abfsHttpClientFactory,
+      final TracingService tracingService) {
+    Preconditions.checkNotNull(abfsHttpClientFactory, "abfsHttpClientFactory");
+    Preconditions.checkNotNull(configurationService, "configurationService");
+    Preconditions.checkNotNull(tracingService, "tracingService");
+
+    this.configurationService = configurationService;
+    this.clientCache = new ConcurrentHashMap<>();
+    this.abfsHttpClientFactory = abfsHttpClientFactory;
+    this.azureAtomicRenameDirSet = new HashSet<>(Arrays.asList(configurationService.getAzureAtomicRenameDirs().split(AbfsHttpConstants.COMMA)));
+  }
+
+  @Override
+  public Hashtable<String, String> getFilesystemProperties(final AzureBlobFileSystem azureBlobFileSystem)
+      throws AzureBlobFileSystemException{
+    final AbfsClient client = this.getOrCreateClient(azureBlobFileSystem);
+
+    this.LOG.debug(
+        "getFilesystemProperties for filesystem: {}",
+        client.getFileSystem());
+
+    final Hashtable<String, String> parsedXmsProperties;
+
+    final AbfsRestOperation op = client.getFilesystemProperties();
+    final String xMsProperties = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_PROPERTIES);
+
+    parsedXmsProperties = parseCommaSeparatedXmsProperties(xMsProperties);
+
+    return parsedXmsProperties;
+  }
+
+  @Override
+  public void setFilesystemProperties(final AzureBlobFileSystem azureBlobFileSystem, final Hashtable<String, String> properties) throws
+      AzureBlobFileSystemException {
+    if (properties == null || properties.size() == 0) {
+      return;
+    }
+
+    final AbfsClient client = this.getOrCreateClient(azureBlobFileSystem);
+
+    this.LOG.debug(
+        "setFilesystemProperties for filesystem: {} with properties: {}",
+        client.getFileSystem(),
+        properties);
+
+    final String commaSeparatedProperties;
+    try {
+      commaSeparatedProperties = convertXmsPropertiesToCommaSeparatedString(properties);
+    } catch (CharacterCodingException ex) {
+      throw new InvalidAbfsRestOperationException(ex);
+    }
+    client.setFilesystemProperties(commaSeparatedProperties);
+  }
+
+  @Override
+  public Hashtable<String, String> getPathProperties(final AzureBlobFileSystem azureBlobFileSystem, final Path path) throws
+      AzureBlobFileSystemException {
+    final AbfsClient client = this.getOrCreateClient(azureBlobFileSystem);
+
+    this.LOG.debug(
+        "getPathProperties for filesystem: {} path: {}",
+        client.getFileSystem(),
+        path.toString());
+
+    final Hashtable<String, String> parsedXmsProperties;
+    final AbfsRestOperation op = client.getPathProperties(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path));
+
+    final String xMsProperties = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_PROPERTIES);
+
+    parsedXmsProperties = parseCommaSeparatedXmsProperties(xMsProperties);
+
+    return parsedXmsProperties;
+  }
+
+  @Override
+  public void setPathProperties(final AzureBlobFileSystem azureBlobFileSystem, final Path path, final Hashtable<String,
+      String> properties) throws
+      AzureBlobFileSystemException {
+    final AbfsClient client = this.getOrCreateClient(azureBlobFileSystem);
+
+    this.LOG.debug(
+        "setFilesystemProperties for filesystem: {} path: {} with properties: {}",
+        client.getFileSystem(),
+        path.toString(),
+        properties);
+
+    final String commaSeparatedProperties;
+    try {
+      commaSeparatedProperties = convertXmsPropertiesToCommaSeparatedString(properties);
+    } catch (CharacterCodingException ex) {
+      throw new InvalidAbfsRestOperationException(ex);
+    }
+    client.setPathProperties("/" + getRelativePath(path), commaSeparatedProperties);
+  }
+
+  @Override
+  public void createFilesystem(final AzureBlobFileSystem azureBlobFileSystem) throws AzureBlobFileSystemException {
+    final AbfsClient client = this.getOrCreateClient(azureBlobFileSystem);
+
+    this.LOG.debug(
+        "createFilesystem for filesystem: {}",
+        client.getFileSystem());
+
+    client.createFilesystem();
+  }
+
+  @Override
+  public void deleteFilesystem(final AzureBlobFileSystem azureBlobFileSystem) throws AzureBlobFileSystemException {
+    final AbfsClient client = this.getOrCreateClient(azureBlobFileSystem);
+
+    this.LOG.debug(
+        "deleteFilesystem for filesystem: {}",
+        client.getFileSystem());
+
+    client.deleteFilesystem();
+  }
+
+  @Override
+  public OutputStream createFile(final AzureBlobFileSystem azureBlobFileSystem, final Path path, final boolean overwrite) throws
+      AzureBlobFileSystemException {
+    final AbfsClient client = this.getOrCreateClient(azureBlobFileSystem);
+
+    this.LOG.debug(
+        "createFile filesystem: {} path: {} overwrite: {}",
+        client.getFileSystem(),
+        path.toString(),
+        overwrite);
+
+    client.createPath(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path), true, overwrite);
+
+    final OutputStream outputStream;
+    outputStream = new FSDataOutputStream(
+        new AbfsOutputStream(client, AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path), 0,
+            configurationService.getWriteBufferSize()), null);
+    return outputStream;
+  }
+
+  @Override
+  public Void createDirectory(final AzureBlobFileSystem azureBlobFileSystem, final Path path) throws AzureBlobFileSystemException {
+    final AbfsClient client = this.getOrCreateClient(azureBlobFileSystem);
+
+    this.LOG.debug(
+        "createDirectory filesystem: {} path: {} overwrite: {}",
+        client.getFileSystem(),
+        path.toString());
+
+    client.createPath("/" + getRelativePath(path), false, true);
+
+    return null;
+  }
+
+  @Override
+  public InputStream openFileForRead(final AzureBlobFileSystem azureBlobFileSystem, final Path path,
+      final FileSystem.Statistics statistics) throws AzureBlobFileSystemException {
+    final AbfsClient client = getOrCreateClient(azureBlobFileSystem);
+
+    this.LOG.debug(
+        "openFileForRead filesystem: {} path: {}",
+        client.getFileSystem(),
+        path.toString());
+
+    final AbfsRestOperation op = client.getPathProperties(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path));
+
+    final String resourceType = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_RESOURCE_TYPE);
+    final long contentLength = Long.parseLong(op.getResult().getResponseHeader(HttpHeaderConfigurations.CONTENT_LENGTH));
+    final String eTag = op.getResult().getResponseHeader(HttpHeaderConfigurations.ETAG);
+
+    if (parseIsDirectory(resourceType)) {
+      throw new AbfsRestOperationException(
+          AzureServiceErrorCode.PATH_NOT_FOUND.getStatusCode(),
+          AzureServiceErrorCode.PATH_NOT_FOUND.getErrorCode(),
+          "openFileForRead must be used with files and not directories",
+          null);
+    }
+
+    // Add statistics for InputStream
+    return new FSDataInputStream(
+        new AbfsInputStream(client, statistics, AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path), contentLength,
+            configurationService.getReadBufferSize(), configurationService.getReadAheadQueueDepth(), eTag));
+  }
+
+  @Override
+  public OutputStream openFileForWrite(final AzureBlobFileSystem azureBlobFileSystem, final Path path, final boolean overwrite) throws
+      AzureBlobFileSystemException {
+    final AbfsClient client = getOrCreateClient(azureBlobFileSystem);
+
+    this.LOG.debug(
+        "openFileForWrite filesystem: {} path: {} overwrite: {}",
+        client.getFileSystem(),
+        path.toString(),
+        overwrite);
+
+    final AbfsRestOperation op = client.getPathProperties(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path));
+
+    final String resourceType = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_RESOURCE_TYPE);
+    final Long contentLength = Long.valueOf(op.getResult().getResponseHeader(HttpHeaderConfigurations.CONTENT_LENGTH));
+
+    if (parseIsDirectory(resourceType)) {
+      throw new AbfsRestOperationException(
+          AzureServiceErrorCode.PATH_NOT_FOUND.getStatusCode(),
+          AzureServiceErrorCode.PATH_NOT_FOUND.getErrorCode(),
+          "openFileForRead must be used with files and not directories",
+          null);
+    }
+
+    final long offset = overwrite ? 0 : contentLength;
+
+    final OutputStream outputStream;
+    outputStream = new FSDataOutputStream(
+        new AbfsOutputStream(client, AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path),
+            offset, configurationService.getWriteBufferSize()), null);
+    return outputStream;
+  }
+
+  @Override
+  public void rename(final AzureBlobFileSystem azureBlobFileSystem, final Path source, final Path destination) throws
+      AzureBlobFileSystemException {
+
+    if (isAtomicRenameKey(source.getName())) {
+      this.LOG.warn("The atomic rename feature is not supported by the ABFS scheme; however rename,"
+          +" create and delete operations are atomic if Namespace is enabled for your Azure Storage account.");
+    }
+
+    final AbfsClient client = this.getOrCreateClient(azureBlobFileSystem);
+
+    this.LOG.debug(
+        "renameAsync filesystem: {} source: {} destination: {}",
+        client.getFileSystem(),
+        source.toString(),
+        destination.toString());
+
+    String continuation = null;
+    long deadline = now() + RENAME_TIMEOUT_MILISECONDS;
+
+    do {
+      if (now() > deadline) {
+        LOG.debug(
+            "Rename {} to {} timed out.",
+            source,
+            destination);
+
+        throw new TimeoutException("Rename timed out.");
+      }
+
+      AbfsRestOperation op = client.renamePath(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(source),
+          AbfsHttpConstants.FORWARD_SLASH + getRelativePath(destination), continuation);
+      continuation = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_CONTINUATION);
+
+    } while (continuation != null && !continuation.isEmpty());
+  }
+
+  @Override
+  public void delete(final AzureBlobFileSystem azureBlobFileSystem, final Path path, final boolean recursive) throws
+      AzureBlobFileSystemException {
+    final AbfsClient client = this.getOrCreateClient(azureBlobFileSystem);
+
+    this.LOG.debug(
+        "delete filesystem: {} path: {} recursive: {}",
+        client.getFileSystem(),
+        path.toString(),
+        String.valueOf(recursive));
+
+    String continuation = null;
+    long deadline = now() + DELETE_DIRECTORY_TIMEOUT_MILISECONDS;
+
+    do {
+      if (now() > deadline) {
+        this.LOG.debug(
+            "Delete directory {} timed out.", path);
+
+        throw new TimeoutException("Delete directory timed out.");
+      }
+
+      AbfsRestOperation op = client.deletePath(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path), recursive, continuation);
+      continuation = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_CONTINUATION);
+
+    } while (continuation != null && !continuation.isEmpty());
+  }
+
+  @Override
+  public FileStatus getFileStatus(final AzureBlobFileSystem azureBlobFileSystem, final Path path) throws AzureBlobFileSystemException {
+    final AbfsClient client = this.getOrCreateClient(azureBlobFileSystem);
+
+    this.LOG.debug(
+        "getFileStatus filesystem: {} path: {}",
+        client.getFileSystem(),
+        path.toString());
+
+    if (path.isRoot()) {
+      AbfsRestOperation op = client.getFilesystemProperties();
+      final long blockSize = configurationService.getAzureBlockSize();
+      final String eTag = op.getResult().getResponseHeader(HttpHeaderConfigurations.ETAG);
+      final String lastModified = op.getResult().getResponseHeader(HttpHeaderConfigurations.LAST_MODIFIED);
+      return new VersionedFileStatus(
+          azureBlobFileSystem.getOwnerUser(),
+          azureBlobFileSystem.getOwnerUserPrimaryGroup(),
+          0,
+          true,
+          1,
+          blockSize,
+          parseLastModifiedTime(lastModified).getMillis(),
+          path,
+          eTag);
+    } else {
+      AbfsRestOperation op = client.getPathProperties(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path));
+
+      final long blockSize = configurationService.getAzureBlockSize();
+      final String eTag = op.getResult().getResponseHeader(HttpHeaderConfigurations.ETAG);
+      final String lastModified = op.getResult().getResponseHeader(HttpHeaderConfigurations.LAST_MODIFIED);
+      final String contentLength = op.getResult().getResponseHeader(HttpHeaderConfigurations.CONTENT_LENGTH);
+      final String resourceType = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_RESOURCE_TYPE);
+
+      return new VersionedFileStatus(
+          azureBlobFileSystem.getOwnerUser(),
+          azureBlobFileSystem.getOwnerUserPrimaryGroup(),
+          parseContentLength(contentLength),
+          parseIsDirectory(resourceType),
+          1,
+          blockSize,
+          parseLastModifiedTime(lastModified).getMillis(),
+          path,
+          eTag);
+    }
+  }
+
+  @Override
+  public FileStatus[] listStatus(final AzureBlobFileSystem azureBlobFileSystem, final Path path) throws AzureBlobFileSystemException {
+    final AbfsClient client = this.getOrCreateClient(azureBlobFileSystem);
+
+    this.LOG.debug(
+        "listStatus filesystem: {} path: {}",
+        client.getFileSystem(),
+        path.toString());
+
+    String relativePath = path.isRoot() ? AbfsHttpConstants.EMPTY_STRING : getRelativePath(path);
+    String continuation = null;
+    ArrayList<FileStatus> fileStatuses = new ArrayList<>();
+
+    do {
+      AbfsRestOperation op = client.listPath(relativePath, false, LIST_MAX_RESULTS, continuation);
+      continuation = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_CONTINUATION);
+      ListResultSchema retrievedSchema = op.getResult().getListResultSchema();
+      if (retrievedSchema == null) {
+        throw new AbfsRestOperationException(
+            AzureServiceErrorCode.PATH_NOT_FOUND.getStatusCode(),
+            AzureServiceErrorCode.PATH_NOT_FOUND.getErrorCode(),
+            "listStatusAsync path not found",
+            null, op.getResult());
+      }
+
+      long blockSize = configurationService.getAzureBlockSize();
+
+      for (ListResultEntrySchema entry : retrievedSchema.paths()) {
+        long lastModifiedMillis = 0;
+        long contentLength = entry.contentLength() == null ? 0 : entry.contentLength();
+        boolean isDirectory = entry.isDirectory() == null ? false : entry.isDirectory();
+        if (entry.lastModified() != null && !entry.lastModified().isEmpty()) {
+          final DateTime dateTime = DateTime.parse(
+              entry.lastModified(),
+              DateTimeFormat.forPattern(DATE_TIME_PATTERN).withZoneUTC());
+          lastModifiedMillis = dateTime.getMillis();
+        }
+
+        fileStatuses.add(
+            new VersionedFileStatus(
+                azureBlobFileSystem.getOwnerUser(),
+                azureBlobFileSystem.getOwnerUserPrimaryGroup(),
+                contentLength,
+                isDirectory,
+                1,
+                blockSize,
+                lastModifiedMillis,
+                azureBlobFileSystem.makeQualified(new Path(File.separator + entry.name())),
+                entry.eTag()));
+      }
+
+    } while (continuation != null && !continuation.isEmpty());
+
+    return fileStatuses.toArray(new FileStatus[0]);
+  }
+
+  @Override
+  public synchronized void closeFileSystem(final AzureBlobFileSystem azureBlobFileSystem) throws AzureBlobFileSystemException {
+    this.clientCache.remove(azureBlobFileSystem);
+  }
+
+  @Override
+  public boolean isAtomicRenameKey(String key) {
+    return isKeyForDirectorySet(key, azureAtomicRenameDirSet);
+  }
+
+  private String getRelativePath(final Path path) {
+    Preconditions.checkNotNull(path, "path");
+    final String relativePath = path.toUri().getPath();
+
+    if (relativePath.length() == 0) {
+      return relativePath;
+    }
+
+    if (relativePath.charAt(0) == Path.SEPARATOR_CHAR) {
+      if (relativePath.length() == 1) {
+        return AbfsHttpConstants.EMPTY_STRING;
+      }
+
+      return relativePath.substring(1);
+    }
+
+    return relativePath;
+  }
+
+  private synchronized AbfsClient getOrCreateClient(final AzureBlobFileSystem azureBlobFileSystem) throws
+      AzureBlobFileSystemException {
+    Preconditions.checkNotNull(azureBlobFileSystem, "azureBlobFileSystem");
+
+    AbfsClient client = this.clientCache.get(azureBlobFileSystem);
+
+    if (client != null) {
+      return client;
+    }
+
+    client = abfsHttpClientFactory.create(azureBlobFileSystem);
+    this.clientCache.put(
+        azureBlobFileSystem,
+        client);
+    return client;
+  }
+
+  private long parseContentLength(final String contentLength) {
+    if (contentLength == null) {
+      return -1;
+    }
+
+    return Long.parseLong(contentLength);
+  }
+
+  private boolean parseIsDirectory(final String resourceType) {
+    return resourceType == null ? false : resourceType.equalsIgnoreCase(AbfsHttpConstants.DIRECTORY);
+  }
+
+  private DateTime parseLastModifiedTime(final String lastModifiedTime) {
+    return DateTime.parse(
+        lastModifiedTime,
+        DateTimeFormat.forPattern(DATE_TIME_PATTERN).withZoneUTC());
+  }
+
+  private String convertXmsPropertiesToCommaSeparatedString(final Hashtable<String, String> properties) throws
+      CharacterCodingException {
+    StringBuilder commaSeparatedProperties = new StringBuilder();
+
+    final CharsetEncoder encoder = Charset.forName(XMS_PROPERTIES_ENCODING).newEncoder();
+
+    for (Map.Entry<String, String> propertyEntry : properties.entrySet()) {
+      String key = propertyEntry.getKey();
+      String value = propertyEntry.getValue();
+
+      Boolean canEncodeValue = encoder.canEncode(value);
+      if (!canEncodeValue) {
+        throw new CharacterCodingException();
+      }
+
+      String encodedPropertyValue = DatatypeConverter.printBase64Binary(encoder.encode(CharBuffer.wrap(value)).array());
+      commaSeparatedProperties.append(key)
+              .append(AbfsHttpConstants.EQUAL)
+              .append(encodedPropertyValue);
+
+      commaSeparatedProperties.append(AbfsHttpConstants.COMMA);
+    }
+
+    if (commaSeparatedProperties.length() != 0) {
+      commaSeparatedProperties.deleteCharAt(commaSeparatedProperties.length() - 1);
+    }
+
+    return commaSeparatedProperties.toString();
+  }
+
+  private Hashtable<String, String> parseCommaSeparatedXmsProperties(String xMsProperties) throws
+      InvalidFileSystemPropertyException, InvalidAbfsRestOperationException {
+    Hashtable<String, String> properties = new Hashtable<>();
+
+    final CharsetDecoder decoder = Charset.forName(XMS_PROPERTIES_ENCODING).newDecoder();
+
+    if (xMsProperties != null && !xMsProperties.isEmpty()) {
+      String[] userProperties = xMsProperties.split(AbfsHttpConstants.COMMA);
+
+      if (userProperties.length == 0) {
+        return properties;
+      }
+
+      for (String property : userProperties) {
+        if (property.isEmpty()) {
+          throw new InvalidFileSystemPropertyException(xMsProperties);
+        }
+
+        String[] nameValue = property.split(AbfsHttpConstants.EQUAL, 2);
+        if (nameValue.length != 2) {
+          throw new InvalidFileSystemPropertyException(xMsProperties);
+        }
+
+        byte[] decodedValue = DatatypeConverter.parseBase64Binary(nameValue[1]);
+
+        final String value;
+        try {
+          value = decoder.decode(ByteBuffer.wrap(decodedValue)).toString();
+        } catch (CharacterCodingException ex) {
+          throw new InvalidAbfsRestOperationException(ex);
+        }
+        properties.put(nameValue[0], value);
+      }
+    }
+
+    return properties;
+  }
+
+  private boolean isKeyForDirectorySet(String key, Set<String> dirSet) {
+    for (String dir : dirSet) {
+      if (dir.isEmpty() || key.startsWith(dir + AbfsHttpConstants.FORWARD_SLASH)) {
+        return true;
+      }
+
+      try {
+        URI uri = new URI(dir);
+        if (null == uri.getAuthority()) {
+          if (key.startsWith(dir + "/")){
+            return true;
+          }
+        }
+      } catch (URISyntaxException e) {
+        this.LOG.info("URI syntax error creating URI for {}", dir);
+      }
+    }
+
+    return false;
+  }
+
+  private static class VersionedFileStatus extends FileStatus {
+    private final String version;
+
+    VersionedFileStatus(
+        final String owner, final String group,
+        final long length, final boolean isdir, final int blockReplication,
+        final long blocksize, final long modificationTime, final Path path,
+        String version) {
+      super(length, isdir, blockReplication, blocksize, modificationTime, 0,
+          new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL),
+          owner,
+          group,
+          path);
+
+      this.version = version;
+    }
+
+    /** Compare if this object is equal to another object.
+     * @param   obj the object to be compared.
+     * @return  true if two file status has the same path name; false if not.
+     */
+    @Override
+    public boolean equals(Object obj) {
+      if (obj == this) {
+        return true;
+      }
+
+      if (obj == null) {
+        return false;
+      }
+
+      if (this.getClass() == obj.getClass()) {
+        VersionedFileStatus other = (VersionedFileStatus) obj;
+        return this.getPath().equals(other.getPath()) && this.version.equals(other.version);
+      }
+
+      return false;
+    }
+
+    /**
+     * Returns a hash code value for the object, which is defined as
+     * the hash code of the path name.
+     *
+     * @return  a hash code value for the path name and version
+     */
+    @Override
+    public int hashCode() {
+      int hash = getPath().hashCode();
+      hash = 89 * hash + (this.version != null ? this.version.hashCode() : 0);
+      return hash;
+    }
+
+    /**
+     * Returns the version of this FileStatus
+     *
+     * @return  a string value for the FileStatus version
+     */
+    public String getVersion() {
+      return this.version;
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fdf5f4c3/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java
new file mode 100644
index 0000000..6554380
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java
@@ -0,0 +1,382 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.EOFException;
+import java.io.IOException;
+
+import com.google.common.base.Preconditions;
+
+import org.apache.hadoop.fs.FSExceptionMessages;
+import org.apache.hadoop.fs.FSInputStream;
+import org.apache.hadoop.fs.FileSystem.Statistics;
+import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
+
+/**
+ * The AbfsInputStream for AbfsClient
+ */
+public class AbfsInputStream extends FSInputStream {
+  private final AbfsClient client;
+  private final Statistics statistics;
+  private final String path;
+  private final long contentLength;
+  private final int bufferSize; // default buffer size
+  private final int readAheadQueueDepth;         // initialized in constructor
+  private final String eTag;                  // eTag of the path when InputStream are created
+  private final boolean tolerateOobAppends; // whether tolerate Oob Appends
+  private final boolean readAheadEnabled; // whether enable readAhead;
+
+  private byte[] buffer = null;            // will be initialized on first use
+
+  private long fCursor = 0;  // cursor of buffer within file - offset of next byte to read from remote server
+  private long fCursorAfterLastRead = -1;
+  private int bCursor = 0;   // cursor of read within buffer - offset of next byte to be returned from buffer
+  private int limit = 0;     // offset of next byte to be read into buffer from service (i.e., upper marker+1
+  //                                                      of valid bytes in buffer)
+  private boolean closed = false;
+
+  public AbfsInputStream(
+      final AbfsClient client,
+      final Statistics statistics,
+      final String path,
+      final long contentLength,
+      final int bufferSize,
+      final int readAheadQueueDepth,
+      final String eTag) {
+    super();
+    this.client = client;
+    this.statistics = statistics;
+    this.path = path;
+    this.contentLength = contentLength;
+    this.bufferSize = bufferSize;
+    this.readAheadQueueDepth = (readAheadQueueDepth >= 0) ? readAheadQueueDepth : 2 * Runtime.getRuntime().availableProcessors();
+    this.eTag = eTag;
+    this.tolerateOobAppends = false;
+    this.readAheadEnabled = true;
+  }
+
+  public String getPath() {
+    return path;
+  }
+
+  @Override
+  public int read() throws IOException {
+    byte[] b = new byte[1];
+    int numberOfBytesRead = read(b, 0, 1);
+    if (numberOfBytesRead < 0) {
+      return -1;
+    } else {
+      return (b[0] & 0xFF);
+    }
+  }
+
+  @Override
+  public synchronized int read(final byte[] b, final int off, final int len) throws IOException {
+    int currentOff = off;
+    int currentLen = len;
+    int lastReadBytes;
+    int totalReadBytes = 0;
+    do {
+      lastReadBytes = readOneBlock(b, currentOff, currentLen);
+      if (lastReadBytes > 0) {
+        currentOff += lastReadBytes;
+        currentLen -= lastReadBytes;
+        totalReadBytes += lastReadBytes;
+      }
+      if (currentLen <= 0 || currentLen > b.length - currentOff) {
+        break;
+      }
+    } while (lastReadBytes > 0);
+    return totalReadBytes > 0 ? totalReadBytes : lastReadBytes;
+  }
+
+  private int readOneBlock(final byte[] b, final int off, final int len) throws IOException {
+    if (closed) {
+      throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
+    }
+
+    Preconditions.checkNotNull(b);
+
+    if (len == 0) {
+      return 0;
+    }
+
+    if (this.available() == 0) {
+      return -1;
+    }
+
+    if (off < 0 || len < 0 || len > b.length - off) {
+      throw new IndexOutOfBoundsException();
+    }
+
+    //If buffer is empty, then fill the buffer.
+    if (bCursor == limit) {
+      //If EOF, then return -1
+      if (fCursor >= contentLength) {
+        return -1;
+      }
+
+      long bytesRead = 0;
+      //reset buffer to initial state - i.e., throw away existing data
+      bCursor = 0;
+      limit = 0;
+      if (buffer == null) {
+        buffer = new byte[bufferSize];
+      }
+
+      // Enable readAhead when reading sequentially
+      if (-1 == fCursorAfterLastRead || fCursorAfterLastRead == fCursor || b.length >= bufferSize) {
+        bytesRead = readInternal(fCursor, buffer, 0, bufferSize, false);
+      } else {
+        bytesRead = readInternal(fCursor, buffer, 0, b.length, true);
+      }
+
+      if (bytesRead == -1) {
+        return -1;
+      }
+
+      limit += bytesRead;
+      fCursor += bytesRead;
+      fCursorAfterLastRead = fCursor;
+    }
+
+    //If there is anything in the buffer, then return lesser of (requested bytes) and (bytes in buffer)
+    //(bytes returned may be less than requested)
+    int bytesRemaining = limit - bCursor;
+    int bytesToRead = Math.min(len, bytesRemaining);
+    System.arraycopy(buffer, bCursor, b, off, bytesToRead);
+    bCursor += bytesToRead;
+    if (statistics != null) {
+      statistics.incrementBytesRead(bytesToRead);
+    }
+    return bytesToRead;
+  }
+
+
+  private int readInternal(final long position, final byte[] b, final int offset, final int length,
+                           final boolean bypassReadAhead) throws IOException {
+    if (readAheadEnabled && !bypassReadAhead) {
+      // try reading from read-ahead
+      if (offset != 0) {
+        throw new IllegalArgumentException("readahead buffers cannot have non-zero buffer offsets");
+      }
+      int receivedBytes;
+
+      // queue read-aheads
+      int numReadAheads = this.readAheadQueueDepth;
+      long nextSize;
+      long nextOffset = position;
+      while (numReadAheads > 0 && nextOffset < contentLength) {
+        nextSize = Math.min((long) bufferSize, contentLength - nextOffset);
+        ReadBufferManager.getBufferManager().queueReadAhead(this, nextOffset, (int) nextSize);
+        nextOffset = nextOffset + nextSize;
+        numReadAheads--;
+      }
+
+      // try reading from buffers first
+      receivedBytes = ReadBufferManager.getBufferManager().getBlock(this, position, length, b);
+      if (receivedBytes > 0) {
+        return receivedBytes;
+      }
+
+      // got nothing from read-ahead, do our own read now
+      receivedBytes = readRemote(position, b, offset, length);
+      return receivedBytes;
+    } else {
+      return readRemote(position, b, offset, length);
+    }
+  }
+
+  int readRemote(long position, byte[] b, int offset, int length) throws IOException {
+    if (position < 0) {
+      throw new IllegalArgumentException("attempting to read from negative offset");
+    }
+    if (position >= contentLength) {
+      return -1;  // Hadoop prefers -1 to EOFException
+    }
+    if (b == null) {
+      throw new IllegalArgumentException("null byte array passed in to read() method");
+    }
+    if (offset >= b.length) {
+      throw new IllegalArgumentException("offset greater than length of array");
+    }
+    if (length < 0) {
+      throw new IllegalArgumentException("requested read length is less than zero");
+    }
+    if (length > (b.length - offset)) {
+      throw new IllegalArgumentException("requested read length is more than will fit after requested offset in buffer");
+    }
+    final AbfsRestOperation op;
+    try {
+      op = client.read(path, position, b, offset, length, tolerateOobAppends ? "*" : eTag);
+    } catch (AzureBlobFileSystemException ex) {
+      throw new IOException(ex);
+    }
+    long bytesRead = op.getResult().getBytesReceived();
+    if (bytesRead > Integer.MAX_VALUE) {
+      throw new IOException("Unexpected Content-Length");
+    }
+    return (int) bytesRead;
+  }
+
+  /**
+   * Seek to given position in stream.
+   * @param n position to seek to
+   * @throws IOException if there is an error
+   * @throws EOFException if attempting to seek past end of file
+   */
+  @Override
+  public synchronized void seek(long n) throws IOException {
+    if (closed) {
+      throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
+    }
+    if (n < 0) {
+      throw new EOFException(FSExceptionMessages.NEGATIVE_SEEK);
+    }
+    if (n > contentLength) {
+      throw new EOFException(FSExceptionMessages.CANNOT_SEEK_PAST_EOF);
+    }
+
+    if (n>=fCursor-limit && n<=fCursor) { // within buffer
+      bCursor = (int) (n-(fCursor-limit));
+      return;
+    }
+
+    // next read will read from here
+    fCursor = n;
+
+    //invalidate buffer
+    limit = 0;
+    bCursor = 0;
+  }
+
+  @Override
+  public synchronized long skip(long n) throws IOException {
+    if (closed) {
+      throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
+    }
+    long currentPos = getPos();
+    if (currentPos == contentLength) {
+      if (n > 0) {
+        throw new EOFException(FSExceptionMessages.CANNOT_SEEK_PAST_EOF);
+      }
+    }
+    long newPos = currentPos + n;
+    if (newPos < 0) {
+      newPos = 0;
+      n = newPos - currentPos;
+    }
+    if (newPos > contentLength) {
+      newPos = contentLength;
+      n = newPos - currentPos;
+    }
+    seek(newPos);
+    return n;
+  }
+
+  /**
+   * Return the size of the remaining available bytes
+   * if the size is less than or equal to {@link Integer#MAX_VALUE},
+   * otherwise, return {@link Integer#MAX_VALUE}.
+   *
+   * This is to match the behavior of DFSInputStream.available(),
+   * which some clients may rely on (HBase write-ahead log reading in
+   * particular).
+   */
+  @Override
+  public synchronized int available() throws IOException {
+    if (closed) {
+      throw new IOException(
+          FSExceptionMessages.STREAM_IS_CLOSED);
+    }
+    final long remaining = this.contentLength - this.getPos();
+    return remaining <= Integer.MAX_VALUE
+        ? (int) remaining : Integer.MAX_VALUE;
+  }
+
+  /**
+   * Returns the length of the file that this stream refers to. Note that the length returned is the length
+   * as of the time the Stream was opened. Specifically, if there have been subsequent appends to the file,
+   * they wont be reflected in the returned length.
+   *
+   * @return length of the file.
+   * @throws IOException if the stream is closed
+   */
+  public long length() throws IOException {
+    if (closed) {
+      throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
+    }
+    return contentLength;
+  }
+
+  /**
+   * Return the current offset from the start of the file
+   * @throws IOException throws {@link IOException} if there is an error
+   */
+  @Override
+  public synchronized long getPos() throws IOException {
+    if (closed) {
+      throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
+    }
+    return fCursor - limit + bCursor;
+  }
+
+  /**
+   * Seeks a different copy of the data.  Returns true if
+   * found a new source, false otherwise.
+   * @throws IOException throws {@link IOException} if there is an error
+   */
+  @Override
+  public boolean seekToNewSource(long l) throws IOException {
+    return false;
+  }
+
+  @Override
+  public synchronized void close() throws IOException {
+    closed = true;
+    buffer = null; // de-reference the buffer so it can be GC'ed sooner
+  }
+
+  /**
+   * Not supported by this stream. Throws {@link UnsupportedOperationException}
+   * @param readlimit ignored
+   */
+  @Override
+  public synchronized void mark(int readlimit) {
+    throw new UnsupportedOperationException("mark()/reset() not supported on this stream");
+  }
+
+  /**
+   * Not supported by this stream. Throws {@link UnsupportedOperationException}
+   */
+  @Override
+  public synchronized void reset() throws IOException {
+    throw new UnsupportedOperationException("mark()/reset() not supported on this stream");
+  }
+
+  /**
+   * gets whether mark and reset are supported by {@code ADLFileInputStream}. Always returns false.
+   *
+   * @return always {@code false}
+   */
+  @Override
+  public boolean markSupported() {
+    return false;
+  }
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org