You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by tm...@apache.org on 2018/08/11 05:37:47 UTC

[43/50] [abbrv] hadoop git commit: HADOOP-15407. HADOOP-15540. Support Windows Azure Storage - Blob file system "ABFS" in Hadoop: Core Commit.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fdf5f4c3/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java
new file mode 100644
index 0000000..de5c934
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java
@@ -0,0 +1,335 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.base.Preconditions;
+
+import org.apache.hadoop.fs.FSExceptionMessages;
+import org.apache.hadoop.fs.Syncable;
+import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
+
+/**
+ * The BlobFsOutputStream for Rest AbfsClient
+ */
+public class AbfsOutputStream extends OutputStream implements Syncable {
+  private final AbfsClient client;
+  private final String path;
+  private long position;
+  private boolean closed;
+  private volatile IOException lastError;
+
+  private long lastFlushOffset;
+  private long lastTotalAppendOffset = 0;
+
+  private final int bufferSize;
+  private byte[] buffer;
+  private int bufferIndex;
+  private final int maxConcurrentRequestCount;
+
+  private ConcurrentLinkedDeque<WriteOperation> writeOperations;
+  private final ThreadPoolExecutor threadExecutor;
+  private final ExecutorCompletionService<Void> completionService;
+
+  public AbfsOutputStream(
+      final AbfsClient client,
+      final String path,
+      final long position,
+      final int bufferSize) {
+    this.client = client;
+    this.path = path;
+    this.position = position;
+    this.closed = false;
+    this.lastError = null;
+    this.lastFlushOffset = 0;
+    this.bufferSize = bufferSize;
+    this.buffer = new byte[bufferSize];
+    this.bufferIndex = 0;
+    this.writeOperations = new ConcurrentLinkedDeque<>();
+
+    this.maxConcurrentRequestCount = 4 * Runtime.getRuntime().availableProcessors();
+
+    this.threadExecutor
+        = new ThreadPoolExecutor(maxConcurrentRequestCount,
+        maxConcurrentRequestCount,
+        10L,
+        TimeUnit.SECONDS,
+        new LinkedBlockingQueue());
+    this.completionService = new ExecutorCompletionService(this.threadExecutor);
+  }
+
+  /**
+   * Writes the specified byte to this output stream. The general contract for
+   * write is that one byte is written to the output stream. The byte to be
+   * written is the eight low-order bits of the argument b. The 24 high-order
+   * bits of b are ignored.
+   *
+   * @param byteVal the byteValue to write.
+   * @throws IOException if an I/O error occurs. In particular, an IOException may be
+   *                     thrown if the output stream has been closed.
+   */
+  @Override
+  public void write(final int byteVal) throws IOException {
+    write(new byte[]{(byte) (byteVal & 0xFF)});
+  }
+
+  /**
+   * Writes length bytes from the specified byte array starting at off to
+   * this output stream.
+   *
+   * @param data   the byte array to write.
+   * @param off the start off in the data.
+   * @param length the number of bytes to write.
+   * @throws IOException if an I/O error occurs. In particular, an IOException may be
+   *                     thrown if the output stream has been closed.
+   */
+  @Override
+  public synchronized void write(final byte[] data, final int off, final int length)
+      throws IOException {
+    if (this.lastError != null) {
+      throw this.lastError;
+    }
+
+    Preconditions.checkArgument(data != null, "null data");
+
+    if (off < 0 || length < 0 || length > data.length - off) {
+      throw new IndexOutOfBoundsException();
+    }
+
+    int currentOffset = off;
+    int writableBytes = bufferSize - bufferIndex;
+    int numberOfBytesToWrite = length;
+
+    while (numberOfBytesToWrite > 0) {
+      if (writableBytes <= numberOfBytesToWrite) {
+        System.arraycopy(data, currentOffset, buffer, bufferIndex, writableBytes);
+        bufferIndex += writableBytes;
+        writeCurrentBufferToService();
+        currentOffset += writableBytes;
+        numberOfBytesToWrite = numberOfBytesToWrite - writableBytes;
+      } else {
+        System.arraycopy(data, currentOffset, buffer, bufferIndex, numberOfBytesToWrite);
+        bufferIndex += numberOfBytesToWrite;
+        numberOfBytesToWrite = 0;
+      }
+
+      writableBytes = bufferSize - bufferIndex;
+    }
+  }
+
+  /**
+   * Flushes this output stream and forces any buffered output bytes to be
+   * written out. If any data remains in the payload it is committed to the
+   * service. Data is queued for writing and forced out to the service
+   * before the call returns.
+   */
+  @Override
+  public void flush() throws IOException {
+    this.flushInternalAsync();
+  }
+
+  /** Similar to posix fsync, flush out the data in client's user buffer
+   * all the way to the disk device (but the disk may have it in its cache).
+   * @throws IOException if error occurs
+   */
+  @Override
+  public void hsync() throws IOException {
+    this.flushInternal();
+  }
+
+  /** Flush out the data in client's user buffer. After the return of
+   * this call, new readers will see the data.
+   * @throws IOException if any error occurs
+   */
+  @Override
+  public void hflush() throws IOException {
+    this.flushInternal();
+  }
+
+  /**
+   * Force all data in the output stream to be written to Azure storage.
+   * Wait to return until this is complete. Close the access to the stream and
+   * shutdown the upload thread pool.
+   * If the blob was created, its lease will be released.
+   * Any error encountered caught in threads and stored will be rethrown here
+   * after cleanup.
+   */
+  @Override
+  public synchronized void close() throws IOException {
+    if (closed) {
+      return;
+    }
+
+    try {
+      this.flushInternal();
+      this.threadExecutor.shutdown();
+    } finally {
+      this.lastError = new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
+      this.buffer = null;
+      this.bufferIndex = 0;
+      this.closed = true;
+      this.writeOperations.clear();
+      if (!this.threadExecutor.isShutdown()) {
+        this.threadExecutor.shutdownNow();
+      }
+    }
+  }
+
+  private synchronized void flushInternal() throws IOException {
+    if (this.lastError != null) {
+      throw this.lastError;
+    }
+    this.writeCurrentBufferToService();
+    this.flushWrittenBytesToService();
+  }
+
+  private synchronized void flushInternalAsync() throws IOException {
+    if (this.lastError != null) {
+      throw this.lastError;
+    }
+    this.writeCurrentBufferToService();
+    this.flushWrittenBytesToServiceAsync();
+  }
+
+  private synchronized void writeCurrentBufferToService() throws IOException {
+    if (bufferIndex == 0) {
+      return;
+    }
+
+    final byte[] bytes = this.buffer;
+    final int bytesLength = bufferIndex;
+
+    this.buffer = new byte[bufferSize];
+    this.bufferIndex = 0;
+    final long offset = this.position;
+    this.position += bytesLength;
+
+    if (this.threadExecutor.getQueue().size() >= maxConcurrentRequestCount * 2) {
+      this.waitForTaskToComplete();
+    }
+
+    final Future job = this.completionService.submit(new Callable<Void>() {
+      @Override
+      public Void call() throws Exception {
+        client.append(path, offset, bytes, 0,
+            bytesLength);
+        return null;
+      }
+    });
+
+    this.writeOperations.add(new WriteOperation(job, offset, bytesLength));
+
+    // Try to shrink the queue
+    shrinkWriteOperationQueue();
+  }
+
+  private synchronized void flushWrittenBytesToService() throws IOException {
+    for (WriteOperation writeOperation : this.writeOperations) {
+      try {
+        writeOperation.task.get();
+      } catch (Exception ex) {
+        if (AzureBlobFileSystemException.class.isInstance(ex.getCause())) {
+          ex = AzureBlobFileSystemException.class.cast(ex.getCause());
+        }
+        this.lastError = new IOException(ex);
+        throw this.lastError;
+      }
+    }
+    flushWrittenBytesToServiceInternal(this.position, false);
+  }
+
+  private synchronized void flushWrittenBytesToServiceAsync() throws IOException {
+    shrinkWriteOperationQueue();
+
+    if (this.lastTotalAppendOffset > this.lastFlushOffset) {
+      this.flushWrittenBytesToServiceInternal(this.lastTotalAppendOffset, true);
+    }
+
+    this.lastTotalAppendOffset = 0;
+  }
+
+  private synchronized void flushWrittenBytesToServiceInternal(final long offset, final boolean retainUncommitedData) throws IOException {
+    try {
+      client.flush(path, offset, retainUncommitedData);
+    } catch (AzureBlobFileSystemException ex) {
+      throw new IOException(ex);
+    }
+    this.lastFlushOffset = offset;
+  }
+
+  /**
+   * Try to remove the completed write operations from the beginning of write
+   * operation FIFO queue.
+   */
+  private synchronized void shrinkWriteOperationQueue() throws IOException {
+    try {
+      while (this.writeOperations.peek() != null && this.writeOperations.peek().task.isDone()) {
+        this.writeOperations.peek().task.get();
+        this.lastTotalAppendOffset += this.writeOperations.peek().length;
+        this.writeOperations.remove();
+      }
+    } catch (Exception e) {
+      if (AzureBlobFileSystemException.class.isInstance(e.getCause())) {
+        this.lastError = IOException.class.cast(e.getCause());
+      } else {
+        this.lastError = new IOException(e);
+      }
+      throw this.lastError;
+    }
+  }
+
+  private void waitForTaskToComplete() throws IOException {
+    boolean completed;
+    for (completed = false; this.completionService.poll() != null; completed = true) {}
+
+    if (!completed) {
+      try {
+        this.completionService.take();
+      } catch (InterruptedException e) {
+        this.lastError = new IOException(e);
+        throw this.lastError;
+      }
+    }
+  }
+
+  private static class WriteOperation {
+    private final Future<Void> task;
+    private final long startOffset;
+    private final long length;
+
+    WriteOperation(final Future<Void> task, final long startOffset, final long length) {
+      Preconditions.checkNotNull(task, "task");
+      Preconditions.checkArgument(startOffset >= 0, "startOffset");
+      Preconditions.checkArgument(length >= 0, "length");
+
+      this.task = task;
+      this.startOffset = startOffset;
+      this.length = length;
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fdf5f4c3/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java
new file mode 100644
index 0000000..17fc35a
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java
@@ -0,0 +1,178 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.IOException;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.util.List;
+
+import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants;
+import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
+import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
+import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidAbfsRestOperationException;
+
+import org.slf4j.Logger;
+
+/**
+ * The AbfsRestOperation for Rest AbfsClient
+ */
+public class AbfsRestOperation {
+  // Blob FS client, which has the credentials, retry policy, and logs.
+  private final AbfsClient client;
+  // the HTTP method (PUT, PATCH, POST, GET, HEAD, or DELETE)
+  private final String method;
+  // full URL including query parameters
+  private final URL url;
+  // all the custom HTTP request headers provided by the caller
+  private final List<AbfsHttpHeader> requestHeaders;
+
+  // This is a simple operation class, where all the upload methods have a
+  // request body and all the download methods have a response body.
+  private final boolean hasRequestBody;
+
+  private final Logger logger;
+
+  // For uploads, this is the request entity body.  For downloads,
+  // this will hold the response entity body.
+  private byte[] buffer;
+  private int bufferOffset;
+  private int bufferLength;
+
+  private AbfsHttpOperation result;
+
+  public AbfsHttpOperation getResult() {
+    return result;
+  }
+
+  /**
+   * Initializes a new REST operation.
+   *
+   * @param client The Blob FS client.
+   * @param method The HTTP method (PUT, PATCH, POST, GET, HEAD, or DELETE).
+   * @param url The full URL including query string parameters.
+   * @param requestHeaders The HTTP request headers.
+   */
+  AbfsRestOperation(final AbfsClient client,
+                    final String method,
+                    final URL url,
+                    final List<AbfsHttpHeader> requestHeaders) {
+    this.client = client;
+    this.method = method;
+    this.url = url;
+    this.requestHeaders = requestHeaders;
+    this.hasRequestBody = (AbfsHttpConstants.HTTP_METHOD_PUT.equals(method)
+            || AbfsHttpConstants.HTTP_METHOD_PATCH.equals(method));
+    this.logger = client.LOG;
+  }
+
+  /**
+   * Initializes a new REST operation.
+   *
+   * @param client The Blob FS client.
+   * @param method The HTTP method (PUT, PATCH, POST, GET, HEAD, or DELETE).
+   * @param url The full URL including query string parameters.
+   * @param requestHeaders The HTTP request headers.
+   * @param buffer For uploads, this is the request entity body.  For downloads,
+   *               this will hold the response entity body.
+   * @param bufferOffset An offset into the buffer where the data beings.
+   * @param bufferLength The length of the data in the buffer.
+   */
+  AbfsRestOperation(AbfsClient client,
+                    String method,
+                    URL url,
+                    List<AbfsHttpHeader> requestHeaders,
+                    byte[] buffer,
+                    int bufferOffset,
+                    int bufferLength) {
+    this(client, method, url, requestHeaders);
+    this.buffer = buffer;
+    this.bufferOffset = bufferOffset;
+    this.bufferLength = bufferLength;
+  }
+
+  /**
+   * Executes the REST operation with retry, by issuing one or more
+   * HTTP operations.
+   */
+  void execute() throws AzureBlobFileSystemException {
+    int retryCount = 0;
+    while (!executeHttpOperation(retryCount++)) {
+      try {
+        Thread.sleep(client.getRetryPolicy().getRetryInterval(retryCount));
+      } catch (InterruptedException ex) {
+        Thread.currentThread().interrupt();
+      }
+    }
+
+    if (result.getStatusCode() > HttpURLConnection.HTTP_BAD_REQUEST) {
+      throw new AbfsRestOperationException(result.getStatusCode(), result.getStorageErrorCode(),
+          result.getStorageErrorMessage(), null, result);
+    }
+  }
+
+  /**
+   * Executes a single HTTP operation to complete the REST operation.  If it
+   * fails, there may be a retry.  The retryCount is incremented with each
+   * attempt.
+   */
+  private boolean executeHttpOperation(final int retryCount) throws AzureBlobFileSystemException {
+    AbfsHttpOperation httpOperation = null;
+    try {
+      // initialize the HTTP request and open the connection
+      httpOperation = new AbfsHttpOperation(url, method, requestHeaders);
+
+      // sign the HTTP request
+      client.getSharedKeyCredentials().signRequest(
+          httpOperation.getConnection(),
+          hasRequestBody ? bufferLength : 0);
+
+      if (hasRequestBody) {
+        // HttpUrlConnection requires
+        httpOperation.sendRequest(buffer, bufferOffset, bufferLength);
+      }
+
+      httpOperation.processResponse(buffer, bufferOffset, bufferLength);
+    } catch (IOException ex) {
+      if (logger.isDebugEnabled()) {
+        if (httpOperation != null) {
+          logger.debug("HttpRequestFailure: " + httpOperation.toString(), ex);
+        } else {
+          logger.debug("HttpRequestFailure: " + method + "," + url, ex);
+        }
+      }
+      if (!client.getRetryPolicy().shouldRetry(retryCount, -1)) {
+        throw new InvalidAbfsRestOperationException(ex);
+      }
+      return false;
+    }
+
+    if (logger.isDebugEnabled()) {
+      logger.debug("HttpRequest: " + httpOperation.toString());
+    }
+
+    if (client.getRetryPolicy().shouldRetry(retryCount, httpOperation.getStatusCode())) {
+      return false;
+    }
+
+    result = httpOperation;
+
+    return true;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fdf5f4c3/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsServiceInjectorImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsServiceInjectorImpl.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsServiceInjectorImpl.java
new file mode 100644
index 0000000..1cbf6b5
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsServiceInjectorImpl.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import com.google.inject.AbstractModule;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.azurebfs.contracts.services.AbfsHttpClientFactory;
+import org.apache.hadoop.fs.azurebfs.contracts.services.AbfsHttpService;
+import org.apache.hadoop.fs.azurebfs.contracts.services.ConfigurationService;
+import org.apache.hadoop.fs.azurebfs.contracts.services.TracingService;
+
+/**
+ * This class is responsible to configure all the services used by Azure Blob File System.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+class AbfsServiceInjectorImpl extends AbstractModule {
+  private final Configuration configuration;
+  private final Map<Class, Class> providers;
+  private final Map<Class, Object> instances;
+
+  AbfsServiceInjectorImpl(Configuration configuration) {
+    this.providers = new HashMap<>();
+    this.instances = new HashMap<>();
+    this.configuration = configuration;
+
+    this.instances.put(Configuration.class, this.configuration);
+
+    this.providers.put(ConfigurationService.class, ConfigurationServiceImpl.class);
+
+    this.providers.put(AbfsHttpService.class, AbfsHttpServiceImpl.class);
+    this.providers.put(AbfsHttpClientFactory.class, AbfsHttpClientFactoryImpl.class);
+
+    this.providers.put(TracingService.class, TracingServiceImpl.class);
+  }
+
+  @Override
+  protected void configure() {
+    for (Map.Entry<Class, Object> entrySet : this.instances.entrySet()) {
+      bind(entrySet.getKey()).toInstance(entrySet.getValue());
+    }
+
+    for (Map.Entry<Class, Class> entrySet : this.providers.entrySet()) {
+      bind(entrySet.getKey()).to(entrySet.getValue());
+    }
+  }
+
+  protected Configuration getConfiguration() {
+    return this.configuration;
+  }
+
+  protected Map<Class, Class> getProviders() {
+    return this.providers;
+  }
+
+  protected Map<Class, Object> getInstances() {
+    return this.instances;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fdf5f4c3/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsServiceProviderImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsServiceProviderImpl.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsServiceProviderImpl.java
new file mode 100644
index 0000000..8560620
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsServiceProviderImpl.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.azurebfs.contracts.exceptions.ServiceResolutionException;
+import org.apache.hadoop.fs.azurebfs.contracts.services.AbfsServiceProvider;
+import org.apache.hadoop.fs.azurebfs.contracts.services.InjectableService;
+
+/**
+ * Dependency injected Azure Storage services provider.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public final class AbfsServiceProviderImpl implements AbfsServiceProvider {
+  private static AbfsServiceProviderImpl abfsServiceProvider;
+  private final Injector abfsServiceInjector;
+
+  private AbfsServiceProviderImpl(final Configuration configuration) {
+    this.abfsServiceInjector = Guice.createInjector(new AbfsServiceInjectorImpl(Preconditions.checkNotNull(configuration, "configuration")));
+  }
+
+  @VisibleForTesting
+  private AbfsServiceProviderImpl(final Injector abfsServiceInjector) {
+    Preconditions.checkNotNull(abfsServiceInjector, "abfsServiceInjector");
+    this.abfsServiceInjector = abfsServiceInjector;
+  }
+
+  /**
+   * Create an instance or returns existing instance of service provider.
+   * This method must be marked as synchronized to ensure thread-safety.
+   * @param configuration hadoop configuration.
+   * @return AbfsServiceProvider the service provider instance.
+   */
+  public static synchronized AbfsServiceProvider create(final Configuration configuration) {
+    if (abfsServiceProvider == null) {
+      abfsServiceProvider = new AbfsServiceProviderImpl(configuration);
+    }
+
+    return abfsServiceProvider;
+  }
+
+  /**
+   * Returns current instance of service provider.
+   * @return AbfsServiceProvider the service provider instance.
+   */
+  public static AbfsServiceProvider instance() {
+    return abfsServiceProvider;
+  }
+
+  @VisibleForTesting
+  static synchronized AbfsServiceProvider create(Injector serviceInjector) {
+    abfsServiceProvider = new AbfsServiceProviderImpl(serviceInjector);
+    return abfsServiceProvider;
+  }
+
+  /**
+   * Returns an instance of resolved injectable service by class name.
+   * The injectable service must be configured first to be resolvable.
+   * @param clazz the injectable service which is expected to be returned.
+   * @param <T> The type of injectable service.
+   * @return T instance
+   * @throws ServiceResolutionException if the service is not resolvable.
+   */
+  @Override
+  public <T extends InjectableService> T get(final Class<T> clazz) throws ServiceResolutionException {
+    try {
+      return this.abfsServiceInjector.getInstance(clazz);
+    } catch (Exception ex) {
+      throw new ServiceResolutionException(clazz.getSimpleName(), ex);
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fdf5f4c3/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsUriQueryBuilder.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsUriQueryBuilder.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsUriQueryBuilder.java
new file mode 100644
index 0000000..bac66af
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsUriQueryBuilder.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * The UrlQueryBuilder for Rest AbfsClient
+ */
+public class AbfsUriQueryBuilder {
+  private Map<String, String> parameters;
+
+  public AbfsUriQueryBuilder() {
+    this.parameters = new HashMap<>();
+  }
+
+  public void addQuery(final String name, final String value) {
+    if (value != null && !value.isEmpty()) {
+      this.parameters.put(name, value);
+    }
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+    boolean first = true;
+
+    for (Map.Entry<String, String> entry : parameters.entrySet()) {
+      if (first) {
+        sb.append(AbfsHttpConstants.QUESTION_MARK);
+        first = false;
+      } else {
+        sb.append(AbfsHttpConstants.AND_MARK);
+      }
+      sb.append(entry.getKey()).append(AbfsHttpConstants.EQUAL).append(entry.getValue());
+    }
+    return sb.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fdf5f4c3/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ConfigurationServiceImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ConfigurationServiceImpl.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ConfigurationServiceImpl.java
new file mode 100644
index 0000000..568ee5d
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ConfigurationServiceImpl.java
@@ -0,0 +1,317 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.lang.reflect.Field;
+import java.util.Map;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys;
+import org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations;
+import org.apache.hadoop.fs.azurebfs.contracts.annotations.ConfigurationValidationAnnotations.IntegerConfigurationValidatorAnnotation;
+import org.apache.hadoop.fs.azurebfs.contracts.annotations.ConfigurationValidationAnnotations.LongConfigurationValidatorAnnotation;
+import org.apache.hadoop.fs.azurebfs.contracts.annotations.ConfigurationValidationAnnotations.StringConfigurationValidatorAnnotation;
+import org.apache.hadoop.fs.azurebfs.contracts.annotations.ConfigurationValidationAnnotations.Base64StringConfigurationValidatorAnnotation;
+import org.apache.hadoop.fs.azurebfs.contracts.annotations.ConfigurationValidationAnnotations.BooleanConfigurationValidatorAnnotation;
+import org.apache.hadoop.fs.azurebfs.contracts.exceptions.ConfigurationPropertyNotFoundException;
+import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidConfigurationValueException;
+import org.apache.hadoop.fs.azurebfs.contracts.services.ConfigurationService;
+import org.apache.hadoop.fs.azurebfs.diagnostics.Base64StringConfigurationBasicValidator;
+import org.apache.hadoop.fs.azurebfs.diagnostics.BooleanConfigurationBasicValidator;
+import org.apache.hadoop.fs.azurebfs.diagnostics.IntegerConfigurationBasicValidator;
+import org.apache.hadoop.fs.azurebfs.diagnostics.LongConfigurationBasicValidator;
+import org.apache.hadoop.fs.azurebfs.diagnostics.StringConfigurationBasicValidator;
+
+@Singleton
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+class ConfigurationServiceImpl implements ConfigurationService {
+  private final Configuration configuration;
+  private final boolean isSecure;
+
+  @IntegerConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.AZURE_WRITE_BUFFER_SIZE,
+      MinValue = FileSystemConfigurations.MIN_BUFFER_SIZE,
+      MaxValue = FileSystemConfigurations.MAX_BUFFER_SIZE,
+      DefaultValue = FileSystemConfigurations.DEFAULT_WRITE_BUFFER_SIZE)
+  private int writeBufferSize;
+
+  @IntegerConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.AZURE_READ_BUFFER_SIZE,
+      MinValue = FileSystemConfigurations.MIN_BUFFER_SIZE,
+      MaxValue = FileSystemConfigurations.MAX_BUFFER_SIZE,
+      DefaultValue = FileSystemConfigurations.DEFAULT_READ_BUFFER_SIZE)
+  private int readBufferSize;
+
+  @IntegerConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.AZURE_MIN_BACKOFF_INTERVAL,
+      DefaultValue = FileSystemConfigurations.DEFAULT_MIN_BACKOFF_INTERVAL)
+  private int minBackoffInterval;
+
+  @IntegerConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.AZURE_MAX_BACKOFF_INTERVAL,
+      DefaultValue = FileSystemConfigurations.DEFAULT_MAX_BACKOFF_INTERVAL)
+  private int maxBackoffInterval;
+
+  @IntegerConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.AZURE_BACKOFF_INTERVAL,
+      DefaultValue = FileSystemConfigurations.DEFAULT_BACKOFF_INTERVAL)
+  private int backoffInterval;
+
+  @IntegerConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.AZURE_MAX_IO_RETRIES,
+      MinValue = 0,
+      DefaultValue = FileSystemConfigurations.DEFAULT_MAX_RETRY_ATTEMPTS)
+  private int maxIoRetries;
+
+  @LongConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.AZURE_BLOCK_SIZE_PROPERTY_NAME,
+      MinValue = 0,
+      MaxValue = FileSystemConfigurations.MAX_AZURE_BLOCK_SIZE,
+      DefaultValue = FileSystemConfigurations.MAX_AZURE_BLOCK_SIZE)
+  private long azureBlockSize;
+
+  @StringConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.AZURE_BLOCK_LOCATION_HOST_PROPERTY_NAME,
+      DefaultValue = FileSystemConfigurations.AZURE_BLOCK_LOCATION_HOST_DEFAULT)
+  private String azureBlockLocationHost;
+
+  @IntegerConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.AZURE_CONCURRENT_CONNECTION_VALUE_OUT,
+      MinValue = 1,
+      DefaultValue = FileSystemConfigurations.MAX_CONCURRENT_WRITE_THREADS)
+  private int maxConcurrentWriteThreads;
+
+  @IntegerConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.AZURE_CONCURRENT_CONNECTION_VALUE_IN,
+      MinValue = 1,
+      DefaultValue = FileSystemConfigurations.MAX_CONCURRENT_READ_THREADS)
+  private int maxConcurrentReadThreads;
+
+  @BooleanConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.AZURE_TOLERATE_CONCURRENT_APPEND,
+      DefaultValue = FileSystemConfigurations.DEFAULT_READ_TOLERATE_CONCURRENT_APPEND)
+  private boolean tolerateOobAppends;
+
+  @StringConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.FS_AZURE_ATOMIC_RENAME_KEY,
+          DefaultValue = FileSystemConfigurations.DEFAULT_FS_AZURE_ATOMIC_RENAME_DIRECTORIES)
+  private String azureAtomicDirs;
+
+  @BooleanConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.AZURE_CREATE_REMOTE_FILESYSTEM_DURING_INITIALIZATION,
+      DefaultValue = FileSystemConfigurations.DEFAULT_AZURE_CREATE_REMOTE_FILESYSTEM_DURING_INITIALIZATION)
+  private boolean createRemoteFileSystemDuringInitialization;
+
+  @IntegerConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.FS_AZURE_READ_AHEAD_QUEUE_DEPTH,
+      DefaultValue = FileSystemConfigurations.DEFAULT_READ_AHEAD_QUEUE_DEPTH)
+  private int readAheadQueueDepth;
+
+  private Map<String, String> storageAccountKeys;
+
+  @Inject
+  ConfigurationServiceImpl(final Configuration configuration) throws IllegalAccessException, InvalidConfigurationValueException {
+    this.configuration = configuration;
+    this.isSecure = this.configuration.getBoolean(ConfigurationKeys.FS_AZURE_SECURE_MODE, false);
+
+    validateStorageAccountKeys();
+    Field[] fields = this.getClass().getDeclaredFields();
+    for (Field field : fields) {
+      field.setAccessible(true);
+      if (field.isAnnotationPresent(IntegerConfigurationValidatorAnnotation.class)) {
+        field.set(this, validateInt(field));
+      } else if (field.isAnnotationPresent(LongConfigurationValidatorAnnotation.class)) {
+        field.set(this, validateLong(field));
+      } else if (field.isAnnotationPresent(StringConfigurationValidatorAnnotation.class)) {
+        field.set(this, validateString(field));
+      } else if (field.isAnnotationPresent(Base64StringConfigurationValidatorAnnotation.class)) {
+        field.set(this, validateBase64String(field));
+      } else if (field.isAnnotationPresent(BooleanConfigurationValidatorAnnotation.class)) {
+        field.set(this, validateBoolean(field));
+      }
+    }
+  }
+
+  @Override
+  public boolean isEmulator() {
+    return this.getConfiguration().getBoolean(ConfigurationKeys.FS_AZURE_EMULATOR_ENABLED, false);
+  }
+
+  @Override
+  public boolean isSecureMode() {
+    return this.isSecure;
+  }
+
+  @Override
+  public String getStorageAccountKey(final String accountName) throws ConfigurationPropertyNotFoundException {
+    String accountKey = this.storageAccountKeys.get(ConfigurationKeys.FS_AZURE_ACCOUNT_KEY_PROPERTY_NAME + accountName);
+    if (accountKey == null) {
+      throw new ConfigurationPropertyNotFoundException(accountName);
+    }
+
+    return accountKey;
+  }
+
+  @Override
+  public Configuration getConfiguration() {
+    return this.configuration;
+  }
+
+  @Override
+  public int getWriteBufferSize() {
+    return this.writeBufferSize;
+  }
+
+  @Override
+  public int getReadBufferSize() {
+    return this.readBufferSize;
+  }
+
+  @Override
+  public int getMinBackoffIntervalMilliseconds() {
+    return this.minBackoffInterval;
+  }
+
+  @Override
+  public int getMaxBackoffIntervalMilliseconds() {
+    return this.maxBackoffInterval;
+  }
+
+  @Override
+  public int getBackoffIntervalMilliseconds() {
+    return this.backoffInterval;
+  }
+
+  @Override
+  public int getMaxIoRetries() {
+    return this.maxIoRetries;
+  }
+
+  @Override
+  public long getAzureBlockSize() {
+    return this.azureBlockSize;
+  }
+
+  @Override
+  public String getAzureBlockLocationHost() {
+    return this.azureBlockLocationHost;
+  }
+
+  @Override
+  public int getMaxConcurrentWriteThreads() {
+    return this.maxConcurrentWriteThreads;
+  }
+
+  @Override
+  public int getMaxConcurrentReadThreads() {
+    return this.maxConcurrentReadThreads;
+  }
+
+  @Override
+  public boolean getTolerateOobAppends() {
+    return this.tolerateOobAppends;
+  }
+
+  @Override
+  public String getAzureAtomicRenameDirs() {
+    return this.azureAtomicDirs;
+  }
+
+  @Override
+  public boolean getCreateRemoteFileSystemDuringInitialization() {
+    return this.createRemoteFileSystemDuringInitialization;
+  }
+
+  @Override
+  public int getReadAheadQueueDepth() {
+    return this.readAheadQueueDepth;
+  }
+
+  void validateStorageAccountKeys() throws InvalidConfigurationValueException {
+    Base64StringConfigurationBasicValidator validator = new Base64StringConfigurationBasicValidator(
+        ConfigurationKeys.FS_AZURE_ACCOUNT_KEY_PROPERTY_NAME, "", true);
+    this.storageAccountKeys = this.configuration.getValByRegex(ConfigurationKeys.FS_AZURE_ACCOUNT_KEY_PROPERTY_NAME_REGX);
+
+    for (Map.Entry<String, String> account : this.storageAccountKeys.entrySet()) {
+      validator.validate(account.getValue());
+    }
+  }
+
+  int validateInt(Field field) throws IllegalAccessException, InvalidConfigurationValueException {
+    IntegerConfigurationValidatorAnnotation validator = field.getAnnotation(IntegerConfigurationValidatorAnnotation.class);
+    String value = this.configuration.get(validator.ConfigurationKey());
+
+    // validate
+    return new IntegerConfigurationBasicValidator(
+        validator.MinValue(),
+        validator.MaxValue(),
+        validator.DefaultValue(),
+        validator.ConfigurationKey(),
+        validator.ThrowIfInvalid()).validate(value);
+  }
+
+  long validateLong(Field field) throws IllegalAccessException, InvalidConfigurationValueException {
+    LongConfigurationValidatorAnnotation validator = field.getAnnotation(LongConfigurationValidatorAnnotation.class);
+    String value = this.configuration.get(validator.ConfigurationKey());
+
+    // validate
+    return new LongConfigurationBasicValidator(
+        validator.MinValue(),
+        validator.MaxValue(),
+        validator.DefaultValue(),
+        validator.ConfigurationKey(),
+        validator.ThrowIfInvalid()).validate(value);
+  }
+
+  String validateString(Field field) throws IllegalAccessException, InvalidConfigurationValueException {
+    StringConfigurationValidatorAnnotation validator = field.getAnnotation(StringConfigurationValidatorAnnotation.class);
+    String value = this.configuration.get(validator.ConfigurationKey());
+
+    // validate
+    return new StringConfigurationBasicValidator(
+        validator.ConfigurationKey(),
+        validator.DefaultValue(),
+        validator.ThrowIfInvalid()).validate(value);
+  }
+
+  String validateBase64String(Field field) throws IllegalAccessException, InvalidConfigurationValueException {
+    Base64StringConfigurationValidatorAnnotation validator = field.getAnnotation((Base64StringConfigurationValidatorAnnotation.class));
+    String value = this.configuration.get(validator.ConfigurationKey());
+
+    // validate
+    return new Base64StringConfigurationBasicValidator(
+        validator.ConfigurationKey(),
+        validator.DefaultValue(),
+        validator.ThrowIfInvalid()).validate(value);
+  }
+
+  boolean validateBoolean(Field field) throws IllegalAccessException, InvalidConfigurationValueException {
+    BooleanConfigurationValidatorAnnotation validator = field.getAnnotation(BooleanConfigurationValidatorAnnotation.class);
+    String value = this.configuration.get(validator.ConfigurationKey());
+
+    // validate
+    return new BooleanConfigurationBasicValidator(
+        validator.ConfigurationKey(),
+        validator.DefaultValue(),
+        validator.ThrowIfInvalid()).validate(value);
+  }
+
+  @VisibleForTesting
+  void setReadBufferSize(int bufferSize) {
+    this.readBufferSize = bufferSize;
+  }
+
+  @VisibleForTesting
+  void setWriteBufferSize(int bufferSize) {
+    this.writeBufferSize = bufferSize;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fdf5f4c3/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ExponentialRetryPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ExponentialRetryPolicy.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ExponentialRetryPolicy.java
new file mode 100644
index 0000000..0c92612
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ExponentialRetryPolicy.java
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.util.Random;
+import java.net.HttpURLConnection;
+
+class ExponentialRetryPolicy {
+  /**
+   * Represents the default number of retry attempts.
+   */
+  private static final int DEFAULT_CLIENT_RETRY_COUNT = 30;
+
+  /**
+   * Represents the default amount of time used when calculating a random delta in the exponential
+   * delay between retries.
+   */
+  private static final int DEFAULT_CLIENT_BACKOFF = 1000 * 3;
+
+  /**
+   * Represents the default maximum amount of time used when calculating the exponential
+   * delay between retries.
+   */
+  private static final int DEFAULT_MAX_BACKOFF = 1000 * 30;
+
+  /**
+   *Represents the default minimum amount of time used when calculating the exponential
+   * delay between retries.
+   */
+  private static final int DEFAULT_MIN_BACKOFF = 1000 * 3;
+
+  /**
+   *  The minimum random ratio used for delay interval calculation.
+   */
+  private static final double MIN_RANDOM_RATIO = 0.8;
+
+  /**
+   *  The maximum random ratio used for delay interval calculation.
+   */
+  private static final double MAX_RANDOM_RATIO = 1.2;
+
+  /**
+   *  Holds the random number generator used to calculate randomized backoff intervals
+   */
+  private final Random randRef = new Random();
+
+  /**
+   * The value that will be used to calculate a random delta in the exponential delay interval
+   */
+  private final int deltaBackoff;
+
+  /**
+   * The maximum backoff time.
+   */
+  private final int maxBackoff;
+
+  /**
+   * The minimum backoff time.
+   */
+  private final int minBackoff;
+
+  /**
+   * The maximum number of retry attempts.
+   */
+  private final int retryCount;
+
+  /**
+   * Initializes a new instance of the {@link ExponentialRetryPolicy} class.
+   */
+  ExponentialRetryPolicy() {
+    this(DEFAULT_CLIENT_RETRY_COUNT, DEFAULT_MIN_BACKOFF, DEFAULT_MAX_BACKOFF, DEFAULT_CLIENT_BACKOFF);
+  }
+
+  /**
+   * Initializes a new instance of the {@link ExponentialRetryPolicy} class.
+   *
+   * @param retryCount The maximum number of retry attempts.
+   * @param minBackoff The minimum backoff time.
+   * @param maxBackoff The maximum backoff time.
+   * @param deltaBackoff The value that will be used to calculate a random delta in the exponential delay
+   *                     between retries.
+   */
+  ExponentialRetryPolicy(final int retryCount, final int minBackoff, final int maxBackoff, final int deltaBackoff) {
+    this.retryCount = retryCount;
+    this.minBackoff = minBackoff;
+    this.maxBackoff = maxBackoff;
+    this.deltaBackoff = deltaBackoff;
+  }
+
+  /**
+   * Returns if a request should be retried based on the retry count, current response,
+   * and the current strategy.
+   *
+   * @param retryCount The current retry attempt count.
+   * @param statusCode The status code of the response, or -1 for socket error.
+   * @return true if the request should be retried; false otherwise.
+   */
+  public boolean shouldRetry(final int retryCount, final int statusCode) {
+    return retryCount < this.retryCount
+        && (statusCode == -1
+        || statusCode == HttpURLConnection.HTTP_CLIENT_TIMEOUT
+        || (statusCode >= HttpURLConnection.HTTP_INTERNAL_ERROR
+            && statusCode != HttpURLConnection.HTTP_NOT_IMPLEMENTED
+            && statusCode != HttpURLConnection.HTTP_VERSION));
+  }
+
+  /**
+   * Returns backoff interval between 80% and 120% of the desired backoff,
+   * multiply by 2^n-1 for exponential.
+   *
+   * @param retryCount The current retry attempt count.
+   * @return backoff Interval time
+   */
+  public long getRetryInterval(final int retryCount) {
+    final long boundedRandDelta = (int) (this.deltaBackoff * MIN_RANDOM_RATIO)
+        + this.randRef.nextInt((int) (this.deltaBackoff * MAX_RANDOM_RATIO)
+        - (int) (this.deltaBackoff * MIN_RANDOM_RATIO));
+
+    final double incrementDelta = (Math.pow(2, retryCount - 1)) * boundedRandDelta;
+
+    final long retryInterval = (int) Math.round(Math.min(this.minBackoff + incrementDelta, maxBackoff));
+
+    return retryInterval;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fdf5f4c3/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/LoggerSpanReceiver.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/LoggerSpanReceiver.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/LoggerSpanReceiver.java
new file mode 100644
index 0000000..99190e6
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/LoggerSpanReceiver.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.IOException;
+
+import com.google.common.base.Preconditions;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;
+import org.apache.htrace.core.HTraceConfiguration;
+import org.apache.htrace.core.Span;
+import org.apache.htrace.core.SpanReceiver;
+import org.apache.htrace.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.htrace.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.htrace.fasterxml.jackson.databind.ObjectWriter;
+import org.apache.htrace.fasterxml.jackson.databind.SerializationFeature;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * LoggerSpanReceiver is a layer between HTrace and log4j only used for {@link org.apache.hadoop.fs.azurebfs.contracts.services.TracingService}
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class LoggerSpanReceiver extends SpanReceiver {
+  private static final ObjectWriter JSON_WRITER =
+      new ObjectMapper()
+          .configure(SerializationFeature.INDENT_OUTPUT, true)
+          .configure(SerializationFeature.WRITE_BIGDECIMAL_AS_PLAIN, true)
+          .configure(SerializationFeature.WRITE_EMPTY_JSON_ARRAYS, false)
+          .configure(SerializationFeature.USE_EQUALITY_FOR_OBJECT_ID, false)
+          .writer();
+
+  public LoggerSpanReceiver(HTraceConfiguration hTraceConfiguration) {
+    Preconditions.checkNotNull(hTraceConfiguration, "hTraceConfiguration");
+  }
+
+  @Override
+  public void receiveSpan(final Span span) {
+    String jsonValue;
+
+    Logger logger = LoggerFactory.getLogger(AzureBlobFileSystem.class);
+
+    try {
+      jsonValue = JSON_WRITER.writeValueAsString(span);
+      logger.trace(jsonValue);
+    } catch (JsonProcessingException e) {
+      logger.error("Json processing error: " + e.getMessage());
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+    // No-Op
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fdf5f4c3/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBuffer.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBuffer.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBuffer.java
new file mode 100644
index 0000000..1fac13d
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBuffer.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import org.apache.hadoop.fs.azurebfs.contracts.services.ReadBufferStatus;
+
+import java.util.concurrent.CountDownLatch;
+
+class ReadBuffer {
+
+  private AbfsInputStream stream;
+  private long offset;                   // offset within the file for the buffer
+  private int length;                    // actual length, set after the buffer is filles
+  private int requestedLength;           // requested length of the read
+  private byte[] buffer;                 // the buffer itself
+  private int bufferindex = -1;          // index in the buffers array in Buffer manager
+  private ReadBufferStatus status;             // status of the buffer
+  private CountDownLatch latch = null;   // signaled when the buffer is done reading, so any client
+  // waiting on this buffer gets unblocked
+
+  // fields to help with eviction logic
+  private long timeStamp = 0;  // tick at which buffer became available to read
+  private boolean isFirstByteConsumed = false;
+  private boolean isLastByteConsumed = false;
+  private boolean isAnyByteConsumed = false;
+
+  public AbfsInputStream getStream() {
+    return stream;
+  }
+
+  public void setStream(AbfsInputStream stream) {
+    this.stream = stream;
+  }
+
+  public long getOffset() {
+    return offset;
+  }
+
+  public void setOffset(long offset) {
+    this.offset = offset;
+  }
+
+  public int getLength() {
+    return length;
+  }
+
+  public void setLength(int length) {
+    this.length = length;
+  }
+
+  public int getRequestedLength() {
+    return requestedLength;
+  }
+
+  public void setRequestedLength(int requestedLength) {
+    this.requestedLength = requestedLength;
+  }
+
+  public byte[] getBuffer() {
+    return buffer;
+  }
+
+  public void setBuffer(byte[] buffer) {
+    this.buffer = buffer;
+  }
+
+  public int getBufferindex() {
+    return bufferindex;
+  }
+
+  public void setBufferindex(int bufferindex) {
+    this.bufferindex = bufferindex;
+  }
+
+  public ReadBufferStatus getStatus() {
+    return status;
+  }
+
+  public void setStatus(ReadBufferStatus status) {
+    this.status = status;
+  }
+
+  public CountDownLatch getLatch() {
+    return latch;
+  }
+
+  public void setLatch(CountDownLatch latch) {
+    this.latch = latch;
+  }
+
+  public long getTimeStamp() {
+    return timeStamp;
+  }
+
+  public void setTimeStamp(long timeStamp) {
+    this.timeStamp = timeStamp;
+  }
+
+  public boolean isFirstByteConsumed() {
+    return isFirstByteConsumed;
+  }
+
+  public void setFirstByteConsumed(boolean isFirstByteConsumed) {
+    this.isFirstByteConsumed = isFirstByteConsumed;
+  }
+
+  public boolean isLastByteConsumed() {
+    return isLastByteConsumed;
+  }
+
+  public void setLastByteConsumed(boolean isLastByteConsumed) {
+    this.isLastByteConsumed = isLastByteConsumed;
+  }
+
+  public boolean isAnyByteConsumed() {
+    return isAnyByteConsumed;
+  }
+
+  public void setAnyByteConsumed(boolean isAnyByteConsumed) {
+    this.isAnyByteConsumed = isAnyByteConsumed;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fdf5f4c3/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java
new file mode 100644
index 0000000..164e549
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java
@@ -0,0 +1,391 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.azurebfs.services;
+
+import org.apache.hadoop.fs.azurebfs.contracts.services.ReadBufferStatus;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.Queue;
+import java.util.Stack;
+import java.util.concurrent.CountDownLatch;
+
+/**
+ * The Read Buffer Manager for Rest AbfsClient
+ */
+final class ReadBufferManager {
+  private static final Logger LOGGER = LoggerFactory.getLogger(ReadBufferManager.class);
+
+  private static final int NUM_BUFFERS = 16;
+  private static final int BLOCK_SIZE = 4 * 1024 * 1024;
+  private static final int NUM_THREADS = 8;
+  private static final int THRESHOLD_AGE_MILLISECONDS = 3000; // have to see if 3 seconds is a good threshold
+
+  private Thread[] threads = new Thread[NUM_THREADS];
+  private byte[][] buffers;    // array of byte[] buffers, to hold the data that is read
+  private Stack<Integer> freeList = new Stack<Integer>();   // indices in buffers[] array that are available
+
+  private Queue<ReadBuffer> readAheadQueue = new LinkedList<ReadBuffer>(); // queue of requests that are not picked up by any worker thread yet
+  private LinkedList<ReadBuffer> inProgressList = new LinkedList<ReadBuffer>(); // requests being processed by worker threads
+  private LinkedList<ReadBuffer> completedReadList = new LinkedList<ReadBuffer>(); // buffers available for reading
+  private static final ReadBufferManager BUFFER_MANAGER; // singleton, initialized in static initialization block
+
+  static {
+    BUFFER_MANAGER = new ReadBufferManager();
+    BUFFER_MANAGER.init();
+  }
+
+  static ReadBufferManager getBufferManager() {
+    return BUFFER_MANAGER;
+  }
+
+  private void init() {
+    buffers = new byte[NUM_BUFFERS][];
+    for (int i = 0; i < NUM_BUFFERS; i++) {
+      buffers[i] = new byte[BLOCK_SIZE];  // same buffers are reused. The byte array never goes back to GC
+      freeList.add(i);
+    }
+    for (int i = 0; i < NUM_THREADS; i++) {
+      Thread t = new Thread(new ReadBufferWorker(i));
+      t.setDaemon(true);
+      threads[i] = t;
+      t.setName("ABFS-prefetch-" + i);
+      t.start();
+    }
+    ReadBufferWorker.UNLEASH_WORKERS.countDown();
+  }
+
+  // hide instance constructor
+  private ReadBufferManager() {
+  }
+
+
+  /*
+   *
+   *  AbfsInputStream-facing methods
+   *
+   */
+
+
+  /**
+   * {@link AbfsInputStream} calls this method to queue read-aheads
+   *
+   * @param stream          The {@link AbfsInputStream} for which to do the read-ahead
+   * @param requestedOffset The offset in the file which shoukd be read
+   * @param requestedLength The length to read
+   */
+  void queueReadAhead(final AbfsInputStream stream, final long requestedOffset, final int requestedLength) {
+    if (LOGGER.isTraceEnabled()) {
+      LOGGER.trace("Start Queueing readAhead for " + stream.getPath() + " offset " + requestedOffset
+          + " length " + requestedLength);
+    }
+    ReadBuffer buffer;
+    synchronized (this) {
+      if (isAlreadyQueued(stream, requestedOffset)) {
+        return; // already queued, do not queue again
+      }
+      if (freeList.size() == 0 && !tryEvict()) {
+        return; // no buffers available, cannot queue anything
+      }
+
+      buffer = new ReadBuffer();
+      buffer.setStream(stream);
+      buffer.setOffset(requestedOffset);
+      buffer.setLength(0);
+      buffer.setRequestedLength(requestedLength);
+      buffer.setStatus(ReadBufferStatus.NOT_AVAILABLE);
+      buffer.setLatch(new CountDownLatch(1));
+
+      Integer bufferIndex = freeList.pop();  // will return a value, since we have checked size > 0 already
+
+      buffer.setBuffer(buffers[bufferIndex]);
+      buffer.setBufferindex(bufferIndex);
+      readAheadQueue.add(buffer);
+      notifyAll();
+    }
+    if (LOGGER.isTraceEnabled()) {
+      LOGGER.trace("Done q-ing readAhead for file " + stream.getPath() + " offset " + requestedOffset
+          + " buffer idx " + buffer.getBufferindex());
+    }
+  }
+
+
+  /**
+   * {@link AbfsInputStream} calls this method read any bytes already available in a buffer (thereby saving a
+   * remote read). This returns the bytes if the data already exists in buffer. If there is a buffer that is reading
+   * the requested offset, then this method blocks until that read completes. If the data is queued in a read-ahead
+   * but not picked up by a worker thread yet, then it cancels that read-ahead and reports cache miss. This is because
+   * depending on worker thread availability, the read-ahead may take a while - the calling thread can do it's own
+   * read to get the data faster (copmared to the read waiting in queue for an indeterminate amount of time).
+   *
+   * @param stream   the file to read bytes for
+   * @param position the offset in the file to do a read for
+   * @param length   the length to read
+   * @param buffer   the buffer to read data into. Note that the buffer will be written into from offset 0.
+   * @return the number of bytes read
+   */
+  int getBlock(final AbfsInputStream stream, final long position, final int length, final byte[] buffer) {
+    // not synchronized, so have to be careful with locking
+    if (LOGGER.isTraceEnabled()) {
+      LOGGER.trace("getBlock for file " + stream.getPath() + " position " + position + " thread " + Thread.currentThread().getName());
+    }
+
+    waitForProcess(stream, position);
+
+    int bytesRead = 0;
+    synchronized (this) {
+      bytesRead = getBlockFromCompletedQueue(stream, position, length, buffer);
+    }
+    if (bytesRead > 0) {
+      if (LOGGER.isTraceEnabled()) {
+        LOGGER.trace("Done read from Cache for " + stream.getPath() + " position " + position + " length " + bytesRead);
+      }
+      return bytesRead;
+    }
+
+    // otherwise, just say we got nothing - calling thread can do it's own read
+    return 0;
+  }
+
+  /*
+   *
+   *  Internal methods
+   *
+   */
+
+  private void waitForProcess(final AbfsInputStream stream, final long position) {
+    ReadBuffer readBuf;
+    synchronized (this) {
+      clearFromReadAheadQueue(stream, position);
+      readBuf = getFromList(inProgressList, stream, position);
+    }
+    if (readBuf != null) {         // if in in-progress queue, then block for it
+      try {
+        if (LOGGER.isTraceEnabled()) {
+          LOGGER.trace("got a relevant read buffer for file " + stream.getPath() + " offset " + readBuf.getOffset()
+                  + " buffer idx " + readBuf.getBufferindex());
+        }
+        readBuf.getLatch().await();  // blocking wait on the caller stream's thread
+        // Note on correctness: readBuf gets out of inProgressList only in 1 place: after worker thread
+        // is done processing it (in doneReading). There, the latch is set after removing the buffer from
+        // inProgressList. So this latch is safe to be outside the synchronized block.
+        // Putting it in synchronized would result in a deadlock, since this thread would be holding the lock
+        // while waiting, so no one will be able to  change any state. If this becomes more complex in the future,
+        // then the latch cane be removed and replaced with wait/notify whenever inProgressList is touched.
+      } catch (InterruptedException ex) {
+        Thread.currentThread().interrupt();
+      }
+      if (LOGGER.isTraceEnabled()) {
+        LOGGER.trace("latch done for file " + stream.getPath() + " buffer idx " + readBuf.getBufferindex()
+                + " length " + readBuf.getLength());
+      }
+    }
+  }
+
+  /**
+   * If any buffer in the completedlist can be reclaimed then reclaim it and return the buffer to free list.
+   * The objective is to find just one buffer - there is no advantage to evicting more than one.
+   *
+   * @return whether the eviction succeeeded - i.e., were we able to free up one buffer
+   */
+  private synchronized boolean tryEvict() {
+    ReadBuffer nodeToEvict = null;
+    if (completedReadList.size() <= 0) {
+      return false;  // there are no evict-able buffers
+    }
+
+    // first, try buffers where all bytes have been consumed (approximated as first and last bytes consumed)
+    for (ReadBuffer buf : completedReadList) {
+      if (buf.isFirstByteConsumed() && buf.isLastByteConsumed()) {
+        nodeToEvict = buf;
+        break;
+      }
+    }
+    if (nodeToEvict != null) {
+      return evict(nodeToEvict);
+    }
+
+    // next, try buffers where any bytes have been consumed (may be a bad idea? have to experiment and see)
+    for (ReadBuffer buf : completedReadList) {
+      if (buf.isAnyByteConsumed()) {
+        nodeToEvict = buf;
+        break;
+      }
+    }
+
+    if (nodeToEvict != null) {
+      return evict(nodeToEvict);
+    }
+
+    // next, try any old nodes that have not been consumed
+    long earliestBirthday = Long.MAX_VALUE;
+    for (ReadBuffer buf : completedReadList) {
+      if (buf.getTimeStamp() < earliestBirthday) {
+        nodeToEvict = buf;
+        earliestBirthday = buf.getTimeStamp();
+      }
+    }
+    if ((currentTimeMillis() - earliestBirthday > THRESHOLD_AGE_MILLISECONDS) && (nodeToEvict != null)) {
+      return evict(nodeToEvict);
+    }
+
+    // nothing can be evicted
+    return false;
+  }
+
+  private boolean evict(final ReadBuffer buf) {
+    freeList.push(buf.getBufferindex());
+    completedReadList.remove(buf);
+    if (LOGGER.isTraceEnabled()) {
+      LOGGER.trace("Evicting buffer idx " + buf.getBufferindex() + "; was used for file " + buf.getStream().getPath()
+          + " offset " + buf.getOffset() + " length " + buf.getLength());
+    }
+    return true;
+  }
+
+  private boolean isAlreadyQueued(final AbfsInputStream stream, final long requestedOffset) {
+    // returns true if any part of the buffer is already queued
+    return (isInList(readAheadQueue, stream, requestedOffset)
+        || isInList(inProgressList, stream, requestedOffset)
+        || isInList(completedReadList, stream, requestedOffset));
+  }
+
+  private boolean isInList(final Collection<ReadBuffer> list, final AbfsInputStream stream, final long requestedOffset) {
+    return (getFromList(list, stream, requestedOffset) != null);
+  }
+
+  private ReadBuffer getFromList(final Collection<ReadBuffer> list, final AbfsInputStream stream, final long requestedOffset) {
+    for (ReadBuffer buffer : list) {
+      if (buffer.getStream() == stream) {
+        if (buffer.getStatus() == ReadBufferStatus.AVAILABLE
+            && requestedOffset >= buffer.getOffset()
+            && requestedOffset < buffer.getOffset() + buffer.getLength()) {
+          return buffer;
+        } else if (requestedOffset >= buffer.getOffset()
+            && requestedOffset < buffer.getOffset() + buffer.getRequestedLength()) {
+          return buffer;
+        }
+      }
+    }
+    return null;
+  }
+
+  private void clearFromReadAheadQueue(final AbfsInputStream stream, final long requestedOffset) {
+    ReadBuffer buffer = getFromList(readAheadQueue, stream, requestedOffset);
+    if (buffer != null) {
+      readAheadQueue.remove(buffer);
+      notifyAll();   // lock is held in calling method
+      freeList.push(buffer.getBufferindex());
+    }
+  }
+
+  private int getBlockFromCompletedQueue(final AbfsInputStream stream, final long position, final int length,
+                                         final byte[] buffer) {
+    ReadBuffer buf = getFromList(completedReadList, stream, position);
+    if (buf == null || position >= buf.getOffset() + buf.getLength()) {
+      return 0;
+    }
+    int cursor = (int) (position - buf.getOffset());
+    int availableLengthInBuffer = buf.getLength() - cursor;
+    int lengthToCopy = Math.min(length, availableLengthInBuffer);
+    System.arraycopy(buf.getBuffer(), cursor, buffer, 0, lengthToCopy);
+    if (cursor == 0) {
+      buf.setFirstByteConsumed(true);
+    }
+    if (cursor + lengthToCopy == buf.getLength()) {
+      buf.setLastByteConsumed(true);
+    }
+    buf.setAnyByteConsumed(true);
+    return lengthToCopy;
+  }
+
+  /*
+   *
+   *  ReadBufferWorker-thread-facing methods
+   *
+   */
+
+  /**
+   * ReadBufferWorker thread calls this to get the next buffer that it should work on.
+   *
+   * @return {@link ReadBuffer}
+   * @throws InterruptedException if thread is interrupted
+   */
+  ReadBuffer getNextBlockToRead() throws InterruptedException {
+    ReadBuffer buffer = null;
+    synchronized (this) {
+      //buffer = readAheadQueue.take();  // blocking method
+      while (readAheadQueue.size() == 0) {
+        wait();
+      }
+      buffer = readAheadQueue.remove();
+      notifyAll();
+      if (buffer == null) {
+        return null;            // should never happen
+      }
+      buffer.setStatus(ReadBufferStatus.READING_IN_PROGRESS);
+      inProgressList.add(buffer);
+    }
+    if (LOGGER.isTraceEnabled()) {
+      LOGGER.trace("ReadBufferWorker picked file " + buffer.getStream().getPath() + " for offset " + buffer.getOffset());
+    }
+    return buffer;
+  }
+
+  /**
+   * ReadBufferWorker thread calls this method to post completion
+   *
+   * @param buffer            the buffer whose read was completed
+   * @param result            the {@link ReadBufferStatus} after the read operation in the worker thread
+   * @param bytesActuallyRead the number of bytes that the worker thread was actually able to read
+   */
+  void doneReading(final ReadBuffer buffer, final ReadBufferStatus result, final int bytesActuallyRead) {
+    if (LOGGER.isTraceEnabled()) {
+      LOGGER.trace("ReadBufferWorker completed file " + buffer.getStream().getPath() + " for offset " + buffer.getOffset()
+          + " bytes " + bytesActuallyRead);
+    }
+    synchronized (this) {
+      inProgressList.remove(buffer);
+      if (result == ReadBufferStatus.AVAILABLE && bytesActuallyRead > 0) {
+        buffer.setStatus(ReadBufferStatus.AVAILABLE);
+        buffer.setTimeStamp(currentTimeMillis());
+        buffer.setLength(bytesActuallyRead);
+        completedReadList.add(buffer);
+      } else {
+        freeList.push(buffer.getBufferindex());
+        // buffer should go out of scope after the end of the calling method in ReadBufferWorker, and eligible for GC
+      }
+    }
+    //outside the synchronized, since anyone receiving a wake-up from the latch must see safe-published results
+    buffer.getLatch().countDown(); // wake up waiting threads (if any)
+  }
+
+  /**
+   * Similar to System.currentTimeMillis, except implemented with System.nanoTime().
+   * System.currentTimeMillis can go backwards when system clock is changed (e.g., with NTP time synchronization),
+   * making it unsuitable for measuring time intervals. nanotime is strictly monotonically increasing,
+   * so it is much more suitable to measuring intervals.
+   *
+   * @return current time in milliseconds
+   */
+  private long currentTimeMillis() {
+    return System.nanoTime() / 1000 / 1000;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fdf5f4c3/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferWorker.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferWorker.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferWorker.java
new file mode 100644
index 0000000..2d0c96e
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferWorker.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import org.apache.hadoop.fs.azurebfs.contracts.services.ReadBufferStatus;
+
+import java.util.concurrent.CountDownLatch;
+
+class ReadBufferWorker implements Runnable {
+
+  protected static final CountDownLatch UNLEASH_WORKERS = new CountDownLatch(1);
+  private int id;
+
+  ReadBufferWorker(final int id) {
+    this.id = id;
+  }
+
+  /**
+   * return the ID of ReadBufferWorker.
+   */
+  public int getId() {
+    return this.id;
+  }
+
+  /**
+   * Waits until a buffer becomes available in ReadAheadQueue.
+   * Once a buffer becomes available, reads the file specified in it and then posts results back to buffer manager.
+   * Rinse and repeat. Forever.
+   */
+  public void run() {
+    try {
+      UNLEASH_WORKERS.await();
+    } catch (InterruptedException ex) {
+      Thread.currentThread().interrupt();
+    }
+    ReadBufferManager bufferManager = ReadBufferManager.getBufferManager();
+    ReadBuffer buffer;
+    while (true) {
+      try {
+        buffer = bufferManager.getNextBlockToRead();   // blocks, until a buffer is available for this thread
+      } catch (InterruptedException ex) {
+        Thread.currentThread().interrupt();
+        return;
+      }
+      if (buffer != null) {
+        try {
+          // do the actual read, from the file.
+          int bytesRead = buffer.getStream().readRemote(buffer.getOffset(), buffer.getBuffer(), 0, buffer.getRequestedLength());
+          bufferManager.doneReading(buffer, ReadBufferStatus.AVAILABLE, bytesRead);  // post result back to ReadBufferManager
+        } catch (Exception ex) {
+          bufferManager.doneReading(buffer, ReadBufferStatus.READ_FAILED, 0);
+        }
+      }
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org