You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cl...@apache.org on 2020/11/03 20:04:40 UTC

[hadoop] branch branch-2.10 updated: HADOOP-17336. Backport HADOOP-16005 NativeAzureFileSystem does not support setXAttr and HADOOP-16785. Improve wasb and abfs resilience on double close() calls. followup to abfs close() fix to branch-2.10. Contributed by Sally Zuo.

This is an automated email from the ASF dual-hosted git repository.

cliang pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/branch-2.10 by this push:
     new ac82334  HADOOP-17336. Backport HADOOP-16005 NativeAzureFileSystem does not support setXAttr and HADOOP-16785. Improve wasb and abfs resilience on double close() calls. followup to abfs close() fix to branch-2.10. Contributed by Sally Zuo.
ac82334 is described below

commit ac82334041d87f3038d863a6eb7510ee30ad5aad
Author: Chen Liang <va...@apache.org>
AuthorDate: Tue Nov 3 12:00:57 2020 -0800

    HADOOP-17336. Backport HADOOP-16005 NativeAzureFileSystem does not support setXAttr and HADOOP-16785. Improve wasb and abfs resilience on double close() calls. followup to abfs close() fix to branch-2.10. Contributed by Sally Zuo.
---
 .../org/apache/hadoop/test/LambdaTestUtils.java    |   2 +-
 .../fs/azure/AzureNativeFileSystemStore.java       |  67 +++++++++---
 .../hadoop/fs/azure/NativeAzureFileSystem.java     | 113 ++++++++++++++++++---
 .../hadoop/fs/azure/NativeFileSystemStore.java     |   4 +
 .../hadoop/fs/azurebfs/AzureBlobFileSystem.java    |  79 ++++++++++++++
 .../fs/azurebfs/AzureBlobFileSystemStore.java      |   9 ++
 .../fs/azurebfs/services/AbfsOutputStream.java     |   6 ++
 .../hadoop/fs/azure/AbstractWasbTestBase.java      |   2 +-
 .../ITestFileSystemOperationExceptionHandling.java |  31 ++++++
 .../fs/azure/NativeAzureFileSystemBaseTest.java    |  77 +++++++++++++-
 .../ITestAzureBlobFileSystemAttributes.java        | 113 +++++++++++++++++++++
 .../azurebfs/ITestAzureBlobFileSystemCreate.java   |  98 ++++++++++++++++++
 12 files changed, 572 insertions(+), 29 deletions(-)

diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/LambdaTestUtils.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/LambdaTestUtils.java
index 1f906be..1fc6a07 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/LambdaTestUtils.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/LambdaTestUtils.java
@@ -400,7 +400,7 @@ public final class LambdaTestUtils {
       throws Exception {
     try {
       eval.call();
-      throw new AssertionError("Expected an exception");
+      throw new AssertionError("Expected an exception of type " + clazz);
     } catch (Throwable e) {
       if (clazz.isAssignableFrom(e.getClass())) {
         return (E)e;
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java
index 8ced57a..f03efdc 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java
@@ -29,6 +29,8 @@ import java.net.URI;
 import java.net.URISyntaxException;
 import java.net.URLDecoder;
 import java.net.URLEncoder;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
 import java.security.InvalidKeyException;
 import java.util.Calendar;
 import java.util.Date;
@@ -245,6 +247,8 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
 
   private static final int DEFAULT_CONCURRENT_WRITES = 8;
 
+  private static final Charset METADATA_ENCODING = StandardCharsets.UTF_8;
+
   // Concurrent reads reads of data written out of band are disable by default.
   //
   private static final boolean DEFAULT_READ_TOLERATE_CONCURRENT_APPEND = false;
@@ -1604,17 +1608,30 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
     removeMetadataAttribute(blob, OLD_IS_FOLDER_METADATA_KEY);
   }
 
-  private static void storeLinkAttribute(CloudBlobWrapper blob,
-      String linkTarget) throws UnsupportedEncodingException {
-    // We have to URL encode the link attribute as the link URI could
+  private static String encodeMetadataAttribute(String value) throws UnsupportedEncodingException {
+    // We have to URL encode the attribute as it could
     // have URI special characters which unless encoded will result
     // in 403 errors from the server. This is due to metadata properties
     // being sent in the HTTP header of the request which is in turn used
     // on the server side to authorize the request.
-    String encodedLinkTarget = null;
-    if (linkTarget != null) {
-      encodedLinkTarget = URLEncoder.encode(linkTarget, "UTF-8");
-    }
+    return value == null ? null : URLEncoder.encode(value, METADATA_ENCODING.name());
+  }
+
+  private static String decodeMetadataAttribute(String encoded) throws UnsupportedEncodingException {
+    return encoded == null ? null : URLDecoder.decode(encoded, METADATA_ENCODING.name());
+  }
+
+  private static String ensureValidAttributeName(String attribute) {
+    // Attribute names must be valid C# identifiers so we have to
+    // convert the namespace dots (e.g. "user.something") in the
+    // attribute names. Using underscores here to be consistent with
+    // the constant metadata keys defined earlier in the file
+    return attribute.replace('.', '_');
+  }
+
+  private static void storeLinkAttribute(CloudBlobWrapper blob,
+      String linkTarget) throws UnsupportedEncodingException {
+    String encodedLinkTarget = encodeMetadataAttribute(linkTarget);
     storeMetadataAttribute(blob,
         LINK_BACK_TO_UPLOAD_IN_PROGRESS_METADATA_KEY,
         encodedLinkTarget);
@@ -1628,11 +1645,7 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
     String encodedLinkTarget = getMetadataAttribute(blob,
         LINK_BACK_TO_UPLOAD_IN_PROGRESS_METADATA_KEY,
         OLD_LINK_BACK_TO_UPLOAD_IN_PROGRESS_METADATA_KEY);
-    String linkTarget = null;
-    if (encodedLinkTarget != null) {
-      linkTarget = URLDecoder.decode(encodedLinkTarget, "UTF-8");
-    }
-    return linkTarget;
+    return decodeMetadataAttribute(encodedLinkTarget);
   }
 
   private static boolean retrieveFolderAttribute(CloudBlobWrapper blob) {
@@ -2154,6 +2167,36 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
   }
 
   @Override
+  public byte[] retrieveAttribute(String key, String attribute) throws IOException {
+    try {
+      checkContainer(ContainerAccessType.PureRead);
+      CloudBlobWrapper blob = getBlobReference(key);
+      blob.downloadAttributes(getInstrumentedContext());
+
+      String value = getMetadataAttribute(blob, ensureValidAttributeName(attribute));
+      value = decodeMetadataAttribute(value);
+      return value == null ? null : value.getBytes(METADATA_ENCODING);
+    } catch (Exception e) {
+      throw new AzureException(e);
+    }
+  }
+
+  @Override
+  public void storeAttribute(String key, String attribute, byte[] value) throws IOException {
+    try {
+      checkContainer(ContainerAccessType.ReadThenWrite);
+      CloudBlobWrapper blob = getBlobReference(key);
+      blob.downloadAttributes(getInstrumentedContext());
+
+      String encodedValue = encodeMetadataAttribute(new String(value, METADATA_ENCODING));
+      storeMetadataAttribute(blob, ensureValidAttributeName(attribute), encodedValue);
+      blob.uploadMetadata(getInstrumentedContext());
+    } catch (Exception e) {
+      throw new AzureException(e);
+    }
+  }
+
+  @Override
   public InputStream retrieve(String key) throws AzureException, IOException {
     return retrieve(key, 0);
   }
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java
index db880f2..0588f22 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java
@@ -59,6 +59,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.Seekable;
 import org.apache.hadoop.fs.StreamCapabilities;
 import org.apache.hadoop.fs.Syncable;
+import org.apache.hadoop.fs.XAttrSetFlag;
 import org.apache.hadoop.fs.azure.metrics.AzureFileSystemInstrumentation;
 import org.apache.hadoop.fs.azure.metrics.AzureFileSystemMetricsSystem;
 import org.apache.hadoop.fs.azure.security.Constants;
@@ -184,7 +185,7 @@ public class NativeAzureFileSystem extends FileSystem {
       } catch (IOException e) {
         this.committed = false;
       }
-      
+
       if (!this.committed) {
         LOG.error("Deleting corruped rename pending file {} \n {}",
             redoFile, contents);
@@ -507,7 +508,7 @@ public class NativeAzureFileSystem extends FileSystem {
     /**
      * Recover from a folder rename failure by redoing the intended work,
      * as recorded in the -RenamePending.json file.
-     * 
+     *
      * @throws IOException Thrown when fail to redo.
      */
     public void redo() throws IOException {
@@ -675,7 +676,7 @@ public class NativeAzureFileSystem extends FileSystem {
   /**
    * The time span in seconds before which we consider a temp blob to be
    * dangling (not being actively uploaded to) and up for reclamation.
-   * 
+   *
    * So e.g. if this is 60, then any temporary blobs more than a minute old
    * would be considered dangling.
    */
@@ -1083,12 +1084,13 @@ public class NativeAzureFileSystem extends FileSystem {
      * write is that one byte is written to the output stream. The byte to be
      * written is the eight low-order bits of the argument b. The 24 high-order
      * bits of b are ignored.
-     * 
+     *
      * @param b
      *          32-bit integer of block of 4 bytes
      */
     @Override
     public void write(int b) throws IOException {
+      checkOpen();
       try {
         out.write(b);
       } catch(IOException e) {
@@ -1106,12 +1108,13 @@ public class NativeAzureFileSystem extends FileSystem {
      * Writes b.length bytes from the specified byte array to this output
      * stream. The general contract for write(b) is that it should have exactly
      * the same effect as the call write(b, 0, b.length).
-     * 
+     *
      * @param b
      *          Block of bytes to be written to the output stream.
      */
     @Override
     public void write(byte[] b) throws IOException {
+      checkOpen();
       try {
         out.write(b);
       } catch(IOException e) {
@@ -1132,7 +1135,7 @@ public class NativeAzureFileSystem extends FileSystem {
      * are written to the output stream in order; element <code>b[off]</code>
      * is the first byte written and <code>b[off+len-1]</code> is the last
      * byte written by this operation.
-     * 
+     *
      * @param b
      *          Byte array to be written.
      * @param off
@@ -1142,6 +1145,7 @@ public class NativeAzureFileSystem extends FileSystem {
      */
     @Override
     public void write(byte[] b, int off, int len) throws IOException {
+      checkOpen();
       try {
         out.write(b, off, len);
       } catch(IOException e) {
@@ -1157,7 +1161,7 @@ public class NativeAzureFileSystem extends FileSystem {
 
     /**
      * Get the blob name.
-     * 
+     *
      * @return String Blob name.
      */
     public String getKey() {
@@ -1166,7 +1170,7 @@ public class NativeAzureFileSystem extends FileSystem {
 
     /**
      * Set the blob name.
-     * 
+     *
      * @param key
      *          Blob name.
      */
@@ -1176,7 +1180,7 @@ public class NativeAzureFileSystem extends FileSystem {
 
     /**
      * Get the blob name.
-     * 
+     *
      * @return String Blob name.
      */
     public String getEncodedKey() {
@@ -1185,7 +1189,7 @@ public class NativeAzureFileSystem extends FileSystem {
 
     /**
      * Set the blob name.
-     * 
+     *
      * @param anEncodedKey
      *          Blob name.
      */
@@ -1204,6 +1208,17 @@ public class NativeAzureFileSystem extends FileSystem {
     private void restoreKey() throws IOException {
       store.rename(getEncodedKey(), getKey());
     }
+
+    /**
+     * Check for the stream being open.
+     * @throws IOException if the stream is closed.
+     */
+    private void checkOpen() throws IOException {
+      if (out == null) {
+        throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
+      }
+    }
+
   }
 
   private URI uri;
@@ -3556,6 +3571,76 @@ public class NativeAzureFileSystem extends FileSystem {
   }
 
   /**
+   * Set the value of an attribute for a path.
+   *
+   * @param path The path on which to set the attribute
+   * @param xAttrName The attribute to set
+   * @param value The byte value of the attribute to set (encoded in utf-8)
+   * @param flag The mode in which to set the attribute
+   * @throws IOException If there was an issue setting the attribute on Azure
+   */
+  @Override
+  public void setXAttr(Path path, String xAttrName, byte[] value, EnumSet<XAttrSetFlag> flag) throws IOException {
+    Path absolutePath = makeAbsolute(path);
+    performAuthCheck(absolutePath, WasbAuthorizationOperations.WRITE, "setXAttr", absolutePath);
+
+    String key = pathToKey(absolutePath);
+    FileMetadata metadata;
+    try {
+      metadata = store.retrieveMetadata(key);
+    } catch (IOException ex) {
+      Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex);
+      if (innerException instanceof StorageException
+          && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
+        throw new FileNotFoundException("File " + path + " doesn't exists.");
+      }
+      throw ex;
+    }
+
+    if (metadata == null) {
+      throw new FileNotFoundException("File doesn't exist: " + path);
+    }
+
+    boolean xAttrExists = store.retrieveAttribute(key, xAttrName) != null;
+    XAttrSetFlag.validate(xAttrName, xAttrExists, flag);
+    store.storeAttribute(key, xAttrName, value);
+  }
+
+  /**
+   * Get the value of an attribute for a path.
+   *
+   * @param path The path on which to get the attribute
+   * @param xAttrName The attribute to get
+   * @return The bytes of the attribute's value (encoded in utf-8)
+   *         or null if the attribute does not exist
+   * @throws IOException If there was an issue getting the attribute from Azure
+   */
+  @Override
+  public byte[] getXAttr(Path path, String xAttrName) throws IOException {
+    Path absolutePath = makeAbsolute(path);
+    performAuthCheck(absolutePath, WasbAuthorizationOperations.READ, "getXAttr", absolutePath);
+
+    String key = pathToKey(absolutePath);
+    FileMetadata metadata;
+    try {
+      metadata = store.retrieveMetadata(key);
+    } catch (IOException ex) {
+      Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex);
+      if (innerException instanceof StorageException
+              && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
+        throw new FileNotFoundException("File " + path + " doesn't exists.");
+      }
+      throw ex;
+    }
+
+    if (metadata == null) {
+      throw new FileNotFoundException("File doesn't exist: " + path);
+    }
+
+    return store.retrieveAttribute(key, xAttrName);
+  }
+
+  /**
    * Is the user allowed?
    * <ol>
    *   <li>No user: false</li>
@@ -3728,7 +3813,7 @@ public class NativeAzureFileSystem extends FileSystem {
   /**
    * Implements recover and delete (-move and -delete) behaviors for handling
    * dangling files (blobs whose upload was interrupted).
-   * 
+   *
    * @param root
    *          The root path to check from.
    * @param handler
@@ -3770,7 +3855,7 @@ public class NativeAzureFileSystem extends FileSystem {
    * the data to a temporary blob, but for some reason we crashed in the middle
    * of the upload and left them there. If any are found, we move them to the
    * destination given.
-   * 
+   *
    * @param root
    *          The root path to consider.
    * @param destination
@@ -3790,7 +3875,7 @@ public class NativeAzureFileSystem extends FileSystem {
    * meaning that they are place-holder blobs that we created while we upload
    * the data to a temporary blob, but for some reason we crashed in the middle
    * of the upload and left them there. If any are found, we delete them.
-   * 
+   *
    * @param root
    *          The root path to consider.
    * @throws IOException Thrown when fail to delete.
@@ -3812,7 +3897,7 @@ public class NativeAzureFileSystem extends FileSystem {
    * Encode the key with a random prefix for load balancing in Azure storage.
    * Upload data to a random temporary file then do storage side renaming to
    * recover the original key.
-   * 
+   *
    * @param aKey a key to be encoded.
    * @return Encoded version of the original key.
    */
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeFileSystemStore.java
index 36e3819..414a011 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeFileSystemStore.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeFileSystemStore.java
@@ -76,6 +76,10 @@ interface NativeFileSystemStore {
   void changePermissionStatus(String key, PermissionStatus newPermission)
       throws AzureException;
 
+  byte[] retrieveAttribute(String key, String attribute) throws IOException;
+
+  void storeAttribute(String key, String attribute, byte[] value) throws IOException;
+
   /**
    * API to delete a blob in the back end azure storage.
    * @param key - key to the blob being deleted.
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java
index c3791fb..7180a47 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java
@@ -26,6 +26,7 @@ import java.io.OutputStream;
 import java.net.HttpURLConnection;
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.util.Hashtable;
 import java.util.List;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -55,6 +56,7 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathIOException;
+import org.apache.hadoop.fs.XAttrSetFlag;
 import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants;
 import org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations;
 import org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes;
@@ -626,6 +628,83 @@ public class AzureBlobFileSystem extends FileSystem {
   }
 
   /**
+   * Set the value of an attribute for a path.
+   *
+   * @param path The path on which to set the attribute
+   * @param name The attribute to set
+   * @param value The byte value of the attribute to set (encoded in latin-1)
+   * @param flag The mode in which to set the attribute
+   * @throws IOException If there was an issue setting the attribute on Azure
+   * @throws IllegalArgumentException If name is null or empty or if value is null
+   */
+  @Override
+  public void setXAttr(final Path path, final String name, final byte[] value, final EnumSet<XAttrSetFlag> flag)
+      throws IOException {
+    LOG.debug("AzureBlobFileSystem.setXAttr path: {}", path);
+
+    if (name == null || name.isEmpty() || value == null) {
+      throw new IllegalArgumentException("A valid name and value must be specified.");
+    }
+
+    Path qualifiedPath = makeQualified(path);
+    performAbfsAuthCheck(FsAction.READ_WRITE, qualifiedPath);
+
+    try {
+      Hashtable<String, String> properties = abfsStore.getPathStatus(path);
+      String xAttrName = ensureValidAttributeName(name);
+      boolean xAttrExists = properties.containsKey(xAttrName);
+      XAttrSetFlag.validate(name, xAttrExists, flag);
+
+      String xAttrValue = abfsStore.decodeAttribute(value);
+      properties.put(xAttrName, xAttrValue);
+      abfsStore.setPathProperties(path, properties);
+    } catch (AzureBlobFileSystemException ex) {
+      checkException(path, ex);
+    }
+  }
+
+  /**
+   * Get the value of an attribute for a path.
+   *
+   * @param path The path on which to get the attribute
+   * @param name The attribute to get
+   * @return The bytes of the attribute's value (encoded in latin-1)
+   *         or null if the attribute does not exist
+   * @throws IOException If there was an issue getting the attribute from Azure
+   * @throws IllegalArgumentException If name is null or empty
+   */
+  @Override
+  public byte[] getXAttr(final Path path, final String name)
+      throws IOException {
+    LOG.debug("AzureBlobFileSystem.getXAttr path: {}", path);
+
+    if (name == null || name.isEmpty()) {
+      throw new IllegalArgumentException("A valid name must be specified.");
+    }
+
+    Path qualifiedPath = makeQualified(path);
+    performAbfsAuthCheck(FsAction.READ, qualifiedPath);
+
+    byte[] value = null;
+    try {
+      Hashtable<String, String> properties = abfsStore.getPathStatus(path);
+      String xAttrName = ensureValidAttributeName(name);
+      if (properties.containsKey(xAttrName)) {
+        String xAttrValue = properties.get(xAttrName);
+        value = abfsStore.encodeAttribute(xAttrValue);
+      }
+    } catch (AzureBlobFileSystemException ex) {
+      checkException(path, ex);
+    }
+    return value;
+  }
+
+  private static String ensureValidAttributeName(String attribute) {
+    // to avoid HTTP 400 Bad Request, InvalidPropertyName
+    return attribute.replace('.', '_');
+  }
+
+  /**
    * Set permission of a path.
    *
    * @param path       The path
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
index c3dfb11..0b2b3b1 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.fs.azurebfs;
 import java.io.File;
 import java.io.IOException;
 import java.io.OutputStream;
+import java.io.UnsupportedEncodingException;
 import java.net.HttpURLConnection;
 import java.net.MalformedURLException;
 import java.net.URI;
@@ -177,6 +178,14 @@ public class AzureBlobFileSystemStore {
     return this.primaryUserGroup;
   }
 
+  byte[] encodeAttribute(String value) throws UnsupportedEncodingException {
+    return value.getBytes(XMS_PROPERTIES_ENCODING);
+  }
+
+  String decodeAttribute(byte[] value) throws UnsupportedEncodingException {
+    return new String(value, XMS_PROPERTIES_ENCODING);
+  }
+
   private String[] authorityParts(URI uri) throws InvalidUriAuthorityException, InvalidUriException {
     final String authority = uri.getRawAuthority();
     if (null == authority) {
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java
index be2ab67..e68400f 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java
@@ -246,6 +246,12 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
     try {
       flushInternal(true);
       threadExecutor.shutdown();
+    } catch (IOException e) {
+      // Problems surface in try-with-resources clauses if
+      // the exception thrown in a close == the one already thrown
+      // -so we wrap any exception with a new one.
+      // See HADOOP-16785
+      throw new IOException(e);
     } finally {
       lastError = new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
       buffer = null;
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/AbstractWasbTestBase.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/AbstractWasbTestBase.java
index 0d3a06c..d5c1dce 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/AbstractWasbTestBase.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/AbstractWasbTestBase.java
@@ -52,7 +52,7 @@ public abstract class AbstractWasbTestBase extends AbstractWasbTestWithTimeout
   @Before
   public void setUp() throws Exception {
     AzureBlobStorageTestAccount account = createTestAccount();
-    assumeNotNull(account);
+    assumeNotNull("test account", account);
     bindToTestAccount(account);
   }
 
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestFileSystemOperationExceptionHandling.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestFileSystemOperationExceptionHandling.java
index a45dae4..bcb9b2f 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestFileSystemOperationExceptionHandling.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestFileSystemOperationExceptionHandling.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.fs.azure;
 
 import java.io.FileNotFoundException;
+import java.io.IOException;
 
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
@@ -27,10 +28,13 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.contract.ContractTestUtils;
 import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.test.LambdaTestUtils;
 import org.junit.After;
 import org.junit.Test;
 
+import static org.apache.hadoop.fs.FSExceptionMessages.STREAM_IS_CLOSED;
 import static org.apache.hadoop.fs.azure.ExceptionHandlingTestHelper.*;
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
 
 /**
  * Single threaded exception handling.
@@ -265,6 +269,33 @@ public class ITestFileSystemOperationExceptionHandling
     inputStream = fs.open(testPath);
   }
 
+  /**
+   * Attempts to write to the azure stream after it is closed will raise
+   * an IOException.
+   */
+  @Test
+  public void testWriteAfterClose() throws Throwable {
+    final FSDataOutputStream out = fs.create(testPath);
+    out.close();
+    intercept(IOException.class, STREAM_IS_CLOSED,
+        new LambdaTestUtils.VoidCallable() {
+          @Override
+          public void call() throws Exception {
+            out.write('a');
+          }
+        });
+    intercept(IOException.class, STREAM_IS_CLOSED,
+        new LambdaTestUtils.VoidCallable() {
+          @Override
+          public void call() throws Exception {
+            out.write(new byte[]{'a'});
+          }
+        });
+    out.hsync();
+    out.flush();
+    out.close();
+  }
+
   @After
   public void tearDown() throws Exception {
     if (inputStream != null) {
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/NativeAzureFileSystemBaseTest.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/NativeAzureFileSystemBaseTest.java
index 19d370e..db8b018 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/NativeAzureFileSystemBaseTest.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/NativeAzureFileSystemBaseTest.java
@@ -22,10 +22,12 @@ import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Calendar;
 import java.util.Date;
+import java.util.EnumSet;
 import java.util.TimeZone;
 
 import org.apache.commons.logging.Log;
@@ -37,10 +39,12 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.XAttrSetFlag;
 import org.apache.hadoop.fs.contract.ContractTestUtils;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.junit.Test;
+import org.apache.hadoop.test.LambdaTestUtils;
 import org.apache.hadoop.fs.azure.NativeAzureFileSystem.FolderRenamePending;
 
 import com.microsoft.azure.storage.AccessCondition;
@@ -51,12 +55,13 @@ import static org.apache.hadoop.fs.azure.integration.AzureTestUtils.readStringFr
 import static org.apache.hadoop.fs.azure.integration.AzureTestUtils.writeStringToFile;
 import static org.apache.hadoop.fs.azure.integration.AzureTestUtils.writeStringToStream;
 import static org.apache.hadoop.test.GenericTestUtils.*;
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
 
 /*
  * Tests the Native Azure file system (WASB) against an actual blob store if
  * provided in the environment.
  * Subclasses implement createTestAccount() to hit local&mock storage with the same test code.
- * 
+ *
  * For hand-testing: remove "abstract" keyword and copy in an implementation of createTestAccount
  * from one of the subclasses
  */
@@ -65,6 +70,10 @@ public abstract class NativeAzureFileSystemBaseTest
 
   private final long modifiedTimeErrorMargin = 5 * 1000; // Give it +/-5 seconds
 
+  private static final short READ_WRITE_PERMISSIONS = 644;
+  private static final EnumSet<XAttrSetFlag> CREATE_FLAG = EnumSet.of(XAttrSetFlag.CREATE);
+  private static final EnumSet<XAttrSetFlag> REPLACE_FLAG = EnumSet.of(XAttrSetFlag.REPLACE);
+
   public static final Log LOG = LogFactory.getLog(NativeAzureFileSystemBaseTest.class);
   protected NativeAzureFileSystem fs;
 
@@ -118,6 +127,72 @@ public abstract class NativeAzureFileSystemBaseTest
   }
 
   @Test
+  public void testSetGetXAttr() throws Exception {
+    byte[] attributeValue1 = "hi".getBytes(StandardCharsets.UTF_8);
+    byte[] attributeValue2 = "你好".getBytes(StandardCharsets.UTF_8);
+    String attributeName1 = "user.asciiAttribute";
+    String attributeName2 = "user.unicodeAttribute";
+    Path testFile = methodPath();
+
+    // after creating a file, the xAttr should not be present
+    createEmptyFile(testFile, FsPermission.createImmutable(READ_WRITE_PERMISSIONS));
+    assertNull(fs.getXAttr(testFile, attributeName1));
+
+    // after setting the xAttr on the file, the value should be retrievable
+    fs.setXAttr(testFile, attributeName1, attributeValue1);
+    assertArrayEquals(attributeValue1, fs.getXAttr(testFile, attributeName1));
+
+    // after setting a second xAttr on the file, the first xAttr values should not be overwritten
+    fs.setXAttr(testFile, attributeName2, attributeValue2);
+    assertArrayEquals(attributeValue1, fs.getXAttr(testFile, attributeName1));
+    assertArrayEquals(attributeValue2, fs.getXAttr(testFile, attributeName2));
+  }
+
+  @Test
+  public void testSetGetXAttrCreateReplace() throws Exception {
+    final byte[] attributeValue = "one".getBytes(StandardCharsets.UTF_8);
+    final String attributeName = "user.someAttribute";
+    final Path testFile = methodPath();
+
+    // after creating a file, it must be possible to create a new xAttr
+    createEmptyFile(testFile, FsPermission.createImmutable(READ_WRITE_PERMISSIONS));
+    fs.setXAttr(testFile, attributeName, attributeValue, CREATE_FLAG);
+    assertArrayEquals(attributeValue, fs.getXAttr(testFile, attributeName));
+
+    // however after the xAttr is created, creating it again must fail
+    intercept(IOException.class,
+        new LambdaTestUtils.VoidCallable() {
+          @Override
+          public void call() throws Exception {
+            fs.setXAttr(testFile, attributeName, attributeValue, CREATE_FLAG);
+          }
+        });
+  }
+
+  @Test
+  public void testSetGetXAttrReplace() throws Exception {
+    final byte[] attributeValue1 = "one".getBytes(StandardCharsets.UTF_8);
+    byte[] attributeValue2 = "two".getBytes(StandardCharsets.UTF_8);
+    final String attributeName = "user.someAttribute";
+    final Path testFile = methodPath();
+
+    // after creating a file, it must not be possible to replace an xAttr
+    createEmptyFile(testFile, FsPermission.createImmutable(READ_WRITE_PERMISSIONS));
+    intercept(IOException.class,
+        new LambdaTestUtils.VoidCallable() {
+          @Override
+          public void call() throws Exception {
+            fs.setXAttr(testFile, attributeName, attributeValue1, REPLACE_FLAG);
+          }
+        });
+
+    // however after the xAttr is created, replacing it must succeed
+    fs.setXAttr(testFile, attributeName, attributeValue1, CREATE_FLAG);
+    fs.setXAttr(testFile, attributeName, attributeValue2, REPLACE_FLAG);
+    assertArrayEquals(attributeValue2, fs.getXAttr(testFile, attributeName));
+  }
+
+  @Test
   public void testStoreDeleteFolder() throws Exception {
     Path testFolder = methodPath();
     assertFalse(fs.exists(testFolder));
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemAttributes.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemAttributes.java
new file mode 100644
index 0000000..1b10b52
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemAttributes.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs;
+
+import java.io.IOException;
+import java.util.EnumSet;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.XAttrSetFlag;
+import org.junit.Assume;
+import org.junit.Test;
+import org.apache.hadoop.test.LambdaTestUtils;
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+
+/**
+ * Test attribute operations.
+ */
+public class ITestAzureBlobFileSystemAttributes extends AbstractAbfsIntegrationTest {
+  private static final short READ_WRITE_PERMISSIONS = 644;
+  private static final EnumSet<XAttrSetFlag> CREATE_FLAG = EnumSet.of(XAttrSetFlag.CREATE);
+  private static final EnumSet<XAttrSetFlag> REPLACE_FLAG = EnumSet.of(XAttrSetFlag.REPLACE);
+
+  public ITestAzureBlobFileSystemAttributes() throws Exception {
+    super();
+  }
+
+  @Test
+  public void testSetGetXAttr() throws Exception {
+    AzureBlobFileSystem fs = getFileSystem();
+    Assume.assumeTrue(fs.getIsNamespaceEnabled());
+
+    byte[] attributeValue1 = fs.getAbfsStore().encodeAttribute("hi");
+    byte[] attributeValue2 = fs.getAbfsStore().encodeAttribute("你好");
+    String attributeName1 = "user.asciiAttribute";
+    String attributeName2 = "user.unicodeAttribute";
+    Path testFile = path("setGetXAttr");
+
+    // after creating a file, the xAttr should not be present
+    touch(testFile);
+    assertNull(fs.getXAttr(testFile, attributeName1));
+
+    // after setting the xAttr on the file, the value should be retrievable
+    fs.setXAttr(testFile, attributeName1, attributeValue1);
+    assertArrayEquals(attributeValue1, fs.getXAttr(testFile, attributeName1));
+
+    // after setting a second xAttr on the file, the first xAttr values should not be overwritten
+    fs.setXAttr(testFile, attributeName2, attributeValue2);
+    assertArrayEquals(attributeValue1, fs.getXAttr(testFile, attributeName1));
+    assertArrayEquals(attributeValue2, fs.getXAttr(testFile, attributeName2));
+  }
+
+  @Test
+  public void testSetGetXAttrCreateReplace() throws Exception {
+    final AzureBlobFileSystem fs = getFileSystem();
+    Assume.assumeTrue(fs.getIsNamespaceEnabled());
+    final byte[] attributeValue = fs.getAbfsStore().encodeAttribute("one");
+    final String attributeName = "user.someAttribute";
+    final Path testFile = path("createReplaceXAttr");
+
+    // after creating a file, it must be possible to create a new xAttr
+    touch(testFile);
+    fs.setXAttr(testFile, attributeName, attributeValue, CREATE_FLAG);
+    assertArrayEquals(attributeValue, fs.getXAttr(testFile, attributeName));
+
+    // however after the xAttr is created, creating it again must fail
+    intercept(IOException.class,
+        new LambdaTestUtils.VoidCallable() {
+          @Override
+          public void call() throws Exception {
+            fs.setXAttr(testFile, attributeName, attributeValue, CREATE_FLAG);
+          }
+        });
+  }
+
+  @Test
+  public void testSetGetXAttrReplace() throws Exception {
+    final AzureBlobFileSystem fs = getFileSystem();
+    Assume.assumeTrue(fs.getIsNamespaceEnabled());
+    final byte[] attributeValue1 = fs.getAbfsStore().encodeAttribute("one");
+    byte[] attributeValue2 = fs.getAbfsStore().encodeAttribute("two");
+    final String attributeName = "user.someAttribute";
+    final Path testFile = path("replaceXAttr");
+
+    // after creating a file, it must not be possible to replace an xAttr
+    intercept(IOException.class,
+        new LambdaTestUtils.VoidCallable() {
+          @Override
+          public void call() throws Exception {
+            fs.setXAttr(testFile, attributeName, attributeValue1, REPLACE_FLAG);
+          }
+        });
+
+    // however after the xAttr is created, replacing it must succeed
+    fs.setXAttr(testFile, attributeName, attributeValue1, CREATE_FLAG);
+    fs.setXAttr(testFile, attributeName, attributeValue2, REPLACE_FLAG);
+    assertArrayEquals(attributeValue2, fs.getXAttr(testFile, attributeName));
+  }
+}
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemCreate.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemCreate.java
index ab01166..e315ad2 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemCreate.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemCreate.java
@@ -19,16 +19,21 @@
 package org.apache.hadoop.fs.azurebfs;
 
 import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.FilterOutputStream;
 import java.util.EnumSet;
 
+import org.apache.hadoop.test.LambdaTestUtils;
 import org.junit.Test;
 
 import org.apache.hadoop.fs.CreateFlag;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.test.GenericTestUtils;
 
 import static org.apache.hadoop.fs.contract.ContractTestUtils.assertIsFile;
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
 
 /**
  * Test create operation.
@@ -104,4 +109,97 @@ public class ITestAzureBlobFileSystemCreate extends
         .close();
     assertIsFile(fs, testFile);
   }
+
+  /**
+   * Attempts to use to the ABFS stream after it is closed.
+   */
+  @Test
+  public void testWriteAfterClose() throws Throwable {
+    final AzureBlobFileSystem fs = getFileSystem();
+    Path testPath = new Path(TEST_FOLDER_PATH, TEST_CHILD_FILE);
+    final FSDataOutputStream out = fs.create(testPath);
+    out.close();
+    intercept(IOException.class,
+        new LambdaTestUtils.VoidCallable() {
+          @Override
+          public void call() throws Exception {
+            out.write('a');
+          }
+        });
+    intercept(IOException.class,
+        new LambdaTestUtils.VoidCallable() {
+          @Override
+          public void call() throws Exception {
+            out.write(new byte[]{'a'});
+          }
+        });
+    // hsync is not ignored on a closed stream
+    //out.hsync();
+    out.flush();
+    out.close();
+  }
+
+  /**
+   * Attempts to double close an ABFS output stream from within a
+   * FilterOutputStream.
+   * That class handles a double failure on close badly if the second
+   * exception rethrows the first.
+   */
+  @Test
+  public void testTryWithResources() throws Throwable {
+    final AzureBlobFileSystem fs = getFileSystem();
+    Path testPath = new Path(TEST_FOLDER_PATH, TEST_CHILD_FILE);
+    try (FSDataOutputStream out = fs.create(testPath)) {
+      out.write('1');
+      out.hsync();
+      // this will cause the next write to failAll
+      fs.delete(testPath, false);
+      out.write('2');
+      out.hsync();
+      fail("Expected a failure");
+    } catch (FileNotFoundException fnfe) {
+      // the exception raised in close() must be in the caught exception's
+      // suppressed list
+      Throwable[] suppressed = fnfe.getSuppressed();
+      assertEquals("suppressed count", 1, suppressed.length);
+      Throwable inner = suppressed[0];
+      if (!(inner instanceof IOException)) {
+        throw inner;
+      }
+      GenericTestUtils.assertExceptionContains(fnfe.getMessage(), inner);
+    }
+  }
+
+  /**
+   * Attempts to write to the azure stream after it is closed will raise
+   * an IOException.
+   */
+  @Test
+  public void testFilterFSWriteAfterClose() throws Throwable {
+    final AzureBlobFileSystem fs = getFileSystem();
+    final Path testPath = new Path(TEST_FOLDER_PATH, TEST_CHILD_FILE);
+    final FSDataOutputStream out = fs.create(testPath);
+    intercept(FileNotFoundException.class, new LambdaTestUtils.VoidCallable() {
+      @Override
+      public void call() throws Exception {
+        try (FilterOutputStream fos = new FilterOutputStream(out)) {
+          fos.write('a');
+          fos.flush();
+          out.hsync();
+          fs.delete(testPath, false);
+          // trigger the first failure
+          intercept(FileNotFoundException.class,
+              new LambdaTestUtils.VoidCallable() {
+                @Override
+                public void call() throws Exception {
+                  fos.write('b');
+                  out.hsync();
+                  throw new Exception("hsync didn't raise an IOE");
+                }
+              });
+        }
+      }
+    });
+  }
+
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org