You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by tm...@apache.org on 2018/08/11 05:37:40 UTC
[36/50] [abbrv] hadoop git commit: HADOOP-15446. ABFS: tune imports &
javadocs; stabilise tests. Contributed by Steve Loughran and Da Zhou.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/75b184c6/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
index a78e7af..2b3ccc0 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
@@ -26,14 +26,17 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
-import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
-import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidUriException;
-import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants;
-import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
-import org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
+import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidUriException;
+
+
+import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.*;
+import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.*;
+import static org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.*;
+
/**
* AbfsClient
*/
@@ -53,7 +56,7 @@ public class AbfsClient {
this.baseUrl = baseUrl;
this.sharedKeyCredentials = sharedKeyCredentials;
String baseUrlString = baseUrl.toString();
- this.filesystem = baseUrlString.substring(baseUrlString.lastIndexOf(AbfsHttpConstants.FORWARD_SLASH) + 1);
+ this.filesystem = baseUrlString.substring(baseUrlString.lastIndexOf(FORWARD_SLASH) + 1);
this.abfsConfiguration = abfsConfiguration;
this.retryPolicy = exponentialRetryPolicy;
this.userAgent = initializeUserAgent();
@@ -73,19 +76,19 @@ public class AbfsClient {
List<AbfsHttpHeader> createDefaultHeaders() {
final List<AbfsHttpHeader> requestHeaders = new ArrayList<AbfsHttpHeader>();
- requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.X_MS_VERSION, xMsVersion));
- requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.ACCEPT, AbfsHttpConstants.APPLICATION_JSON
- + AbfsHttpConstants.COMMA + AbfsHttpConstants.SINGLE_WHITE_SPACE + AbfsHttpConstants.APPLICATION_OCTET_STREAM));
- requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.ACCEPT_CHARSET,
- AbfsHttpConstants.UTF_8));
- requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.CONTENT_TYPE, AbfsHttpConstants.EMPTY_STRING));
- requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.USER_AGENT, userAgent));
+ requestHeaders.add(new AbfsHttpHeader(X_MS_VERSION, xMsVersion));
+ requestHeaders.add(new AbfsHttpHeader(ACCEPT, APPLICATION_JSON
+ + COMMA + SINGLE_WHITE_SPACE + APPLICATION_OCTET_STREAM));
+ requestHeaders.add(new AbfsHttpHeader(ACCEPT_CHARSET,
+ UTF_8));
+ requestHeaders.add(new AbfsHttpHeader(CONTENT_TYPE, EMPTY_STRING));
+ requestHeaders.add(new AbfsHttpHeader(USER_AGENT, userAgent));
return requestHeaders;
}
AbfsUriQueryBuilder createDefaultUriQueryBuilder() {
final AbfsUriQueryBuilder abfsUriQueryBuilder = new AbfsUriQueryBuilder();
- abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_TIMEOUT, AbfsHttpConstants.DEFAULT_TIMEOUT);
+ abfsUriQueryBuilder.addQuery(QUERY_PARAM_TIMEOUT, DEFAULT_TIMEOUT);
return abfsUriQueryBuilder;
}
@@ -93,12 +96,12 @@ public class AbfsClient {
final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
final AbfsUriQueryBuilder abfsUriQueryBuilder = new AbfsUriQueryBuilder();
- abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_RESOURCE, AbfsHttpConstants.FILESYSTEM);
+ abfsUriQueryBuilder.addQuery(QUERY_PARAM_RESOURCE, FILESYSTEM);
final URL url = createRequestUrl(abfsUriQueryBuilder.toString());
final AbfsRestOperation op = new AbfsRestOperation(
this,
- AbfsHttpConstants.HTTP_METHOD_PUT,
+ HTTP_METHOD_PUT,
url,
requestHeaders);
op.execute();
@@ -109,19 +112,19 @@ public class AbfsClient {
final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
// JDK7 does not support PATCH, so to workaround the issue we will use
// PUT and specify the real method in the X-Http-Method-Override header.
- requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.X_HTTP_METHOD_OVERRIDE,
- AbfsHttpConstants.HTTP_METHOD_PATCH));
+ requestHeaders.add(new AbfsHttpHeader(X_HTTP_METHOD_OVERRIDE,
+ HTTP_METHOD_PATCH));
- requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.X_MS_PROPERTIES,
+ requestHeaders.add(new AbfsHttpHeader(X_MS_PROPERTIES,
properties));
final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
- abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_RESOURCE, AbfsHttpConstants.FILESYSTEM);
+ abfsUriQueryBuilder.addQuery(QUERY_PARAM_RESOURCE, FILESYSTEM);
final URL url = createRequestUrl(abfsUriQueryBuilder.toString());
final AbfsRestOperation op = new AbfsRestOperation(
this,
- AbfsHttpConstants.HTTP_METHOD_PUT,
+ HTTP_METHOD_PUT,
url,
requestHeaders);
op.execute();
@@ -133,16 +136,16 @@ public class AbfsClient {
final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
- abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_RESOURCE, AbfsHttpConstants.FILESYSTEM);
- abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_DIRECTORY, relativePath == null ? "" : urlEncode(relativePath));
- abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_RECURSIVE, String.valueOf(recursive));
- abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_CONTINUATION, continuation);
- abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_MAXRESULTS, String.valueOf(listMaxResults));
+ abfsUriQueryBuilder.addQuery(QUERY_PARAM_RESOURCE, FILESYSTEM);
+ abfsUriQueryBuilder.addQuery(QUERY_PARAM_DIRECTORY, relativePath == null ? "" : urlEncode(relativePath));
+ abfsUriQueryBuilder.addQuery(QUERY_PARAM_RECURSIVE, String.valueOf(recursive));
+ abfsUriQueryBuilder.addQuery(QUERY_PARAM_CONTINUATION, continuation);
+ abfsUriQueryBuilder.addQuery(QUERY_PARAM_MAXRESULTS, String.valueOf(listMaxResults));
final URL url = createRequestUrl(abfsUriQueryBuilder.toString());
final AbfsRestOperation op = new AbfsRestOperation(
this,
- AbfsHttpConstants.HTTP_METHOD_GET,
+ HTTP_METHOD_GET,
url,
requestHeaders);
op.execute();
@@ -153,12 +156,12 @@ public class AbfsClient {
final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
- abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_RESOURCE, AbfsHttpConstants.FILESYSTEM);
+ abfsUriQueryBuilder.addQuery(QUERY_PARAM_RESOURCE, FILESYSTEM);
final URL url = createRequestUrl(abfsUriQueryBuilder.toString());
final AbfsRestOperation op = new AbfsRestOperation(
this,
- AbfsHttpConstants.HTTP_METHOD_HEAD,
+ HTTP_METHOD_HEAD,
url,
requestHeaders);
op.execute();
@@ -169,12 +172,12 @@ public class AbfsClient {
final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
- abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_RESOURCE, AbfsHttpConstants.FILESYSTEM);
+ abfsUriQueryBuilder.addQuery(QUERY_PARAM_RESOURCE, FILESYSTEM);
final URL url = createRequestUrl(abfsUriQueryBuilder.toString());
final AbfsRestOperation op = new AbfsRestOperation(
this,
- AbfsHttpConstants.HTTP_METHOD_DELETE,
+ HTTP_METHOD_DELETE,
url,
requestHeaders);
op.execute();
@@ -185,16 +188,16 @@ public class AbfsClient {
throws AzureBlobFileSystemException {
final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
if (!overwrite) {
- requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.IF_NONE_MATCH, "*"));
+ requestHeaders.add(new AbfsHttpHeader(IF_NONE_MATCH, "*"));
}
final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
- abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_RESOURCE, isFile ? AbfsHttpConstants.FILE : AbfsHttpConstants.DIRECTORY);
+ abfsUriQueryBuilder.addQuery(QUERY_PARAM_RESOURCE, isFile ? FILE : DIRECTORY);
final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
final AbfsRestOperation op = new AbfsRestOperation(
this,
- AbfsHttpConstants.HTTP_METHOD_PUT,
+ HTTP_METHOD_PUT,
url,
requestHeaders);
op.execute();
@@ -205,17 +208,17 @@ public class AbfsClient {
throws AzureBlobFileSystemException {
final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
- final String encodedRenameSource = urlEncode(AbfsHttpConstants.FORWARD_SLASH + this.getFileSystem() + source);
- requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.X_MS_RENAME_SOURCE, encodedRenameSource));
- requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.IF_NONE_MATCH, AbfsHttpConstants.STAR));
+ final String encodedRenameSource = urlEncode(FORWARD_SLASH + this.getFileSystem() + source);
+ requestHeaders.add(new AbfsHttpHeader(X_MS_RENAME_SOURCE, encodedRenameSource));
+ requestHeaders.add(new AbfsHttpHeader(IF_NONE_MATCH, STAR));
final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
- abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_CONTINUATION, continuation);
+ abfsUriQueryBuilder.addQuery(QUERY_PARAM_CONTINUATION, continuation);
final URL url = createRequestUrl(destination, abfsUriQueryBuilder.toString());
final AbfsRestOperation op = new AbfsRestOperation(
this,
- AbfsHttpConstants.HTTP_METHOD_PUT,
+ HTTP_METHOD_PUT,
url,
requestHeaders);
op.execute();
@@ -227,17 +230,17 @@ public class AbfsClient {
final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
// JDK7 does not support PATCH, so to workaround the issue we will use
// PUT and specify the real method in the X-Http-Method-Override header.
- requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.X_HTTP_METHOD_OVERRIDE,
- AbfsHttpConstants.HTTP_METHOD_PATCH));
+ requestHeaders.add(new AbfsHttpHeader(X_HTTP_METHOD_OVERRIDE,
+ HTTP_METHOD_PATCH));
final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
- abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_ACTION, AbfsHttpConstants.APPEND_ACTION);
- abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_POSITION, Long.toString(position));
+ abfsUriQueryBuilder.addQuery(QUERY_PARAM_ACTION, APPEND_ACTION);
+ abfsUriQueryBuilder.addQuery(QUERY_PARAM_POSITION, Long.toString(position));
final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
final AbfsRestOperation op = new AbfsRestOperation(
this,
- AbfsHttpConstants.HTTP_METHOD_PUT,
+ HTTP_METHOD_PUT,
url,
requestHeaders, buffer, offset, length);
op.execute();
@@ -245,44 +248,46 @@ public class AbfsClient {
}
- public AbfsRestOperation flush(final String path, final long position, boolean retainUncommittedData) throws AzureBlobFileSystemException {
+ public AbfsRestOperation flush(final String path, final long position, boolean retainUncommittedData)
+ throws AzureBlobFileSystemException {
final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
// JDK7 does not support PATCH, so to workaround the issue we will use
// PUT and specify the real method in the X-Http-Method-Override header.
- requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.X_HTTP_METHOD_OVERRIDE,
- AbfsHttpConstants.HTTP_METHOD_PATCH));
+ requestHeaders.add(new AbfsHttpHeader(X_HTTP_METHOD_OVERRIDE,
+ HTTP_METHOD_PATCH));
final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
- abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_ACTION, AbfsHttpConstants.FLUSH_ACTION);
- abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_POSITION, Long.toString(position));
- abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_RETAIN_UNCOMMITTED_DATA, String.valueOf(retainUncommittedData));
+ abfsUriQueryBuilder.addQuery(QUERY_PARAM_ACTION, FLUSH_ACTION);
+ abfsUriQueryBuilder.addQuery(QUERY_PARAM_POSITION, Long.toString(position));
+ abfsUriQueryBuilder.addQuery(QUERY_PARAM_RETAIN_UNCOMMITTED_DATA, String.valueOf(retainUncommittedData));
final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
final AbfsRestOperation op = new AbfsRestOperation(
this,
- AbfsHttpConstants.HTTP_METHOD_PUT,
+ HTTP_METHOD_PUT,
url,
requestHeaders);
op.execute();
return op;
}
- public AbfsRestOperation setPathProperties(final String path, final String properties) throws AzureBlobFileSystemException {
+ public AbfsRestOperation setPathProperties(final String path, final String properties)
+ throws AzureBlobFileSystemException {
final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
// JDK7 does not support PATCH, so to workaround the issue we will use
// PUT and specify the real method in the X-Http-Method-Override header.
- requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.X_HTTP_METHOD_OVERRIDE,
- AbfsHttpConstants.HTTP_METHOD_PATCH));
+ requestHeaders.add(new AbfsHttpHeader(X_HTTP_METHOD_OVERRIDE,
+ HTTP_METHOD_PATCH));
- requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.X_MS_PROPERTIES, properties));
+ requestHeaders.add(new AbfsHttpHeader(X_MS_PROPERTIES, properties));
final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
- abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_ACTION, AbfsHttpConstants.SET_PROPERTIES_ACTION);
+ abfsUriQueryBuilder.addQuery(QUERY_PARAM_ACTION, SET_PROPERTIES_ACTION);
final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
final AbfsRestOperation op = new AbfsRestOperation(
this,
- AbfsHttpConstants.HTTP_METHOD_PUT,
+ HTTP_METHOD_PUT,
url,
requestHeaders);
op.execute();
@@ -297,7 +302,7 @@ public class AbfsClient {
final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
final AbfsRestOperation op = new AbfsRestOperation(
this,
- AbfsHttpConstants.HTTP_METHOD_HEAD,
+ HTTP_METHOD_HEAD,
url,
requestHeaders);
op.execute();
@@ -307,9 +312,9 @@ public class AbfsClient {
public AbfsRestOperation read(final String path, final long position, final byte[] buffer, final int bufferOffset,
final int bufferLength, final String eTag) throws AzureBlobFileSystemException {
final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
- requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.RANGE,
+ requestHeaders.add(new AbfsHttpHeader(RANGE,
String.format("bytes=%d-%d", position, position + bufferLength - 1)));
- requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.IF_MATCH, eTag));
+ requestHeaders.add(new AbfsHttpHeader(IF_MATCH, eTag));
final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
@@ -317,7 +322,7 @@ public class AbfsClient {
final AbfsRestOperation op = new AbfsRestOperation(
this,
- AbfsHttpConstants.HTTP_METHOD_GET,
+ HTTP_METHOD_GET,
url,
requestHeaders,
buffer,
@@ -333,13 +338,13 @@ public class AbfsClient {
final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
- abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_RECURSIVE, String.valueOf(recursive));
- abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_CONTINUATION, continuation);
+ abfsUriQueryBuilder.addQuery(QUERY_PARAM_RECURSIVE, String.valueOf(recursive));
+ abfsUriQueryBuilder.addQuery(QUERY_PARAM_CONTINUATION, continuation);
final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
final AbfsRestOperation op = new AbfsRestOperation(
this,
- AbfsHttpConstants.HTTP_METHOD_DELETE,
+ HTTP_METHOD_DELETE,
url,
requestHeaders);
op.execute();
@@ -347,7 +352,7 @@ public class AbfsClient {
}
private URL createRequestUrl(final String query) throws AzureBlobFileSystemException {
- return createRequestUrl(AbfsHttpConstants.EMPTY_STRING, query);
+ return createRequestUrl(EMPTY_STRING, query);
}
private URL createRequestUrl(final String path, final String query)
@@ -357,8 +362,8 @@ public class AbfsClient {
try {
encodedPath = urlEncode(path);
} catch (AzureBlobFileSystemException ex) {
- this.LOG.debug(
- "Unexpected error.", ex);
+ LOG.debug("Unexpected error.", ex);
+ throw new InvalidUriException(path);
}
final StringBuilder sb = new StringBuilder();
@@ -378,9 +383,9 @@ public class AbfsClient {
private static String urlEncode(final String value) throws AzureBlobFileSystemException {
String encodedString = null;
try {
- encodedString = URLEncoder.encode(value, AbfsHttpConstants.UTF_8)
- .replace(AbfsHttpConstants.PLUS, AbfsHttpConstants.PLUS_ENCODE)
- .replace(AbfsHttpConstants.FORWARD_SLASH_ENCODE, AbfsHttpConstants.FORWARD_SLASH);
+ encodedString = URLEncoder.encode(value, UTF_8)
+ .replace(PLUS, PLUS_ENCODE)
+ .replace(FORWARD_SLASH_ENCODE, FORWARD_SLASH);
} catch (UnsupportedEncodingException ex) {
throw new InvalidUriException(value);
}
@@ -391,11 +396,11 @@ public class AbfsClient {
private String initializeUserAgent() {
final String userAgentComment = String.format(Locale.ROOT,
"(JavaJRE %s; %s %s)",
- System.getProperty(AbfsHttpConstants.JAVA_VERSION),
- System.getProperty(AbfsHttpConstants.OS_NAME)
- .replaceAll(AbfsHttpConstants.SINGLE_WHITE_SPACE, AbfsHttpConstants.EMPTY_STRING),
- System.getProperty(AbfsHttpConstants.OS_VERSION));
+ System.getProperty(JAVA_VERSION),
+ System.getProperty(OS_NAME)
+ .replaceAll(SINGLE_WHITE_SPACE, EMPTY_STRING),
+ System.getProperty(OS_VERSION));
- return String.format(AbfsHttpConstants.CLIENT_VERSION + " %s", userAgentComment);
+ return String.format(CLIENT_VERSION + " %s", userAgentComment);
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/75b184c6/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java
index 0ea9365..53f6900 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java
@@ -30,12 +30,12 @@ import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonToken;
import com.fasterxml.jackson.databind.ObjectMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants;
import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
import org.apache.hadoop.fs.azurebfs.contracts.services.ListResultSchema;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* Represents an HTTP operation.
@@ -427,4 +427,4 @@ public class AbfsHttpOperation {
private boolean isNullInputStream(InputStream stream) {
return stream == null ? true : false;
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/75b184c6/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java
index 6554380..848ce8a 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java
@@ -29,7 +29,7 @@ import org.apache.hadoop.fs.FileSystem.Statistics;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
/**
- * The AbfsInputStream for AbfsClient
+ * The AbfsInputStream for AbfsClient.
*/
public class AbfsInputStream extends FSInputStream {
private final AbfsClient client;
@@ -59,7 +59,6 @@ public class AbfsInputStream extends FSInputStream {
final int bufferSize,
final int readAheadQueueDepth,
final String eTag) {
- super();
this.client = client;
this.statistics = statistics;
this.path = path;
@@ -379,4 +378,4 @@ public class AbfsInputStream extends FSInputStream {
public boolean markSupported() {
return false;
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/75b184c6/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java
index de5c934..2dbcee5 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.fs.azurebfs.services;
import java.io.IOException;
+import java.io.InterruptedIOException;
import java.io.OutputStream;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.LinkedBlockingQueue;
@@ -35,7 +36,7 @@ import org.apache.hadoop.fs.Syncable;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
/**
- * The BlobFsOutputStream for Rest AbfsClient
+ * The BlobFsOutputStream for Rest AbfsClient.
*/
public class AbfsOutputStream extends OutputStream implements Syncable {
private final AbfsClient client;
@@ -79,8 +80,8 @@ public class AbfsOutputStream extends OutputStream implements Syncable {
maxConcurrentRequestCount,
10L,
TimeUnit.SECONDS,
- new LinkedBlockingQueue());
- this.completionService = new ExecutorCompletionService(this.threadExecutor);
+ new LinkedBlockingQueue<>());
+ this.completionService = new ExecutorCompletionService<>(this.threadExecutor);
}
/**
@@ -111,9 +112,7 @@ public class AbfsOutputStream extends OutputStream implements Syncable {
@Override
public synchronized void write(final byte[] data, final int off, final int length)
throws IOException {
- if (this.lastError != null) {
- throw this.lastError;
- }
+ maybeThrowLastError();
Preconditions.checkArgument(data != null, "null data");
@@ -143,6 +142,19 @@ public class AbfsOutputStream extends OutputStream implements Syncable {
}
/**
+ * Throw the last error recorded if not null.
+ * After the stream is closed, this is always set to
+ * an exception, so acts as a guard against method invocation once
+ * closed.
+ * @throws IOException if lastError is set
+ */
+ private void maybeThrowLastError() throws IOException {
+ if (lastError != null) {
+ throw lastError;
+ }
+ }
+
+ /**
* Flushes this output stream and forces any buffered output bytes to be
* written out. If any data remains in the payload it is committed to the
* service. Data is queued for writing and forced out to the service
@@ -150,7 +162,7 @@ public class AbfsOutputStream extends OutputStream implements Syncable {
*/
@Override
public void flush() throws IOException {
- this.flushInternalAsync();
+ flushInternalAsync();
}
/** Similar to posix fsync, flush out the data in client's user buffer
@@ -159,7 +171,7 @@ public class AbfsOutputStream extends OutputStream implements Syncable {
*/
@Override
public void hsync() throws IOException {
- this.flushInternal();
+ flushInternal();
}
/** Flush out the data in client's user buffer. After the return of
@@ -168,7 +180,7 @@ public class AbfsOutputStream extends OutputStream implements Syncable {
*/
@Override
public void hflush() throws IOException {
- this.flushInternal();
+ flushInternal();
}
/**
@@ -186,34 +198,30 @@ public class AbfsOutputStream extends OutputStream implements Syncable {
}
try {
- this.flushInternal();
- this.threadExecutor.shutdown();
+ flushInternal();
+ threadExecutor.shutdown();
} finally {
- this.lastError = new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
- this.buffer = null;
- this.bufferIndex = 0;
- this.closed = true;
- this.writeOperations.clear();
- if (!this.threadExecutor.isShutdown()) {
- this.threadExecutor.shutdownNow();
+ lastError = new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
+ buffer = null;
+ bufferIndex = 0;
+ closed = true;
+ writeOperations.clear();
+ if (!threadExecutor.isShutdown()) {
+ threadExecutor.shutdownNow();
}
}
}
private synchronized void flushInternal() throws IOException {
- if (this.lastError != null) {
- throw this.lastError;
- }
- this.writeCurrentBufferToService();
- this.flushWrittenBytesToService();
+ maybeThrowLastError();
+ writeCurrentBufferToService();
+ flushWrittenBytesToService();
}
private synchronized void flushInternalAsync() throws IOException {
- if (this.lastError != null) {
- throw this.lastError;
- }
- this.writeCurrentBufferToService();
- this.flushWrittenBytesToServiceAsync();
+ maybeThrowLastError();
+ writeCurrentBufferToService();
+ flushWrittenBytesToServiceAsync();
}
private synchronized void writeCurrentBufferToService() throws IOException {
@@ -221,19 +229,19 @@ public class AbfsOutputStream extends OutputStream implements Syncable {
return;
}
- final byte[] bytes = this.buffer;
+ final byte[] bytes = buffer;
final int bytesLength = bufferIndex;
- this.buffer = new byte[bufferSize];
- this.bufferIndex = 0;
- final long offset = this.position;
- this.position += bytesLength;
+ buffer = new byte[bufferSize];
+ bufferIndex = 0;
+ final long offset = position;
+ position += bytesLength;
- if (this.threadExecutor.getQueue().size() >= maxConcurrentRequestCount * 2) {
- this.waitForTaskToComplete();
+ if (threadExecutor.getQueue().size() >= maxConcurrentRequestCount * 2) {
+ waitForTaskToComplete();
}
- final Future job = this.completionService.submit(new Callable<Void>() {
+ final Future<Void> job = completionService.submit(new Callable<Void>() {
@Override
public Void call() throws Exception {
client.append(path, offset, bytes, 0,
@@ -242,25 +250,25 @@ public class AbfsOutputStream extends OutputStream implements Syncable {
}
});
- this.writeOperations.add(new WriteOperation(job, offset, bytesLength));
+ writeOperations.add(new WriteOperation(job, offset, bytesLength));
// Try to shrink the queue
shrinkWriteOperationQueue();
}
private synchronized void flushWrittenBytesToService() throws IOException {
- for (WriteOperation writeOperation : this.writeOperations) {
+ for (WriteOperation writeOperation : writeOperations) {
try {
writeOperation.task.get();
} catch (Exception ex) {
- if (AzureBlobFileSystemException.class.isInstance(ex.getCause())) {
- ex = AzureBlobFileSystemException.class.cast(ex.getCause());
+ if (ex.getCause() instanceof AzureBlobFileSystemException) {
+ ex = (AzureBlobFileSystemException)ex.getCause();
}
- this.lastError = new IOException(ex);
- throw this.lastError;
+ lastError = new IOException(ex);
+ throw lastError;
}
}
- flushWrittenBytesToServiceInternal(this.position, false);
+ flushWrittenBytesToServiceInternal(position, false);
}
private synchronized void flushWrittenBytesToServiceAsync() throws IOException {
@@ -273,7 +281,8 @@ public class AbfsOutputStream extends OutputStream implements Syncable {
this.lastTotalAppendOffset = 0;
}
- private synchronized void flushWrittenBytesToServiceInternal(final long offset, final boolean retainUncommitedData) throws IOException {
+ private synchronized void flushWrittenBytesToServiceInternal(final long offset,
+ final boolean retainUncommitedData) throws IOException {
try {
client.flush(path, offset, retainUncommitedData);
} catch (AzureBlobFileSystemException ex) {
@@ -288,31 +297,33 @@ public class AbfsOutputStream extends OutputStream implements Syncable {
*/
private synchronized void shrinkWriteOperationQueue() throws IOException {
try {
- while (this.writeOperations.peek() != null && this.writeOperations.peek().task.isDone()) {
- this.writeOperations.peek().task.get();
- this.lastTotalAppendOffset += this.writeOperations.peek().length;
- this.writeOperations.remove();
+ while (writeOperations.peek() != null && writeOperations.peek().task.isDone()) {
+ writeOperations.peek().task.get();
+ lastTotalAppendOffset += writeOperations.peek().length;
+ writeOperations.remove();
}
} catch (Exception e) {
- if (AzureBlobFileSystemException.class.isInstance(e.getCause())) {
- this.lastError = IOException.class.cast(e.getCause());
+ if (e.getCause() instanceof AzureBlobFileSystemException) {
+ lastError = (AzureBlobFileSystemException)e.getCause();
} else {
- this.lastError = new IOException(e);
+ lastError = new IOException(e);
}
- throw this.lastError;
+ throw lastError;
}
}
private void waitForTaskToComplete() throws IOException {
boolean completed;
- for (completed = false; this.completionService.poll() != null; completed = true) {}
+ for (completed = false; completionService.poll() != null; completed = true) {
+ // keep polling until there is no data
+ }
if (!completed) {
try {
- this.completionService.take();
+ completionService.take();
} catch (InterruptedException e) {
- this.lastError = new IOException(e);
- throw this.lastError;
+ lastError = (IOException)new InterruptedIOException(e.toString()).initCause(e);
+ throw lastError;
}
}
}
@@ -332,4 +343,4 @@ public class AbfsOutputStream extends OutputStream implements Syncable {
this.length = length;
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/75b184c6/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java
index 17fc35a..6126398 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java
@@ -23,15 +23,16 @@ import java.net.HttpURLConnection;
import java.net.URL;
import java.util.List;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants;
-import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
+import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidAbfsRestOperationException;
-import org.slf4j.Logger;
-
/**
- * The AbfsRestOperation for Rest AbfsClient
+ * The AbfsRestOperation for Rest AbfsClient.
*/
public class AbfsRestOperation {
// Blob FS client, which has the credentials, retry policy, and logs.
@@ -47,7 +48,7 @@ public class AbfsRestOperation {
// request body and all the download methods have a response body.
private final boolean hasRequestBody;
- private final Logger logger;
+ private final Logger LOG = LoggerFactory.getLogger(AbfsClient.class);
// For uploads, this is the request entity body. For downloads,
// this will hold the response entity body.
@@ -79,7 +80,6 @@ public class AbfsRestOperation {
this.requestHeaders = requestHeaders;
this.hasRequestBody = (AbfsHttpConstants.HTTP_METHOD_PUT.equals(method)
|| AbfsHttpConstants.HTTP_METHOD_PATCH.equals(method));
- this.logger = client.LOG;
}
/**
@@ -150,11 +150,11 @@ public class AbfsRestOperation {
httpOperation.processResponse(buffer, bufferOffset, bufferLength);
} catch (IOException ex) {
- if (logger.isDebugEnabled()) {
+ if (LOG.isDebugEnabled()) {
if (httpOperation != null) {
- logger.debug("HttpRequestFailure: " + httpOperation.toString(), ex);
+ LOG.debug("HttpRequestFailure: " + httpOperation.toString(), ex);
} else {
- logger.debug("HttpRequestFailure: " + method + "," + url, ex);
+ LOG.debug("HttpRequestFailure: " + method + "," + url, ex);
}
}
if (!client.getRetryPolicy().shouldRetry(retryCount, -1)) {
@@ -163,8 +163,8 @@ public class AbfsRestOperation {
return false;
}
- if (logger.isDebugEnabled()) {
- logger.debug("HttpRequest: " + httpOperation.toString());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("HttpRequest: " + httpOperation.toString());
}
if (client.getRetryPolicy().shouldRetry(retryCount, httpOperation.getStatusCode())) {
@@ -175,4 +175,4 @@ public class AbfsRestOperation {
return true;
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/75b184c6/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsUriQueryBuilder.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsUriQueryBuilder.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsUriQueryBuilder.java
index bac66af..3624853 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsUriQueryBuilder.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsUriQueryBuilder.java
@@ -18,13 +18,13 @@
package org.apache.hadoop.fs.azurebfs.services;
-import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants;
-
import java.util.HashMap;
import java.util.Map;
+import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants;
+
/**
- * The UrlQueryBuilder for Rest AbfsClient
+ * The UrlQueryBuilder for Rest AbfsClient.
*/
public class AbfsUriQueryBuilder {
private Map<String, String> parameters;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/75b184c6/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ExponentialRetryPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ExponentialRetryPolicy.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ExponentialRetryPolicy.java
index 54aa1ab..5eb7a66 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ExponentialRetryPolicy.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ExponentialRetryPolicy.java
@@ -43,7 +43,7 @@ public class ExponentialRetryPolicy {
private static final int DEFAULT_MAX_BACKOFF = 1000 * 30;
/**
- *Represents the default minimum amount of time used when calculating the exponential
+ * Represents the default minimum amount of time used when calculating the exponential
* delay between retries.
*/
private static final int DEFAULT_MIN_BACKOFF = 1000 * 3;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/75b184c6/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBuffer.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBuffer.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBuffer.java
index 1fac13d..00e4f00 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBuffer.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBuffer.java
@@ -18,10 +18,10 @@
package org.apache.hadoop.fs.azurebfs.services;
-import org.apache.hadoop.fs.azurebfs.contracts.services.ReadBufferStatus;
-
import java.util.concurrent.CountDownLatch;
+import org.apache.hadoop.fs.azurebfs.contracts.services.ReadBufferStatus;
+
class ReadBuffer {
private AbfsInputStream stream;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/75b184c6/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java
index 164e549..5b71cf0 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java
@@ -28,7 +28,7 @@ import java.util.Stack;
import java.util.concurrent.CountDownLatch;
/**
- * The Read Buffer Manager for Rest AbfsClient
+ * The Read Buffer Manager for Rest AbfsClient.
*/
final class ReadBufferManager {
private static final Logger LOGGER = LoggerFactory.getLogger(ReadBufferManager.class);
@@ -40,11 +40,11 @@ final class ReadBufferManager {
private Thread[] threads = new Thread[NUM_THREADS];
private byte[][] buffers; // array of byte[] buffers, to hold the data that is read
- private Stack<Integer> freeList = new Stack<Integer>(); // indices in buffers[] array that are available
+ private Stack<Integer> freeList = new Stack<>(); // indices in buffers[] array that are available
- private Queue<ReadBuffer> readAheadQueue = new LinkedList<ReadBuffer>(); // queue of requests that are not picked up by any worker thread yet
- private LinkedList<ReadBuffer> inProgressList = new LinkedList<ReadBuffer>(); // requests being processed by worker threads
- private LinkedList<ReadBuffer> completedReadList = new LinkedList<ReadBuffer>(); // buffers available for reading
+ private Queue<ReadBuffer> readAheadQueue = new LinkedList<>(); // queue of requests that are not picked up by any worker thread yet
+ private LinkedList<ReadBuffer> inProgressList = new LinkedList<>(); // requests being processed by worker threads
+ private LinkedList<ReadBuffer> completedReadList = new LinkedList<>(); // buffers available for reading
private static final ReadBufferManager BUFFER_MANAGER; // singleton, initialized in static initialization block
static {
@@ -85,7 +85,7 @@ final class ReadBufferManager {
/**
- * {@link AbfsInputStream} calls this method to queue read-aheads
+ * {@link AbfsInputStream} calls this method to queue read-aheads.
*
* @param stream The {@link AbfsInputStream} for which to do the read-ahead
* @param requestedOffset The offset in the file which shoukd be read
@@ -93,15 +93,15 @@ final class ReadBufferManager {
*/
void queueReadAhead(final AbfsInputStream stream, final long requestedOffset, final int requestedLength) {
if (LOGGER.isTraceEnabled()) {
- LOGGER.trace("Start Queueing readAhead for " + stream.getPath() + " offset " + requestedOffset
- + " length " + requestedLength);
+ LOGGER.trace("Start Queueing readAhead for {} offset {} length {}",
+ stream.getPath(), requestedOffset, requestedLength);
}
ReadBuffer buffer;
synchronized (this) {
if (isAlreadyQueued(stream, requestedOffset)) {
return; // already queued, do not queue again
}
- if (freeList.size() == 0 && !tryEvict()) {
+ if (freeList.isEmpty() && !tryEvict()) {
return; // no buffers available, cannot queue anything
}
@@ -121,8 +121,8 @@ final class ReadBufferManager {
notifyAll();
}
if (LOGGER.isTraceEnabled()) {
- LOGGER.trace("Done q-ing readAhead for file " + stream.getPath() + " offset " + requestedOffset
- + " buffer idx " + buffer.getBufferindex());
+ LOGGER.trace("Done q-ing readAhead for file {} offset {} buffer idx {}",
+ stream.getPath(), requestedOffset, buffer.getBufferindex());
}
}
@@ -144,7 +144,8 @@ final class ReadBufferManager {
int getBlock(final AbfsInputStream stream, final long position, final int length, final byte[] buffer) {
// not synchronized, so have to be careful with locking
if (LOGGER.isTraceEnabled()) {
- LOGGER.trace("getBlock for file " + stream.getPath() + " position " + position + " thread " + Thread.currentThread().getName());
+ LOGGER.trace("getBlock for file {} position {} thread {}",
+ stream.getPath(), position, Thread.currentThread().getName());
}
waitForProcess(stream, position);
@@ -155,12 +156,13 @@ final class ReadBufferManager {
}
if (bytesRead > 0) {
if (LOGGER.isTraceEnabled()) {
- LOGGER.trace("Done read from Cache for " + stream.getPath() + " position " + position + " length " + bytesRead);
+ LOGGER.trace("Done read from Cache for {} position {} length {}",
+ stream.getPath(), position, bytesRead);
}
return bytesRead;
}
- // otherwise, just say we got nothing - calling thread can do it's own read
+ // otherwise, just say we got nothing - calling thread can do its own read
return 0;
}
@@ -179,8 +181,8 @@ final class ReadBufferManager {
if (readBuf != null) { // if in in-progress queue, then block for it
try {
if (LOGGER.isTraceEnabled()) {
- LOGGER.trace("got a relevant read buffer for file " + stream.getPath() + " offset " + readBuf.getOffset()
- + " buffer idx " + readBuf.getBufferindex());
+ LOGGER.trace("got a relevant read buffer for file {} offset {} buffer idx {}",
+ stream.getPath(), readBuf.getOffset(), readBuf.getBufferindex());
}
readBuf.getLatch().await(); // blocking wait on the caller stream's thread
// Note on correctness: readBuf gets out of inProgressList only in 1 place: after worker thread
@@ -193,8 +195,8 @@ final class ReadBufferManager {
Thread.currentThread().interrupt();
}
if (LOGGER.isTraceEnabled()) {
- LOGGER.trace("latch done for file " + stream.getPath() + " buffer idx " + readBuf.getBufferindex()
- + " length " + readBuf.getLength());
+ LOGGER.trace("latch done for file {} buffer idx {} length {}",
+ stream.getPath(), readBuf.getBufferindex(), readBuf.getLength());
}
}
}
@@ -254,8 +256,8 @@ final class ReadBufferManager {
freeList.push(buf.getBufferindex());
completedReadList.remove(buf);
if (LOGGER.isTraceEnabled()) {
- LOGGER.trace("Evicting buffer idx " + buf.getBufferindex() + "; was used for file " + buf.getStream().getPath()
- + " offset " + buf.getOffset() + " length " + buf.getLength());
+ LOGGER.trace("Evicting buffer idx {}; was used for file {} offset {} length {}",
+ buf.getBufferindex(), buf.getStream().getPath(), buf.getOffset(), buf.getLength());
}
return true;
}
@@ -344,13 +346,14 @@ final class ReadBufferManager {
inProgressList.add(buffer);
}
if (LOGGER.isTraceEnabled()) {
- LOGGER.trace("ReadBufferWorker picked file " + buffer.getStream().getPath() + " for offset " + buffer.getOffset());
+ LOGGER.trace("ReadBufferWorker picked file {} for offset {}",
+ buffer.getStream().getPath(), buffer.getOffset());
}
return buffer;
}
/**
- * ReadBufferWorker thread calls this method to post completion
+ * ReadBufferWorker thread calls this method to post completion.
*
* @param buffer the buffer whose read was completed
* @param result the {@link ReadBufferStatus} after the read operation in the worker thread
@@ -358,8 +361,8 @@ final class ReadBufferManager {
*/
void doneReading(final ReadBuffer buffer, final ReadBufferStatus result, final int bytesActuallyRead) {
if (LOGGER.isTraceEnabled()) {
- LOGGER.trace("ReadBufferWorker completed file " + buffer.getStream().getPath() + " for offset " + buffer.getOffset()
- + " bytes " + bytesActuallyRead);
+ LOGGER.trace("ReadBufferWorker completed file {} for offset {} bytes {}",
+ buffer.getStream().getPath(), buffer.getOffset(), bytesActuallyRead);
}
synchronized (this) {
inProgressList.remove(buffer);
@@ -380,8 +383,9 @@ final class ReadBufferManager {
/**
* Similar to System.currentTimeMillis, except implemented with System.nanoTime().
* System.currentTimeMillis can go backwards when system clock is changed (e.g., with NTP time synchronization),
- * making it unsuitable for measuring time intervals. nanotime is strictly monotonically increasing,
- * so it is much more suitable to measuring intervals.
+ * making it unsuitable for measuring time intervals. nanotime is strictly monotonically increasing per CPU core.
+ * Note: it is not monotonic across Sockets, and even within a CPU, its only the
+ * more recent parts which share a clock across all cores.
*
* @return current time in milliseconds
*/
http://git-wip-us.apache.org/repos/asf/hadoop/blob/75b184c6/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferWorker.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferWorker.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferWorker.java
index 2d0c96e..af69de0 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferWorker.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferWorker.java
@@ -18,10 +18,10 @@
package org.apache.hadoop.fs.azurebfs.services;
-import org.apache.hadoop.fs.azurebfs.contracts.services.ReadBufferStatus;
-
import java.util.concurrent.CountDownLatch;
+import org.apache.hadoop.fs.azurebfs.contracts.services.ReadBufferStatus;
+
class ReadBufferWorker implements Runnable {
protected static final CountDownLatch UNLEASH_WORKERS = new CountDownLatch(1);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/75b184c6/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/SharedKeyCredentials.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/SharedKeyCredentials.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/SharedKeyCredentials.java
index dd59892..105a1a2 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/SharedKeyCredentials.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/SharedKeyCredentials.java
@@ -22,6 +22,7 @@ import javax.crypto.Mac;
import javax.crypto.spec.SecretKeySpec;
import java.io.UnsupportedEncodingException;
import java.net.HttpURLConnection;
+import java.net.URL;
import java.net.URLDecoder;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
@@ -38,11 +39,11 @@ import java.util.TimeZone;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
+import org.apache.commons.codec.Charsets;
+import org.apache.commons.codec.binary.Base64;
import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants;
import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
-import org.apache.commons.codec.binary.Base64;
-import org.apache.commons.codec.Charsets;
/**
* Represents the shared key credentials used to access an Azure Storage
* account.
@@ -89,7 +90,7 @@ public class SharedKeyCredentials {
}
private String computeHmac256(final String stringToSign) {
- byte[] utf8Bytes = null;
+ byte[] utf8Bytes;
try {
utf8Bytes = stringToSign.getBytes(AbfsHttpConstants.UTF_8);
} catch (final UnsupportedEncodingException e) {
@@ -158,7 +159,7 @@ public class SharedKeyCredentials {
}
/**
- * Initialie the HmacSha256 associated with the account key.
+ * Initialize the HmacSha256 associated with the account key.
*/
private void initializeMac() {
// Initializes the HMAC-SHA256 Mac and SecretKey.
@@ -171,7 +172,7 @@ public class SharedKeyCredentials {
}
/**
- * Append a string to a string builder with a newline constant
+ * Append a string to a string builder with a newline constant.
*
* @param builder the StringBuilder object
* @param element the string to append.
@@ -194,9 +195,10 @@ public class SharedKeyCredentials {
* @param conn the HttpURLConnection for the operation.
* @return A canonicalized string.
*/
- private static String canonicalizeHttpRequest(final java.net.URL address, final String accountName,
- final String method, final String contentType, final long contentLength, final String date,
- final HttpURLConnection conn) throws UnsupportedEncodingException {
+ private static String canonicalizeHttpRequest(final URL address,
+ final String accountName, final String method, final String contentType,
+ final long contentLength, final String date, final HttpURLConnection conn)
+ throws UnsupportedEncodingException {
// The first element should be the Method of the request.
// I.e. GET, POST, PUT, or HEAD.
@@ -246,7 +248,8 @@ public class SharedKeyCredentials {
* @param accountName the account name for the request.
* @return the canonicalized resource string.
*/
- private static String getCanonicalizedResource(final java.net.URL address, final String accountName) throws UnsupportedEncodingException {
+ private static String getCanonicalizedResource(final URL address,
+ final String accountName) throws UnsupportedEncodingException {
// Resource path
final StringBuilder resourcepath = new StringBuilder(AbfsHttpConstants.FORWARD_SLASH);
resourcepath.append(accountName);
@@ -263,7 +266,7 @@ public class SharedKeyCredentials {
final Map<String, String[]> queryVariables = parseQueryString(address.getQuery());
- final Map<String, String> lowercasedKeyNameValue = new HashMap<String, String>();
+ final Map<String, String> lowercasedKeyNameValue = new HashMap<>();
for (final Entry<String, String[]> entry : queryVariables.entrySet()) {
// sort the value and organize it as comma separated values
@@ -303,14 +306,17 @@ public class SharedKeyCredentials {
}
/**
- * Gets all the values for the given header in the one to many map, performs a trimStart() on each return value
+ * Gets all the values for the given header in the one to many map,
+ * performs a trimStart() on each return value.
*
* @param headers a one to many map of key / values representing the header values for the connection.
* @param headerName the name of the header to lookup
* @return an ArrayList<String> of all trimmed values corresponding to the requested headerName. This may be empty
* if the header is not found.
*/
- private static ArrayList<String> getHeaderValues(final Map<String, List<String>> headers, final String headerName) {
+ private static ArrayList<String> getHeaderValues(
+ final Map<String, List<String>> headers,
+ final String headerName) {
final ArrayList<String> arrayOfValues = new ArrayList<String>();
List<String> values = null;
@@ -338,7 +344,7 @@ public class SharedKeyCredentials {
* @return a HashMap<String, String[]> of the key values.
*/
private static HashMap<String, String[]> parseQueryString(String parseString) throws UnsupportedEncodingException {
- final HashMap<String, String[]> retVals = new HashMap<String, String[]>();
+ final HashMap<String, String[]> retVals = new HashMap<>();
if (parseString == null || parseString.isEmpty()) {
return retVals;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/75b184c6/hadoop-tools/hadoop-azure/src/site/markdown/abfs.md
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/site/markdown/abfs.md b/hadoop-tools/hadoop-azure/src/site/markdown/abfs.md
new file mode 100644
index 0000000..a4b3483
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/site/markdown/abfs.md
@@ -0,0 +1,72 @@
+<!---
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License. See accompanying LICENSE file.
+-->
+
+# Hadoop Azure Support: ABFS — Azure Data Lake Storage Gen2
+
+<!-- MACRO{toc|fromDepth=1|toDepth=3} -->
+
+## Introduction
+
+The `hadoop-azure` module provides support for the Azure Data Lake Storage Gen2
+storage layer through the "abfs" connector
+
+To make it part of Apache Hadoop's default classpath, simply make sure that
+`HADOOP_OPTIONAL_TOOLS` in `hadoop-env.sh` has `hadoop-azure` in the list.
+
+## Features
+
+* Read and write data stored in an Azure Blob Storage account.
+* *Fully Consistent* view of the storage across all clients.
+* Can read data written through the wasb: connector.
+* Present a hierarchical file system view by implementing the standard Hadoop
+ [`FileSystem`](../api/org/apache/hadoop/fs/FileSystem.html) interface.
+* Supports configuration of multiple Azure Blob Storage accounts.
+* Can act as a source or destination of data in Hadoop MapReduce, Apache Hive, Apache Spark
+* Tested at scale on both Linux and Windows.
+* Can be used as a replacement for HDFS on Hadoop clusters deployed in Azure infrastructure.
+
+
+
+## Limitations
+
+* File last access time is not tracked.
+
+
+## Technical notes
+
+### Security
+
+### Consistency and Concurrency
+
+*TODO*: complete/review
+
+The abfs client has a fully consistent view of the store, which has complete Create Read Update and Delete consistency for data and metadata.
+(Compare and contrast with S3 which only offers Create consistency; S3Guard adds CRUD to metadata, but not the underlying data).
+
+### Performance
+
+*TODO*: check these.
+
+* File Rename: `O(1)`.
+* Directory Rename: `O(files)`.
+* Directory Delete: `O(files)`.
+
+## Testing ABFS
+
+See the relevant section in [Testing Azure](testing_azure.html).
+
+## References
+
+* [A closer look at Azure Data Lake Storage Gen2](https://azure.microsoft.com/en-gb/blog/a-closer-look-at-azure-data-lake-storage-gen2/);
+MSDN Article from June 28, 2018.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/75b184c6/hadoop-tools/hadoop-azure/src/site/markdown/testing_azure.md
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/site/markdown/testing_azure.md b/hadoop-tools/hadoop-azure/src/site/markdown/testing_azure.md
index b58e68b..c148807 100644
--- a/hadoop-tools/hadoop-azure/src/site/markdown/testing_azure.md
+++ b/hadoop-tools/hadoop-azure/src/site/markdown/testing_azure.md
@@ -574,3 +574,79 @@ mvn test -Dtest=CleanupTestContainers
This will delete the containers; the output log of the test run will
provide the details and summary of the operation.
+
+
+## Testing ABFS
+
+The ABFS Connector tests share the same account as the wasb tests; this is
+needed for cross-connector compatibility tests.
+
+This makes for a somewhat complex set of configuration options.
+
+Here are the settings for an account `ACCOUNTNAME`
+
+```xml
+<property>
+ <name>abfs.account.name</name>
+ <value>ACCOUNTNAME</value>
+</property>
+
+<property>
+ <name>abfs.account.full.name</name>
+ <value>${abfs.account.name}.dfs.core.windows.net</value>
+</property>
+
+<property>
+ <name>abfs.account.key</name>
+ <value>SECRETKEY==</value>
+</property>
+
+<property>
+ <name>fs.azure.account.key.ACCOUNTNAME.dfs.core.windows.net</name>
+ <value>${abfs.account.key}</value>
+</property>
+
+<property>
+ <name>fs.azure.account.key.ACCOUNTNAME.blob.core.windows.net</name>
+ <value>${abfs.account.key}</value>
+</property>
+
+<property>
+ <name>fs.azure.test.account.key.ACCOUNTNAME.dfs.core.windows.net</name>
+ <value>${abfs.account.key}</value>
+</property>
+
+<property>
+ <name>fs.azure.test.account.key.ACCOUNTNAME.blob.core.windows.net</name>
+ <value>${abfs.account.key}</value>
+</property>
+
+<property>
+ <name>fs.azure.account.key.ACCOUNTNAME</name>
+ <value>${abfs.account.key}</value>
+</property>
+
+<property>
+ <name>fs.azure.test.account.key.ACCOUNTNAME</name>
+ <value>${abfs.account.key}</value>
+</property>
+
+<property>
+ <name>fs.azure.test.account.name</name>
+ <value>${abfs.account.full.name}</value>
+</property>
+
+<property>
+ <name>fs.contract.test.fs.abfs</name>
+ <value>abfs://TESTCONTAINER@ACCOUNTNAME.dfs.core.windows.net</value>
+ <description>Container for contract tests</description>
+</property>
+
+<property>
+ <name>fs.contract.test.fs.abfss</name>
+ <value>abfss://TESTCONTAINER@ACCOUNTNAME.dfs.core.windows.net</value>
+ <description>Container for contract tests</description>
+</property>
+
+
+```
http://git-wip-us.apache.org/repos/asf/hadoop/blob/75b184c6/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/contract/ITestAzureNativeContractAppend.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/contract/ITestAzureNativeContractAppend.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/contract/ITestAzureNativeContractAppend.java
index fd21bd2..db4a843 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/contract/ITestAzureNativeContractAppend.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/contract/ITestAzureNativeContractAppend.java
@@ -18,10 +18,19 @@
package org.apache.hadoop.fs.azure.contract;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+
+import org.junit.Test;
+
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileAlreadyExistsException;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.contract.AbstractContractAppendTest;
import org.apache.hadoop.fs.contract.AbstractFSContract;
import static org.apache.hadoop.fs.contract.ContractTestUtils.skip;
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
/**
* Append test, skipping one of them.
@@ -38,4 +47,18 @@ public class ITestAzureNativeContractAppend extends AbstractContractAppendTest {
public void testRenameFileBeingAppended() throws Throwable {
skip("Skipping as renaming an opened file is not supported");
}
+
+ /**
+ * Wasb returns a different exception, so change the intercept logic here.
+ */
+ @Override
+ @Test
+ public void testAppendDirectory() throws Exception {
+ final FileSystem fs = getFileSystem();
+
+ final Path folderPath = path("testAppendDirectory");
+ fs.mkdirs(folderPath);
+ intercept(FileNotFoundException.class,
+ () -> fs.append(folderPath));
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/75b184c6/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java
new file mode 100644
index 0000000..106fa09
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java
@@ -0,0 +1,304 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Hashtable;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+
+import com.google.common.base.Preconditions;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.rules.TestName;
+import org.junit.rules.Timeout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.azure.AbstractWasbTestWithTimeout;
+import org.apache.hadoop.fs.azure.AzureNativeFileSystemStore;
+import org.apache.hadoop.fs.azure.NativeAzureFileSystem;
+import org.apache.hadoop.fs.azure.integration.AzureTestConstants;
+import org.apache.hadoop.fs.azure.metrics.AzureFileSystemInstrumentation;
+import org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys;
+import org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes;
+import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
+import org.apache.hadoop.fs.azurebfs.utils.UriUtils;
+import org.apache.hadoop.fs.contract.ContractTestUtils;
+import org.apache.hadoop.io.IOUtils;
+
+import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.*;
+import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.FILE_SYSTEM_NOT_FOUND;
+import static org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.*;
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+import static org.junit.Assume.assumeTrue;
+import static org.hamcrest.CoreMatchers.*;
+
+/**
+ * Base for AzureBlobFileSystem Integration tests.
+ *
+ * <I>Important: This is for integration tests only.</I>
+ */
+public abstract class AbstractAbfsIntegrationTest extends
+ AbstractWasbTestWithTimeout {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(AbstractAbfsIntegrationTest.class);
+
+ private final boolean isEmulator;
+ private NativeAzureFileSystem wasb;
+ private AzureBlobFileSystem abfs;
+ private String abfsScheme;
+
+ private Configuration configuration;
+ private String fileSystemName;
+ private String accountName;
+ private String testUrl;
+
+ protected AbstractAbfsIntegrationTest(final boolean secure) {
+ this(secure ? FileSystemUriSchemes.ABFS_SECURE_SCHEME : FileSystemUriSchemes.ABFS_SCHEME);
+ }
+
+ protected AbstractAbfsIntegrationTest() {
+ this(FileSystemUriSchemes.ABFS_SCHEME);
+ }
+
+ private AbstractAbfsIntegrationTest(final String scheme) {
+ abfsScheme = scheme;
+ fileSystemName = ABFS_TEST_CONTAINER_PREFIX + UUID.randomUUID().toString();
+ configuration = new Configuration();
+ configuration.addResource(ABFS_TEST_RESOURCE_XML);
+
+ String accountName = configuration.get(FS_AZURE_TEST_ACCOUNT_NAME, "");
+ assumeTrue("Not set: " + FS_AZURE_TEST_ACCOUNT_NAME,
+ !accountName.isEmpty());
+ assertThat("The key in " + FS_AZURE_TEST_ACCOUNT_KEY_PREFIX
+ + " is not bound to an ABFS account",
+ accountName, containsString("dfs.core.windows.net"));
+ String fullKey = FS_AZURE_TEST_ACCOUNT_KEY_PREFIX
+ + accountName;
+ assumeTrue("Not set: " + fullKey,
+ configuration.get(fullKey) != null);
+
+ final String abfsUrl = this.getFileSystemName() + "@" + this.getAccountName();
+ URI defaultUri = null;
+
+ try {
+ defaultUri = new URI(abfsScheme, abfsUrl, null, null, null);
+ } catch (Exception ex) {
+ throw new AssertionError(ex);
+ }
+
+ this.testUrl = defaultUri.toString();
+ configuration.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, defaultUri.toString());
+ configuration.setBoolean(AZURE_CREATE_REMOTE_FILESYSTEM_DURING_INITIALIZATION, true);
+ this.isEmulator = this.configuration.getBoolean(FS_AZURE_EMULATOR_ENABLED, false);
+ this.accountName = this.configuration.get(FS_AZURE_TEST_ACCOUNT_NAME);
+ }
+
+
+ @Before
+ public void setup() throws Exception {
+ //Create filesystem first to make sure getWasbFileSystem() can return an existing filesystem.
+ createFileSystem();
+
+ if (!isEmulator) {
+ final URI wasbUri = new URI(abfsUrlToWasbUrl(getTestUrl()));
+ final AzureNativeFileSystemStore azureNativeFileSystemStore =
+ new AzureNativeFileSystemStore();
+ azureNativeFileSystemStore.initialize(
+ wasbUri,
+ getConfiguration(),
+ new AzureFileSystemInstrumentation(getConfiguration()));
+
+ wasb = new NativeAzureFileSystem(azureNativeFileSystemStore);
+ wasb.initialize(wasbUri, configuration);
+ }
+ }
+
+ @After
+ public void teardown() throws Exception {
+ try {
+ IOUtils.closeStream(wasb);
+ wasb = null;
+
+ if (abfs == null) {
+ return;
+ }
+
+ final AzureBlobFileSystemStore abfsStore = abfs.getAbfsStore();
+ abfsStore.deleteFilesystem();
+
+ AbfsRestOperationException ex = intercept(
+ AbfsRestOperationException.class,
+ new Callable<Hashtable<String, String>>() {
+ @Override
+ public Hashtable<String, String> call() throws Exception {
+ return abfsStore.getFilesystemProperties();
+ }
+ });
+ if (FILE_SYSTEM_NOT_FOUND.getStatusCode() != ex.getStatusCode()) {
+ LOG.warn("Deleted test filesystem may still exist: {}", abfs, ex);
+ }
+ } catch (Exception e) {
+ LOG.warn("During cleanup: {}", e, e);
+ } finally {
+ IOUtils.closeStream(abfs);
+ abfs = null;
+ }
+ }
+
+ public AzureBlobFileSystem getFileSystem() throws IOException {
+ return abfs;
+ }
+
+ /**
+ * Creates the filesystem; updates the {@link #abfs} field.
+ * @return the created filesystem.
+ * @throws IOException failure during create/init.
+ */
+ public AzureBlobFileSystem createFileSystem() throws IOException {
+ Preconditions.checkState(abfs == null,
+ "existing ABFS instance exists: %s", abfs);
+ abfs = (AzureBlobFileSystem) FileSystem.newInstance(configuration);
+ return abfs;
+ }
+
+
+ protected NativeAzureFileSystem getWasbFileSystem() {
+ return wasb;
+ }
+
+ protected String getHostName() {
+ return configuration.get(FS_AZURE_TEST_HOST_NAME);
+ }
+
+ protected void setTestUrl(String testUrl) {
+ this.testUrl = testUrl;
+ }
+
+ protected String getTestUrl() {
+ return testUrl;
+ }
+
+ protected void setFileSystemName(String fileSystemName) {
+ this.fileSystemName = fileSystemName;
+ }
+ protected String getFileSystemName() {
+ return fileSystemName;
+ }
+
+ protected String getAccountName() {
+ return configuration.get(FS_AZURE_TEST_ACCOUNT_NAME);
+ }
+
+ protected String getAccountKey() {
+ return configuration.get(
+ FS_AZURE_TEST_ACCOUNT_KEY_PREFIX
+ + getAccountName());
+ }
+
+ protected Configuration getConfiguration() {
+ return configuration;
+ }
+
+ protected boolean isEmulator() {
+ return isEmulator;
+ }
+
+ /**
+ * Write a buffer to a file.
+ * @param path path
+ * @param buffer buffer
+ * @throws IOException failure
+ */
+ protected void write(Path path, byte[] buffer) throws IOException {
+ ContractTestUtils.writeDataset(getFileSystem(), path, buffer, buffer.length,
+ CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT, false);
+ }
+
+ /**
+ * Touch a file in the test store. Will overwrite any existing file.
+ * @param path path
+ * @throws IOException failure.
+ */
+ protected void touch(Path path) throws IOException {
+ ContractTestUtils.touch(getFileSystem(), path);
+ }
+
+ protected static String wasbUrlToAbfsUrl(final String wasbUrl) {
+ return convertTestUrls(
+ wasbUrl, FileSystemUriSchemes.WASB_SCHEME, FileSystemUriSchemes.WASB_SECURE_SCHEME, FileSystemUriSchemes.WASB_DNS_PREFIX,
+ FileSystemUriSchemes.ABFS_SCHEME, FileSystemUriSchemes.ABFS_SECURE_SCHEME, FileSystemUriSchemes.ABFS_DNS_PREFIX);
+ }
+
+ protected static String abfsUrlToWasbUrl(final String abfsUrl) {
+ return convertTestUrls(
+ abfsUrl, FileSystemUriSchemes.ABFS_SCHEME, FileSystemUriSchemes.ABFS_SECURE_SCHEME, FileSystemUriSchemes.ABFS_DNS_PREFIX,
+ FileSystemUriSchemes.WASB_SCHEME, FileSystemUriSchemes.WASB_SECURE_SCHEME, FileSystemUriSchemes.WASB_DNS_PREFIX);
+ }
+
+ private static String convertTestUrls(
+ final String url,
+ final String fromNonSecureScheme,
+ final String fromSecureScheme,
+ final String fromDnsPrefix,
+ final String toNonSecureScheme,
+ final String toSecureScheme,
+ final String toDnsPrefix) {
+ String data = null;
+ if (url.startsWith(fromNonSecureScheme + "://")) {
+ data = url.replace(fromNonSecureScheme + "://", toNonSecureScheme + "://");
+ } else if (url.startsWith(fromSecureScheme + "://")) {
+ data = url.replace(fromSecureScheme + "://", toSecureScheme + "://");
+ }
+
+
+ if (data != null) {
+ data = data.replace("." + fromDnsPrefix + ".",
+ "." + toDnsPrefix + ".");
+ }
+ return data;
+ }
+
+ public Path getTestPath() {
+ Path path = new Path(UriUtils.generateUniqueTestPath());
+ return path;
+ }
+
+ /**
+ * Create a path under the test path provided by
+ * {@link #getTestPath()}.
+ * @param filepath path string in
+ * @return a path qualified by the test filesystem
+ * @throws IOException IO problems
+ */
+ protected Path path(String filepath) throws IOException {
+ return getFileSystem().makeQualified(
+ new Path(getTestPath(), filepath));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/75b184c6/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsScaleTest.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsScaleTest.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsScaleTest.java
new file mode 100644
index 0000000..cfda7a7
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsScaleTest.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.fs.azure.integration.AzureTestConstants;
+
+import static org.apache.hadoop.fs.azure.integration.AzureTestUtils.assumeScaleTestsEnabled;
+
+/**
+ * Integration tests at bigger scale; configurable as to
+ * size, off by default.
+ */
+public class AbstractAbfsScaleTest extends AbstractAbfsIntegrationTest {
+
+ protected static final Logger LOG =
+ LoggerFactory.getLogger(AbstractAbfsScaleTest.class);
+
+ @Override
+ protected int getTestTimeoutMillis() {
+ return AzureTestConstants.SCALE_TEST_TIMEOUT_MILLIS;
+ }
+
+ @Override
+ public void setup() throws Exception {
+ super.setup();
+ LOG.debug("Scale test operation count = {}", getOperationCount());
+ assumeScaleTestsEnabled(getConfiguration());
+ }
+
+ protected long getOperationCount() {
+ return getConfiguration().getLong(AzureTestConstants.KEY_OPERATION_COUNT,
+ AzureTestConstants.DEFAULT_OPERATION_COUNT);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/75b184c6/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/DependencyInjectedTest.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/DependencyInjectedTest.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/DependencyInjectedTest.java
deleted file mode 100644
index 74a530c..0000000
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/DependencyInjectedTest.java
+++ /dev/null
@@ -1,206 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.fs.azurebfs;
-
-import java.net.URI;
-import java.util.UUID;
-import java.util.concurrent.Callable;
-
-import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.azure.AzureNativeFileSystemStore;
-import org.apache.hadoop.fs.azure.NativeAzureFileSystem;
-import org.apache.hadoop.fs.azure.metrics.AzureFileSystemInstrumentation;
-import org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys;
-import org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes;
-import org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys;
-
-import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.FILE_SYSTEM_NOT_FOUND;
-import static org.apache.hadoop.test.LambdaTestUtils.intercept;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assume.assumeNotNull;
-
-/**
- * Provide dependencies for AzureBlobFileSystem tests.
- */
-public abstract class DependencyInjectedTest {
- private final boolean isEmulator;
- private NativeAzureFileSystem wasb;
- private String abfsScheme;
-
- private Configuration configuration;
- private String fileSystemName;
- private String accountName;
- private String testUrl;
-
- public static final String TEST_CONTAINER_PREFIX = "abfs-testcontainer-";
-
- public DependencyInjectedTest(final boolean secure) {
- this(secure ? FileSystemUriSchemes.ABFS_SECURE_SCHEME : FileSystemUriSchemes.ABFS_SCHEME);
- }
-
- protected DependencyInjectedTest() {
- this(FileSystemUriSchemes.ABFS_SCHEME);
- }
-
- private DependencyInjectedTest(final String scheme) {
- abfsScheme = scheme;
- fileSystemName = TEST_CONTAINER_PREFIX + UUID.randomUUID().toString();
- configuration = new Configuration();
- configuration.addResource("azure-bfs-test.xml");
-
- assumeNotNull(configuration.get(TestConfigurationKeys.FS_AZURE_TEST_ACCOUNT_NAME));
- assumeNotNull(configuration.get(TestConfigurationKeys.FS_AZURE_TEST_ACCOUNT_KEY_PREFIX + configuration.get(TestConfigurationKeys
- .FS_AZURE_TEST_ACCOUNT_NAME)));
-
- final String abfsUrl = this.getFileSystemName() + "@" + this.getAccountName();
- URI defaultUri = null;
-
- try {
- defaultUri = new URI(abfsScheme, abfsUrl, null, null, null);
- } catch (Exception ex) {
- Assert.fail(ex.getMessage());
- }
-
- this.testUrl = defaultUri.toString();
- configuration.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, defaultUri.toString());
- configuration.setBoolean(ConfigurationKeys.AZURE_CREATE_REMOTE_FILESYSTEM_DURING_INITIALIZATION, true);
- this.isEmulator = this.configuration.getBoolean(ConfigurationKeys.FS_AZURE_EMULATOR_ENABLED, false);
- this.accountName = this.configuration.get(TestConfigurationKeys.FS_AZURE_TEST_ACCOUNT_NAME);
- }
-
- @Before
- public void initialize() throws Exception {
- //Create filesystem first to make sure getWasbFileSystem() can return an existed filesystem.
- this.getFileSystem();
-
- if (!this.isEmulator) {
- final URI wasbUri = new URI(abfsUrlToWasbUrl(this.getTestUrl()));
- final AzureNativeFileSystemStore azureNativeFileSystemStore = new AzureNativeFileSystemStore();
- azureNativeFileSystemStore.initialize(
- wasbUri,
- this.getConfiguration(),
- new AzureFileSystemInstrumentation(this.getConfiguration()));
-
- this.wasb = new NativeAzureFileSystem(azureNativeFileSystemStore);
- this.wasb.initialize(wasbUri, configuration);
- }
- }
-
- @After
- public void testCleanup() throws Exception {
- if (this.wasb != null) {
- this.wasb.close();
- }
-
- FileSystem.closeAll();
-
- final AzureBlobFileSystem fs = this.getFileSystem();
- final AzureBlobFileSystemStore abfsStore = fs.getAbfsStore();
- abfsStore.deleteFilesystem();
-
- AbfsRestOperationException ex = intercept(
- AbfsRestOperationException.class,
- new Callable<Void>() {
- @Override
- public Void call() throws Exception {
- fs.getAbfsStore().getFilesystemProperties();
- return null;
- }
- });
-
- assertEquals(FILE_SYSTEM_NOT_FOUND.getStatusCode(), ex.getStatusCode());
- }
-
- public AzureBlobFileSystem getFileSystem() throws Exception {
- return (AzureBlobFileSystem) FileSystem.get(this.configuration);
- }
-
- protected NativeAzureFileSystem getWasbFileSystem() {
- return this.wasb;
- }
-
- protected String getHostName() {
- return configuration.get(TestConfigurationKeys.FS_AZURE_TEST_HOST_NAME);
- }
-
- protected void updateTestUrl(String testUrl) {
- this.testUrl = testUrl;
- }
- protected String getTestUrl() {
- return testUrl;
- }
-
- protected void updateFileSystemName(String fileSystemName) {
- this.fileSystemName = fileSystemName;
- }
- protected String getFileSystemName() {
- return fileSystemName;
- }
-
- protected String getAccountName() {
- return configuration.get(TestConfigurationKeys.FS_AZURE_TEST_ACCOUNT_NAME);
- }
-
- protected String getAccountKey() {
- return configuration.get(
- TestConfigurationKeys.FS_AZURE_TEST_ACCOUNT_KEY_PREFIX
- + getAccountName());
- }
-
- protected Configuration getConfiguration() {
- return this.configuration;
- }
-
- protected boolean isEmulator() {
- return isEmulator;
- }
-
- protected static String wasbUrlToAbfsUrl(final String wasbUrl) {
- return convertTestUrls(
- wasbUrl, FileSystemUriSchemes.WASB_SCHEME, FileSystemUriSchemes.WASB_SECURE_SCHEME, FileSystemUriSchemes.WASB_DNS_PREFIX,
- FileSystemUriSchemes.ABFS_SCHEME, FileSystemUriSchemes.ABFS_SECURE_SCHEME, FileSystemUriSchemes.ABFS_DNS_PREFIX);
- }
-
- protected static String abfsUrlToWasbUrl(final String abfsUrl) {
- return convertTestUrls(
- abfsUrl, FileSystemUriSchemes.ABFS_SCHEME, FileSystemUriSchemes.ABFS_SECURE_SCHEME, FileSystemUriSchemes.ABFS_DNS_PREFIX,
- FileSystemUriSchemes.WASB_SCHEME, FileSystemUriSchemes.WASB_SECURE_SCHEME, FileSystemUriSchemes.WASB_DNS_PREFIX);
- }
-
- private static String convertTestUrls(
- final String url, final String fromNonSecureScheme, final String fromSecureScheme, final String fromDnsPrefix,
- final String toNonSecureScheme, final String toSecureScheme, final String toDnsPrefix) {
- String data = null;
- if (url.startsWith(fromNonSecureScheme + "://")) {
- data = url.replace(fromNonSecureScheme + "://", toNonSecureScheme + "://");
- } else if (url.startsWith(fromSecureScheme + "://")) {
- data = url.replace(fromSecureScheme + "://", toSecureScheme + "://");
- }
-
- data = data.replace("." + fromDnsPrefix + ".", "." + toDnsPrefix + ".");
- return data;
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/75b184c6/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemAppend.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemAppend.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemAppend.java
index 10d42d1..f2e26ec 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemAppend.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemAppend.java
@@ -25,13 +25,13 @@ import org.junit.Test;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
-
-import static org.junit.Assert.assertEquals;
+import org.apache.hadoop.fs.contract.ContractTestUtils;
/**
* Test append operations.
*/
-public class ITestAzureBlobFileSystemAppend extends DependencyInjectedTest {
+public class ITestAzureBlobFileSystemAppend extends
+ AbstractAbfsIntegrationTest {
private static final Path TEST_FILE_PATH = new Path("testfile");
private static final Path TEST_FOLDER_PATH = new Path("testFolder");
public ITestAzureBlobFileSystemAppend() {
@@ -40,7 +40,7 @@ public class ITestAzureBlobFileSystemAppend extends DependencyInjectedTest {
@Test(expected = FileNotFoundException.class)
public void testAppendDirShouldFail() throws Exception {
- final AzureBlobFileSystem fs = this.getFileSystem();
+ final AzureBlobFileSystem fs = getFileSystem();
final Path filePath = TEST_FILE_PATH;
fs.mkdirs(filePath);
fs.append(filePath, 0);
@@ -48,21 +48,21 @@ public class ITestAzureBlobFileSystemAppend extends DependencyInjectedTest {
@Test
public void testAppendWithLength0() throws Exception {
- final AzureBlobFileSystem fs = this.getFileSystem();
- FSDataOutputStream stream = fs.create(TEST_FILE_PATH);
- final byte[] b = new byte[1024];
- new Random().nextBytes(b);
- stream.write(b, 1000, 0);
-
- assertEquals(0, stream.getPos());
+ final AzureBlobFileSystem fs = getFileSystem();
+ try(FSDataOutputStream stream = fs.create(TEST_FILE_PATH)) {
+ final byte[] b = new byte[1024];
+ new Random().nextBytes(b);
+ stream.write(b, 1000, 0);
+ assertEquals(0, stream.getPos());
+ }
}
@Test(expected = FileNotFoundException.class)
public void testAppendFileAfterDelete() throws Exception {
- final AzureBlobFileSystem fs = this.getFileSystem();
+ final AzureBlobFileSystem fs = getFileSystem();
final Path filePath = TEST_FILE_PATH;
- fs.create(filePath);
+ ContractTestUtils.touch(fs, filePath);
fs.delete(filePath, false);
fs.append(filePath);
@@ -70,7 +70,7 @@ public class ITestAzureBlobFileSystemAppend extends DependencyInjectedTest {
@Test(expected = FileNotFoundException.class)
public void testAppendDirectory() throws Exception {
- final AzureBlobFileSystem fs = this.getFileSystem();
+ final AzureBlobFileSystem fs = getFileSystem();
final Path folderPath = TEST_FOLDER_PATH;
fs.mkdirs(folderPath);
fs.append(folderPath);
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org