You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by tm...@apache.org on 2018/08/11 05:37:47 UTC
[43/50] [abbrv] hadoop git commit: HADOOP-15407. HADOOP-15540.
Support Windows Azure Storage - Blob file system "ABFS" in Hadoop: Core
Commit.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fdf5f4c3/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java
new file mode 100644
index 0000000..de5c934
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java
@@ -0,0 +1,335 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.base.Preconditions;
+
+import org.apache.hadoop.fs.FSExceptionMessages;
+import org.apache.hadoop.fs.Syncable;
+import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
+
+/**
+ * The BlobFsOutputStream for Rest AbfsClient
+ */
+public class AbfsOutputStream extends OutputStream implements Syncable {
+ private final AbfsClient client;
+ private final String path;
+ private long position;
+ private boolean closed;
+ private volatile IOException lastError;
+
+ private long lastFlushOffset;
+ private long lastTotalAppendOffset = 0;
+
+ private final int bufferSize;
+ private byte[] buffer;
+ private int bufferIndex;
+ private final int maxConcurrentRequestCount;
+
+ private ConcurrentLinkedDeque<WriteOperation> writeOperations;
+ private final ThreadPoolExecutor threadExecutor;
+ private final ExecutorCompletionService<Void> completionService;
+
+ public AbfsOutputStream(
+ final AbfsClient client,
+ final String path,
+ final long position,
+ final int bufferSize) {
+ this.client = client;
+ this.path = path;
+ this.position = position;
+ this.closed = false;
+ this.lastError = null;
+ this.lastFlushOffset = 0;
+ this.bufferSize = bufferSize;
+ this.buffer = new byte[bufferSize];
+ this.bufferIndex = 0;
+ this.writeOperations = new ConcurrentLinkedDeque<>();
+
+ this.maxConcurrentRequestCount = 4 * Runtime.getRuntime().availableProcessors();
+
+ this.threadExecutor
+ = new ThreadPoolExecutor(maxConcurrentRequestCount,
+ maxConcurrentRequestCount,
+ 10L,
+ TimeUnit.SECONDS,
+ new LinkedBlockingQueue());
+ this.completionService = new ExecutorCompletionService(this.threadExecutor);
+ }
+
+ /**
+ * Writes the specified byte to this output stream. The general contract for
+ * write is that one byte is written to the output stream. The byte to be
+ * written is the eight low-order bits of the argument b. The 24 high-order
+ * bits of b are ignored.
+ *
+ * @param byteVal the byteValue to write.
+ * @throws IOException if an I/O error occurs. In particular, an IOException may be
+ * thrown if the output stream has been closed.
+ */
+ @Override
+ public void write(final int byteVal) throws IOException {
+ write(new byte[]{(byte) (byteVal & 0xFF)});
+ }
+
+ /**
+ * Writes length bytes from the specified byte array starting at off to
+ * this output stream.
+ *
+ * @param data the byte array to write.
+ * @param off the start off in the data.
+ * @param length the number of bytes to write.
+ * @throws IOException if an I/O error occurs. In particular, an IOException may be
+ * thrown if the output stream has been closed.
+ */
+ @Override
+ public synchronized void write(final byte[] data, final int off, final int length)
+ throws IOException {
+ if (this.lastError != null) {
+ throw this.lastError;
+ }
+
+ Preconditions.checkArgument(data != null, "null data");
+
+ if (off < 0 || length < 0 || length > data.length - off) {
+ throw new IndexOutOfBoundsException();
+ }
+
+ int currentOffset = off;
+ int writableBytes = bufferSize - bufferIndex;
+ int numberOfBytesToWrite = length;
+
+ while (numberOfBytesToWrite > 0) {
+ if (writableBytes <= numberOfBytesToWrite) {
+ System.arraycopy(data, currentOffset, buffer, bufferIndex, writableBytes);
+ bufferIndex += writableBytes;
+ writeCurrentBufferToService();
+ currentOffset += writableBytes;
+ numberOfBytesToWrite = numberOfBytesToWrite - writableBytes;
+ } else {
+ System.arraycopy(data, currentOffset, buffer, bufferIndex, numberOfBytesToWrite);
+ bufferIndex += numberOfBytesToWrite;
+ numberOfBytesToWrite = 0;
+ }
+
+ writableBytes = bufferSize - bufferIndex;
+ }
+ }
+
+ /**
+ * Flushes this output stream and forces any buffered output bytes to be
+ * written out. If any data remains in the payload it is committed to the
+ * service. Data is queued for writing and forced out to the service
+ * before the call returns.
+ */
+ @Override
+ public void flush() throws IOException {
+ this.flushInternalAsync();
+ }
+
+ /** Similar to posix fsync, flush out the data in client's user buffer
+ * all the way to the disk device (but the disk may have it in its cache).
+ * @throws IOException if error occurs
+ */
+ @Override
+ public void hsync() throws IOException {
+ this.flushInternal();
+ }
+
+ /** Flush out the data in client's user buffer. After the return of
+ * this call, new readers will see the data.
+ * @throws IOException if any error occurs
+ */
+ @Override
+ public void hflush() throws IOException {
+ this.flushInternal();
+ }
+
+ /**
+ * Force all data in the output stream to be written to Azure storage.
+ * Wait to return until this is complete. Close the access to the stream and
+ * shutdown the upload thread pool.
+ * If the blob was created, its lease will be released.
+ * Any error encountered caught in threads and stored will be rethrown here
+ * after cleanup.
+ */
+ @Override
+ public synchronized void close() throws IOException {
+ if (closed) {
+ return;
+ }
+
+ try {
+ this.flushInternal();
+ this.threadExecutor.shutdown();
+ } finally {
+ this.lastError = new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
+ this.buffer = null;
+ this.bufferIndex = 0;
+ this.closed = true;
+ this.writeOperations.clear();
+ if (!this.threadExecutor.isShutdown()) {
+ this.threadExecutor.shutdownNow();
+ }
+ }
+ }
+
+ private synchronized void flushInternal() throws IOException {
+ if (this.lastError != null) {
+ throw this.lastError;
+ }
+ this.writeCurrentBufferToService();
+ this.flushWrittenBytesToService();
+ }
+
+ private synchronized void flushInternalAsync() throws IOException {
+ if (this.lastError != null) {
+ throw this.lastError;
+ }
+ this.writeCurrentBufferToService();
+ this.flushWrittenBytesToServiceAsync();
+ }
+
+ private synchronized void writeCurrentBufferToService() throws IOException {
+ if (bufferIndex == 0) {
+ return;
+ }
+
+ final byte[] bytes = this.buffer;
+ final int bytesLength = bufferIndex;
+
+ this.buffer = new byte[bufferSize];
+ this.bufferIndex = 0;
+ final long offset = this.position;
+ this.position += bytesLength;
+
+ if (this.threadExecutor.getQueue().size() >= maxConcurrentRequestCount * 2) {
+ this.waitForTaskToComplete();
+ }
+
+ final Future job = this.completionService.submit(new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ client.append(path, offset, bytes, 0,
+ bytesLength);
+ return null;
+ }
+ });
+
+ this.writeOperations.add(new WriteOperation(job, offset, bytesLength));
+
+ // Try to shrink the queue
+ shrinkWriteOperationQueue();
+ }
+
+ private synchronized void flushWrittenBytesToService() throws IOException {
+ for (WriteOperation writeOperation : this.writeOperations) {
+ try {
+ writeOperation.task.get();
+ } catch (Exception ex) {
+ if (AzureBlobFileSystemException.class.isInstance(ex.getCause())) {
+ ex = AzureBlobFileSystemException.class.cast(ex.getCause());
+ }
+ this.lastError = new IOException(ex);
+ throw this.lastError;
+ }
+ }
+ flushWrittenBytesToServiceInternal(this.position, false);
+ }
+
+ private synchronized void flushWrittenBytesToServiceAsync() throws IOException {
+ shrinkWriteOperationQueue();
+
+ if (this.lastTotalAppendOffset > this.lastFlushOffset) {
+ this.flushWrittenBytesToServiceInternal(this.lastTotalAppendOffset, true);
+ }
+
+ this.lastTotalAppendOffset = 0;
+ }
+
+ private synchronized void flushWrittenBytesToServiceInternal(final long offset, final boolean retainUncommitedData) throws IOException {
+ try {
+ client.flush(path, offset, retainUncommitedData);
+ } catch (AzureBlobFileSystemException ex) {
+ throw new IOException(ex);
+ }
+ this.lastFlushOffset = offset;
+ }
+
+ /**
+ * Try to remove the completed write operations from the beginning of write
+ * operation FIFO queue.
+ */
+ private synchronized void shrinkWriteOperationQueue() throws IOException {
+ try {
+ while (this.writeOperations.peek() != null && this.writeOperations.peek().task.isDone()) {
+ this.writeOperations.peek().task.get();
+ this.lastTotalAppendOffset += this.writeOperations.peek().length;
+ this.writeOperations.remove();
+ }
+ } catch (Exception e) {
+ if (AzureBlobFileSystemException.class.isInstance(e.getCause())) {
+ this.lastError = IOException.class.cast(e.getCause());
+ } else {
+ this.lastError = new IOException(e);
+ }
+ throw this.lastError;
+ }
+ }
+
+ private void waitForTaskToComplete() throws IOException {
+ boolean completed;
+ for (completed = false; this.completionService.poll() != null; completed = true) {}
+
+ if (!completed) {
+ try {
+ this.completionService.take();
+ } catch (InterruptedException e) {
+ this.lastError = new IOException(e);
+ throw this.lastError;
+ }
+ }
+ }
+
+ private static class WriteOperation {
+ private final Future<Void> task;
+ private final long startOffset;
+ private final long length;
+
+ WriteOperation(final Future<Void> task, final long startOffset, final long length) {
+ Preconditions.checkNotNull(task, "task");
+ Preconditions.checkArgument(startOffset >= 0, "startOffset");
+ Preconditions.checkArgument(length >= 0, "length");
+
+ this.task = task;
+ this.startOffset = startOffset;
+ this.length = length;
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fdf5f4c3/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java
new file mode 100644
index 0000000..17fc35a
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java
@@ -0,0 +1,178 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.IOException;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.util.List;
+
+import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants;
+import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
+import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
+import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidAbfsRestOperationException;
+
+import org.slf4j.Logger;
+
+/**
+ * The AbfsRestOperation for Rest AbfsClient
+ */
+public class AbfsRestOperation {
+ // Blob FS client, which has the credentials, retry policy, and logs.
+ private final AbfsClient client;
+ // the HTTP method (PUT, PATCH, POST, GET, HEAD, or DELETE)
+ private final String method;
+ // full URL including query parameters
+ private final URL url;
+ // all the custom HTTP request headers provided by the caller
+ private final List<AbfsHttpHeader> requestHeaders;
+
+ // This is a simple operation class, where all the upload methods have a
+ // request body and all the download methods have a response body.
+ private final boolean hasRequestBody;
+
+ private final Logger logger;
+
+ // For uploads, this is the request entity body. For downloads,
+ // this will hold the response entity body.
+ private byte[] buffer;
+ private int bufferOffset;
+ private int bufferLength;
+
+ private AbfsHttpOperation result;
+
+ public AbfsHttpOperation getResult() {
+ return result;
+ }
+
+ /**
+ * Initializes a new REST operation.
+ *
+ * @param client The Blob FS client.
+ * @param method The HTTP method (PUT, PATCH, POST, GET, HEAD, or DELETE).
+ * @param url The full URL including query string parameters.
+ * @param requestHeaders The HTTP request headers.
+ */
+ AbfsRestOperation(final AbfsClient client,
+ final String method,
+ final URL url,
+ final List<AbfsHttpHeader> requestHeaders) {
+ this.client = client;
+ this.method = method;
+ this.url = url;
+ this.requestHeaders = requestHeaders;
+ this.hasRequestBody = (AbfsHttpConstants.HTTP_METHOD_PUT.equals(method)
+ || AbfsHttpConstants.HTTP_METHOD_PATCH.equals(method));
+ this.logger = client.LOG;
+ }
+
+ /**
+ * Initializes a new REST operation.
+ *
+ * @param client The Blob FS client.
+ * @param method The HTTP method (PUT, PATCH, POST, GET, HEAD, or DELETE).
+ * @param url The full URL including query string parameters.
+ * @param requestHeaders The HTTP request headers.
+ * @param buffer For uploads, this is the request entity body. For downloads,
+ * this will hold the response entity body.
+ * @param bufferOffset An offset into the buffer where the data beings.
+ * @param bufferLength The length of the data in the buffer.
+ */
+ AbfsRestOperation(AbfsClient client,
+ String method,
+ URL url,
+ List<AbfsHttpHeader> requestHeaders,
+ byte[] buffer,
+ int bufferOffset,
+ int bufferLength) {
+ this(client, method, url, requestHeaders);
+ this.buffer = buffer;
+ this.bufferOffset = bufferOffset;
+ this.bufferLength = bufferLength;
+ }
+
+ /**
+ * Executes the REST operation with retry, by issuing one or more
+ * HTTP operations.
+ */
+ void execute() throws AzureBlobFileSystemException {
+ int retryCount = 0;
+ while (!executeHttpOperation(retryCount++)) {
+ try {
+ Thread.sleep(client.getRetryPolicy().getRetryInterval(retryCount));
+ } catch (InterruptedException ex) {
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ if (result.getStatusCode() > HttpURLConnection.HTTP_BAD_REQUEST) {
+ throw new AbfsRestOperationException(result.getStatusCode(), result.getStorageErrorCode(),
+ result.getStorageErrorMessage(), null, result);
+ }
+ }
+
+ /**
+ * Executes a single HTTP operation to complete the REST operation. If it
+ * fails, there may be a retry. The retryCount is incremented with each
+ * attempt.
+ */
+ private boolean executeHttpOperation(final int retryCount) throws AzureBlobFileSystemException {
+ AbfsHttpOperation httpOperation = null;
+ try {
+ // initialize the HTTP request and open the connection
+ httpOperation = new AbfsHttpOperation(url, method, requestHeaders);
+
+ // sign the HTTP request
+ client.getSharedKeyCredentials().signRequest(
+ httpOperation.getConnection(),
+ hasRequestBody ? bufferLength : 0);
+
+ if (hasRequestBody) {
+ // HttpUrlConnection requires
+ httpOperation.sendRequest(buffer, bufferOffset, bufferLength);
+ }
+
+ httpOperation.processResponse(buffer, bufferOffset, bufferLength);
+ } catch (IOException ex) {
+ if (logger.isDebugEnabled()) {
+ if (httpOperation != null) {
+ logger.debug("HttpRequestFailure: " + httpOperation.toString(), ex);
+ } else {
+ logger.debug("HttpRequestFailure: " + method + "," + url, ex);
+ }
+ }
+ if (!client.getRetryPolicy().shouldRetry(retryCount, -1)) {
+ throw new InvalidAbfsRestOperationException(ex);
+ }
+ return false;
+ }
+
+ if (logger.isDebugEnabled()) {
+ logger.debug("HttpRequest: " + httpOperation.toString());
+ }
+
+ if (client.getRetryPolicy().shouldRetry(retryCount, httpOperation.getStatusCode())) {
+ return false;
+ }
+
+ result = httpOperation;
+
+ return true;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fdf5f4c3/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsServiceInjectorImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsServiceInjectorImpl.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsServiceInjectorImpl.java
new file mode 100644
index 0000000..1cbf6b5
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsServiceInjectorImpl.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import com.google.inject.AbstractModule;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.azurebfs.contracts.services.AbfsHttpClientFactory;
+import org.apache.hadoop.fs.azurebfs.contracts.services.AbfsHttpService;
+import org.apache.hadoop.fs.azurebfs.contracts.services.ConfigurationService;
+import org.apache.hadoop.fs.azurebfs.contracts.services.TracingService;
+
+/**
+ * This class is responsible to configure all the services used by Azure Blob File System.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+class AbfsServiceInjectorImpl extends AbstractModule {
+ private final Configuration configuration;
+ private final Map<Class, Class> providers;
+ private final Map<Class, Object> instances;
+
+ AbfsServiceInjectorImpl(Configuration configuration) {
+ this.providers = new HashMap<>();
+ this.instances = new HashMap<>();
+ this.configuration = configuration;
+
+ this.instances.put(Configuration.class, this.configuration);
+
+ this.providers.put(ConfigurationService.class, ConfigurationServiceImpl.class);
+
+ this.providers.put(AbfsHttpService.class, AbfsHttpServiceImpl.class);
+ this.providers.put(AbfsHttpClientFactory.class, AbfsHttpClientFactoryImpl.class);
+
+ this.providers.put(TracingService.class, TracingServiceImpl.class);
+ }
+
+ @Override
+ protected void configure() {
+ for (Map.Entry<Class, Object> entrySet : this.instances.entrySet()) {
+ bind(entrySet.getKey()).toInstance(entrySet.getValue());
+ }
+
+ for (Map.Entry<Class, Class> entrySet : this.providers.entrySet()) {
+ bind(entrySet.getKey()).to(entrySet.getValue());
+ }
+ }
+
+ protected Configuration getConfiguration() {
+ return this.configuration;
+ }
+
+ protected Map<Class, Class> getProviders() {
+ return this.providers;
+ }
+
+ protected Map<Class, Object> getInstances() {
+ return this.instances;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fdf5f4c3/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsServiceProviderImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsServiceProviderImpl.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsServiceProviderImpl.java
new file mode 100644
index 0000000..8560620
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsServiceProviderImpl.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.azurebfs.contracts.exceptions.ServiceResolutionException;
+import org.apache.hadoop.fs.azurebfs.contracts.services.AbfsServiceProvider;
+import org.apache.hadoop.fs.azurebfs.contracts.services.InjectableService;
+
+/**
+ * Dependency injected Azure Storage services provider.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public final class AbfsServiceProviderImpl implements AbfsServiceProvider {
+ private static AbfsServiceProviderImpl abfsServiceProvider;
+ private final Injector abfsServiceInjector;
+
+ private AbfsServiceProviderImpl(final Configuration configuration) {
+ this.abfsServiceInjector = Guice.createInjector(new AbfsServiceInjectorImpl(Preconditions.checkNotNull(configuration, "configuration")));
+ }
+
+ @VisibleForTesting
+ private AbfsServiceProviderImpl(final Injector abfsServiceInjector) {
+ Preconditions.checkNotNull(abfsServiceInjector, "abfsServiceInjector");
+ this.abfsServiceInjector = abfsServiceInjector;
+ }
+
+ /**
+ * Create an instance or returns existing instance of service provider.
+ * This method must be marked as synchronized to ensure thread-safety.
+ * @param configuration hadoop configuration.
+ * @return AbfsServiceProvider the service provider instance.
+ */
+ public static synchronized AbfsServiceProvider create(final Configuration configuration) {
+ if (abfsServiceProvider == null) {
+ abfsServiceProvider = new AbfsServiceProviderImpl(configuration);
+ }
+
+ return abfsServiceProvider;
+ }
+
+ /**
+ * Returns current instance of service provider.
+ * @return AbfsServiceProvider the service provider instance.
+ */
+ public static AbfsServiceProvider instance() {
+ return abfsServiceProvider;
+ }
+
+ @VisibleForTesting
+ static synchronized AbfsServiceProvider create(Injector serviceInjector) {
+ abfsServiceProvider = new AbfsServiceProviderImpl(serviceInjector);
+ return abfsServiceProvider;
+ }
+
+ /**
+ * Returns an instance of resolved injectable service by class name.
+ * The injectable service must be configured first to be resolvable.
+ * @param clazz the injectable service which is expected to be returned.
+ * @param <T> The type of injectable service.
+ * @return T instance
+ * @throws ServiceResolutionException if the service is not resolvable.
+ */
+ @Override
+ public <T extends InjectableService> T get(final Class<T> clazz) throws ServiceResolutionException {
+ try {
+ return this.abfsServiceInjector.getInstance(clazz);
+ } catch (Exception ex) {
+ throw new ServiceResolutionException(clazz.getSimpleName(), ex);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fdf5f4c3/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsUriQueryBuilder.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsUriQueryBuilder.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsUriQueryBuilder.java
new file mode 100644
index 0000000..bac66af
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsUriQueryBuilder.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * The UrlQueryBuilder for Rest AbfsClient
+ */
+public class AbfsUriQueryBuilder {
+ private Map<String, String> parameters;
+
+ public AbfsUriQueryBuilder() {
+ this.parameters = new HashMap<>();
+ }
+
+ public void addQuery(final String name, final String value) {
+ if (value != null && !value.isEmpty()) {
+ this.parameters.put(name, value);
+ }
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ boolean first = true;
+
+ for (Map.Entry<String, String> entry : parameters.entrySet()) {
+ if (first) {
+ sb.append(AbfsHttpConstants.QUESTION_MARK);
+ first = false;
+ } else {
+ sb.append(AbfsHttpConstants.AND_MARK);
+ }
+ sb.append(entry.getKey()).append(AbfsHttpConstants.EQUAL).append(entry.getValue());
+ }
+ return sb.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fdf5f4c3/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ConfigurationServiceImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ConfigurationServiceImpl.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ConfigurationServiceImpl.java
new file mode 100644
index 0000000..568ee5d
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ConfigurationServiceImpl.java
@@ -0,0 +1,317 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.lang.reflect.Field;
+import java.util.Map;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys;
+import org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations;
+import org.apache.hadoop.fs.azurebfs.contracts.annotations.ConfigurationValidationAnnotations.IntegerConfigurationValidatorAnnotation;
+import org.apache.hadoop.fs.azurebfs.contracts.annotations.ConfigurationValidationAnnotations.LongConfigurationValidatorAnnotation;
+import org.apache.hadoop.fs.azurebfs.contracts.annotations.ConfigurationValidationAnnotations.StringConfigurationValidatorAnnotation;
+import org.apache.hadoop.fs.azurebfs.contracts.annotations.ConfigurationValidationAnnotations.Base64StringConfigurationValidatorAnnotation;
+import org.apache.hadoop.fs.azurebfs.contracts.annotations.ConfigurationValidationAnnotations.BooleanConfigurationValidatorAnnotation;
+import org.apache.hadoop.fs.azurebfs.contracts.exceptions.ConfigurationPropertyNotFoundException;
+import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidConfigurationValueException;
+import org.apache.hadoop.fs.azurebfs.contracts.services.ConfigurationService;
+import org.apache.hadoop.fs.azurebfs.diagnostics.Base64StringConfigurationBasicValidator;
+import org.apache.hadoop.fs.azurebfs.diagnostics.BooleanConfigurationBasicValidator;
+import org.apache.hadoop.fs.azurebfs.diagnostics.IntegerConfigurationBasicValidator;
+import org.apache.hadoop.fs.azurebfs.diagnostics.LongConfigurationBasicValidator;
+import org.apache.hadoop.fs.azurebfs.diagnostics.StringConfigurationBasicValidator;
+
+@Singleton
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+class ConfigurationServiceImpl implements ConfigurationService {
+ private final Configuration configuration;
+ private final boolean isSecure;
+
+ @IntegerConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.AZURE_WRITE_BUFFER_SIZE,
+ MinValue = FileSystemConfigurations.MIN_BUFFER_SIZE,
+ MaxValue = FileSystemConfigurations.MAX_BUFFER_SIZE,
+ DefaultValue = FileSystemConfigurations.DEFAULT_WRITE_BUFFER_SIZE)
+ private int writeBufferSize;
+
+ @IntegerConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.AZURE_READ_BUFFER_SIZE,
+ MinValue = FileSystemConfigurations.MIN_BUFFER_SIZE,
+ MaxValue = FileSystemConfigurations.MAX_BUFFER_SIZE,
+ DefaultValue = FileSystemConfigurations.DEFAULT_READ_BUFFER_SIZE)
+ private int readBufferSize;
+
+ @IntegerConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.AZURE_MIN_BACKOFF_INTERVAL,
+ DefaultValue = FileSystemConfigurations.DEFAULT_MIN_BACKOFF_INTERVAL)
+ private int minBackoffInterval;
+
+ @IntegerConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.AZURE_MAX_BACKOFF_INTERVAL,
+ DefaultValue = FileSystemConfigurations.DEFAULT_MAX_BACKOFF_INTERVAL)
+ private int maxBackoffInterval;
+
+ @IntegerConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.AZURE_BACKOFF_INTERVAL,
+ DefaultValue = FileSystemConfigurations.DEFAULT_BACKOFF_INTERVAL)
+ private int backoffInterval;
+
+ @IntegerConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.AZURE_MAX_IO_RETRIES,
+ MinValue = 0,
+ DefaultValue = FileSystemConfigurations.DEFAULT_MAX_RETRY_ATTEMPTS)
+ private int maxIoRetries;
+
+ @LongConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.AZURE_BLOCK_SIZE_PROPERTY_NAME,
+ MinValue = 0,
+ MaxValue = FileSystemConfigurations.MAX_AZURE_BLOCK_SIZE,
+ DefaultValue = FileSystemConfigurations.MAX_AZURE_BLOCK_SIZE)
+ private long azureBlockSize;
+
+ @StringConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.AZURE_BLOCK_LOCATION_HOST_PROPERTY_NAME,
+ DefaultValue = FileSystemConfigurations.AZURE_BLOCK_LOCATION_HOST_DEFAULT)
+ private String azureBlockLocationHost;
+
+ @IntegerConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.AZURE_CONCURRENT_CONNECTION_VALUE_OUT,
+ MinValue = 1,
+ DefaultValue = FileSystemConfigurations.MAX_CONCURRENT_WRITE_THREADS)
+ private int maxConcurrentWriteThreads;
+
+ @IntegerConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.AZURE_CONCURRENT_CONNECTION_VALUE_IN,
+ MinValue = 1,
+ DefaultValue = FileSystemConfigurations.MAX_CONCURRENT_READ_THREADS)
+ private int maxConcurrentReadThreads;
+
+ @BooleanConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.AZURE_TOLERATE_CONCURRENT_APPEND,
+ DefaultValue = FileSystemConfigurations.DEFAULT_READ_TOLERATE_CONCURRENT_APPEND)
+ private boolean tolerateOobAppends;
+
+ @StringConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.FS_AZURE_ATOMIC_RENAME_KEY,
+ DefaultValue = FileSystemConfigurations.DEFAULT_FS_AZURE_ATOMIC_RENAME_DIRECTORIES)
+ private String azureAtomicDirs;
+
+ @BooleanConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.AZURE_CREATE_REMOTE_FILESYSTEM_DURING_INITIALIZATION,
+ DefaultValue = FileSystemConfigurations.DEFAULT_AZURE_CREATE_REMOTE_FILESYSTEM_DURING_INITIALIZATION)
+ private boolean createRemoteFileSystemDuringInitialization;
+
+ @IntegerConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.FS_AZURE_READ_AHEAD_QUEUE_DEPTH,
+ DefaultValue = FileSystemConfigurations.DEFAULT_READ_AHEAD_QUEUE_DEPTH)
+ private int readAheadQueueDepth;
+
+ private Map<String, String> storageAccountKeys;
+
+ @Inject
+ ConfigurationServiceImpl(final Configuration configuration) throws IllegalAccessException, InvalidConfigurationValueException {
+ this.configuration = configuration;
+ this.isSecure = this.configuration.getBoolean(ConfigurationKeys.FS_AZURE_SECURE_MODE, false);
+
+ validateStorageAccountKeys();
+ Field[] fields = this.getClass().getDeclaredFields();
+ for (Field field : fields) {
+ field.setAccessible(true);
+ if (field.isAnnotationPresent(IntegerConfigurationValidatorAnnotation.class)) {
+ field.set(this, validateInt(field));
+ } else if (field.isAnnotationPresent(LongConfigurationValidatorAnnotation.class)) {
+ field.set(this, validateLong(field));
+ } else if (field.isAnnotationPresent(StringConfigurationValidatorAnnotation.class)) {
+ field.set(this, validateString(field));
+ } else if (field.isAnnotationPresent(Base64StringConfigurationValidatorAnnotation.class)) {
+ field.set(this, validateBase64String(field));
+ } else if (field.isAnnotationPresent(BooleanConfigurationValidatorAnnotation.class)) {
+ field.set(this, validateBoolean(field));
+ }
+ }
+ }
+
+ @Override
+ public boolean isEmulator() {
+ return this.getConfiguration().getBoolean(ConfigurationKeys.FS_AZURE_EMULATOR_ENABLED, false);
+ }
+
+ @Override
+ public boolean isSecureMode() {
+ return this.isSecure;
+ }
+
+ @Override
+ public String getStorageAccountKey(final String accountName) throws ConfigurationPropertyNotFoundException {
+ String accountKey = this.storageAccountKeys.get(ConfigurationKeys.FS_AZURE_ACCOUNT_KEY_PROPERTY_NAME + accountName);
+ if (accountKey == null) {
+ throw new ConfigurationPropertyNotFoundException(accountName);
+ }
+
+ return accountKey;
+ }
+
+ @Override
+ public Configuration getConfiguration() {
+ return this.configuration;
+ }
+
+ @Override
+ public int getWriteBufferSize() {
+ return this.writeBufferSize;
+ }
+
+ @Override
+ public int getReadBufferSize() {
+ return this.readBufferSize;
+ }
+
+ @Override
+ public int getMinBackoffIntervalMilliseconds() {
+ return this.minBackoffInterval;
+ }
+
+ @Override
+ public int getMaxBackoffIntervalMilliseconds() {
+ return this.maxBackoffInterval;
+ }
+
+ @Override
+ public int getBackoffIntervalMilliseconds() {
+ return this.backoffInterval;
+ }
+
+ @Override
+ public int getMaxIoRetries() {
+ return this.maxIoRetries;
+ }
+
+ @Override
+ public long getAzureBlockSize() {
+ return this.azureBlockSize;
+ }
+
+ @Override
+ public String getAzureBlockLocationHost() {
+ return this.azureBlockLocationHost;
+ }
+
+ @Override
+ public int getMaxConcurrentWriteThreads() {
+ return this.maxConcurrentWriteThreads;
+ }
+
+ @Override
+ public int getMaxConcurrentReadThreads() {
+ return this.maxConcurrentReadThreads;
+ }
+
+ @Override
+ public boolean getTolerateOobAppends() {
+ return this.tolerateOobAppends;
+ }
+
+ @Override
+ public String getAzureAtomicRenameDirs() {
+ return this.azureAtomicDirs;
+ }
+
+ @Override
+ public boolean getCreateRemoteFileSystemDuringInitialization() {
+ return this.createRemoteFileSystemDuringInitialization;
+ }
+
+ @Override
+ public int getReadAheadQueueDepth() {
+ return this.readAheadQueueDepth;
+ }
+
+ void validateStorageAccountKeys() throws InvalidConfigurationValueException {
+ Base64StringConfigurationBasicValidator validator = new Base64StringConfigurationBasicValidator(
+ ConfigurationKeys.FS_AZURE_ACCOUNT_KEY_PROPERTY_NAME, "", true);
+ this.storageAccountKeys = this.configuration.getValByRegex(ConfigurationKeys.FS_AZURE_ACCOUNT_KEY_PROPERTY_NAME_REGX);
+
+ for (Map.Entry<String, String> account : this.storageAccountKeys.entrySet()) {
+ validator.validate(account.getValue());
+ }
+ }
+
+ int validateInt(Field field) throws IllegalAccessException, InvalidConfigurationValueException {
+ IntegerConfigurationValidatorAnnotation validator = field.getAnnotation(IntegerConfigurationValidatorAnnotation.class);
+ String value = this.configuration.get(validator.ConfigurationKey());
+
+ // validate
+ return new IntegerConfigurationBasicValidator(
+ validator.MinValue(),
+ validator.MaxValue(),
+ validator.DefaultValue(),
+ validator.ConfigurationKey(),
+ validator.ThrowIfInvalid()).validate(value);
+ }
+
+ long validateLong(Field field) throws IllegalAccessException, InvalidConfigurationValueException {
+ LongConfigurationValidatorAnnotation validator = field.getAnnotation(LongConfigurationValidatorAnnotation.class);
+ String value = this.configuration.get(validator.ConfigurationKey());
+
+ // validate
+ return new LongConfigurationBasicValidator(
+ validator.MinValue(),
+ validator.MaxValue(),
+ validator.DefaultValue(),
+ validator.ConfigurationKey(),
+ validator.ThrowIfInvalid()).validate(value);
+ }
+
+ String validateString(Field field) throws IllegalAccessException, InvalidConfigurationValueException {
+ StringConfigurationValidatorAnnotation validator = field.getAnnotation(StringConfigurationValidatorAnnotation.class);
+ String value = this.configuration.get(validator.ConfigurationKey());
+
+ // validate
+ return new StringConfigurationBasicValidator(
+ validator.ConfigurationKey(),
+ validator.DefaultValue(),
+ validator.ThrowIfInvalid()).validate(value);
+ }
+
+ String validateBase64String(Field field) throws IllegalAccessException, InvalidConfigurationValueException {
+ Base64StringConfigurationValidatorAnnotation validator = field.getAnnotation((Base64StringConfigurationValidatorAnnotation.class));
+ String value = this.configuration.get(validator.ConfigurationKey());
+
+ // validate
+ return new Base64StringConfigurationBasicValidator(
+ validator.ConfigurationKey(),
+ validator.DefaultValue(),
+ validator.ThrowIfInvalid()).validate(value);
+ }
+
+ boolean validateBoolean(Field field) throws IllegalAccessException, InvalidConfigurationValueException {
+ BooleanConfigurationValidatorAnnotation validator = field.getAnnotation(BooleanConfigurationValidatorAnnotation.class);
+ String value = this.configuration.get(validator.ConfigurationKey());
+
+ // validate
+ return new BooleanConfigurationBasicValidator(
+ validator.ConfigurationKey(),
+ validator.DefaultValue(),
+ validator.ThrowIfInvalid()).validate(value);
+ }
+
+ @VisibleForTesting
+ void setReadBufferSize(int bufferSize) {
+ this.readBufferSize = bufferSize;
+ }
+
+ @VisibleForTesting
+ void setWriteBufferSize(int bufferSize) {
+ this.writeBufferSize = bufferSize;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fdf5f4c3/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ExponentialRetryPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ExponentialRetryPolicy.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ExponentialRetryPolicy.java
new file mode 100644
index 0000000..0c92612
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ExponentialRetryPolicy.java
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.util.Random;
+import java.net.HttpURLConnection;
+
+class ExponentialRetryPolicy {
+ /**
+ * Represents the default number of retry attempts.
+ */
+ private static final int DEFAULT_CLIENT_RETRY_COUNT = 30;
+
+ /**
+ * Represents the default amount of time used when calculating a random delta in the exponential
+ * delay between retries.
+ */
+ private static final int DEFAULT_CLIENT_BACKOFF = 1000 * 3;
+
+ /**
+ * Represents the default maximum amount of time used when calculating the exponential
+ * delay between retries.
+ */
+ private static final int DEFAULT_MAX_BACKOFF = 1000 * 30;
+
+ /**
+ *Represents the default minimum amount of time used when calculating the exponential
+ * delay between retries.
+ */
+ private static final int DEFAULT_MIN_BACKOFF = 1000 * 3;
+
+ /**
+ * The minimum random ratio used for delay interval calculation.
+ */
+ private static final double MIN_RANDOM_RATIO = 0.8;
+
+ /**
+ * The maximum random ratio used for delay interval calculation.
+ */
+ private static final double MAX_RANDOM_RATIO = 1.2;
+
+ /**
+ * Holds the random number generator used to calculate randomized backoff intervals
+ */
+ private final Random randRef = new Random();
+
+ /**
+ * The value that will be used to calculate a random delta in the exponential delay interval
+ */
+ private final int deltaBackoff;
+
+ /**
+ * The maximum backoff time.
+ */
+ private final int maxBackoff;
+
+ /**
+ * The minimum backoff time.
+ */
+ private final int minBackoff;
+
+ /**
+ * The maximum number of retry attempts.
+ */
+ private final int retryCount;
+
+ /**
+ * Initializes a new instance of the {@link ExponentialRetryPolicy} class.
+ */
+ ExponentialRetryPolicy() {
+ this(DEFAULT_CLIENT_RETRY_COUNT, DEFAULT_MIN_BACKOFF, DEFAULT_MAX_BACKOFF, DEFAULT_CLIENT_BACKOFF);
+ }
+
+ /**
+ * Initializes a new instance of the {@link ExponentialRetryPolicy} class.
+ *
+ * @param retryCount The maximum number of retry attempts.
+ * @param minBackoff The minimum backoff time.
+ * @param maxBackoff The maximum backoff time.
+ * @param deltaBackoff The value that will be used to calculate a random delta in the exponential delay
+ * between retries.
+ */
+ ExponentialRetryPolicy(final int retryCount, final int minBackoff, final int maxBackoff, final int deltaBackoff) {
+ this.retryCount = retryCount;
+ this.minBackoff = minBackoff;
+ this.maxBackoff = maxBackoff;
+ this.deltaBackoff = deltaBackoff;
+ }
+
+ /**
+ * Returns if a request should be retried based on the retry count, current response,
+ * and the current strategy.
+ *
+ * @param retryCount The current retry attempt count.
+ * @param statusCode The status code of the response, or -1 for socket error.
+ * @return true if the request should be retried; false otherwise.
+ */
+ public boolean shouldRetry(final int retryCount, final int statusCode) {
+ return retryCount < this.retryCount
+ && (statusCode == -1
+ || statusCode == HttpURLConnection.HTTP_CLIENT_TIMEOUT
+ || (statusCode >= HttpURLConnection.HTTP_INTERNAL_ERROR
+ && statusCode != HttpURLConnection.HTTP_NOT_IMPLEMENTED
+ && statusCode != HttpURLConnection.HTTP_VERSION));
+ }
+
+ /**
+ * Returns backoff interval between 80% and 120% of the desired backoff,
+ * multiply by 2^n-1 for exponential.
+ *
+ * @param retryCount The current retry attempt count.
+ * @return backoff Interval time
+ */
+ public long getRetryInterval(final int retryCount) {
+ final long boundedRandDelta = (int) (this.deltaBackoff * MIN_RANDOM_RATIO)
+ + this.randRef.nextInt((int) (this.deltaBackoff * MAX_RANDOM_RATIO)
+ - (int) (this.deltaBackoff * MIN_RANDOM_RATIO));
+
+ final double incrementDelta = (Math.pow(2, retryCount - 1)) * boundedRandDelta;
+
+ final long retryInterval = (int) Math.round(Math.min(this.minBackoff + incrementDelta, maxBackoff));
+
+ return retryInterval;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fdf5f4c3/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/LoggerSpanReceiver.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/LoggerSpanReceiver.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/LoggerSpanReceiver.java
new file mode 100644
index 0000000..99190e6
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/LoggerSpanReceiver.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.IOException;
+
+import com.google.common.base.Preconditions;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;
+import org.apache.htrace.core.HTraceConfiguration;
+import org.apache.htrace.core.Span;
+import org.apache.htrace.core.SpanReceiver;
+import org.apache.htrace.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.htrace.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.htrace.fasterxml.jackson.databind.ObjectWriter;
+import org.apache.htrace.fasterxml.jackson.databind.SerializationFeature;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * LoggerSpanReceiver is a layer between HTrace and log4j only used for {@link org.apache.hadoop.fs.azurebfs.contracts.services.TracingService}
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class LoggerSpanReceiver extends SpanReceiver {
+ private static final ObjectWriter JSON_WRITER =
+ new ObjectMapper()
+ .configure(SerializationFeature.INDENT_OUTPUT, true)
+ .configure(SerializationFeature.WRITE_BIGDECIMAL_AS_PLAIN, true)
+ .configure(SerializationFeature.WRITE_EMPTY_JSON_ARRAYS, false)
+ .configure(SerializationFeature.USE_EQUALITY_FOR_OBJECT_ID, false)
+ .writer();
+
+ public LoggerSpanReceiver(HTraceConfiguration hTraceConfiguration) {
+ Preconditions.checkNotNull(hTraceConfiguration, "hTraceConfiguration");
+ }
+
+ @Override
+ public void receiveSpan(final Span span) {
+ String jsonValue;
+
+ Logger logger = LoggerFactory.getLogger(AzureBlobFileSystem.class);
+
+ try {
+ jsonValue = JSON_WRITER.writeValueAsString(span);
+ logger.trace(jsonValue);
+ } catch (JsonProcessingException e) {
+ logger.error("Json processing error: " + e.getMessage());
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ // No-Op
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fdf5f4c3/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBuffer.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBuffer.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBuffer.java
new file mode 100644
index 0000000..1fac13d
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBuffer.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import org.apache.hadoop.fs.azurebfs.contracts.services.ReadBufferStatus;
+
+import java.util.concurrent.CountDownLatch;
+
+class ReadBuffer {
+
+ private AbfsInputStream stream;
+ private long offset; // offset within the file for the buffer
+ private int length; // actual length, set after the buffer is filles
+ private int requestedLength; // requested length of the read
+ private byte[] buffer; // the buffer itself
+ private int bufferindex = -1; // index in the buffers array in Buffer manager
+ private ReadBufferStatus status; // status of the buffer
+ private CountDownLatch latch = null; // signaled when the buffer is done reading, so any client
+ // waiting on this buffer gets unblocked
+
+ // fields to help with eviction logic
+ private long timeStamp = 0; // tick at which buffer became available to read
+ private boolean isFirstByteConsumed = false;
+ private boolean isLastByteConsumed = false;
+ private boolean isAnyByteConsumed = false;
+
+ public AbfsInputStream getStream() {
+ return stream;
+ }
+
+ public void setStream(AbfsInputStream stream) {
+ this.stream = stream;
+ }
+
+ public long getOffset() {
+ return offset;
+ }
+
+ public void setOffset(long offset) {
+ this.offset = offset;
+ }
+
+ public int getLength() {
+ return length;
+ }
+
+ public void setLength(int length) {
+ this.length = length;
+ }
+
+ public int getRequestedLength() {
+ return requestedLength;
+ }
+
+ public void setRequestedLength(int requestedLength) {
+ this.requestedLength = requestedLength;
+ }
+
+ public byte[] getBuffer() {
+ return buffer;
+ }
+
+ public void setBuffer(byte[] buffer) {
+ this.buffer = buffer;
+ }
+
+ public int getBufferindex() {
+ return bufferindex;
+ }
+
+ public void setBufferindex(int bufferindex) {
+ this.bufferindex = bufferindex;
+ }
+
+ public ReadBufferStatus getStatus() {
+ return status;
+ }
+
+ public void setStatus(ReadBufferStatus status) {
+ this.status = status;
+ }
+
+ public CountDownLatch getLatch() {
+ return latch;
+ }
+
+ public void setLatch(CountDownLatch latch) {
+ this.latch = latch;
+ }
+
+ public long getTimeStamp() {
+ return timeStamp;
+ }
+
+ public void setTimeStamp(long timeStamp) {
+ this.timeStamp = timeStamp;
+ }
+
+ public boolean isFirstByteConsumed() {
+ return isFirstByteConsumed;
+ }
+
+ public void setFirstByteConsumed(boolean isFirstByteConsumed) {
+ this.isFirstByteConsumed = isFirstByteConsumed;
+ }
+
+ public boolean isLastByteConsumed() {
+ return isLastByteConsumed;
+ }
+
+ public void setLastByteConsumed(boolean isLastByteConsumed) {
+ this.isLastByteConsumed = isLastByteConsumed;
+ }
+
+ public boolean isAnyByteConsumed() {
+ return isAnyByteConsumed;
+ }
+
+ public void setAnyByteConsumed(boolean isAnyByteConsumed) {
+ this.isAnyByteConsumed = isAnyByteConsumed;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fdf5f4c3/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java
new file mode 100644
index 0000000..164e549
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java
@@ -0,0 +1,391 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.azurebfs.services;
+
+import org.apache.hadoop.fs.azurebfs.contracts.services.ReadBufferStatus;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.Queue;
+import java.util.Stack;
+import java.util.concurrent.CountDownLatch;
+
+/**
+ * The Read Buffer Manager for Rest AbfsClient
+ */
+final class ReadBufferManager {
+ private static final Logger LOGGER = LoggerFactory.getLogger(ReadBufferManager.class);
+
+ private static final int NUM_BUFFERS = 16;
+ private static final int BLOCK_SIZE = 4 * 1024 * 1024;
+ private static final int NUM_THREADS = 8;
+ private static final int THRESHOLD_AGE_MILLISECONDS = 3000; // have to see if 3 seconds is a good threshold
+
+ private Thread[] threads = new Thread[NUM_THREADS];
+ private byte[][] buffers; // array of byte[] buffers, to hold the data that is read
+ private Stack<Integer> freeList = new Stack<Integer>(); // indices in buffers[] array that are available
+
+ private Queue<ReadBuffer> readAheadQueue = new LinkedList<ReadBuffer>(); // queue of requests that are not picked up by any worker thread yet
+ private LinkedList<ReadBuffer> inProgressList = new LinkedList<ReadBuffer>(); // requests being processed by worker threads
+ private LinkedList<ReadBuffer> completedReadList = new LinkedList<ReadBuffer>(); // buffers available for reading
+ private static final ReadBufferManager BUFFER_MANAGER; // singleton, initialized in static initialization block
+
+ static {
+ BUFFER_MANAGER = new ReadBufferManager();
+ BUFFER_MANAGER.init();
+ }
+
+ static ReadBufferManager getBufferManager() {
+ return BUFFER_MANAGER;
+ }
+
+ private void init() {
+ buffers = new byte[NUM_BUFFERS][];
+ for (int i = 0; i < NUM_BUFFERS; i++) {
+ buffers[i] = new byte[BLOCK_SIZE]; // same buffers are reused. The byte array never goes back to GC
+ freeList.add(i);
+ }
+ for (int i = 0; i < NUM_THREADS; i++) {
+ Thread t = new Thread(new ReadBufferWorker(i));
+ t.setDaemon(true);
+ threads[i] = t;
+ t.setName("ABFS-prefetch-" + i);
+ t.start();
+ }
+ ReadBufferWorker.UNLEASH_WORKERS.countDown();
+ }
+
+ // hide instance constructor
+ private ReadBufferManager() {
+ }
+
+
+ /*
+ *
+ * AbfsInputStream-facing methods
+ *
+ */
+
+
+ /**
+ * {@link AbfsInputStream} calls this method to queue read-aheads
+ *
+ * @param stream The {@link AbfsInputStream} for which to do the read-ahead
+ * @param requestedOffset The offset in the file which shoukd be read
+ * @param requestedLength The length to read
+ */
+ void queueReadAhead(final AbfsInputStream stream, final long requestedOffset, final int requestedLength) {
+ if (LOGGER.isTraceEnabled()) {
+ LOGGER.trace("Start Queueing readAhead for " + stream.getPath() + " offset " + requestedOffset
+ + " length " + requestedLength);
+ }
+ ReadBuffer buffer;
+ synchronized (this) {
+ if (isAlreadyQueued(stream, requestedOffset)) {
+ return; // already queued, do not queue again
+ }
+ if (freeList.size() == 0 && !tryEvict()) {
+ return; // no buffers available, cannot queue anything
+ }
+
+ buffer = new ReadBuffer();
+ buffer.setStream(stream);
+ buffer.setOffset(requestedOffset);
+ buffer.setLength(0);
+ buffer.setRequestedLength(requestedLength);
+ buffer.setStatus(ReadBufferStatus.NOT_AVAILABLE);
+ buffer.setLatch(new CountDownLatch(1));
+
+ Integer bufferIndex = freeList.pop(); // will return a value, since we have checked size > 0 already
+
+ buffer.setBuffer(buffers[bufferIndex]);
+ buffer.setBufferindex(bufferIndex);
+ readAheadQueue.add(buffer);
+ notifyAll();
+ }
+ if (LOGGER.isTraceEnabled()) {
+ LOGGER.trace("Done q-ing readAhead for file " + stream.getPath() + " offset " + requestedOffset
+ + " buffer idx " + buffer.getBufferindex());
+ }
+ }
+
+
+ /**
+ * {@link AbfsInputStream} calls this method read any bytes already available in a buffer (thereby saving a
+ * remote read). This returns the bytes if the data already exists in buffer. If there is a buffer that is reading
+ * the requested offset, then this method blocks until that read completes. If the data is queued in a read-ahead
+ * but not picked up by a worker thread yet, then it cancels that read-ahead and reports cache miss. This is because
+ * depending on worker thread availability, the read-ahead may take a while - the calling thread can do it's own
+ * read to get the data faster (copmared to the read waiting in queue for an indeterminate amount of time).
+ *
+ * @param stream the file to read bytes for
+ * @param position the offset in the file to do a read for
+ * @param length the length to read
+ * @param buffer the buffer to read data into. Note that the buffer will be written into from offset 0.
+ * @return the number of bytes read
+ */
+ int getBlock(final AbfsInputStream stream, final long position, final int length, final byte[] buffer) {
+ // not synchronized, so have to be careful with locking
+ if (LOGGER.isTraceEnabled()) {
+ LOGGER.trace("getBlock for file " + stream.getPath() + " position " + position + " thread " + Thread.currentThread().getName());
+ }
+
+ waitForProcess(stream, position);
+
+ int bytesRead = 0;
+ synchronized (this) {
+ bytesRead = getBlockFromCompletedQueue(stream, position, length, buffer);
+ }
+ if (bytesRead > 0) {
+ if (LOGGER.isTraceEnabled()) {
+ LOGGER.trace("Done read from Cache for " + stream.getPath() + " position " + position + " length " + bytesRead);
+ }
+ return bytesRead;
+ }
+
+ // otherwise, just say we got nothing - calling thread can do it's own read
+ return 0;
+ }
+
+ /*
+ *
+ * Internal methods
+ *
+ */
+
+ private void waitForProcess(final AbfsInputStream stream, final long position) {
+ ReadBuffer readBuf;
+ synchronized (this) {
+ clearFromReadAheadQueue(stream, position);
+ readBuf = getFromList(inProgressList, stream, position);
+ }
+ if (readBuf != null) { // if in in-progress queue, then block for it
+ try {
+ if (LOGGER.isTraceEnabled()) {
+ LOGGER.trace("got a relevant read buffer for file " + stream.getPath() + " offset " + readBuf.getOffset()
+ + " buffer idx " + readBuf.getBufferindex());
+ }
+ readBuf.getLatch().await(); // blocking wait on the caller stream's thread
+ // Note on correctness: readBuf gets out of inProgressList only in 1 place: after worker thread
+ // is done processing it (in doneReading). There, the latch is set after removing the buffer from
+ // inProgressList. So this latch is safe to be outside the synchronized block.
+ // Putting it in synchronized would result in a deadlock, since this thread would be holding the lock
+ // while waiting, so no one will be able to change any state. If this becomes more complex in the future,
+ // then the latch cane be removed and replaced with wait/notify whenever inProgressList is touched.
+ } catch (InterruptedException ex) {
+ Thread.currentThread().interrupt();
+ }
+ if (LOGGER.isTraceEnabled()) {
+ LOGGER.trace("latch done for file " + stream.getPath() + " buffer idx " + readBuf.getBufferindex()
+ + " length " + readBuf.getLength());
+ }
+ }
+ }
+
+ /**
+ * If any buffer in the completedlist can be reclaimed then reclaim it and return the buffer to free list.
+ * The objective is to find just one buffer - there is no advantage to evicting more than one.
+ *
+ * @return whether the eviction succeeeded - i.e., were we able to free up one buffer
+ */
+ private synchronized boolean tryEvict() {
+ ReadBuffer nodeToEvict = null;
+ if (completedReadList.size() <= 0) {
+ return false; // there are no evict-able buffers
+ }
+
+ // first, try buffers where all bytes have been consumed (approximated as first and last bytes consumed)
+ for (ReadBuffer buf : completedReadList) {
+ if (buf.isFirstByteConsumed() && buf.isLastByteConsumed()) {
+ nodeToEvict = buf;
+ break;
+ }
+ }
+ if (nodeToEvict != null) {
+ return evict(nodeToEvict);
+ }
+
+ // next, try buffers where any bytes have been consumed (may be a bad idea? have to experiment and see)
+ for (ReadBuffer buf : completedReadList) {
+ if (buf.isAnyByteConsumed()) {
+ nodeToEvict = buf;
+ break;
+ }
+ }
+
+ if (nodeToEvict != null) {
+ return evict(nodeToEvict);
+ }
+
+ // next, try any old nodes that have not been consumed
+ long earliestBirthday = Long.MAX_VALUE;
+ for (ReadBuffer buf : completedReadList) {
+ if (buf.getTimeStamp() < earliestBirthday) {
+ nodeToEvict = buf;
+ earliestBirthday = buf.getTimeStamp();
+ }
+ }
+ if ((currentTimeMillis() - earliestBirthday > THRESHOLD_AGE_MILLISECONDS) && (nodeToEvict != null)) {
+ return evict(nodeToEvict);
+ }
+
+ // nothing can be evicted
+ return false;
+ }
+
+ private boolean evict(final ReadBuffer buf) {
+ freeList.push(buf.getBufferindex());
+ completedReadList.remove(buf);
+ if (LOGGER.isTraceEnabled()) {
+ LOGGER.trace("Evicting buffer idx " + buf.getBufferindex() + "; was used for file " + buf.getStream().getPath()
+ + " offset " + buf.getOffset() + " length " + buf.getLength());
+ }
+ return true;
+ }
+
+ private boolean isAlreadyQueued(final AbfsInputStream stream, final long requestedOffset) {
+ // returns true if any part of the buffer is already queued
+ return (isInList(readAheadQueue, stream, requestedOffset)
+ || isInList(inProgressList, stream, requestedOffset)
+ || isInList(completedReadList, stream, requestedOffset));
+ }
+
+ private boolean isInList(final Collection<ReadBuffer> list, final AbfsInputStream stream, final long requestedOffset) {
+ return (getFromList(list, stream, requestedOffset) != null);
+ }
+
+ private ReadBuffer getFromList(final Collection<ReadBuffer> list, final AbfsInputStream stream, final long requestedOffset) {
+ for (ReadBuffer buffer : list) {
+ if (buffer.getStream() == stream) {
+ if (buffer.getStatus() == ReadBufferStatus.AVAILABLE
+ && requestedOffset >= buffer.getOffset()
+ && requestedOffset < buffer.getOffset() + buffer.getLength()) {
+ return buffer;
+ } else if (requestedOffset >= buffer.getOffset()
+ && requestedOffset < buffer.getOffset() + buffer.getRequestedLength()) {
+ return buffer;
+ }
+ }
+ }
+ return null;
+ }
+
+ private void clearFromReadAheadQueue(final AbfsInputStream stream, final long requestedOffset) {
+ ReadBuffer buffer = getFromList(readAheadQueue, stream, requestedOffset);
+ if (buffer != null) {
+ readAheadQueue.remove(buffer);
+ notifyAll(); // lock is held in calling method
+ freeList.push(buffer.getBufferindex());
+ }
+ }
+
+ private int getBlockFromCompletedQueue(final AbfsInputStream stream, final long position, final int length,
+ final byte[] buffer) {
+ ReadBuffer buf = getFromList(completedReadList, stream, position);
+ if (buf == null || position >= buf.getOffset() + buf.getLength()) {
+ return 0;
+ }
+ int cursor = (int) (position - buf.getOffset());
+ int availableLengthInBuffer = buf.getLength() - cursor;
+ int lengthToCopy = Math.min(length, availableLengthInBuffer);
+ System.arraycopy(buf.getBuffer(), cursor, buffer, 0, lengthToCopy);
+ if (cursor == 0) {
+ buf.setFirstByteConsumed(true);
+ }
+ if (cursor + lengthToCopy == buf.getLength()) {
+ buf.setLastByteConsumed(true);
+ }
+ buf.setAnyByteConsumed(true);
+ return lengthToCopy;
+ }
+
+ /*
+ *
+ * ReadBufferWorker-thread-facing methods
+ *
+ */
+
+ /**
+ * ReadBufferWorker thread calls this to get the next buffer that it should work on.
+ *
+ * @return {@link ReadBuffer}
+ * @throws InterruptedException if thread is interrupted
+ */
+ ReadBuffer getNextBlockToRead() throws InterruptedException {
+ ReadBuffer buffer = null;
+ synchronized (this) {
+ //buffer = readAheadQueue.take(); // blocking method
+ while (readAheadQueue.size() == 0) {
+ wait();
+ }
+ buffer = readAheadQueue.remove();
+ notifyAll();
+ if (buffer == null) {
+ return null; // should never happen
+ }
+ buffer.setStatus(ReadBufferStatus.READING_IN_PROGRESS);
+ inProgressList.add(buffer);
+ }
+ if (LOGGER.isTraceEnabled()) {
+ LOGGER.trace("ReadBufferWorker picked file " + buffer.getStream().getPath() + " for offset " + buffer.getOffset());
+ }
+ return buffer;
+ }
+
+ /**
+ * ReadBufferWorker thread calls this method to post completion
+ *
+ * @param buffer the buffer whose read was completed
+ * @param result the {@link ReadBufferStatus} after the read operation in the worker thread
+ * @param bytesActuallyRead the number of bytes that the worker thread was actually able to read
+ */
+ void doneReading(final ReadBuffer buffer, final ReadBufferStatus result, final int bytesActuallyRead) {
+ if (LOGGER.isTraceEnabled()) {
+ LOGGER.trace("ReadBufferWorker completed file " + buffer.getStream().getPath() + " for offset " + buffer.getOffset()
+ + " bytes " + bytesActuallyRead);
+ }
+ synchronized (this) {
+ inProgressList.remove(buffer);
+ if (result == ReadBufferStatus.AVAILABLE && bytesActuallyRead > 0) {
+ buffer.setStatus(ReadBufferStatus.AVAILABLE);
+ buffer.setTimeStamp(currentTimeMillis());
+ buffer.setLength(bytesActuallyRead);
+ completedReadList.add(buffer);
+ } else {
+ freeList.push(buffer.getBufferindex());
+ // buffer should go out of scope after the end of the calling method in ReadBufferWorker, and eligible for GC
+ }
+ }
+ //outside the synchronized, since anyone receiving a wake-up from the latch must see safe-published results
+ buffer.getLatch().countDown(); // wake up waiting threads (if any)
+ }
+
+ /**
+ * Similar to System.currentTimeMillis, except implemented with System.nanoTime().
+ * System.currentTimeMillis can go backwards when system clock is changed (e.g., with NTP time synchronization),
+ * making it unsuitable for measuring time intervals. nanotime is strictly monotonically increasing,
+ * so it is much more suitable to measuring intervals.
+ *
+ * @return current time in milliseconds
+ */
+ private long currentTimeMillis() {
+ return System.nanoTime() / 1000 / 1000;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fdf5f4c3/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferWorker.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferWorker.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferWorker.java
new file mode 100644
index 0000000..2d0c96e
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferWorker.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import org.apache.hadoop.fs.azurebfs.contracts.services.ReadBufferStatus;
+
+import java.util.concurrent.CountDownLatch;
+
+class ReadBufferWorker implements Runnable {
+
+ protected static final CountDownLatch UNLEASH_WORKERS = new CountDownLatch(1);
+ private int id;
+
+ ReadBufferWorker(final int id) {
+ this.id = id;
+ }
+
+ /**
+ * return the ID of ReadBufferWorker.
+ */
+ public int getId() {
+ return this.id;
+ }
+
+ /**
+ * Waits until a buffer becomes available in ReadAheadQueue.
+ * Once a buffer becomes available, reads the file specified in it and then posts results back to buffer manager.
+ * Rinse and repeat. Forever.
+ */
+ public void run() {
+ try {
+ UNLEASH_WORKERS.await();
+ } catch (InterruptedException ex) {
+ Thread.currentThread().interrupt();
+ }
+ ReadBufferManager bufferManager = ReadBufferManager.getBufferManager();
+ ReadBuffer buffer;
+ while (true) {
+ try {
+ buffer = bufferManager.getNextBlockToRead(); // blocks, until a buffer is available for this thread
+ } catch (InterruptedException ex) {
+ Thread.currentThread().interrupt();
+ return;
+ }
+ if (buffer != null) {
+ try {
+ // do the actual read, from the file.
+ int bytesRead = buffer.getStream().readRemote(buffer.getOffset(), buffer.getBuffer(), 0, buffer.getRequestedLength());
+ bufferManager.doneReading(buffer, ReadBufferStatus.AVAILABLE, bytesRead); // post result back to ReadBufferManager
+ } catch (Exception ex) {
+ bufferManager.doneReading(buffer, ReadBufferStatus.READ_FAILED, 0);
+ }
+ }
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org