You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ma...@apache.org on 2018/09/23 03:24:05 UTC
[08/45] hadoop git commit: HADOOP-15407. HADOOP-15540. Support
Windows Azure Storage - Blob file system "ABFS" in Hadoop: Core Commit.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f044deed/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/SharedKeyCredentials.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/SharedKeyCredentials.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/SharedKeyCredentials.java
new file mode 100644
index 0000000..dd59892
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/SharedKeyCredentials.java
@@ -0,0 +1,507 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import javax.crypto.Mac;
+import javax.crypto.spec.SecretKeySpec;
+import java.io.UnsupportedEncodingException;
+import java.net.HttpURLConnection;
+import java.net.URLDecoder;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.TimeZone;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants;
+import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
+
+import org.apache.commons.codec.binary.Base64;
+import org.apache.commons.codec.Charsets;
+/**
+ * Represents the shared key credentials used to access an Azure Storage
+ * account.
+ */
+public class SharedKeyCredentials {
+ private static final int EXPECTED_BLOB_QUEUE_CANONICALIZED_STRING_LENGTH = 300;
+ private static final Pattern CRLF = Pattern.compile("\r\n", Pattern.LITERAL);
+ private static final String HMAC_SHA256 = "HmacSHA256";
+ private static final Base64 BASE_64 = new Base64();
+
+ /**
+ * Stores a reference to the RFC1123 date/time pattern.
+ */
+ private static final String RFC1123_PATTERN = "EEE, dd MMM yyyy HH:mm:ss z";
+
+
+ private String accountName;
+ private byte[] accountKey;
+ private Mac hmacSha256;
+
+ public SharedKeyCredentials(final String accountName,
+ final String accountKey) {
+ if (accountName == null || accountName.isEmpty()) {
+ throw new IllegalArgumentException("Invalid account name.");
+ }
+ if (accountKey == null || accountKey.isEmpty()) {
+ throw new IllegalArgumentException("Invalid account key.");
+ }
+ this.accountName = accountName;
+ this.accountKey = BASE_64.decode(accountKey);
+ initializeMac();
+ }
+
+ public void signRequest(HttpURLConnection connection, final long contentLength) throws UnsupportedEncodingException {
+
+ connection.setRequestProperty(HttpHeaderConfigurations.X_MS_DATE, getGMTTime());
+
+ final String stringToSign = canonicalize(connection, accountName, contentLength);
+
+ final String computedBase64Signature = computeHmac256(stringToSign);
+
+ connection.setRequestProperty(HttpHeaderConfigurations.AUTHORIZATION,
+ String.format("%s %s:%s", "SharedKey", accountName, computedBase64Signature));
+ }
+
+ private String computeHmac256(final String stringToSign) {
+ byte[] utf8Bytes = null;
+ try {
+ utf8Bytes = stringToSign.getBytes(AbfsHttpConstants.UTF_8);
+ } catch (final UnsupportedEncodingException e) {
+ throw new IllegalArgumentException(e);
+ }
+ byte[] hmac;
+ synchronized (this) {
+ hmac = hmacSha256.doFinal(utf8Bytes);
+ }
+ return new String(BASE_64.encode(hmac), Charsets.UTF_8);
+ }
+
+ /**
+ * Add x-ms- prefixed headers in a fixed order.
+ *
+ * @param conn the HttpURLConnection for the operation
+ * @param canonicalizedString the canonicalized string to add the canonicalized headerst to.
+ */
+ private static void addCanonicalizedHeaders(final HttpURLConnection conn, final StringBuilder canonicalizedString) {
+ // Look for header names that start with
+ // HeaderNames.PrefixForStorageHeader
+ // Then sort them in case-insensitive manner.
+
+ final Map<String, List<String>> headers = conn.getRequestProperties();
+ final ArrayList<String> httpStorageHeaderNameArray = new ArrayList<String>();
+
+ for (final String key : headers.keySet()) {
+ if (key.toLowerCase(Locale.ROOT).startsWith(AbfsHttpConstants.HTTP_HEADER_PREFIX)) {
+ httpStorageHeaderNameArray.add(key.toLowerCase(Locale.ROOT));
+ }
+ }
+
+ Collections.sort(httpStorageHeaderNameArray);
+
+ // Now go through each header's values in the sorted order and append
+ // them to the canonicalized string.
+ for (final String key : httpStorageHeaderNameArray) {
+ final StringBuilder canonicalizedElement = new StringBuilder(key);
+ String delimiter = ":";
+ final ArrayList<String> values = getHeaderValues(headers, key);
+
+ boolean appendCanonicalizedElement = false;
+ // Go through values, unfold them, and then append them to the
+ // canonicalized element string.
+ for (final String value : values) {
+ if (value != null) {
+ appendCanonicalizedElement = true;
+ }
+
+ // Unfolding is simply removal of CRLF.
+ final String unfoldedValue = CRLF.matcher(value)
+ .replaceAll(Matcher.quoteReplacement(""));
+
+ // Append it to the canonicalized element string.
+ canonicalizedElement.append(delimiter);
+ canonicalizedElement.append(unfoldedValue);
+ delimiter = ",";
+ }
+
+ // Now, add this canonicalized element to the canonicalized header
+ // string.
+ if (appendCanonicalizedElement) {
+ appendCanonicalizedElement(canonicalizedString, canonicalizedElement.toString());
+ }
+ }
+ }
+
+ /**
+ * Initialie the HmacSha256 associated with the account key.
+ */
+ private void initializeMac() {
+ // Initializes the HMAC-SHA256 Mac and SecretKey.
+ try {
+ hmacSha256 = Mac.getInstance(HMAC_SHA256);
+ hmacSha256.init(new SecretKeySpec(accountKey, HMAC_SHA256));
+ } catch (final Exception e) {
+ throw new IllegalArgumentException(e);
+ }
+ }
+
+ /**
+ * Append a string to a string builder with a newline constant
+ *
+ * @param builder the StringBuilder object
+ * @param element the string to append.
+ */
+ private static void appendCanonicalizedElement(final StringBuilder builder, final String element) {
+ builder.append("\n");
+ builder.append(element);
+ }
+
+ /**
+ * Constructs a canonicalized string from the request's headers that will be used to construct the signature string
+ * for signing a Blob or Queue service request under the Shared Key Full authentication scheme.
+ *
+ * @param address the request URI
+ * @param accountName the account name associated with the request
+ * @param method the verb to be used for the HTTP request.
+ * @param contentType the content type of the HTTP request.
+ * @param contentLength the length of the content written to the outputstream in bytes, -1 if unknown
+ * @param date the date/time specification for the HTTP request
+ * @param conn the HttpURLConnection for the operation.
+ * @return A canonicalized string.
+ */
+ private static String canonicalizeHttpRequest(final java.net.URL address, final String accountName,
+ final String method, final String contentType, final long contentLength, final String date,
+ final HttpURLConnection conn) throws UnsupportedEncodingException {
+
+ // The first element should be the Method of the request.
+ // I.e. GET, POST, PUT, or HEAD.
+ final StringBuilder canonicalizedString = new StringBuilder(EXPECTED_BLOB_QUEUE_CANONICALIZED_STRING_LENGTH);
+ canonicalizedString.append(conn.getRequestMethod());
+
+ // The next elements are
+ // If any element is missing it may be empty.
+ appendCanonicalizedElement(canonicalizedString,
+ getHeaderValue(conn, HttpHeaderConfigurations.CONTENT_ENCODING, AbfsHttpConstants.EMPTY_STRING));
+ appendCanonicalizedElement(canonicalizedString,
+ getHeaderValue(conn, HttpHeaderConfigurations.CONTENT_LANGUAGE, AbfsHttpConstants.EMPTY_STRING));
+ appendCanonicalizedElement(canonicalizedString,
+ contentLength <= 0 ? "" : String.valueOf(contentLength));
+ appendCanonicalizedElement(canonicalizedString,
+ getHeaderValue(conn, HttpHeaderConfigurations.CONTENT_MD5, AbfsHttpConstants.EMPTY_STRING));
+ appendCanonicalizedElement(canonicalizedString, contentType != null ? contentType : AbfsHttpConstants.EMPTY_STRING);
+
+ final String dateString = getHeaderValue(conn, HttpHeaderConfigurations.X_MS_DATE, AbfsHttpConstants.EMPTY_STRING);
+ // If x-ms-date header exists, Date should be empty string
+ appendCanonicalizedElement(canonicalizedString, dateString.equals(AbfsHttpConstants.EMPTY_STRING) ? date
+ : "");
+
+ appendCanonicalizedElement(canonicalizedString,
+ getHeaderValue(conn, HttpHeaderConfigurations.IF_MODIFIED_SINCE, AbfsHttpConstants.EMPTY_STRING));
+ appendCanonicalizedElement(canonicalizedString,
+ getHeaderValue(conn, HttpHeaderConfigurations.IF_MATCH, AbfsHttpConstants.EMPTY_STRING));
+ appendCanonicalizedElement(canonicalizedString,
+ getHeaderValue(conn, HttpHeaderConfigurations.IF_NONE_MATCH, AbfsHttpConstants.EMPTY_STRING));
+ appendCanonicalizedElement(canonicalizedString,
+ getHeaderValue(conn, HttpHeaderConfigurations.IF_UNMODIFIED_SINCE, AbfsHttpConstants.EMPTY_STRING));
+ appendCanonicalizedElement(canonicalizedString,
+ getHeaderValue(conn, HttpHeaderConfigurations.RANGE, AbfsHttpConstants.EMPTY_STRING));
+
+ addCanonicalizedHeaders(conn, canonicalizedString);
+
+ appendCanonicalizedElement(canonicalizedString, getCanonicalizedResource(address, accountName));
+
+ return canonicalizedString.toString();
+ }
+
+ /**
+ * Gets the canonicalized resource string for a Blob or Queue service request under the Shared Key Lite
+ * authentication scheme.
+ *
+ * @param address the resource URI.
+ * @param accountName the account name for the request.
+ * @return the canonicalized resource string.
+ */
+ private static String getCanonicalizedResource(final java.net.URL address, final String accountName) throws UnsupportedEncodingException {
+ // Resource path
+ final StringBuilder resourcepath = new StringBuilder(AbfsHttpConstants.FORWARD_SLASH);
+ resourcepath.append(accountName);
+
+ // Note that AbsolutePath starts with a '/'.
+ resourcepath.append(address.getPath());
+ final StringBuilder canonicalizedResource = new StringBuilder(resourcepath.toString());
+
+ // query parameters
+ if (address.getQuery() == null || !address.getQuery().contains(AbfsHttpConstants.EQUAL)) {
+ //no query params.
+ return canonicalizedResource.toString();
+ }
+
+ final Map<String, String[]> queryVariables = parseQueryString(address.getQuery());
+
+ final Map<String, String> lowercasedKeyNameValue = new HashMap<String, String>();
+
+ for (final Entry<String, String[]> entry : queryVariables.entrySet()) {
+ // sort the value and organize it as comma separated values
+ final List<String> sortedValues = Arrays.asList(entry.getValue());
+ Collections.sort(sortedValues);
+
+ final StringBuilder stringValue = new StringBuilder();
+
+ for (final String value : sortedValues) {
+ if (stringValue.length() > 0) {
+ stringValue.append(AbfsHttpConstants.COMMA);
+ }
+
+ stringValue.append(value);
+ }
+
+ // key turns out to be null for ?a&b&c&d
+ lowercasedKeyNameValue.put((entry.getKey()) == null ? null
+ : entry.getKey().toLowerCase(Locale.ROOT), stringValue.toString());
+ }
+
+ final ArrayList<String> sortedKeys = new ArrayList<String>(lowercasedKeyNameValue.keySet());
+
+ Collections.sort(sortedKeys);
+
+ for (final String key : sortedKeys) {
+ final StringBuilder queryParamString = new StringBuilder();
+
+ queryParamString.append(key);
+ queryParamString.append(":");
+ queryParamString.append(lowercasedKeyNameValue.get(key));
+
+ appendCanonicalizedElement(canonicalizedResource, queryParamString.toString());
+ }
+
+ return canonicalizedResource.toString();
+ }
+
+ /**
+ * Gets all the values for the given header in the one to many map, performs a trimStart() on each return value
+ *
+ * @param headers a one to many map of key / values representing the header values for the connection.
+ * @param headerName the name of the header to lookup
+ * @return an ArrayList<String> of all trimmed values corresponding to the requested headerName. This may be empty
+ * if the header is not found.
+ */
+ private static ArrayList<String> getHeaderValues(final Map<String, List<String>> headers, final String headerName) {
+
+ final ArrayList<String> arrayOfValues = new ArrayList<String>();
+ List<String> values = null;
+
+ for (final Entry<String, List<String>> entry : headers.entrySet()) {
+ if (entry.getKey().toLowerCase(Locale.ROOT).equals(headerName)) {
+ values = entry.getValue();
+ break;
+ }
+ }
+ if (values != null) {
+ for (final String value : values) {
+ // canonicalization formula requires the string to be left
+ // trimmed.
+ arrayOfValues.add(trimStart(value));
+ }
+ }
+ return arrayOfValues;
+ }
+
+ /**
+ * Parses a query string into a one to many hashmap.
+ *
+ * @param parseString the string to parse
+ * @return a HashMap<String, String[]> of the key values.
+ */
+ private static HashMap<String, String[]> parseQueryString(String parseString) throws UnsupportedEncodingException {
+ final HashMap<String, String[]> retVals = new HashMap<String, String[]>();
+ if (parseString == null || parseString.isEmpty()) {
+ return retVals;
+ }
+
+ // 1. Remove ? if present
+ final int queryDex = parseString.indexOf(AbfsHttpConstants.QUESTION_MARK);
+ if (queryDex >= 0 && parseString.length() > 0) {
+ parseString = parseString.substring(queryDex + 1);
+ }
+
+ // 2. split name value pairs by splitting on the 'c&' character
+ final String[] valuePairs = parseString.contains(AbfsHttpConstants.AND_MARK)
+ ? parseString.split(AbfsHttpConstants.AND_MARK)
+ : parseString.split(AbfsHttpConstants.SEMICOLON);
+
+ // 3. for each field value pair parse into appropriate map entries
+ for (int m = 0; m < valuePairs.length; m++) {
+ final int equalDex = valuePairs[m].indexOf(AbfsHttpConstants.EQUAL);
+
+ if (equalDex < 0 || equalDex == valuePairs[m].length() - 1) {
+ continue;
+ }
+
+ String key = valuePairs[m].substring(0, equalDex);
+ String value = valuePairs[m].substring(equalDex + 1);
+
+ key = safeDecode(key);
+ value = safeDecode(value);
+
+ // 3.1 add to map
+ String[] values = retVals.get(key);
+
+ if (values == null) {
+ values = new String[]{value};
+ if (!value.equals("")) {
+ retVals.put(key, values);
+ }
+ }
+ }
+
+ return retVals;
+ }
+
+ /**
+ * Performs safe decoding of the specified string, taking care to preserve each <code>+</code> character, rather
+ * than replacing it with a space character.
+ *
+ * @param stringToDecode A <code>String</code> that represents the string to decode.
+ * @return A <code>String</code> that represents the decoded string.
+ * <p>
+ * If a storage service error occurred.
+ */
+ private static String safeDecode(final String stringToDecode) throws UnsupportedEncodingException {
+ if (stringToDecode == null) {
+ return null;
+ }
+
+ if (stringToDecode.length() == 0) {
+ return "";
+ }
+
+ if (stringToDecode.contains(AbfsHttpConstants.PLUS)) {
+ final StringBuilder outBuilder = new StringBuilder();
+
+ int startDex = 0;
+ for (int m = 0; m < stringToDecode.length(); m++) {
+ if (stringToDecode.charAt(m) == '+') {
+ if (m > startDex) {
+ outBuilder.append(URLDecoder.decode(stringToDecode.substring(startDex, m),
+ AbfsHttpConstants.UTF_8));
+ }
+
+ outBuilder.append(AbfsHttpConstants.PLUS);
+ startDex = m + 1;
+ }
+ }
+
+ if (startDex != stringToDecode.length()) {
+ outBuilder.append(URLDecoder.decode(stringToDecode.substring(startDex, stringToDecode.length()),
+ AbfsHttpConstants.UTF_8));
+ }
+
+ return outBuilder.toString();
+ } else {
+ return URLDecoder.decode(stringToDecode, AbfsHttpConstants.UTF_8);
+ }
+ }
+
+ private static String trimStart(final String value) {
+ int spaceDex = 0;
+ while (spaceDex < value.length() && value.charAt(spaceDex) == ' ') {
+ spaceDex++;
+ }
+
+ return value.substring(spaceDex);
+ }
+
+ private static String getHeaderValue(final HttpURLConnection conn, final String headerName, final String defaultValue) {
+ final String headerValue = conn.getRequestProperty(headerName);
+ return headerValue == null ? defaultValue : headerValue;
+ }
+
+
+ /**
+ * Constructs a canonicalized string for signing a request.
+ *
+ * @param conn the HttpURLConnection to canonicalize
+ * @param accountName the account name associated with the request
+ * @param contentLength the length of the content written to the outputstream in bytes,
+ * -1 if unknown
+ * @return a canonicalized string.
+ */
+ private String canonicalize(final HttpURLConnection conn,
+ final String accountName,
+ final Long contentLength) throws UnsupportedEncodingException {
+
+ if (contentLength < -1) {
+ throw new IllegalArgumentException(
+ "The Content-Length header must be greater than or equal to -1.");
+ }
+
+ String contentType = getHeaderValue(conn, HttpHeaderConfigurations.CONTENT_TYPE, "");
+
+ return canonicalizeHttpRequest(conn.getURL(), accountName,
+ conn.getRequestMethod(), contentType, contentLength, null, conn);
+ }
+
+ /**
+ * Thread local for storing GMT date format.
+ */
+ private static ThreadLocal<DateFormat> rfc1123GmtDateTimeFormatter
+ = new ThreadLocal<DateFormat>() {
+ @Override
+ protected DateFormat initialValue() {
+ final DateFormat formatter = new SimpleDateFormat(RFC1123_PATTERN, Locale.ROOT);
+ formatter.setTimeZone(GMT_ZONE);
+ return formatter;
+ }
+ };
+
+ public static final TimeZone GMT_ZONE = TimeZone.getTimeZone(AbfsHttpConstants.GMT_TIMEZONE);
+
+
+ /**
+ * Returns the current GMT date/time String using the RFC1123 pattern.
+ *
+ * @return A <code>String</code> that represents the current GMT date/time using the RFC1123 pattern.
+ */
+ static String getGMTTime() {
+ return getGMTTime(new Date());
+ }
+
+ /**
+ * Returns the GTM date/time String for the specified value using the RFC1123 pattern.
+ *
+ * @param date
+ * A <code>Date</code> object that represents the date to convert to GMT date/time in the RFC1123
+ * pattern.
+ *
+ * @return A <code>String</code> that represents the GMT date/time for the specified value using the RFC1123
+ * pattern.
+ */
+ static String getGMTTime(final Date date) {
+ return rfc1123GmtDateTimeFormatter.get().format(date);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f044deed/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/TracingServiceImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/TracingServiceImpl.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/TracingServiceImpl.java
new file mode 100644
index 0000000..57b6463
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/TracingServiceImpl.java
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.util.Objects;
+
+import com.google.common.base.Preconditions;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
+import org.apache.hadoop.fs.azurebfs.contracts.services.TracingService;
+import org.apache.htrace.core.HTraceConfiguration;
+import org.apache.htrace.core.Sampler;
+import org.apache.htrace.core.SpanId;
+import org.apache.htrace.core.TraceScope;
+import org.apache.htrace.core.Tracer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Singleton
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+final class TracingServiceImpl implements TracingService {
+ private static final Logger LOG = LoggerFactory.getLogger(TracingService.class);
+
+ private final Tracer tracer;
+ private final ThreadLocal<SpanId> currentScopeId;
+
+ @Inject
+ TracingServiceImpl(
+ final Configuration configuration) {
+ Preconditions.checkNotNull(configuration, "configuration");
+
+ this.currentScopeId = new ThreadLocal<>();
+
+ this.tracer = new Tracer.Builder(TracingService.class.getSimpleName()).
+ conf(new HTraceConfiguration() {
+ @Override
+ public String get(String key) {
+ if (Objects.equals(key, Tracer.SPAN_RECEIVER_CLASSES_KEY)) {
+ return LoggerSpanReceiver.class.getName();
+ }
+ return null;
+ }
+
+ @Override
+ public String get(String key, String defaultValue) {
+ String value = get(key);
+ if (value != null) {
+ return value;
+ }
+ return defaultValue;
+ }
+ }).
+ build();
+
+ this.tracer.addSampler(Sampler.ALWAYS);
+ }
+
+ @Override
+ public TraceScope traceBegin(String description) {
+ if (this.LOG.isTraceEnabled()) {
+ TraceScope traceScope = this.tracer.newScope(description);
+ this.currentScopeId.set(traceScope.getSpanId());
+ return traceScope;
+ }
+
+ return null;
+ }
+
+ @Override
+ public TraceScope traceBegin(String description, SpanId parentSpanId) {
+ if (this.LOG.isTraceEnabled()) {
+ TraceScope traceScope = this.tracer.newScope(description, parentSpanId);
+ this.currentScopeId.set(traceScope.getSpanId());
+ return traceScope;
+ }
+
+ return null;
+ }
+
+ @Override
+ public void traceException(TraceScope traceScope, AzureBlobFileSystemException azureBlobFileSystemException) {
+ if (this.LOG.isTraceEnabled()) {
+ Preconditions.checkNotNull(traceScope, "traceScope");
+ Preconditions.checkNotNull(azureBlobFileSystemException, "azureBlobFileSystemException");
+
+ StringWriter stringWriter = new StringWriter();
+ PrintWriter printWriter = new PrintWriter(stringWriter);
+ azureBlobFileSystemException.printStackTrace(printWriter);
+ printWriter.flush();
+
+ traceScope.addKVAnnotation("Exception", stringWriter.toString());
+ }
+ }
+
+ @Override
+ public SpanId getCurrentTraceScopeSpanId() {
+ return this.currentScopeId.get();
+ }
+
+ @Override
+ public void traceEnd(TraceScope traceScope) {
+ if (this.LOG.isTraceEnabled()) {
+ Preconditions.checkNotNull(traceScope, "traceScope");
+
+ SpanId[] parents = traceScope.getSpan().getParents();
+ this.currentScopeId.set(parents != null && parents.length > 0 ? parents[parents.length - 1] : null);
+ traceScope.close();
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f044deed/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/package-info.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/package-info.java
new file mode 100644
index 0000000..97c1d71
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+package org.apache.hadoop.fs.azurebfs.services;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f044deed/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/UriUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/UriUtils.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/UriUtils.java
new file mode 100644
index 0000000..7652adf
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/UriUtils.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.utils;
+
+import java.util.regex.Pattern;
+
+/**
+ * Utility class to help with Abfs url transformation to blob urls.
+ */
+public final class UriUtils {
+ private static final String ABFS_URI_REGEX = "[^.]+\\.dfs\\.(preprod\\.){0,1}core\\.windows\\.net";
+ private static final Pattern ABFS_URI_PATTERN = Pattern.compile(ABFS_URI_REGEX);
+
+ /**
+ * Checks whether a string includes abfs url.
+ * @param string the string to check.
+ * @return true if string has abfs url.
+ */
+ public static boolean containsAbfsUrl(final String string) {
+ if (string == null || string.isEmpty()) {
+ return false;
+ }
+
+ return ABFS_URI_PATTERN.matcher(string).matches();
+ }
+
+ /**
+ * Extracts the raw account name from account name.
+ * @param accountName to extract the raw account name.
+ * @return extracted raw account name.
+ */
+ public static String extractRawAccountFromAccountName(final String accountName) {
+ if (accountName == null || accountName.isEmpty()) {
+ return null;
+ }
+
+ if (!containsAbfsUrl(accountName)) {
+ return null;
+ }
+
+ String[] splitByDot = accountName.split("\\.");
+ if (splitByDot.length == 0) {
+ return null;
+ }
+
+ return splitByDot[0];
+ }
+
+ /**
+ * Generate unique test path for multiple user tests.
+ *
+ * @return root test path
+ */
+ public static String generateUniqueTestPath() {
+ String testUniqueForkId = System.getProperty("test.unique.fork.id");
+ return testUniqueForkId == null ? "/test" : "/" + testUniqueForkId + "/test";
+ }
+
+ private UriUtils() {
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f044deed/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/package-info.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/package-info.java
new file mode 100644
index 0000000..d8cc940
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+package org.apache.hadoop.fs.azurebfs.utils;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f044deed/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/DependencyInjectedTest.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/DependencyInjectedTest.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/DependencyInjectedTest.java
new file mode 100644
index 0000000..5ec1e2e
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/DependencyInjectedTest.java
@@ -0,0 +1,225 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs;
+
+import java.net.URI;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+
+import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
+import org.apache.hadoop.fs.azurebfs.services.AbfsServiceProviderImpl;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.mockito.internal.util.MockUtil;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.azure.AzureNativeFileSystemStore;
+import org.apache.hadoop.fs.azure.NativeAzureFileSystem;
+import org.apache.hadoop.fs.azure.metrics.AzureFileSystemInstrumentation;
+import org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys;
+import org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes;
+import org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys;
+import org.apache.hadoop.fs.azurebfs.contracts.services.AbfsHttpClientFactory;
+import org.apache.hadoop.fs.azurebfs.contracts.services.AbfsHttpService;
+import org.apache.hadoop.fs.azurebfs.contracts.services.ConfigurationService;
+import org.apache.hadoop.fs.azurebfs.services.MockAbfsHttpClientFactoryImpl;
+import org.apache.hadoop.fs.azurebfs.services.MockAbfsServiceInjectorImpl;
+import org.apache.hadoop.fs.azurebfs.services.MockServiceProviderImpl;
+
+import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.FILE_SYSTEM_NOT_FOUND;
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assume.assumeNotNull;
+
+/**
+ * Provide dependencies for AzureBlobFileSystem tests.
+ */
+public abstract class DependencyInjectedTest {
+ private final MockAbfsServiceInjectorImpl mockServiceInjector;
+ private final boolean isEmulator;
+ private NativeAzureFileSystem wasb;
+ private String abfsScheme;
+
+ private Configuration configuration;
+ private String fileSystemName;
+ private String accountName;
+ private String testUrl;
+
+ public DependencyInjectedTest(final boolean secure) {
+ this(secure ? FileSystemUriSchemes.ABFS_SECURE_SCHEME : FileSystemUriSchemes.ABFS_SCHEME);
+ }
+
+ public MockAbfsServiceInjectorImpl getMockServiceInjector() {
+ return this.mockServiceInjector;
+ }
+
+ protected DependencyInjectedTest() {
+ this(FileSystemUriSchemes.ABFS_SCHEME);
+ }
+
+ private DependencyInjectedTest(final String scheme) {
+ abfsScheme = scheme;
+ fileSystemName = UUID.randomUUID().toString();
+ configuration = new Configuration();
+ configuration.addResource("azure-bfs-test.xml");
+
+ assumeNotNull(configuration.get(TestConfigurationKeys.FS_AZURE_TEST_ACCOUNT_NAME));
+ assumeNotNull(configuration.get(TestConfigurationKeys.FS_AZURE_TEST_ACCOUNT_KEY_PREFIX + configuration.get(TestConfigurationKeys
+ .FS_AZURE_TEST_ACCOUNT_NAME)));
+
+ final String abfsUrl = this.getFileSystemName() + "@" + this.getAccountName();
+ URI defaultUri = null;
+
+ try {
+ defaultUri = new URI(abfsScheme, abfsUrl, null, null, null);
+ } catch (Exception ex) {
+ Assert.fail(ex.getMessage());
+ }
+
+ this.testUrl = defaultUri.toString();
+ configuration.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, defaultUri.toString());
+ configuration.setBoolean(ConfigurationKeys.AZURE_CREATE_REMOTE_FILESYSTEM_DURING_INITIALIZATION, true);
+ this.mockServiceInjector = new MockAbfsServiceInjectorImpl(configuration);
+ this.isEmulator = this.configuration.getBoolean(ConfigurationKeys.FS_AZURE_EMULATOR_ENABLED, false);
+ this.accountName = this.configuration.get(TestConfigurationKeys.FS_AZURE_TEST_ACCOUNT_NAME);
+ }
+
+ @Before
+ public void initialize() throws Exception {
+ if (this.isEmulator) {
+ this.mockServiceInjector.replaceProvider(AbfsHttpClientFactory.class, MockAbfsHttpClientFactoryImpl.class);
+ }
+
+ MockServiceProviderImpl.create(this.mockServiceInjector);
+
+ if (!this.isEmulator) {
+ final URI wasbUri = new URI(abfsUrlToWasbUrl(this.getTestUrl()));
+ final AzureNativeFileSystemStore azureNativeFileSystemStore = new AzureNativeFileSystemStore();
+ azureNativeFileSystemStore.initialize(
+ wasbUri,
+ this.getConfiguration(),
+ new AzureFileSystemInstrumentation(this.getConfiguration()));
+
+ this.wasb = new NativeAzureFileSystem(azureNativeFileSystemStore);
+ this.wasb.initialize(wasbUri, configuration);
+ }
+ }
+
+ @After
+ public void testCleanup() throws Exception {
+ if (this.wasb != null) {
+ this.wasb.close();
+ }
+
+ FileSystem.closeAll();
+
+ final AzureBlobFileSystem fs = this.getFileSystem();
+ final AbfsHttpService abfsHttpService = AbfsServiceProviderImpl.instance().get(AbfsHttpService.class);
+ abfsHttpService.deleteFilesystem(fs);
+
+ if (!(new MockUtil().isMock(abfsHttpService))) {
+ AbfsRestOperationException ex = intercept(
+ AbfsRestOperationException.class,
+ new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ abfsHttpService.getFilesystemProperties(fs);
+ return null;
+ }
+ });
+
+ assertEquals(FILE_SYSTEM_NOT_FOUND.getStatusCode(), ex.getStatusCode());
+ }
+ }
+
+ public AzureBlobFileSystem getFileSystem() throws Exception {
+ final Configuration configuration = AbfsServiceProviderImpl.instance().get(ConfigurationService.class).getConfiguration();
+ final AzureBlobFileSystem fs = (AzureBlobFileSystem) FileSystem.get(configuration);
+ return fs;
+ }
+
+ protected NativeAzureFileSystem getWasbFileSystem() {
+ return this.wasb;
+ }
+
+ protected String getHostName() {
+ return configuration.get(TestConfigurationKeys.FS_AZURE_TEST_HOST_NAME);
+ }
+
+ protected void updateTestUrl(String testUrl) {
+ this.testUrl = testUrl;
+ }
+ protected String getTestUrl() {
+ return testUrl;
+ }
+
+ protected void updateFileSystemName(String fileSystemName) {
+ this.fileSystemName = fileSystemName;
+ }
+ protected String getFileSystemName() {
+ return fileSystemName;
+ }
+
+ protected String getAccountName() {
+ return configuration.get(TestConfigurationKeys.FS_AZURE_TEST_ACCOUNT_NAME);
+ }
+
+ protected String getAccountKey() {
+ return configuration.get(
+ TestConfigurationKeys.FS_AZURE_TEST_ACCOUNT_KEY_PREFIX
+ + getAccountName());
+ }
+
+ protected Configuration getConfiguration() {
+ return this.configuration;
+ }
+
+ protected boolean isEmulator() {
+ return isEmulator;
+ }
+
+ protected static String wasbUrlToAbfsUrl(final String wasbUrl) {
+ return convertTestUrls(
+ wasbUrl, FileSystemUriSchemes.WASB_SCHEME, FileSystemUriSchemes.WASB_SECURE_SCHEME, FileSystemUriSchemes.WASB_DNS_PREFIX,
+ FileSystemUriSchemes.ABFS_SCHEME, FileSystemUriSchemes.ABFS_SECURE_SCHEME, FileSystemUriSchemes.ABFS_DNS_PREFIX);
+ }
+
+ protected static String abfsUrlToWasbUrl(final String abfsUrl) {
+ return convertTestUrls(
+ abfsUrl, FileSystemUriSchemes.ABFS_SCHEME, FileSystemUriSchemes.ABFS_SECURE_SCHEME, FileSystemUriSchemes.ABFS_DNS_PREFIX,
+ FileSystemUriSchemes.WASB_SCHEME, FileSystemUriSchemes.WASB_SECURE_SCHEME, FileSystemUriSchemes.WASB_DNS_PREFIX);
+ }
+
+ private static String convertTestUrls(
+ final String url, final String fromNonSecureScheme, final String fromSecureScheme, final String fromDnsPrefix,
+ final String toNonSecureScheme, final String toSecureScheme, final String toDnsPrefix) {
+ String data = null;
+ if (url.startsWith(fromNonSecureScheme + "://")) {
+ data = url.replace(fromNonSecureScheme + "://", toNonSecureScheme + "://");
+ } else if (url.startsWith(fromSecureScheme + "://")) {
+ data = url.replace(fromSecureScheme + "://", toSecureScheme + "://");
+ }
+
+ data = data.replace("." + fromDnsPrefix + ".", "." + toDnsPrefix + ".");
+ return data;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f044deed/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemAppend.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemAppend.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemAppend.java
new file mode 100644
index 0000000..10d42d1
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemAppend.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs;
+
+import java.io.FileNotFoundException;
+import java.util.Random;
+
+import org.junit.Test;
+
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Test append operations.
+ */
+public class ITestAzureBlobFileSystemAppend extends DependencyInjectedTest {
+ private static final Path TEST_FILE_PATH = new Path("testfile");
+ private static final Path TEST_FOLDER_PATH = new Path("testFolder");
+ public ITestAzureBlobFileSystemAppend() {
+ super();
+ }
+
+ @Test(expected = FileNotFoundException.class)
+ public void testAppendDirShouldFail() throws Exception {
+ final AzureBlobFileSystem fs = this.getFileSystem();
+ final Path filePath = TEST_FILE_PATH;
+ fs.mkdirs(filePath);
+ fs.append(filePath, 0);
+ }
+
+ @Test
+ public void testAppendWithLength0() throws Exception {
+ final AzureBlobFileSystem fs = this.getFileSystem();
+ FSDataOutputStream stream = fs.create(TEST_FILE_PATH);
+ final byte[] b = new byte[1024];
+ new Random().nextBytes(b);
+ stream.write(b, 1000, 0);
+
+ assertEquals(0, stream.getPos());
+ }
+
+
+ @Test(expected = FileNotFoundException.class)
+ public void testAppendFileAfterDelete() throws Exception {
+ final AzureBlobFileSystem fs = this.getFileSystem();
+ final Path filePath = TEST_FILE_PATH;
+ fs.create(filePath);
+ fs.delete(filePath, false);
+
+ fs.append(filePath);
+ }
+
+ @Test(expected = FileNotFoundException.class)
+ public void testAppendDirectory() throws Exception {
+ final AzureBlobFileSystem fs = this.getFileSystem();
+ final Path folderPath = TEST_FOLDER_PATH;
+ fs.mkdirs(folderPath);
+ fs.append(folderPath);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f044deed/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemBackCompat.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemBackCompat.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemBackCompat.java
new file mode 100644
index 0000000..d107c9d
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemBackCompat.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs;
+
+import com.microsoft.azure.storage.CloudStorageAccount;
+import com.microsoft.azure.storage.blob.CloudBlobClient;
+import com.microsoft.azure.storage.blob.CloudBlobContainer;
+import com.microsoft.azure.storage.blob.CloudBlockBlob;
+import org.junit.Test;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test AzureBlobFileSystem back compatibility with WASB.
+ */
+public class ITestAzureBlobFileSystemBackCompat extends DependencyInjectedTest {
+ public ITestAzureBlobFileSystemBackCompat() {
+ super();
+ }
+
+ @Test
+ public void testBlobBackCompat() throws Exception {
+ final AzureBlobFileSystem fs = this.getFileSystem();
+ String storageConnectionString = getBlobConnectionString();
+ CloudStorageAccount storageAccount = CloudStorageAccount.parse(storageConnectionString);
+ CloudBlobClient blobClient = storageAccount.createCloudBlobClient();
+ CloudBlobContainer container = blobClient.getContainerReference(this.getFileSystemName());
+ container.createIfNotExists();
+
+ CloudBlockBlob blockBlob = container.getBlockBlobReference("test/10/10/10");
+ blockBlob.uploadText("");
+
+ blockBlob = container.getBlockBlobReference("test/10/123/3/2/1/3");
+ blockBlob.uploadText("");
+
+ FileStatus[] fileStatuses = fs.listStatus(new Path("/test/10/"));
+ assertEquals(fileStatuses.length, 2);
+ assertEquals(fileStatuses[0].getPath().getName(), "10");
+ assertTrue(fileStatuses[0].isDirectory());
+ assertEquals(fileStatuses[0].getLen(), 0);
+ assertEquals(fileStatuses[1].getPath().getName(), "123");
+ assertTrue(fileStatuses[1].isDirectory());
+ assertEquals(fileStatuses[1].getLen(), 0);
+ }
+
+ private String getBlobConnectionString() {
+ String connectionString;
+ if (isEmulator()) {
+ connectionString = "DefaultEndpointsProtocol=http;BlobEndpoint=http://"
+ + this.getHostName() + ":8880/" + this.getAccountName().split("\\.") [0]
+ + ";AccountName=" + this.getAccountName().split("\\.")[0]
+ + ";AccountKey=" + this.getAccountKey();
+ }
+ else {
+ connectionString = "DefaultEndpointsProtocol=http;BlobEndpoint=http://"
+ + this.getAccountName().replaceFirst("\\.dfs\\.", ".blob.")
+ + ";AccountName=" + this.getAccountName().split("\\.")[0]
+ + ";AccountKey=" + this.getAccountKey();
+ }
+
+ return connectionString;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f044deed/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemCopy.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemCopy.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemCopy.java
new file mode 100644
index 0000000..c158e03
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemCopy.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test copy operation.
+ */
+public class ITestAzureBlobFileSystemCopy extends DependencyInjectedTest {
+ public ITestAzureBlobFileSystemCopy() {
+ super();
+ }
+
+ @Test
+ public void testCopyFromLocalFileSystem() throws Exception {
+ final AzureBlobFileSystem fs = this.getFileSystem();
+ Path localFilePath = new Path(System.getProperty("test.build.data",
+ "azure_test"));
+ FileSystem localFs = FileSystem.get(new Configuration());
+ localFs.delete(localFilePath, true);
+ try {
+ writeString(localFs, localFilePath, "Testing");
+ Path dstPath = new Path("copiedFromLocal");
+ assertTrue(FileUtil.copy(localFs, localFilePath, fs, dstPath, false,
+ fs.getConf()));
+ assertTrue(fs.exists(dstPath));
+ assertEquals("Testing", readString(fs, dstPath));
+ fs.delete(dstPath, true);
+ } finally {
+ localFs.delete(localFilePath, true);
+ }
+ }
+
+ private String readString(FileSystem fs, Path testFile) throws IOException {
+ FSDataInputStream inputStream = fs.open(testFile);
+ String ret = readString(inputStream);
+ inputStream.close();
+ return ret;
+ }
+
+ private String readString(FSDataInputStream inputStream) throws IOException {
+ BufferedReader reader = new BufferedReader(new InputStreamReader(
+ inputStream));
+ final int bufferSize = 1024;
+ char[] buffer = new char[bufferSize];
+ int count = reader.read(buffer, 0, bufferSize);
+ if (count > bufferSize) {
+ throw new IOException("Exceeded buffer size");
+ }
+ inputStream.close();
+ return new String(buffer, 0, count);
+ }
+
+ private void writeString(FileSystem fs, Path path, String value)
+ throws IOException {
+ FSDataOutputStream outputStream = fs.create(path, true);
+ writeString(outputStream, value);
+ }
+
+ private void writeString(FSDataOutputStream outputStream, String value)
+ throws IOException {
+ BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(
+ outputStream));
+ writer.write(value);
+ writer.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f044deed/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemCreate.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemCreate.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemCreate.java
new file mode 100644
index 0000000..c9b99e6
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemCreate.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs;
+
+import java.io.FileNotFoundException;
+import java.util.EnumSet;
+
+import org.junit.Test;
+
+import org.apache.hadoop.fs.CreateFlag;
+import org.apache.hadoop.fs.FileAlreadyExistsException;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test create operation.
+ */
+public class ITestAzureBlobFileSystemCreate extends DependencyInjectedTest {
+ private static final Path TEST_FILE_PATH = new Path("testfile");
+ private static final Path TEST_FOLDER_PATH = new Path("testFolder");
+ private static final String TEST_CHILD_FILE = "childFile";
+ public ITestAzureBlobFileSystemCreate() {
+ super();
+ }
+
+ @Test(expected = FileAlreadyExistsException.class)
+ public void testCreateFileWithExistingDir() throws Exception {
+ final AzureBlobFileSystem fs = this.getFileSystem();
+ fs.mkdirs(TEST_FOLDER_PATH);
+ fs.create(TEST_FOLDER_PATH);
+ }
+
+ @Test
+ public void testEnsureFileCreated() throws Exception {
+ final AzureBlobFileSystem fs = this.getFileSystem();
+ fs.create(TEST_FILE_PATH);
+
+ FileStatus fileStatus = fs.getFileStatus(TEST_FILE_PATH);
+ assertNotNull(fileStatus);
+ }
+
+ @Test
+ @SuppressWarnings("deprecation")
+ public void testCreateNonRecursive() throws Exception {
+ final AzureBlobFileSystem fs = this.getFileSystem();
+ Path testFile = new Path(TEST_FOLDER_PATH, TEST_CHILD_FILE);
+ try {
+ fs.createNonRecursive(testFile, true, 1024, (short) 1, 1024, null);
+ assertTrue("Should've thrown", false);
+ } catch (FileNotFoundException e) {
+ }
+ fs.mkdirs(TEST_FOLDER_PATH);
+ fs.createNonRecursive(testFile, true, 1024, (short) 1, 1024, null)
+ .close();
+ assertTrue(fs.exists(testFile));
+ }
+
+ @Test
+ @SuppressWarnings("deprecation")
+ public void testCreateNonRecursive1() throws Exception {
+ final AzureBlobFileSystem fs = this.getFileSystem();
+ Path testFile = new Path(TEST_FOLDER_PATH, TEST_CHILD_FILE);
+ try {
+ fs.createNonRecursive(testFile, FsPermission.getDefault(), EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE), 1024, (short) 1, 1024, null);
+ assertTrue("Should've thrown", false);
+ } catch (FileNotFoundException e) {
+ }
+ fs.mkdirs(TEST_FOLDER_PATH);
+ fs.createNonRecursive(testFile, true, 1024, (short) 1, 1024, null)
+ .close();
+ assertTrue(fs.exists(testFile));
+ }
+
+ @Test
+ @SuppressWarnings("deprecation")
+ public void testCreateNonRecursive2() throws Exception {
+ final AzureBlobFileSystem fs = this.getFileSystem();
+
+ Path testFile = new Path(TEST_FOLDER_PATH, TEST_CHILD_FILE);
+ try {
+ fs.createNonRecursive(testFile, FsPermission.getDefault(), false, 1024, (short) 1, 1024, null);
+ assertTrue("Should've thrown", false);
+ } catch (FileNotFoundException e) {
+ }
+ fs.mkdirs(TEST_FOLDER_PATH);
+ fs.createNonRecursive(testFile, true, 1024, (short) 1, 1024, null)
+ .close();
+ assertTrue(fs.exists(testFile));
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f044deed/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemDelete.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemDelete.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemDelete.java
new file mode 100644
index 0000000..372a087
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemDelete.java
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs;
+
+import java.io.FileNotFoundException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.junit.Test;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Test delete operation.
+ */
+public class ITestAzureBlobFileSystemDelete extends DependencyInjectedTest {
+ public ITestAzureBlobFileSystemDelete() {
+ super();
+ }
+
+ @Test
+ public void testDeleteRoot() throws Exception {
+ final AzureBlobFileSystem fs = this.getFileSystem();
+
+ fs.mkdirs(new Path("/testFolder0"));
+ fs.mkdirs(new Path("/testFolder1"));
+ fs.mkdirs(new Path("/testFolder2"));
+ fs.create(new Path("/testFolder1/testfile"));
+ fs.create(new Path("/testFolder1/testfile2"));
+ fs.create(new Path("/testFolder1/testfile3"));
+
+ FileStatus[] ls = fs.listStatus(new Path("/"));
+ assertEquals(4, ls.length); // and user dir
+
+ fs.delete(new Path("/"), true);
+ ls = fs.listStatus(new Path("/"));
+ assertEquals(0, ls.length);
+ }
+
+ @Test(expected = FileNotFoundException.class)
+ public void testOpenFileAfterDelete() throws Exception {
+ final AzureBlobFileSystem fs = this.getFileSystem();
+ fs.create(new Path("/testFile"));
+ fs.delete(new Path("/testFile"), false);
+
+ fs.open(new Path("/testFile"));
+ }
+
+ @Test(expected = FileNotFoundException.class)
+ public void testEnsureFileIsDeleted() throws Exception {
+ final AzureBlobFileSystem fs = this.getFileSystem();
+ fs.create(new Path("testfile"));
+ fs.delete(new Path("testfile"), false);
+
+ fs.getFileStatus(new Path("testfile"));
+ }
+
+ @Test(expected = FileNotFoundException.class)
+ public void testDeleteDirectory() throws Exception {
+ final AzureBlobFileSystem fs = this.getFileSystem();
+ fs.mkdirs(new Path("testfile"));
+ fs.mkdirs(new Path("testfile/test1"));
+ fs.mkdirs(new Path("testfile/test1/test2"));
+
+ fs.delete(new Path("testfile"), true);
+ fs.getFileStatus(new Path("testfile"));
+ }
+
+ @Test(expected = FileNotFoundException.class)
+ public void testDeleteFirstLevelDirectory() throws Exception {
+ final AzureBlobFileSystem fs = this.getFileSystem();
+ final List<Future> tasks = new ArrayList<>();
+
+ ExecutorService es = Executors.newFixedThreadPool(10);
+ for (int i = 0; i < 1000; i++) {
+ final Path fileName = new Path("/test/" + i);
+ Callable<Void> callable = new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ fs.create(fileName);
+ return null;
+ }
+ };
+
+ tasks.add(es.submit(callable));
+ }
+
+ for (Future<Void> task : tasks) {
+ task.get();
+ }
+
+ es.shutdownNow();
+ fs.delete(new Path("/test"), true);
+ fs.getFileStatus(new Path("/test"));
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f044deed/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemE2E.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemE2E.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemE2E.java
new file mode 100644
index 0000000..4985f58
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemE2E.java
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Random;
+
+
+import org.apache.hadoop.fs.azurebfs.services.AbfsServiceProviderImpl;
+import org.junit.Test;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.azurebfs.contracts.services.ConfigurationService;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertArrayEquals;
+
+/**
+ * Test end to end between ABFS client and ABFS server.
+ */
+public class ITestAzureBlobFileSystemE2E extends DependencyInjectedTest {
+ private static final Path TEST_FILE = new Path("testfile");
+ private static final int TEST_BYTE = 100;
+ private static final int TEST_OFFSET = 100;
+ private static final int TEST_DEFAULT_BUFFER_SIZE = 4 * 1024 * 1024;
+ private static final int TEST_DEFAULT_READ_BUFFER_SIZE = 1023900;
+
+ public ITestAzureBlobFileSystemE2E() {
+ super();
+ Configuration configuration = this.getConfiguration();
+ configuration.set(ConfigurationKeys.FS_AZURE_READ_AHEAD_QUEUE_DEPTH, "0");
+ this.getMockServiceInjector().replaceInstance(Configuration.class, configuration);
+
+ }
+
+ @Test
+ public void testWriteOneByteToFile() throws Exception {
+ final AzureBlobFileSystem fs = this.getFileSystem();
+ FSDataOutputStream stream = fs.create(TEST_FILE);
+
+ stream.write(TEST_BYTE);
+ stream.close();
+
+ FileStatus fileStatus = fs.getFileStatus(TEST_FILE);
+ assertEquals(1, fileStatus.getLen());
+ }
+
+ @Test
+ public void testReadWriteBytesToFile() throws Exception {
+ final AzureBlobFileSystem fs = this.getFileSystem();
+ testWriteOneByteToFile();
+ FSDataInputStream inputStream = fs.open(TEST_FILE, TEST_DEFAULT_BUFFER_SIZE);
+ int i = inputStream.read();
+ inputStream.close();
+
+ assertEquals(TEST_BYTE, i);
+ }
+
+ @Test (expected = IOException.class)
+ public void testOOBWrites() throws Exception {
+ final AzureBlobFileSystem fs = this.getFileSystem();
+ int readBufferSize = AbfsServiceProviderImpl.instance().get(ConfigurationService.class).getReadBufferSize();
+
+ fs.create(TEST_FILE);
+ FSDataOutputStream writeStream = fs.create(TEST_FILE);
+
+ byte[] bytesToRead = new byte[readBufferSize];
+ final byte[] b = new byte[2 * readBufferSize];
+ new Random().nextBytes(b);
+
+ writeStream.write(b);
+ writeStream.flush();
+ writeStream.close();
+
+ FSDataInputStream readStream = fs.open(TEST_FILE);
+ readStream.read(bytesToRead, 0, readBufferSize);
+
+ writeStream = fs.create(TEST_FILE);
+ writeStream.write(b);
+ writeStream.flush();
+ writeStream.close();
+
+ readStream.read(bytesToRead, 0, readBufferSize);
+ readStream.close();
+ }
+
+ @Test
+ public void testWriteWithBufferOffset() throws Exception {
+ final AzureBlobFileSystem fs = this.getFileSystem();
+ final FSDataOutputStream stream = fs.create(TEST_FILE);
+
+ final byte[] b = new byte[1024 * 1000];
+ new Random().nextBytes(b);
+ stream.write(b, TEST_OFFSET, b.length - TEST_OFFSET);
+ stream.close();
+
+ final byte[] r = new byte[TEST_DEFAULT_READ_BUFFER_SIZE];
+ FSDataInputStream inputStream = fs.open(TEST_FILE, TEST_DEFAULT_BUFFER_SIZE);
+ int result = inputStream.read(r);
+
+ assertNotEquals(-1, result);
+ assertArrayEquals(r, Arrays.copyOfRange(b, TEST_OFFSET, b.length));
+
+ inputStream.close();
+ }
+
+ @Test
+ public void testReadWriteHeavyBytesToFileWithSmallerChunks() throws Exception {
+ final AzureBlobFileSystem fs = this.getFileSystem();
+ final FSDataOutputStream stream = fs.create(TEST_FILE);
+
+ final byte[] writeBuffer = new byte[5 * 1000 * 1024];
+ new Random().nextBytes(writeBuffer);
+ stream.write(writeBuffer);
+ stream.close();
+
+ final byte[] readBuffer = new byte[5 * 1000 * 1024];
+ FSDataInputStream inputStream = fs.open(TEST_FILE, TEST_DEFAULT_BUFFER_SIZE);
+ int offset = 0;
+ while (inputStream.read(readBuffer, offset, TEST_OFFSET) > 0) {
+ offset += TEST_OFFSET;
+ }
+
+ assertArrayEquals(readBuffer, writeBuffer);
+ inputStream.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f044deed/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemE2EScale.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemE2EScale.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemE2EScale.java
new file mode 100644
index 0000000..616253b
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemE2EScale.java
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+
+/**
+ * Test end to end between ABFS client and ABFS server with heavy traffic.
+ */
+public class ITestAzureBlobFileSystemE2EScale extends DependencyInjectedTest {
+ private static final int TEN = 10;
+ private static final int ONE_THOUSAND = 1000;
+ private static final int BASE_SIZE = 1024;
+ private static final int ONE_MB = 1024 * 1024;
+ private static final int DEFAULT_WRITE_TIMES = 100;
+ private static final Path TEST_FILE = new Path("testfile");
+
+ public ITestAzureBlobFileSystemE2EScale() {
+ super();
+ }
+
+ @Test
+ public void testWriteHeavyBytesToFile() throws Exception {
+ final AzureBlobFileSystem fs = this.getFileSystem();
+ final FSDataOutputStream stream = fs.create(TEST_FILE);
+ ExecutorService es = Executors.newFixedThreadPool(TEN);
+
+ int testWriteBufferSize = 2 * TEN * ONE_THOUSAND * BASE_SIZE;
+ final byte[] b = new byte[testWriteBufferSize];
+ new Random().nextBytes(b);
+ List<Future<Void>> tasks = new ArrayList<>();
+
+ for (int i = 0; i < DEFAULT_WRITE_TIMES; i++) {
+ Callable<Void> callable = new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ stream.write(b);
+ return null;
+ }
+ };
+
+ tasks.add(es.submit(callable));
+ }
+
+ for (Future<Void> task : tasks) {
+ task.get();
+ }
+
+ tasks.clear();
+ stream.close();
+
+ es.shutdownNow();
+ FileStatus fileStatus = fs.getFileStatus(TEST_FILE);
+ assertEquals(testWriteBufferSize * DEFAULT_WRITE_TIMES, fileStatus.getLen());
+ }
+
+ @Test
+ public void testReadWriteHeavyBytesToFile() throws Exception {
+ final AzureBlobFileSystem fs = this.getFileSystem();
+ final FSDataOutputStream stream = fs.create(TEST_FILE);
+
+ int testBufferSize = 5 * TEN * ONE_THOUSAND * BASE_SIZE;
+ final byte[] b = new byte[testBufferSize];
+ new Random().nextBytes(b);
+ stream.write(b);
+ stream.close();
+
+ final byte[] r = new byte[testBufferSize];
+ FSDataInputStream inputStream = fs.open(TEST_FILE, 4 * ONE_MB);
+ int result = inputStream.read(r);
+ inputStream.close();
+
+ assertNotEquals(-1, result);
+ assertArrayEquals(r, b);
+ }
+
+ @Test
+ public void testReadWriteHeavyBytesToFileWithStatistics() throws Exception {
+ final AzureBlobFileSystem fs = this.getFileSystem();
+ final FSDataOutputStream stream = fs.create(TEST_FILE);
+ final FileSystem.Statistics abfsStatistics = fs.getFsStatistics();
+ abfsStatistics.reset();
+
+ int testBufferSize = 5 * TEN * ONE_THOUSAND * BASE_SIZE;
+ final byte[] b = new byte[testBufferSize];
+ new Random().nextBytes(b);
+ stream.write(b);
+ stream.close();
+
+ final byte[] r = new byte[testBufferSize];
+ FSDataInputStream inputStream = fs.open(TEST_FILE, 4 * ONE_MB);
+ inputStream.read(r);
+ inputStream.close();
+
+ Assert.assertEquals(r.length, abfsStatistics.getBytesRead());
+ Assert.assertEquals(b.length, abfsStatistics.getBytesWritten());
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f044deed/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemFileStatus.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemFileStatus.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemFileStatus.java
new file mode 100644
index 0000000..bfa662d
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemFileStatus.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs;
+
+import org.junit.Test;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Test FileStatus.
+ */
+public class ITestAzureBlobFileSystemFileStatus extends DependencyInjectedTest {
+ private static final Path TEST_FILE = new Path("testFile");
+ private static final Path TEST_FOLDER = new Path("testDir");
+ public ITestAzureBlobFileSystemFileStatus() {
+ super();
+ }
+
+ @Test
+ public void testEnsureStatusWorksForRoot() throws Exception {
+ final AzureBlobFileSystem fs = this.getFileSystem();
+
+ fs.getFileStatus(new Path("/"));
+ fs.listStatus(new Path("/"));
+ }
+
+ @Test
+ public void testFileStatusPermissionsAndOwnerAndGroup() throws Exception {
+ final AzureBlobFileSystem fs = this.getFileSystem();
+ fs.create(TEST_FILE);
+ fs.mkdirs(TEST_FOLDER);
+
+ FileStatus fileStatus = fs.getFileStatus(TEST_FILE);
+ assertEquals(new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL), fileStatus.getPermission());
+ assertEquals(fs.getOwnerUser(), fileStatus.getGroup());
+ assertEquals(fs.getOwnerUserPrimaryGroup(), fileStatus.getOwner());
+
+ fileStatus = fs.getFileStatus(TEST_FOLDER);
+ assertEquals(new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL), fileStatus.getPermission());
+ assertEquals(fs.getOwnerUser(), fileStatus.getGroup());
+ assertEquals(fs.getOwnerUserPrimaryGroup(), fileStatus.getOwner());
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f044deed/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemFlush.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemFlush.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemFlush.java
new file mode 100644
index 0000000..8c2e8ce
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemFlush.java
@@ -0,0 +1,204 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.junit.Test;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+
+/**
+ * Test flush operation.
+ */
+public class ITestAzureBlobFileSystemFlush extends DependencyInjectedTest {
+ private static final int BASE_SIZE = 1024;
+ private static final int ONE_THOUSAND = 1000;
+ private static final int TEST_BUFFER_SIZE = 5 * ONE_THOUSAND * BASE_SIZE;
+ private static final int ONE_MB = 1024 * 1024;
+ private static final int FLUSH_TIMES = 200;
+ private static final int THREAD_SLEEP_TIME = 6000;
+
+ private static final Path TEST_FILE_PATH = new Path("/testfile");
+
+ public ITestAzureBlobFileSystemFlush() {
+ super();
+ }
+
+ @Test
+ public void testAbfsOutputStreamAsyncFlushWithRetainUncommitedData() throws Exception {
+ final AzureBlobFileSystem fs = this.getFileSystem();
+ final FSDataOutputStream stream = fs.create(TEST_FILE_PATH);
+
+ final byte[] b = new byte[TEST_BUFFER_SIZE];
+ new Random().nextBytes(b);
+
+ for (int i = 0; i < 2; i++) {
+ stream.write(b);
+
+ for (int j = 0; j < FLUSH_TIMES; j++) {
+ stream.flush();
+ Thread.sleep(10);
+ }
+ }
+
+ stream.close();
+
+ final byte[] r = new byte[TEST_BUFFER_SIZE];
+ FSDataInputStream inputStream = fs.open(TEST_FILE_PATH, 4 * ONE_MB);
+
+ while (inputStream.available() != 0) {
+ int result = inputStream.read(r);
+
+ assertNotEquals(-1, result);
+ assertArrayEquals(r, b);
+ }
+
+ inputStream.close();
+ }
+
+ @Test
+ public void testAbfsOutputStreamSyncFlush() throws Exception {
+ final AzureBlobFileSystem fs = this.getFileSystem();
+ final FSDataOutputStream stream = fs.create(TEST_FILE_PATH);
+
+ final byte[] b = new byte[TEST_BUFFER_SIZE];
+ new Random().nextBytes(b);
+ stream.write(b);
+
+ for (int i = 0; i < FLUSH_TIMES; i++) {
+ stream.hsync();
+ stream.hflush();
+ Thread.sleep(10);
+ }
+ stream.close();
+
+ final byte[] r = new byte[TEST_BUFFER_SIZE];
+ FSDataInputStream inputStream = fs.open(TEST_FILE_PATH, 4 * ONE_MB);
+ int result = inputStream.read(r);
+
+ assertNotEquals(-1, result);
+ assertArrayEquals(r, b);
+
+ inputStream.close();
+ }
+
+
+ @Test
+ public void testWriteHeavyBytesToFileSyncFlush() throws Exception {
+ final AzureBlobFileSystem fs = this.getFileSystem();
+ final FSDataOutputStream stream = fs.create(TEST_FILE_PATH);
+ final FileSystem.Statistics abfsStatistics = fs.getFsStatistics();
+ abfsStatistics.reset();
+
+ ExecutorService es = Executors.newFixedThreadPool(10);
+
+ final byte[] b = new byte[TEST_BUFFER_SIZE];
+ new Random().nextBytes(b);
+
+ List<Future<Void>> tasks = new ArrayList<>();
+ for (int i = 0; i < FLUSH_TIMES; i++) {
+ Callable<Void> callable = new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ stream.write(b);
+ return null;
+ }
+ };
+
+ tasks.add(es.submit(callable));
+ }
+
+ boolean shouldStop = false;
+ while (!shouldStop) {
+ shouldStop = true;
+ for (Future<Void> task : tasks) {
+ if (!task.isDone()) {
+ stream.hsync();
+ shouldStop = false;
+ Thread.sleep(THREAD_SLEEP_TIME);
+ }
+ }
+ }
+
+ tasks.clear();
+ stream.close();
+
+ es.shutdownNow();
+ FileStatus fileStatus = fs.getFileStatus(TEST_FILE_PATH);
+ assertEquals((long) TEST_BUFFER_SIZE * FLUSH_TIMES, fileStatus.getLen());
+ assertEquals((long) TEST_BUFFER_SIZE * FLUSH_TIMES, abfsStatistics.getBytesWritten());
+ }
+
+ @Test
+ public void testWriteHeavyBytesToFileAsyncFlush() throws Exception {
+ final AzureBlobFileSystem fs = this.getFileSystem();
+ fs.create(TEST_FILE_PATH);
+ final FSDataOutputStream stream = fs.create(TEST_FILE_PATH);
+ ExecutorService es = Executors.newFixedThreadPool(10);
+
+ final byte[] b = new byte[TEST_BUFFER_SIZE];
+ new Random().nextBytes(b);
+
+ List<Future<Void>> tasks = new ArrayList<>();
+ for (int i = 0; i < FLUSH_TIMES; i++) {
+ Callable<Void> callable = new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ stream.write(b);
+ return null;
+ }
+ };
+
+ tasks.add(es.submit(callable));
+ }
+
+ boolean shouldStop = false;
+ while (!shouldStop) {
+ shouldStop = true;
+ for (Future<Void> task : tasks) {
+ if (!task.isDone()) {
+ stream.flush();
+ shouldStop = false;
+ }
+ }
+ }
+ Thread.sleep(THREAD_SLEEP_TIME);
+ tasks.clear();
+ stream.close();
+
+ es.shutdownNow();
+ FileStatus fileStatus = fs.getFileStatus(TEST_FILE_PATH);
+ assertEquals((long) TEST_BUFFER_SIZE * FLUSH_TIMES, fileStatus.getLen());
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f044deed/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemInitAndCreate.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemInitAndCreate.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemInitAndCreate.java
new file mode 100644
index 0000000..d2ed400
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemInitAndCreate.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs;
+
+import java.io.FileNotFoundException;
+
+import org.junit.Test;
+
+import org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys;
+
+/**
+ * Test filesystem initialization and creation.
+ */
+public class ITestAzureBlobFileSystemInitAndCreate extends DependencyInjectedTest {
+ public ITestAzureBlobFileSystemInitAndCreate() {
+ super();
+
+ this.getConfiguration().unset(ConfigurationKeys.AZURE_CREATE_REMOTE_FILESYSTEM_DURING_INITIALIZATION);
+ }
+
+ @Override
+ public void initialize() {
+ }
+
+ @Override
+ public void testCleanup() {
+ }
+
+ @Test (expected = FileNotFoundException.class)
+ public void ensureFilesystemWillNotBeCreatedIfCreationConfigIsNotSet() throws Exception {
+ super.initialize();
+ this.getFileSystem();
+ }
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org