You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cl...@apache.org on 2020/11/03 20:04:40 UTC
[hadoop] branch branch-2.10 updated: HADOOP-17336. Backport
HADOOP-16005 NativeAzureFileSystem does not support setXAttr and
HADOOP-16785. Improve wasb and abfs resilience on double close() calls.
followup to abfs close() fix to branch-2.10. Contributed by Sally Zuo.
This is an automated email from the ASF dual-hosted git repository.
cliang pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/branch-2.10 by this push:
new ac82334 HADOOP-17336. Backport HADOOP-16005 NativeAzureFileSystem does not support setXAttr and HADOOP-16785. Improve wasb and abfs resilience on double close() calls. followup to abfs close() fix to branch-2.10. Contributed by Sally Zuo.
ac82334 is described below
commit ac82334041d87f3038d863a6eb7510ee30ad5aad
Author: Chen Liang <va...@apache.org>
AuthorDate: Tue Nov 3 12:00:57 2020 -0800
HADOOP-17336. Backport HADOOP-16005 NativeAzureFileSystem does not support setXAttr and HADOOP-16785. Improve wasb and abfs resilience on double close() calls. followup to abfs close() fix to branch-2.10. Contributed by Sally Zuo.
---
.../org/apache/hadoop/test/LambdaTestUtils.java | 2 +-
.../fs/azure/AzureNativeFileSystemStore.java | 67 +++++++++---
.../hadoop/fs/azure/NativeAzureFileSystem.java | 113 ++++++++++++++++++---
.../hadoop/fs/azure/NativeFileSystemStore.java | 4 +
.../hadoop/fs/azurebfs/AzureBlobFileSystem.java | 79 ++++++++++++++
.../fs/azurebfs/AzureBlobFileSystemStore.java | 9 ++
.../fs/azurebfs/services/AbfsOutputStream.java | 6 ++
.../hadoop/fs/azure/AbstractWasbTestBase.java | 2 +-
.../ITestFileSystemOperationExceptionHandling.java | 31 ++++++
.../fs/azure/NativeAzureFileSystemBaseTest.java | 77 +++++++++++++-
.../ITestAzureBlobFileSystemAttributes.java | 113 +++++++++++++++++++++
.../azurebfs/ITestAzureBlobFileSystemCreate.java | 98 ++++++++++++++++++
12 files changed, 572 insertions(+), 29 deletions(-)
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/LambdaTestUtils.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/LambdaTestUtils.java
index 1f906be..1fc6a07 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/LambdaTestUtils.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/LambdaTestUtils.java
@@ -400,7 +400,7 @@ public final class LambdaTestUtils {
throws Exception {
try {
eval.call();
- throw new AssertionError("Expected an exception");
+ throw new AssertionError("Expected an exception of type " + clazz);
} catch (Throwable e) {
if (clazz.isAssignableFrom(e.getClass())) {
return (E)e;
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java
index 8ced57a..f03efdc 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java
@@ -29,6 +29,8 @@ import java.net.URI;
import java.net.URISyntaxException;
import java.net.URLDecoder;
import java.net.URLEncoder;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
import java.security.InvalidKeyException;
import java.util.Calendar;
import java.util.Date;
@@ -245,6 +247,8 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
private static final int DEFAULT_CONCURRENT_WRITES = 8;
+ private static final Charset METADATA_ENCODING = StandardCharsets.UTF_8;
+
// Concurrent reads reads of data written out of band are disable by default.
//
private static final boolean DEFAULT_READ_TOLERATE_CONCURRENT_APPEND = false;
@@ -1604,17 +1608,30 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
removeMetadataAttribute(blob, OLD_IS_FOLDER_METADATA_KEY);
}
- private static void storeLinkAttribute(CloudBlobWrapper blob,
- String linkTarget) throws UnsupportedEncodingException {
- // We have to URL encode the link attribute as the link URI could
+ private static String encodeMetadataAttribute(String value) throws UnsupportedEncodingException {
+ // We have to URL encode the attribute as it could
// have URI special characters which unless encoded will result
// in 403 errors from the server. This is due to metadata properties
// being sent in the HTTP header of the request which is in turn used
// on the server side to authorize the request.
- String encodedLinkTarget = null;
- if (linkTarget != null) {
- encodedLinkTarget = URLEncoder.encode(linkTarget, "UTF-8");
- }
+ return value == null ? null : URLEncoder.encode(value, METADATA_ENCODING.name());
+ }
+
+ private static String decodeMetadataAttribute(String encoded) throws UnsupportedEncodingException {
+ return encoded == null ? null : URLDecoder.decode(encoded, METADATA_ENCODING.name());
+ }
+
+ private static String ensureValidAttributeName(String attribute) {
+ // Attribute names must be valid C# identifiers so we have to
+ // convert the namespace dots (e.g. "user.something") in the
+ // attribute names. Using underscores here to be consistent with
+ // the constant metadata keys defined earlier in the file
+ return attribute.replace('.', '_');
+ }
+
+ private static void storeLinkAttribute(CloudBlobWrapper blob,
+ String linkTarget) throws UnsupportedEncodingException {
+ String encodedLinkTarget = encodeMetadataAttribute(linkTarget);
storeMetadataAttribute(blob,
LINK_BACK_TO_UPLOAD_IN_PROGRESS_METADATA_KEY,
encodedLinkTarget);
@@ -1628,11 +1645,7 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
String encodedLinkTarget = getMetadataAttribute(blob,
LINK_BACK_TO_UPLOAD_IN_PROGRESS_METADATA_KEY,
OLD_LINK_BACK_TO_UPLOAD_IN_PROGRESS_METADATA_KEY);
- String linkTarget = null;
- if (encodedLinkTarget != null) {
- linkTarget = URLDecoder.decode(encodedLinkTarget, "UTF-8");
- }
- return linkTarget;
+ return decodeMetadataAttribute(encodedLinkTarget);
}
private static boolean retrieveFolderAttribute(CloudBlobWrapper blob) {
@@ -2154,6 +2167,36 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
}
@Override
+ public byte[] retrieveAttribute(String key, String attribute) throws IOException {
+ try {
+ checkContainer(ContainerAccessType.PureRead);
+ CloudBlobWrapper blob = getBlobReference(key);
+ blob.downloadAttributes(getInstrumentedContext());
+
+ String value = getMetadataAttribute(blob, ensureValidAttributeName(attribute));
+ value = decodeMetadataAttribute(value);
+ return value == null ? null : value.getBytes(METADATA_ENCODING);
+ } catch (Exception e) {
+ throw new AzureException(e);
+ }
+ }
+
+ @Override
+ public void storeAttribute(String key, String attribute, byte[] value) throws IOException {
+ try {
+ checkContainer(ContainerAccessType.ReadThenWrite);
+ CloudBlobWrapper blob = getBlobReference(key);
+ blob.downloadAttributes(getInstrumentedContext());
+
+ String encodedValue = encodeMetadataAttribute(new String(value, METADATA_ENCODING));
+ storeMetadataAttribute(blob, ensureValidAttributeName(attribute), encodedValue);
+ blob.uploadMetadata(getInstrumentedContext());
+ } catch (Exception e) {
+ throw new AzureException(e);
+ }
+ }
+
+ @Override
public InputStream retrieve(String key) throws AzureException, IOException {
return retrieve(key, 0);
}
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java
index db880f2..0588f22 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java
@@ -59,6 +59,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.Seekable;
import org.apache.hadoop.fs.StreamCapabilities;
import org.apache.hadoop.fs.Syncable;
+import org.apache.hadoop.fs.XAttrSetFlag;
import org.apache.hadoop.fs.azure.metrics.AzureFileSystemInstrumentation;
import org.apache.hadoop.fs.azure.metrics.AzureFileSystemMetricsSystem;
import org.apache.hadoop.fs.azure.security.Constants;
@@ -184,7 +185,7 @@ public class NativeAzureFileSystem extends FileSystem {
} catch (IOException e) {
this.committed = false;
}
-
+
if (!this.committed) {
LOG.error("Deleting corruped rename pending file {} \n {}",
redoFile, contents);
@@ -507,7 +508,7 @@ public class NativeAzureFileSystem extends FileSystem {
/**
* Recover from a folder rename failure by redoing the intended work,
* as recorded in the -RenamePending.json file.
- *
+ *
* @throws IOException Thrown when fail to redo.
*/
public void redo() throws IOException {
@@ -675,7 +676,7 @@ public class NativeAzureFileSystem extends FileSystem {
/**
* The time span in seconds before which we consider a temp blob to be
* dangling (not being actively uploaded to) and up for reclamation.
- *
+ *
* So e.g. if this is 60, then any temporary blobs more than a minute old
* would be considered dangling.
*/
@@ -1083,12 +1084,13 @@ public class NativeAzureFileSystem extends FileSystem {
* write is that one byte is written to the output stream. The byte to be
* written is the eight low-order bits of the argument b. The 24 high-order
* bits of b are ignored.
- *
+ *
* @param b
* 32-bit integer of block of 4 bytes
*/
@Override
public void write(int b) throws IOException {
+ checkOpen();
try {
out.write(b);
} catch(IOException e) {
@@ -1106,12 +1108,13 @@ public class NativeAzureFileSystem extends FileSystem {
* Writes b.length bytes from the specified byte array to this output
* stream. The general contract for write(b) is that it should have exactly
* the same effect as the call write(b, 0, b.length).
- *
+ *
* @param b
* Block of bytes to be written to the output stream.
*/
@Override
public void write(byte[] b) throws IOException {
+ checkOpen();
try {
out.write(b);
} catch(IOException e) {
@@ -1132,7 +1135,7 @@ public class NativeAzureFileSystem extends FileSystem {
* are written to the output stream in order; element <code>b[off]</code>
* is the first byte written and <code>b[off+len-1]</code> is the last
* byte written by this operation.
- *
+ *
* @param b
* Byte array to be written.
* @param off
@@ -1142,6 +1145,7 @@ public class NativeAzureFileSystem extends FileSystem {
*/
@Override
public void write(byte[] b, int off, int len) throws IOException {
+ checkOpen();
try {
out.write(b, off, len);
} catch(IOException e) {
@@ -1157,7 +1161,7 @@ public class NativeAzureFileSystem extends FileSystem {
/**
* Get the blob name.
- *
+ *
* @return String Blob name.
*/
public String getKey() {
@@ -1166,7 +1170,7 @@ public class NativeAzureFileSystem extends FileSystem {
/**
* Set the blob name.
- *
+ *
* @param key
* Blob name.
*/
@@ -1176,7 +1180,7 @@ public class NativeAzureFileSystem extends FileSystem {
/**
* Get the blob name.
- *
+ *
* @return String Blob name.
*/
public String getEncodedKey() {
@@ -1185,7 +1189,7 @@ public class NativeAzureFileSystem extends FileSystem {
/**
* Set the blob name.
- *
+ *
* @param anEncodedKey
* Blob name.
*/
@@ -1204,6 +1208,17 @@ public class NativeAzureFileSystem extends FileSystem {
private void restoreKey() throws IOException {
store.rename(getEncodedKey(), getKey());
}
+
+ /**
+ * Check for the stream being open.
+ * @throws IOException if the stream is closed.
+ */
+ private void checkOpen() throws IOException {
+ if (out == null) {
+ throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
+ }
+ }
+
}
private URI uri;
@@ -3556,6 +3571,76 @@ public class NativeAzureFileSystem extends FileSystem {
}
/**
+ * Set the value of an attribute for a path.
+ *
+ * @param path The path on which to set the attribute
+ * @param xAttrName The attribute to set
+ * @param value The byte value of the attribute to set (encoded in utf-8)
+ * @param flag The mode in which to set the attribute
+ * @throws IOException If there was an issue setting the attribute on Azure
+ */
+ @Override
+ public void setXAttr(Path path, String xAttrName, byte[] value, EnumSet<XAttrSetFlag> flag) throws IOException {
+ Path absolutePath = makeAbsolute(path);
+ performAuthCheck(absolutePath, WasbAuthorizationOperations.WRITE, "setXAttr", absolutePath);
+
+ String key = pathToKey(absolutePath);
+ FileMetadata metadata;
+ try {
+ metadata = store.retrieveMetadata(key);
+ } catch (IOException ex) {
+ Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex);
+ if (innerException instanceof StorageException
+ && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
+ throw new FileNotFoundException("File " + path + " doesn't exists.");
+ }
+ throw ex;
+ }
+
+ if (metadata == null) {
+ throw new FileNotFoundException("File doesn't exist: " + path);
+ }
+
+ boolean xAttrExists = store.retrieveAttribute(key, xAttrName) != null;
+ XAttrSetFlag.validate(xAttrName, xAttrExists, flag);
+ store.storeAttribute(key, xAttrName, value);
+ }
+
+ /**
+ * Get the value of an attribute for a path.
+ *
+ * @param path The path on which to get the attribute
+ * @param xAttrName The attribute to get
+ * @return The bytes of the attribute's value (encoded in utf-8)
+ * or null if the attribute does not exist
+ * @throws IOException If there was an issue getting the attribute from Azure
+ */
+ @Override
+ public byte[] getXAttr(Path path, String xAttrName) throws IOException {
+ Path absolutePath = makeAbsolute(path);
+ performAuthCheck(absolutePath, WasbAuthorizationOperations.READ, "getXAttr", absolutePath);
+
+ String key = pathToKey(absolutePath);
+ FileMetadata metadata;
+ try {
+ metadata = store.retrieveMetadata(key);
+ } catch (IOException ex) {
+ Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex);
+ if (innerException instanceof StorageException
+ && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
+ throw new FileNotFoundException("File " + path + " doesn't exists.");
+ }
+ throw ex;
+ }
+
+ if (metadata == null) {
+ throw new FileNotFoundException("File doesn't exist: " + path);
+ }
+
+ return store.retrieveAttribute(key, xAttrName);
+ }
+
+ /**
* Is the user allowed?
* <ol>
* <li>No user: false</li>
@@ -3728,7 +3813,7 @@ public class NativeAzureFileSystem extends FileSystem {
/**
* Implements recover and delete (-move and -delete) behaviors for handling
* dangling files (blobs whose upload was interrupted).
- *
+ *
* @param root
* The root path to check from.
* @param handler
@@ -3770,7 +3855,7 @@ public class NativeAzureFileSystem extends FileSystem {
* the data to a temporary blob, but for some reason we crashed in the middle
* of the upload and left them there. If any are found, we move them to the
* destination given.
- *
+ *
* @param root
* The root path to consider.
* @param destination
@@ -3790,7 +3875,7 @@ public class NativeAzureFileSystem extends FileSystem {
* meaning that they are place-holder blobs that we created while we upload
* the data to a temporary blob, but for some reason we crashed in the middle
* of the upload and left them there. If any are found, we delete them.
- *
+ *
* @param root
* The root path to consider.
* @throws IOException Thrown when fail to delete.
@@ -3812,7 +3897,7 @@ public class NativeAzureFileSystem extends FileSystem {
* Encode the key with a random prefix for load balancing in Azure storage.
* Upload data to a random temporary file then do storage side renaming to
* recover the original key.
- *
+ *
* @param aKey a key to be encoded.
* @return Encoded version of the original key.
*/
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeFileSystemStore.java
index 36e3819..414a011 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeFileSystemStore.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeFileSystemStore.java
@@ -76,6 +76,10 @@ interface NativeFileSystemStore {
void changePermissionStatus(String key, PermissionStatus newPermission)
throws AzureException;
+ byte[] retrieveAttribute(String key, String attribute) throws IOException;
+
+ void storeAttribute(String key, String attribute, byte[] value) throws IOException;
+
/**
* API to delete a blob in the back end azure storage.
* @param key - key to the blob being deleted.
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java
index c3791fb..7180a47 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java
@@ -26,6 +26,7 @@ import java.io.OutputStream;
import java.net.HttpURLConnection;
import java.net.URI;
import java.net.URISyntaxException;
+import java.util.Hashtable;
import java.util.List;
import java.util.ArrayList;
import java.util.Arrays;
@@ -55,6 +56,7 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathIOException;
+import org.apache.hadoop.fs.XAttrSetFlag;
import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants;
import org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations;
import org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes;
@@ -626,6 +628,83 @@ public class AzureBlobFileSystem extends FileSystem {
}
/**
+ * Set the value of an attribute for a path.
+ *
+ * @param path The path on which to set the attribute
+ * @param name The attribute to set
+ * @param value The byte value of the attribute to set (encoded in latin-1)
+ * @param flag The mode in which to set the attribute
+ * @throws IOException If there was an issue setting the attribute on Azure
+ * @throws IllegalArgumentException If name is null or empty or if value is null
+ */
+ @Override
+ public void setXAttr(final Path path, final String name, final byte[] value, final EnumSet<XAttrSetFlag> flag)
+ throws IOException {
+ LOG.debug("AzureBlobFileSystem.setXAttr path: {}", path);
+
+ if (name == null || name.isEmpty() || value == null) {
+ throw new IllegalArgumentException("A valid name and value must be specified.");
+ }
+
+ Path qualifiedPath = makeQualified(path);
+ performAbfsAuthCheck(FsAction.READ_WRITE, qualifiedPath);
+
+ try {
+ Hashtable<String, String> properties = abfsStore.getPathStatus(path);
+ String xAttrName = ensureValidAttributeName(name);
+ boolean xAttrExists = properties.containsKey(xAttrName);
+ XAttrSetFlag.validate(name, xAttrExists, flag);
+
+ String xAttrValue = abfsStore.decodeAttribute(value);
+ properties.put(xAttrName, xAttrValue);
+ abfsStore.setPathProperties(path, properties);
+ } catch (AzureBlobFileSystemException ex) {
+ checkException(path, ex);
+ }
+ }
+
+ /**
+ * Get the value of an attribute for a path.
+ *
+ * @param path The path on which to get the attribute
+ * @param name The attribute to get
+ * @return The bytes of the attribute's value (encoded in latin-1)
+ * or null if the attribute does not exist
+ * @throws IOException If there was an issue getting the attribute from Azure
+ * @throws IllegalArgumentException If name is null or empty
+ */
+ @Override
+ public byte[] getXAttr(final Path path, final String name)
+ throws IOException {
+ LOG.debug("AzureBlobFileSystem.getXAttr path: {}", path);
+
+ if (name == null || name.isEmpty()) {
+ throw new IllegalArgumentException("A valid name must be specified.");
+ }
+
+ Path qualifiedPath = makeQualified(path);
+ performAbfsAuthCheck(FsAction.READ, qualifiedPath);
+
+ byte[] value = null;
+ try {
+ Hashtable<String, String> properties = abfsStore.getPathStatus(path);
+ String xAttrName = ensureValidAttributeName(name);
+ if (properties.containsKey(xAttrName)) {
+ String xAttrValue = properties.get(xAttrName);
+ value = abfsStore.encodeAttribute(xAttrValue);
+ }
+ } catch (AzureBlobFileSystemException ex) {
+ checkException(path, ex);
+ }
+ return value;
+ }
+
+ private static String ensureValidAttributeName(String attribute) {
+ // to avoid HTTP 400 Bad Request, InvalidPropertyName
+ return attribute.replace('.', '_');
+ }
+
+ /**
* Set permission of a path.
*
* @param path The path
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
index c3dfb11..0b2b3b1 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.fs.azurebfs;
import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
+import java.io.UnsupportedEncodingException;
import java.net.HttpURLConnection;
import java.net.MalformedURLException;
import java.net.URI;
@@ -177,6 +178,14 @@ public class AzureBlobFileSystemStore {
return this.primaryUserGroup;
}
+ byte[] encodeAttribute(String value) throws UnsupportedEncodingException {
+ return value.getBytes(XMS_PROPERTIES_ENCODING);
+ }
+
+ String decodeAttribute(byte[] value) throws UnsupportedEncodingException {
+ return new String(value, XMS_PROPERTIES_ENCODING);
+ }
+
private String[] authorityParts(URI uri) throws InvalidUriAuthorityException, InvalidUriException {
final String authority = uri.getRawAuthority();
if (null == authority) {
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java
index be2ab67..e68400f 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java
@@ -246,6 +246,12 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
try {
flushInternal(true);
threadExecutor.shutdown();
+ } catch (IOException e) {
+ // Problems surface in try-with-resources clauses if
+ // the exception thrown in a close == the one already thrown
+ // -so we wrap any exception with a new one.
+ // See HADOOP-16785
+ throw new IOException(e);
} finally {
lastError = new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
buffer = null;
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/AbstractWasbTestBase.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/AbstractWasbTestBase.java
index 0d3a06c..d5c1dce 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/AbstractWasbTestBase.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/AbstractWasbTestBase.java
@@ -52,7 +52,7 @@ public abstract class AbstractWasbTestBase extends AbstractWasbTestWithTimeout
@Before
public void setUp() throws Exception {
AzureBlobStorageTestAccount account = createTestAccount();
- assumeNotNull(account);
+ assumeNotNull("test account", account);
bindToTestAccount(account);
}
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestFileSystemOperationExceptionHandling.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestFileSystemOperationExceptionHandling.java
index a45dae4..bcb9b2f 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestFileSystemOperationExceptionHandling.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestFileSystemOperationExceptionHandling.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.fs.azure;
import java.io.FileNotFoundException;
+import java.io.IOException;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
@@ -27,10 +28,13 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.test.LambdaTestUtils;
import org.junit.After;
import org.junit.Test;
+import static org.apache.hadoop.fs.FSExceptionMessages.STREAM_IS_CLOSED;
import static org.apache.hadoop.fs.azure.ExceptionHandlingTestHelper.*;
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
/**
* Single threaded exception handling.
@@ -265,6 +269,33 @@ public class ITestFileSystemOperationExceptionHandling
inputStream = fs.open(testPath);
}
+ /**
+ * Attempts to write to the azure stream after it is closed will raise
+ * an IOException.
+ */
+ @Test
+ public void testWriteAfterClose() throws Throwable {
+ final FSDataOutputStream out = fs.create(testPath);
+ out.close();
+ intercept(IOException.class, STREAM_IS_CLOSED,
+ new LambdaTestUtils.VoidCallable() {
+ @Override
+ public void call() throws Exception {
+ out.write('a');
+ }
+ });
+ intercept(IOException.class, STREAM_IS_CLOSED,
+ new LambdaTestUtils.VoidCallable() {
+ @Override
+ public void call() throws Exception {
+ out.write(new byte[]{'a'});
+ }
+ });
+ out.hsync();
+ out.flush();
+ out.close();
+ }
+
@After
public void tearDown() throws Exception {
if (inputStream != null) {
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/NativeAzureFileSystemBaseTest.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/NativeAzureFileSystemBaseTest.java
index 19d370e..db8b018 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/NativeAzureFileSystemBaseTest.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/NativeAzureFileSystemBaseTest.java
@@ -22,10 +22,12 @@ import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Calendar;
import java.util.Date;
+import java.util.EnumSet;
import java.util.TimeZone;
import org.apache.commons.logging.Log;
@@ -37,10 +39,12 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.XAttrSetFlag;
import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.security.UserGroupInformation;
import org.junit.Test;
+import org.apache.hadoop.test.LambdaTestUtils;
import org.apache.hadoop.fs.azure.NativeAzureFileSystem.FolderRenamePending;
import com.microsoft.azure.storage.AccessCondition;
@@ -51,12 +55,13 @@ import static org.apache.hadoop.fs.azure.integration.AzureTestUtils.readStringFr
import static org.apache.hadoop.fs.azure.integration.AzureTestUtils.writeStringToFile;
import static org.apache.hadoop.fs.azure.integration.AzureTestUtils.writeStringToStream;
import static org.apache.hadoop.test.GenericTestUtils.*;
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
/*
* Tests the Native Azure file system (WASB) against an actual blob store if
* provided in the environment.
* Subclasses implement createTestAccount() to hit local&mock storage with the same test code.
- *
+ *
* For hand-testing: remove "abstract" keyword and copy in an implementation of createTestAccount
* from one of the subclasses
*/
@@ -65,6 +70,10 @@ public abstract class NativeAzureFileSystemBaseTest
private final long modifiedTimeErrorMargin = 5 * 1000; // Give it +/-5 seconds
+ private static final short READ_WRITE_PERMISSIONS = 644;
+ private static final EnumSet<XAttrSetFlag> CREATE_FLAG = EnumSet.of(XAttrSetFlag.CREATE);
+ private static final EnumSet<XAttrSetFlag> REPLACE_FLAG = EnumSet.of(XAttrSetFlag.REPLACE);
+
public static final Log LOG = LogFactory.getLog(NativeAzureFileSystemBaseTest.class);
protected NativeAzureFileSystem fs;
@@ -118,6 +127,72 @@ public abstract class NativeAzureFileSystemBaseTest
}
@Test
+ public void testSetGetXAttr() throws Exception {
+ byte[] attributeValue1 = "hi".getBytes(StandardCharsets.UTF_8);
+ byte[] attributeValue2 = "你好".getBytes(StandardCharsets.UTF_8);
+ String attributeName1 = "user.asciiAttribute";
+ String attributeName2 = "user.unicodeAttribute";
+ Path testFile = methodPath();
+
+ // after creating a file, the xAttr should not be present
+ createEmptyFile(testFile, FsPermission.createImmutable(READ_WRITE_PERMISSIONS));
+ assertNull(fs.getXAttr(testFile, attributeName1));
+
+ // after setting the xAttr on the file, the value should be retrievable
+ fs.setXAttr(testFile, attributeName1, attributeValue1);
+ assertArrayEquals(attributeValue1, fs.getXAttr(testFile, attributeName1));
+
+ // after setting a second xAttr on the file, the first xAttr values should not be overwritten
+ fs.setXAttr(testFile, attributeName2, attributeValue2);
+ assertArrayEquals(attributeValue1, fs.getXAttr(testFile, attributeName1));
+ assertArrayEquals(attributeValue2, fs.getXAttr(testFile, attributeName2));
+ }
+
+ @Test
+ public void testSetGetXAttrCreateReplace() throws Exception {
+ final byte[] attributeValue = "one".getBytes(StandardCharsets.UTF_8);
+ final String attributeName = "user.someAttribute";
+ final Path testFile = methodPath();
+
+ // after creating a file, it must be possible to create a new xAttr
+ createEmptyFile(testFile, FsPermission.createImmutable(READ_WRITE_PERMISSIONS));
+ fs.setXAttr(testFile, attributeName, attributeValue, CREATE_FLAG);
+ assertArrayEquals(attributeValue, fs.getXAttr(testFile, attributeName));
+
+ // however after the xAttr is created, creating it again must fail
+ intercept(IOException.class,
+ new LambdaTestUtils.VoidCallable() {
+ @Override
+ public void call() throws Exception {
+ fs.setXAttr(testFile, attributeName, attributeValue, CREATE_FLAG);
+ }
+ });
+ }
+
+ @Test
+ public void testSetGetXAttrReplace() throws Exception {
+ final byte[] attributeValue1 = "one".getBytes(StandardCharsets.UTF_8);
+ byte[] attributeValue2 = "two".getBytes(StandardCharsets.UTF_8);
+ final String attributeName = "user.someAttribute";
+ final Path testFile = methodPath();
+
+ // after creating a file, it must not be possible to replace an xAttr
+ createEmptyFile(testFile, FsPermission.createImmutable(READ_WRITE_PERMISSIONS));
+ intercept(IOException.class,
+ new LambdaTestUtils.VoidCallable() {
+ @Override
+ public void call() throws Exception {
+ fs.setXAttr(testFile, attributeName, attributeValue1, REPLACE_FLAG);
+ }
+ });
+
+ // however after the xAttr is created, replacing it must succeed
+ fs.setXAttr(testFile, attributeName, attributeValue1, CREATE_FLAG);
+ fs.setXAttr(testFile, attributeName, attributeValue2, REPLACE_FLAG);
+ assertArrayEquals(attributeValue2, fs.getXAttr(testFile, attributeName));
+ }
+
+ @Test
public void testStoreDeleteFolder() throws Exception {
Path testFolder = methodPath();
assertFalse(fs.exists(testFolder));
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemAttributes.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemAttributes.java
new file mode 100644
index 0000000..1b10b52
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemAttributes.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs;
+
+import java.io.IOException;
+import java.util.EnumSet;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.XAttrSetFlag;
+import org.junit.Assume;
+import org.junit.Test;
+import org.apache.hadoop.test.LambdaTestUtils;
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+
+/**
+ * Test attribute operations.
+ */
+public class ITestAzureBlobFileSystemAttributes extends AbstractAbfsIntegrationTest {
+ private static final short READ_WRITE_PERMISSIONS = 644;
+ private static final EnumSet<XAttrSetFlag> CREATE_FLAG = EnumSet.of(XAttrSetFlag.CREATE);
+ private static final EnumSet<XAttrSetFlag> REPLACE_FLAG = EnumSet.of(XAttrSetFlag.REPLACE);
+
+ public ITestAzureBlobFileSystemAttributes() throws Exception {
+ super();
+ }
+
+ @Test
+ public void testSetGetXAttr() throws Exception {
+ AzureBlobFileSystem fs = getFileSystem();
+ Assume.assumeTrue(fs.getIsNamespaceEnabled());
+
+ byte[] attributeValue1 = fs.getAbfsStore().encodeAttribute("hi");
+ byte[] attributeValue2 = fs.getAbfsStore().encodeAttribute("你好");
+ String attributeName1 = "user.asciiAttribute";
+ String attributeName2 = "user.unicodeAttribute";
+ Path testFile = path("setGetXAttr");
+
+ // after creating a file, the xAttr should not be present
+ touch(testFile);
+ assertNull(fs.getXAttr(testFile, attributeName1));
+
+ // after setting the xAttr on the file, the value should be retrievable
+ fs.setXAttr(testFile, attributeName1, attributeValue1);
+ assertArrayEquals(attributeValue1, fs.getXAttr(testFile, attributeName1));
+
+ // after setting a second xAttr on the file, the first xAttr values should not be overwritten
+ fs.setXAttr(testFile, attributeName2, attributeValue2);
+ assertArrayEquals(attributeValue1, fs.getXAttr(testFile, attributeName1));
+ assertArrayEquals(attributeValue2, fs.getXAttr(testFile, attributeName2));
+ }
+
+ @Test
+ public void testSetGetXAttrCreateReplace() throws Exception {
+ final AzureBlobFileSystem fs = getFileSystem();
+ Assume.assumeTrue(fs.getIsNamespaceEnabled());
+ final byte[] attributeValue = fs.getAbfsStore().encodeAttribute("one");
+ final String attributeName = "user.someAttribute";
+ final Path testFile = path("createReplaceXAttr");
+
+ // after creating a file, it must be possible to create a new xAttr
+ touch(testFile);
+ fs.setXAttr(testFile, attributeName, attributeValue, CREATE_FLAG);
+ assertArrayEquals(attributeValue, fs.getXAttr(testFile, attributeName));
+
+ // however after the xAttr is created, creating it again must fail
+ intercept(IOException.class,
+ new LambdaTestUtils.VoidCallable() {
+ @Override
+ public void call() throws Exception {
+ fs.setXAttr(testFile, attributeName, attributeValue, CREATE_FLAG);
+ }
+ });
+ }
+
+ @Test
+ public void testSetGetXAttrReplace() throws Exception {
+ final AzureBlobFileSystem fs = getFileSystem();
+ Assume.assumeTrue(fs.getIsNamespaceEnabled());
+ final byte[] attributeValue1 = fs.getAbfsStore().encodeAttribute("one");
+ byte[] attributeValue2 = fs.getAbfsStore().encodeAttribute("two");
+ final String attributeName = "user.someAttribute";
+ final Path testFile = path("replaceXAttr");
+
+ // after creating a file, it must not be possible to replace an xAttr
+ intercept(IOException.class,
+ new LambdaTestUtils.VoidCallable() {
+ @Override
+ public void call() throws Exception {
+ fs.setXAttr(testFile, attributeName, attributeValue1, REPLACE_FLAG);
+ }
+ });
+
+ // however after the xAttr is created, replacing it must succeed
+ fs.setXAttr(testFile, attributeName, attributeValue1, CREATE_FLAG);
+ fs.setXAttr(testFile, attributeName, attributeValue2, REPLACE_FLAG);
+ assertArrayEquals(attributeValue2, fs.getXAttr(testFile, attributeName));
+ }
+}
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemCreate.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemCreate.java
index ab01166..e315ad2 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemCreate.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemCreate.java
@@ -19,16 +19,21 @@
package org.apache.hadoop.fs.azurebfs;
import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.FilterOutputStream;
import java.util.EnumSet;
+import org.apache.hadoop.test.LambdaTestUtils;
import org.junit.Test;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.test.GenericTestUtils;
import static org.apache.hadoop.fs.contract.ContractTestUtils.assertIsFile;
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
/**
* Test create operation.
@@ -104,4 +109,97 @@ public class ITestAzureBlobFileSystemCreate extends
.close();
assertIsFile(fs, testFile);
}
+
+ /**
+ * Attempts to use to the ABFS stream after it is closed.
+ */
+ @Test
+ public void testWriteAfterClose() throws Throwable {
+ final AzureBlobFileSystem fs = getFileSystem();
+ Path testPath = new Path(TEST_FOLDER_PATH, TEST_CHILD_FILE);
+ final FSDataOutputStream out = fs.create(testPath);
+ out.close();
+ intercept(IOException.class,
+ new LambdaTestUtils.VoidCallable() {
+ @Override
+ public void call() throws Exception {
+ out.write('a');
+ }
+ });
+ intercept(IOException.class,
+ new LambdaTestUtils.VoidCallable() {
+ @Override
+ public void call() throws Exception {
+ out.write(new byte[]{'a'});
+ }
+ });
+ // hsync is not ignored on a closed stream
+ //out.hsync();
+ out.flush();
+ out.close();
+ }
+
+ /**
+ * Attempts to double close an ABFS output stream from within a
+ * FilterOutputStream.
+ * That class handles a double failure on close badly if the second
+ * exception rethrows the first.
+ */
+ @Test
+ public void testTryWithResources() throws Throwable {
+ final AzureBlobFileSystem fs = getFileSystem();
+ Path testPath = new Path(TEST_FOLDER_PATH, TEST_CHILD_FILE);
+ try (FSDataOutputStream out = fs.create(testPath)) {
+ out.write('1');
+ out.hsync();
+ // this will cause the next write to failAll
+ fs.delete(testPath, false);
+ out.write('2');
+ out.hsync();
+ fail("Expected a failure");
+ } catch (FileNotFoundException fnfe) {
+ // the exception raised in close() must be in the caught exception's
+ // suppressed list
+ Throwable[] suppressed = fnfe.getSuppressed();
+ assertEquals("suppressed count", 1, suppressed.length);
+ Throwable inner = suppressed[0];
+ if (!(inner instanceof IOException)) {
+ throw inner;
+ }
+ GenericTestUtils.assertExceptionContains(fnfe.getMessage(), inner);
+ }
+ }
+
+ /**
+ * Attempts to write to the azure stream after it is closed will raise
+ * an IOException.
+ */
+ @Test
+ public void testFilterFSWriteAfterClose() throws Throwable {
+ final AzureBlobFileSystem fs = getFileSystem();
+ final Path testPath = new Path(TEST_FOLDER_PATH, TEST_CHILD_FILE);
+ final FSDataOutputStream out = fs.create(testPath);
+ intercept(FileNotFoundException.class, new LambdaTestUtils.VoidCallable() {
+ @Override
+ public void call() throws Exception {
+ try (FilterOutputStream fos = new FilterOutputStream(out)) {
+ fos.write('a');
+ fos.flush();
+ out.hsync();
+ fs.delete(testPath, false);
+ // trigger the first failure
+ intercept(FileNotFoundException.class,
+ new LambdaTestUtils.VoidCallable() {
+ @Override
+ public void call() throws Exception {
+ fos.write('b');
+ out.hsync();
+ throw new Exception("hsync didn't raise an IOE");
+ }
+ });
+ }
+ }
+ });
+ }
+
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org