You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by st...@apache.org on 2021/01/26 19:47:28 UTC

[hadoop] branch branch-3.3 updated: HADOOP-17414. Magic committer files don't have the count of bytes written collected by spark (#2530)

This is an automated email from the ASF dual-hosted git repository.

stevel pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
     new fb603e8  HADOOP-17414. Magic committer files don't have the count of bytes written collected by spark (#2530)
fb603e8 is described below

commit fb603e81f0fe9c5c92089cbe11af07fd4ac00bba
Author: Steve Loughran <st...@cloudera.com>
AuthorDate: Tue Jan 26 19:30:51 2021 +0000

    HADOOP-17414. Magic committer files don't have the count of bytes written collected by spark (#2530)
    
    This needs SPARK-33739 in the matching spark branch in order to work
    
    Contributed by Steve Loughran.
    
    Change-Id: I4fe75b057159e35aacc072da3cb7343467c0c3f1
---
 .../fs/statistics/DurationTrackerFactory.java      |   8 +-
 .../hadoop/fs/statistics/StoreStatisticNames.java  |  18 +
 .../java/org/apache/hadoop/fs/s3a/Constants.java   |   6 +
 .../apache/hadoop/fs/s3a/S3ABlockOutputStream.java |   2 +-
 .../org/apache/hadoop/fs/s3a/S3AFileSystem.java    | 127 +++---
 .../apache/hadoop/fs/s3a/S3AInstrumentation.java   |   2 +-
 .../java/org/apache/hadoop/fs/s3a/Statistic.java   |  18 +
 .../apache/hadoop/fs/s3a/WriteOperationHelper.java |  12 +-
 .../org/apache/hadoop/fs/s3a/WriteOperations.java  |   6 +-
 .../hadoop/fs/s3a/commit/CommitConstants.java      |  14 +
 .../hadoop/fs/s3a/commit/CommitOperations.java     |  26 ++
 .../fs/s3a/commit/magic/MagicCommitTracker.java    |  23 +-
 .../hadoop/fs/s3a/impl/ContextAccessors.java       |  13 +
 .../hadoop/fs/s3a/impl/HeaderProcessing.java       | 500 +++++++++++++++++++++
 .../fs/s3a/statistics/CountersAndGauges.java       |   3 +-
 .../impl/BondedS3AStatisticsContext.java           |   6 +
 .../tools/hadoop-aws/committer_architecture.md     |  10 +
 .../site/markdown/tools/hadoop-aws/committers.md   |   1 +
 .../hadoop/fs/s3a/ITestS3AMiscOperations.java      |   8 +
 .../hadoop/fs/s3a/ITestS3ARemoteFileChanged.java   |   4 +-
 .../fs/s3a/commit/AbstractITCommitProtocol.java    |  13 +-
 .../fs/s3a/commit/ITestCommitOperations.java       |  83 +++-
 .../s3a/commit/magic/ITestMagicCommitProtocol.java |  41 +-
 .../integration/ITestStagingCommitProtocol.java    |  13 +-
 .../apache/hadoop/fs/s3a/impl/ITestXAttrCost.java  | 219 +++++++++
 .../hadoop/fs/s3a/impl/TestHeaderProcessing.java   | 313 +++++++++++++
 .../fs/s3a/impl/TestPartialDeleteFailures.java     |  10 +-
 27 files changed, 1391 insertions(+), 108 deletions(-)

diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/DurationTrackerFactory.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/DurationTrackerFactory.java
index b1d87c9..641d7e8 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/DurationTrackerFactory.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/DurationTrackerFactory.java
@@ -18,6 +18,8 @@
 
 package org.apache.hadoop.fs.statistics;
 
+import static org.apache.hadoop.fs.statistics.IOStatisticsSupport.stubDurationTracker;
+
 /**
  * Interface for a source of duration tracking.
  *
@@ -36,12 +38,16 @@ public interface DurationTrackerFactory {
    * by the given count.
    *
    * The expected use is within a try-with-resources clause.
+   *
+   * The default implementation returns a stub duration tracker.
    * @param key statistic key prefix
    * @param count  #of times to increment the matching counter in this
    * operation.
    * @return an object to close after an operation completes.
    */
-  DurationTracker trackDuration(String key, long count);
+  default DurationTracker trackDuration(String key, long count) {
+    return stubDurationTracker();
+  }
 
   /**
    * Initiate a duration tracking operation by creating/returning
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StoreStatisticNames.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StoreStatisticNames.java
index 4baf37d..0dd6540 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StoreStatisticNames.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StoreStatisticNames.java
@@ -130,6 +130,24 @@ public final class StoreStatisticNames {
   /** {@value}. */
   public static final String OP_TRUNCATE = "op_truncate";
 
+  /* The XAttr API */
+
+  /** Invoke {@code getXAttrs(Path path)}: {@value}. */
+  public static final String OP_XATTR_GET_MAP = "op_xattr_get_map";
+
+  /** Invoke {@code getXAttr(Path, String)}: {@value}. */
+  public static final String OP_XATTR_GET_NAMED = "op_xattr_get_named";
+
+  /**
+   * Invoke {@code getXAttrs(Path path, List<String> names)}: {@value}.
+   */
+  public static final String OP_XATTR_GET_NAMED_MAP =
+      "op_xattr_get_named_map";
+
+  /** Invoke {@code listXAttrs(Path path)}: {@value}. */
+  public static final String OP_XATTR_LIST = "op_xattr_list";
+
+
   /** {@value}. */
   public static final String DELEGATION_TOKENS_ISSUED
       = "delegation_tokens_issued";
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
index fcaec50..5079ed9 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
@@ -1048,4 +1048,10 @@ public final class Constants {
   public static final String STORE_CAPABILITY_DIRECTORY_MARKER_ACTION_DELETE
       = "fs.s3a.capability.directory.marker.action.delete";
 
+  /**
+   * To comply with the XAttr rules, all headers of the object retrieved
+   * through the getXAttr APIs have the prefix: {@value}.
+   */
+  public static final String XA_HEADER_PREFIX = "header.";
+
 }
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java
index 0fdad21..5784ab8 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java
@@ -447,7 +447,7 @@ class S3ABlockOutputStream extends OutputStream implements
     final PutObjectRequest putObjectRequest = uploadData.hasFile() ?
         writeOperationHelper.createPutObjectRequest(key, uploadData.getFile())
         : writeOperationHelper.createPutObjectRequest(key,
-            uploadData.getUploadStream(), size);
+            uploadData.getUploadStream(), size, null);
     BlockUploadProgress callback =
         new BlockUploadProgress(
             block, progressListener,  now());
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
index 4e1a67f..e6fb8c0 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
@@ -108,6 +108,7 @@ import org.apache.hadoop.fs.s3a.impl.CopyOutcome;
 import org.apache.hadoop.fs.s3a.impl.DeleteOperation;
 import org.apache.hadoop.fs.s3a.impl.DirectoryPolicy;
 import org.apache.hadoop.fs.s3a.impl.DirectoryPolicyImpl;
+import org.apache.hadoop.fs.s3a.impl.HeaderProcessing;
 import org.apache.hadoop.fs.s3a.impl.InternalConstants;
 import org.apache.hadoop.fs.s3a.impl.ListingOperationCallbacks;
 import org.apache.hadoop.fs.s3a.impl.MultiObjectDeleteSupport;
@@ -332,6 +333,11 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
   private DirectoryPolicy directoryPolicy;
 
   /**
+   * Header processing for XAttr.
+   */
+  private HeaderProcessing headerProcessing;
+
+  /**
    * Context accessors for re-use.
    */
   private final ContextAccessors contextAccessors = new ContextAccessorsImpl();
@@ -457,6 +463,8 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
           magicCommitterEnabled ? "is" : "is not");
       committerIntegration = new MagicCommitIntegration(
           this, magicCommitterEnabled);
+      // header processing for rename and magic committer
+      headerProcessing = new HeaderProcessing(createStoreContext());
 
       // instantiate S3 Select support
       selectBinding = new SelectBinding(writeHelper);
@@ -1781,14 +1789,15 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
 
   /**
    * Low-level call to get at the object metadata.
-   * @param path path to the object
+   * @param path path to the object. This will be qualified.
    * @return metadata
    * @throws IOException IO and object access problems.
    */
   @VisibleForTesting
   @Retries.RetryTranslated
   public ObjectMetadata getObjectMetadata(Path path) throws IOException {
-    return getObjectMetadata(path, null, invoker, null);
+    return getObjectMetadata(makeQualified(path), null, invoker,
+        "getObjectMetadata");
   }
 
   /**
@@ -1800,31 +1809,17 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
    * @return metadata
    * @throws IOException IO and object access problems.
    */
-  @VisibleForTesting
   @Retries.RetryTranslated
-  public ObjectMetadata getObjectMetadata(Path path,
+  private ObjectMetadata getObjectMetadata(Path path,
       ChangeTracker changeTracker, Invoker changeInvoker, String operation)
       throws IOException {
     checkNotClosed();
-    return once("getObjectMetadata", path.toString(),
+    String key = pathToKey(path);
+    return once(operation, path.toString(),
         () ->
             // this always does a full HEAD to the object
             getObjectMetadata(
-                pathToKey(path), changeTracker, changeInvoker, operation));
-  }
-
-  /**
-   * Get all the headers of the object of a path, if the object exists.
-   * @param path path to probe
-   * @return an immutable map of object headers.
-   * @throws IOException failure of the query
-   */
-  @Retries.RetryTranslated
-  public Map<String, Object> getObjectHeaders(Path path) throws IOException {
-    LOG.debug("getObjectHeaders({})", path);
-    checkNotClosed();
-    incrementReadOperations();
-    return getObjectMetadata(path).getRawMetadata();
+                key, changeTracker, changeInvoker, operation));
   }
 
   /**
@@ -2021,7 +2016,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
   @Retries.RetryRaw
   @VisibleForTesting
   ObjectMetadata getObjectMetadata(String key) throws IOException {
-    return getObjectMetadata(key, null, invoker,null);
+    return getObjectMetadata(key, null, invoker, "getObjectMetadata");
   }
 
   /**
@@ -4099,59 +4094,8 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
    * @return a copy of {@link ObjectMetadata} with only relevant attributes
    */
   private ObjectMetadata cloneObjectMetadata(ObjectMetadata source) {
-    // This approach may be too brittle, especially if
-    // in future there are new attributes added to ObjectMetadata
-    // that we do not explicitly call to set here
     ObjectMetadata ret = newObjectMetadata(source.getContentLength());
-
-    // Possibly null attributes
-    // Allowing nulls to pass breaks it during later use
-    if (source.getCacheControl() != null) {
-      ret.setCacheControl(source.getCacheControl());
-    }
-    if (source.getContentDisposition() != null) {
-      ret.setContentDisposition(source.getContentDisposition());
-    }
-    if (source.getContentEncoding() != null) {
-      ret.setContentEncoding(source.getContentEncoding());
-    }
-    if (source.getContentMD5() != null) {
-      ret.setContentMD5(source.getContentMD5());
-    }
-    if (source.getContentType() != null) {
-      ret.setContentType(source.getContentType());
-    }
-    if (source.getExpirationTime() != null) {
-      ret.setExpirationTime(source.getExpirationTime());
-    }
-    if (source.getExpirationTimeRuleId() != null) {
-      ret.setExpirationTimeRuleId(source.getExpirationTimeRuleId());
-    }
-    if (source.getHttpExpiresDate() != null) {
-      ret.setHttpExpiresDate(source.getHttpExpiresDate());
-    }
-    if (source.getLastModified() != null) {
-      ret.setLastModified(source.getLastModified());
-    }
-    if (source.getOngoingRestore() != null) {
-      ret.setOngoingRestore(source.getOngoingRestore());
-    }
-    if (source.getRestoreExpirationTime() != null) {
-      ret.setRestoreExpirationTime(source.getRestoreExpirationTime());
-    }
-    if (source.getSSEAlgorithm() != null) {
-      ret.setSSEAlgorithm(source.getSSEAlgorithm());
-    }
-    if (source.getSSECustomerAlgorithm() != null) {
-      ret.setSSECustomerAlgorithm(source.getSSECustomerAlgorithm());
-    }
-    if (source.getSSECustomerKeyMd5() != null) {
-      ret.setSSECustomerKeyMd5(source.getSSECustomerKeyMd5());
-    }
-
-    for (Map.Entry<String, String> e : source.getUserMetadata().entrySet()) {
-      ret.addUserMetadata(e.getKey(), e.getValue());
-    }
+    getHeaderProcessing().cloneObjectMetadata(source, ret);
     return ret;
   }
 
@@ -4383,6 +4327,37 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
   }
 
   /**
+   * Get header processing support.
+   * @return the header processing of this instance.
+   */
+  private HeaderProcessing getHeaderProcessing() {
+    return headerProcessing;
+  }
+
+  @Override
+  public byte[] getXAttr(final Path path, final String name)
+      throws IOException {
+    return getHeaderProcessing().getXAttr(path, name);
+  }
+
+  @Override
+  public Map<String, byte[]> getXAttrs(final Path path) throws IOException {
+    return getHeaderProcessing().getXAttrs(path);
+  }
+
+  @Override
+  public Map<String, byte[]> getXAttrs(final Path path,
+      final List<String> names)
+      throws IOException {
+    return getHeaderProcessing().getXAttrs(path, names);
+  }
+
+  @Override
+  public List<String> listXAttrs(final Path path) throws IOException {
+    return getHeaderProcessing().listXAttrs(path);
+  }
+
+  /**
    * {@inheritDoc}.
    *
    * This implementation is optimized for S3, which can do a bulk listing
@@ -5088,5 +5063,11 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
       return S3AFileSystem.this.makeQualified(path);
     }
 
+    @Override
+    public ObjectMetadata getObjectMetadata(final String key)
+        throws IOException {
+      return once("getObjectMetadata", key, () ->
+          S3AFileSystem.this.getObjectMetadata(key));
+    }
   }
 }
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java
index c25e3b3..5fcc157 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java
@@ -120,7 +120,7 @@ import static org.apache.hadoop.fs.s3a.Statistic.*;
 @InterfaceAudience.Private
 @InterfaceStability.Evolving
 public class S3AInstrumentation implements Closeable, MetricsSource,
-    CountersAndGauges, IOStatisticsSource, DurationTrackerFactory {
+    CountersAndGauges, IOStatisticsSource {
   private static final Logger LOG = LoggerFactory.getLogger(
       S3AInstrumentation.class);
 
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java
index 6709382..0bd2a62 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java
@@ -157,6 +157,24 @@ public enum Statistic {
       "Calls of rename()",
       TYPE_COUNTER),
 
+  /* The XAttr API metrics are all durations */
+  INVOCATION_XATTR_GET_MAP(
+      StoreStatisticNames.OP_XATTR_GET_MAP,
+      "Calls of getXAttrs(Path path)",
+      TYPE_DURATION),
+  INVOCATION_XATTR_GET_NAMED(
+      StoreStatisticNames.OP_XATTR_GET_NAMED,
+      "Calls of getXAttr(Path, String)",
+      TYPE_DURATION),
+  INVOCATION_XATTR_GET_NAMED_MAP(
+      StoreStatisticNames.OP_XATTR_GET_NAMED_MAP,
+      "Calls of xattr()",
+      TYPE_DURATION),
+  INVOCATION_OP_XATTR_LIST(
+      StoreStatisticNames.OP_XATTR_LIST,
+      "Calls of getXAttrs(Path path, List<String> names)",
+      TYPE_DURATION),
+
   /* Object IO */
   OBJECT_COPY_REQUESTS(StoreStatisticNames.OBJECT_COPY_REQUESTS,
       "Object copy requests",
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java
index e75c09c..49a5eb2 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java
@@ -25,6 +25,7 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import com.amazonaws.services.s3.model.AmazonS3Exception;
@@ -172,12 +173,19 @@ public class WriteOperationHelper implements WriteOperations {
    * @param destKey destination key
    * @param inputStream source data.
    * @param length size, if known. Use -1 for not known
+   * @param headers optional map of custom headers.
    * @return the request
    */
   public PutObjectRequest createPutObjectRequest(String destKey,
-      InputStream inputStream, long length) {
+      InputStream inputStream,
+      long length,
+      final Map<String, String> headers) {
+    ObjectMetadata objectMetadata = newObjectMetadata(length);
+    if (headers != null) {
+      objectMetadata.setUserMetadata(headers);
+    }
     return owner.newPutObjectRequest(destKey,
-        newObjectMetadata(length),
+        objectMetadata,
         inputStream);
   }
 
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperations.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperations.java
index 0b33614..2636ed7 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperations.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperations.java
@@ -24,6 +24,7 @@ import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import com.amazonaws.services.s3.model.CompleteMultipartUploadResult;
@@ -77,10 +78,13 @@ public interface WriteOperations {
    * @param destKey destination key
    * @param inputStream source data.
    * @param length size, if known. Use -1 for not known
+   * @param headers optional map of custom headers.
    * @return the request
    */
   PutObjectRequest createPutObjectRequest(String destKey,
-      InputStream inputStream, long length);
+      InputStream inputStream,
+      long length,
+      @Nullable Map<String, String> headers);
 
   /**
    * Create a {@link PutObjectRequest} request to upload a file.
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitConstants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitConstants.java
index 3224a5a..6093996 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitConstants.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitConstants.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.fs.s3a.commit;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 
+import static org.apache.hadoop.fs.s3a.Constants.XA_HEADER_PREFIX;
 import static org.apache.hadoop.mapreduce.lib.output.PathOutputCommitterFactory.COMMITTER_FACTORY_SCHEME_PATTERN;
 
 /**
@@ -316,4 +317,17 @@ public final class CommitConstants {
   public static final boolean DEFAULT_S3A_COMMITTER_GENERATE_UUID =
       false;
 
+  /**
+   * Magic Marker header to declare final file length on magic uploads
+   * marker objects: {@value}.
+   */
+  public static final String X_HEADER_MAGIC_MARKER =
+      "x-hadoop-s3a-magic-data-length";
+
+  /**
+   * XAttr name of magic marker, with "header." prefix: {@value}.
+   */
+  public static final String XA_MAGIC_MARKER = XA_HEADER_PREFIX
+      + X_HEADER_MAGIC_MARKER;
+
 }
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitOperations.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitOperations.java
index c9fb380..4562e0f 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitOperations.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitOperations.java
@@ -26,6 +26,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
@@ -39,6 +40,7 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
@@ -50,6 +52,7 @@ import org.apache.hadoop.fs.s3a.WriteOperationHelper;
 import org.apache.hadoop.fs.s3a.commit.files.PendingSet;
 import org.apache.hadoop.fs.s3a.commit.files.SinglePendingCommit;
 import org.apache.hadoop.fs.s3a.commit.files.SuccessData;
+import org.apache.hadoop.fs.s3a.impl.HeaderProcessing;
 import org.apache.hadoop.fs.s3a.impl.InternalConstants;
 import org.apache.hadoop.fs.s3a.s3guard.BulkOperationState;
 import org.apache.hadoop.fs.s3a.statistics.CommitterStatistics;
@@ -607,6 +610,29 @@ public class CommitOperations implements IOStatisticsSource {
   }
 
   /**
+   * Get the magic file length of a file.
+   * If the FS doesn't support the API, the attribute is missing or
+   * the parse to long fails, then Optional.empty() is returned.
+   * Static for some easier testability.
+   * @param fs filesystem
+   * @param path path
+   * @return either a length or None.
+   * @throws IOException on error
+   * */
+  public static Optional<Long> extractMagicFileLength(FileSystem fs, Path path)
+      throws IOException {
+    byte[] bytes;
+    try {
+      bytes = fs.getXAttr(path, XA_MAGIC_MARKER);
+    } catch (UnsupportedOperationException e) {
+      // FS doesn't support xattr.
+      LOG.debug("Filesystem {} doesn't support XAttr API", fs);
+      return Optional.empty();
+    }
+    return HeaderProcessing.extractXAttrLongValue(bytes);
+  }
+
+  /**
    * Commit context.
    *
    * It is used to manage the final commit sequence where files become
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicCommitTracker.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicCommitTracker.java
index 0f1a0a6..ddaee19 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicCommitTracker.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicCommitTracker.java
@@ -20,7 +20,9 @@ package org.apache.hadoop.fs.s3a.commit.magic;
 
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 import com.amazonaws.services.s3.model.PartETag;
 import com.amazonaws.services.s3.model.PutObjectRequest;
@@ -37,6 +39,8 @@ import org.apache.hadoop.fs.s3a.commit.files.SinglePendingCommit;
 import org.apache.hadoop.fs.statistics.IOStatistics;
 import org.apache.hadoop.fs.statistics.IOStatisticsSnapshot;
 
+import static org.apache.hadoop.fs.s3a.commit.CommitConstants.X_HEADER_MAGIC_MARKER;
+
 /**
  * Put tracker for Magic commits.
  * <p>Important</p>: must not directly or indirectly import a class which
@@ -122,13 +126,6 @@ public class MagicCommitTracker extends PutTracker {
     Preconditions.checkArgument(!parts.isEmpty(),
         "No uploaded parts to save");
 
-    // put a 0-byte file with the name of the original under-magic path
-    PutObjectRequest originalDestPut = writer.createPutObjectRequest(
-        originalDestKey,
-        new ByteArrayInputStream(EMPTY),
-        0);
-    writer.uploadObject(originalDestPut);
-
     // build the commit summary
     SinglePendingCommit commitData = new SinglePendingCommit();
     commitData.touch(System.currentTimeMillis());
@@ -150,9 +147,19 @@ public class MagicCommitTracker extends PutTracker {
     PutObjectRequest put = writer.createPutObjectRequest(
         pendingPartKey,
         new ByteArrayInputStream(bytes),
-        bytes.length);
+        bytes.length, null);
     writer.uploadObject(put);
 
+    // Add the final file length as a header
+    Map<String, String> headers = new HashMap<>();
+    headers.put(X_HEADER_MAGIC_MARKER, Long.toString(bytesWritten));
+    // now put a 0-byte file with the name of the original under-magic path
+    PutObjectRequest originalDestPut = writer.createPutObjectRequest(
+        originalDestKey,
+        new ByteArrayInputStream(EMPTY),
+        0,
+        headers);
+    writer.uploadObject(originalDestPut);
     return false;
   }
 
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ContextAccessors.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ContextAccessors.java
index d39c649..27ac7de 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ContextAccessors.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ContextAccessors.java
@@ -22,6 +22,8 @@ import java.io.File;
 import java.io.IOException;
 import java.nio.file.AccessDeniedException;
 
+import com.amazonaws.services.s3.model.ObjectMetadata;
+
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.s3a.Retries;
 
@@ -81,4 +83,15 @@ public interface ContextAccessors {
    * @return possibly new path.
    */
   Path makeQualified(Path path);
+
+  /**
+   * Retrieve the object metadata.
+   *
+   * @param key key to retrieve.
+   * @return metadata
+   * @throws IOException IO and object access problems.
+   */
+  @Retries.RetryTranslated
+  ObjectMetadata getObjectMetadata(String key) throws IOException;
+
 }
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/HeaderProcessing.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/HeaderProcessing.java
new file mode 100644
index 0000000..5efec2b
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/HeaderProcessing.java
@@ -0,0 +1,500 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.impl;
+
+import javax.annotation.Nullable;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.TreeMap;
+
+import com.amazonaws.services.s3.Headers;
+import com.amazonaws.services.s3.model.ObjectMetadata;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.Statistic;
+import org.apache.hadoop.fs.s3a.statistics.S3AStatisticsContext;
+
+import static org.apache.hadoop.fs.s3a.Constants.XA_HEADER_PREFIX;
+import static org.apache.hadoop.fs.s3a.Statistic.INVOCATION_OP_XATTR_LIST;
+import static org.apache.hadoop.fs.s3a.Statistic.INVOCATION_XATTR_GET_MAP;
+import static org.apache.hadoop.fs.s3a.Statistic.INVOCATION_XATTR_GET_NAMED;
+import static org.apache.hadoop.fs.s3a.Statistic.INVOCATION_XATTR_GET_NAMED_MAP;
+import static org.apache.hadoop.fs.s3a.commit.CommitConstants.X_HEADER_MAGIC_MARKER;
+import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDuration;
+
+/**
+ * Part of the S3A FS where object headers are
+ * processed.
+ * Implements all the various XAttr read operations.
+ * Those APIs all expect byte arrays back.
+ * Metadata cloning is also implemented here, so as
+ * to stay in sync with custom header logic.
+ *
+ * The standard header names are extracted from the AWS SDK.
+ * The S3A connector does not (currently) support setting them,
+ * though it would be possible to do so through the createFile()
+ * builder API.
+ */
+public class HeaderProcessing extends AbstractStoreOperation {
+
+  private static final Logger LOG = LoggerFactory.getLogger(
+      HeaderProcessing.class);
+
+  /**
+   * An empty buffer.
+   */
+  private static final byte[] EMPTY = new byte[0];
+
+
+  /**
+   * Standard HTTP header found on some S3 objects: {@value}.
+   */
+  public static final String XA_CACHE_CONTROL =
+      XA_HEADER_PREFIX + Headers.CACHE_CONTROL;
+  /**
+   * Standard HTTP header found on some S3 objects: {@value}.
+   */
+  public static final String XA_CONTENT_DISPOSITION =
+      XA_HEADER_PREFIX + Headers.CONTENT_DISPOSITION;
+
+  /**
+   * Standard HTTP header found on some S3 objects: {@value}.
+   */
+  public static final String XA_CONTENT_ENCODING =
+      XA_HEADER_PREFIX + Headers.CONTENT_ENCODING;
+
+  /**
+   * Standard HTTP header found on some S3 objects: {@value}.
+   */
+  public static final String XA_CONTENT_LANGUAGE =
+      XA_HEADER_PREFIX + Headers.CONTENT_LANGUAGE;
+
+  /**
+   * Length XAttr: {@value}.
+   */
+  public static final String XA_CONTENT_LENGTH =
+      XA_HEADER_PREFIX + Headers.CONTENT_LENGTH;
+
+  /**
+   * Standard HTTP header found on some S3 objects: {@value}.
+   */
+  public static final String XA_CONTENT_MD5 =
+      XA_HEADER_PREFIX + Headers.CONTENT_MD5;
+
+  /**
+   * Content range: {@value}.
+   * This is returned on GET requests with ranges.
+   */
+  public static final String XA_CONTENT_RANGE =
+      XA_HEADER_PREFIX + Headers.CONTENT_RANGE;
+
+  /**
+   * Content type: may be set when uploading.
+   * {@value}.
+   */
+  public static final String XA_CONTENT_TYPE =
+      XA_HEADER_PREFIX + Headers.CONTENT_TYPE;
+
+  /**
+   * Etag Header {@value}.
+   * Also accessible via {@code ObjectMetadata.getEtag()}, where
+   * it can be retrieved via {@code getFileChecksum(path)} if
+   * the S3A connector is enabled.
+   */
+  public static final String XA_ETAG = XA_HEADER_PREFIX + Headers.ETAG;
+
+
+  /**
+   * last modified XAttr: {@value}.
+   */
+  public static final String XA_LAST_MODIFIED =
+      XA_HEADER_PREFIX + Headers.LAST_MODIFIED;
+
+  /* AWS Specific Headers. May not be found on other S3 endpoints. */
+
+  /**
+   * object archive status; empty if not on S3 Glacier
+   * (i.e all normal files should be non-archived as
+   * S3A and applications don't handle archived data)
+   * Value {@value}.
+   */
+  public static final String XA_ARCHIVE_STATUS =
+      XA_HEADER_PREFIX + Headers.ARCHIVE_STATUS;
+
+  /**
+   *  Object legal hold status. {@value}.
+   */
+  public static final String XA_OBJECT_LOCK_LEGAL_HOLD_STATUS =
+      XA_HEADER_PREFIX + Headers.OBJECT_LOCK_LEGAL_HOLD_STATUS;
+
+  /**
+   *  Object lock mode. {@value}.
+   */
+  public static final String XA_OBJECT_LOCK_MODE =
+      XA_HEADER_PREFIX + Headers.OBJECT_LOCK_MODE;
+
+  /**
+   *  ISO8601 expiry date of object lock hold. {@value}.
+   */
+  public static final String XA_OBJECT_LOCK_RETAIN_UNTIL_DATE =
+      XA_HEADER_PREFIX + Headers.OBJECT_LOCK_RETAIN_UNTIL_DATE;
+
+  /**
+   *  Replication status for cross-region replicated objects. {@value}.
+   */
+  public static final String XA_OBJECT_REPLICATION_STATUS =
+      XA_HEADER_PREFIX + Headers.OBJECT_REPLICATION_STATUS;
+
+  /**
+   *  Version ID; empty for non-versioned buckets/data. {@value}.
+   */
+  public static final String XA_S3_VERSION_ID =
+      XA_HEADER_PREFIX + Headers.S3_VERSION_ID;
+
+  /**
+   * The server-side encryption algorithm to use
+   * with AWS-managed keys: {@value}.
+   */
+  public static final String XA_SERVER_SIDE_ENCRYPTION =
+      XA_HEADER_PREFIX + Headers.SERVER_SIDE_ENCRYPTION;
+
+  /**
+   * Storage Class XAttr: {@value}.
+   */
+  public static final String XA_STORAGE_CLASS =
+      XA_HEADER_PREFIX + Headers.STORAGE_CLASS;
+
+  /**
+   * Standard headers which are retrieved from HEAD Requests
+   * and set as XAttrs if the response included the relevant header.
+   */
+  public static final String[] XA_STANDARD_HEADERS = {
+      /* HTTP standard headers */
+      XA_CACHE_CONTROL,
+      XA_CONTENT_DISPOSITION,
+      XA_CONTENT_ENCODING,
+      XA_CONTENT_LANGUAGE,
+      XA_CONTENT_LENGTH,
+      XA_CONTENT_MD5,
+      XA_CONTENT_RANGE,
+      XA_CONTENT_TYPE,
+      XA_ETAG,
+      XA_LAST_MODIFIED,
+      /* aws headers */
+      XA_ARCHIVE_STATUS,
+      XA_OBJECT_LOCK_LEGAL_HOLD_STATUS,
+      XA_OBJECT_LOCK_MODE,
+      XA_OBJECT_LOCK_RETAIN_UNTIL_DATE,
+      XA_OBJECT_REPLICATION_STATUS,
+      XA_S3_VERSION_ID,
+      XA_SERVER_SIDE_ENCRYPTION,
+      XA_STORAGE_CLASS,
+  };
+
+  /**
+   * Content type of generic binary objects.
+   * This is the default for uploaded objects.
+   */
+  public static final String CONTENT_TYPE_OCTET_STREAM =
+      "application/octet-stream";
+
+  /**
+   * XML content type : {@value}.
+   * This is application/xml, not text/xml, and is
+   * what a HEAD of / returns as the type of a root path.
+   */
+  public static final String CONTENT_TYPE_APPLICATION_XML =
+      "application/xml";
+
+  /**
+   * Construct.
+   * @param storeContext store context.
+   */
+  public HeaderProcessing(final StoreContext storeContext) {
+    super(storeContext);
+  }
+
+  /**
+   * Query the store, get all the headers into a map. Each Header
+   * has the "header." prefix.
+   * Caller must have read access.
+   * The value of each header is the string value of the object
+   * UTF-8 encoded.
+   * @param path path of object.
+   * @param statistic statistic to use for duration tracking.
+   * @return the headers
+   * @throws IOException failure, including file not found.
+   */
+  private Map<String, byte[]> retrieveHeaders(
+      final Path path,
+      final Statistic statistic) throws IOException {
+    StoreContext context = getStoreContext();
+    ContextAccessors accessors = context.getContextAccessors();
+    String objectKey = accessors.pathToKey(path);
+    ObjectMetadata md;
+    String symbol = statistic.getSymbol();
+    S3AStatisticsContext instrumentation = context.getInstrumentation();
+    try {
+      md = trackDuration(instrumentation, symbol, () ->
+              accessors.getObjectMetadata(objectKey));
+    } catch (FileNotFoundException e) {
+      // no entry. It could be a directory, so try again.
+      md = trackDuration(instrumentation, symbol, () ->
+          accessors.getObjectMetadata(objectKey + "/"));
+    }
+    // all user metadata
+    Map<String, String> rawHeaders = md.getUserMetadata();
+    Map<String, byte[]> headers = new TreeMap<>();
+    rawHeaders.forEach((key, value) ->
+        headers.put(XA_HEADER_PREFIX + key, encodeBytes(value)));
+
+    // and add the usual content length &c, if set
+    maybeSetHeader(headers, XA_CACHE_CONTROL,
+        md.getCacheControl());
+    maybeSetHeader(headers, XA_CONTENT_DISPOSITION,
+        md.getContentDisposition());
+    maybeSetHeader(headers, XA_CONTENT_ENCODING,
+        md.getContentEncoding());
+    maybeSetHeader(headers, XA_CONTENT_LANGUAGE,
+        md.getContentLanguage());
+    maybeSetHeader(headers, XA_CONTENT_LENGTH,
+        md.getContentLength());
+    maybeSetHeader(headers, XA_CONTENT_MD5,
+        md.getContentMD5());
+    maybeSetHeader(headers, XA_CONTENT_RANGE,
+        md.getContentRange());
+    maybeSetHeader(headers, XA_CONTENT_TYPE,
+        md.getContentType());
+    maybeSetHeader(headers, XA_ETAG,
+        md.getETag());
+    maybeSetHeader(headers, XA_LAST_MODIFIED,
+        md.getLastModified());
+
+    // AWS custom headers
+    maybeSetHeader(headers, XA_ARCHIVE_STATUS,
+        md.getArchiveStatus());
+    maybeSetHeader(headers, XA_OBJECT_LOCK_LEGAL_HOLD_STATUS,
+        md.getObjectLockLegalHoldStatus());
+    maybeSetHeader(headers, XA_OBJECT_LOCK_MODE,
+        md.getObjectLockMode());
+    maybeSetHeader(headers, XA_OBJECT_LOCK_RETAIN_UNTIL_DATE,
+        md.getObjectLockRetainUntilDate());
+    maybeSetHeader(headers, XA_OBJECT_REPLICATION_STATUS,
+        md.getReplicationStatus());
+    maybeSetHeader(headers, XA_S3_VERSION_ID,
+        md.getVersionId());
+    maybeSetHeader(headers, XA_SERVER_SIDE_ENCRYPTION,
+        md.getSSEAlgorithm());
+    maybeSetHeader(headers, XA_STORAGE_CLASS,
+        md.getStorageClass());
+    maybeSetHeader(headers, XA_STORAGE_CLASS,
+        md.getReplicationStatus());
+    return headers;
+  }
+
+  /**
+   * Set a header if the value is non null.
+   *
+   * @param headers header map
+   * @param name header name
+   * @param value value to encode.
+   */
+  private void maybeSetHeader(
+      final Map<String, byte[]> headers,
+      final String name,
+      final Object value) {
+    if (value != null) {
+      headers.put(name, encodeBytes(value));
+    }
+  }
+
+  /**
+   * Stringify an object and return its bytes in UTF-8 encoding.
+   * @param s source
+   * @return encoded object or an empty buffer
+   */
+  public static byte[] encodeBytes(@Nullable Object s) {
+    return s == null
+        ? EMPTY
+        : s.toString().getBytes(StandardCharsets.UTF_8);
+  }
+
+  /**
+   * Get the string value from the bytes.
+   * if null : return null, otherwise the UTF-8 decoded
+   * bytes.
+   * @param bytes source bytes
+   * @return decoded value
+   */
+  public static String decodeBytes(byte[] bytes) {
+    return bytes == null
+        ? null
+        : new String(bytes, StandardCharsets.UTF_8);
+  }
+
+  /**
+   * Get an XAttr name and value for a file or directory.
+   * @param path Path to get extended attribute
+   * @param name XAttr name.
+   * @return byte[] XAttr value or null
+   * @throws IOException IO failure
+   */
+  public byte[] getXAttr(Path path, String name) throws IOException {
+    return retrieveHeaders(path, INVOCATION_XATTR_GET_NAMED).get(name);
+  }
+
+  /**
+   * See {@code FileSystem.getXAttrs(path}.
+   *
+   * @param path Path to get extended attributes
+   * @return Map describing the XAttrs of the file or directory
+   * @throws IOException IO failure
+   */
+  public Map<String, byte[]> getXAttrs(Path path) throws IOException {
+    return retrieveHeaders(path, INVOCATION_XATTR_GET_MAP);
+  }
+
+  /**
+   * See {@code FileSystem.listXAttrs(path)}.
+   * @param path Path to get extended attributes
+   * @return List of supported XAttrs
+   * @throws IOException IO failure
+   */
+  public List<String> listXAttrs(final Path path) throws IOException {
+    return new ArrayList<>(retrieveHeaders(path, INVOCATION_OP_XATTR_LIST)
+        .keySet());
+  }
+
+  /**
+   * See {@code FileSystem.getXAttrs(path, names}.
+   * @param path Path to get extended attributes
+   * @param names XAttr names.
+   * @return Map describing the XAttrs of the file or directory
+   * @throws IOException IO failure
+   */
+  public Map<String, byte[]> getXAttrs(Path path, List<String> names)
+      throws IOException {
+    Map<String, byte[]> headers = retrieveHeaders(path,
+        INVOCATION_XATTR_GET_NAMED_MAP);
+    Map<String, byte[]> result = new TreeMap<>();
+    headers.entrySet().stream()
+        .filter(entry -> names.contains(entry.getKey()))
+        .forEach(entry -> result.put(entry.getKey(), entry.getValue()));
+    return result;
+  }
+
+  /**
+   * Convert an XAttr byte array to a long.
+   * testability.
+   * @param data data to parse
+   * @return either a length or none
+   */
+  public static Optional<Long> extractXAttrLongValue(byte[] data) {
+    String xAttr;
+    xAttr = HeaderProcessing.decodeBytes(data);
+    if (StringUtils.isNotEmpty(xAttr)) {
+      try {
+        long l = Long.parseLong(xAttr);
+        if (l >= 0) {
+          return Optional.of(l);
+        }
+      } catch (NumberFormatException ex) {
+        LOG.warn("Not a number: {}", xAttr, ex);
+      }
+    }
+    // missing/empty header or parse failure.
+    return Optional.empty();
+  }
+
+  /**
+   * Creates a copy of the passed {@link ObjectMetadata}.
+   * Does so without using the {@link ObjectMetadata#clone()} method,
+   * to avoid copying unnecessary headers.
+   * This operation does not copy the {@code X_HEADER_MAGIC_MARKER}
+   * header to avoid confusion. If a marker file is renamed,
+   * it loses information about any remapped file.
+   * If new fields are added to ObjectMetadata which are not
+   * present in the user metadata headers, they will not be picked
+   * up or cloned unless this operation is updated.
+   * @param source the {@link ObjectMetadata} to copy
+   * @param dest the metadata to update; this is the return value.
+   */
+  public void cloneObjectMetadata(ObjectMetadata source,
+      ObjectMetadata dest) {
+
+    // Possibly null attributes
+    // Allowing nulls to pass breaks it during later use
+    if (source.getCacheControl() != null) {
+      dest.setCacheControl(source.getCacheControl());
+    }
+    if (source.getContentDisposition() != null) {
+      dest.setContentDisposition(source.getContentDisposition());
+    }
+    if (source.getContentEncoding() != null) {
+      dest.setContentEncoding(source.getContentEncoding());
+    }
+    if (source.getContentMD5() != null) {
+      dest.setContentMD5(source.getContentMD5());
+    }
+    if (source.getContentType() != null) {
+      dest.setContentType(source.getContentType());
+    }
+    if (source.getExpirationTime() != null) {
+      dest.setExpirationTime(source.getExpirationTime());
+    }
+    if (source.getExpirationTimeRuleId() != null) {
+      dest.setExpirationTimeRuleId(source.getExpirationTimeRuleId());
+    }
+    if (source.getHttpExpiresDate() != null) {
+      dest.setHttpExpiresDate(source.getHttpExpiresDate());
+    }
+    if (source.getLastModified() != null) {
+      dest.setLastModified(source.getLastModified());
+    }
+    if (source.getOngoingRestore() != null) {
+      dest.setOngoingRestore(source.getOngoingRestore());
+    }
+    if (source.getRestoreExpirationTime() != null) {
+      dest.setRestoreExpirationTime(source.getRestoreExpirationTime());
+    }
+    if (source.getSSEAlgorithm() != null) {
+      dest.setSSEAlgorithm(source.getSSEAlgorithm());
+    }
+    if (source.getSSECustomerAlgorithm() != null) {
+      dest.setSSECustomerAlgorithm(source.getSSECustomerAlgorithm());
+    }
+    if (source.getSSECustomerKeyMd5() != null) {
+      dest.setSSECustomerKeyMd5(source.getSSECustomerKeyMd5());
+    }
+
+    // copy user metadata except the magic marker header.
+    source.getUserMetadata().entrySet().stream()
+        .filter(e -> !e.getKey().equals(X_HEADER_MAGIC_MARKER))
+        .forEach(e -> dest.addUserMetadata(e.getKey(), e.getValue()));
+  }
+
+}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/CountersAndGauges.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/CountersAndGauges.java
index 61cc033..f9093ff 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/CountersAndGauges.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/CountersAndGauges.java
@@ -21,11 +21,12 @@ package org.apache.hadoop.fs.s3a.statistics;
 import java.time.Duration;
 
 import org.apache.hadoop.fs.s3a.Statistic;
+import org.apache.hadoop.fs.statistics.DurationTrackerFactory;
 
 /**
  * This is the foundational API for collecting S3A statistics.
  */
-public interface CountersAndGauges {
+public interface CountersAndGauges extends DurationTrackerFactory {
 
   /**
    * Increment a specific counter.
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/impl/BondedS3AStatisticsContext.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/impl/BondedS3AStatisticsContext.java
index 006eb24..51bb4af 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/impl/BondedS3AStatisticsContext.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/impl/BondedS3AStatisticsContext.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics;
 import org.apache.hadoop.fs.s3a.statistics.S3AMultipartUploaderStatistics;
 import org.apache.hadoop.fs.s3a.statistics.S3AStatisticsContext;
 import org.apache.hadoop.fs.s3a.statistics.StatisticsFromAwsSdk;
+import org.apache.hadoop.fs.statistics.DurationTracker;
 
 /**
  * An S3A statistics context which is bonded to a
@@ -210,6 +211,11 @@ public class BondedS3AStatisticsContext implements S3AStatisticsContext {
     return new S3AMultipartUploaderStatisticsImpl(this::incrementCounter);
   }
 
+  @Override
+  public DurationTracker trackDuration(final String key, final long count) {
+    return getInstrumentation().trackDuration(key, count);
+  }
+
   /**
    * This is the interface which an integration source must implement
    * for the integration.
diff --git a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/committer_architecture.md b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/committer_architecture.md
index 48d75dc..048f08c 100644
--- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/committer_architecture.md
+++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/committer_architecture.md
@@ -1337,6 +1337,16 @@ On `close()`, summary data would be written to the file
 `/results/latest/__magic/job400_1/task_01_01/latest.orc.lzo.pending`.
 This would contain the upload ID and all the parts and etags of uploaded data.
 
+A marker file is also created, so that code which verifies that a newly created file
+exists does not fail.
+1. These marker files are zero bytes long.
+1. They declare the full length of the final file in the HTTP header
+   `x-hadoop-s3a-magic-data-length`.
+1. A call to `getXAttr("header.x-hadoop-s3a-magic-data-length")` will return a
+  string containing the number of bytes in the data uploaded.
+
+This is needed so that the Spark write-tracking code can report how much data
+has been created.
 
 #### Task commit
 
diff --git a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/committers.md b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/committers.md
index 0e86f52..d4292df 100644
--- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/committers.md
+++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/committers.md
@@ -360,6 +360,7 @@ However, it has extra requirements of the filesystem
 1. The S3A client must be configured to recognize interactions
 with the magic directories and treat them specially.
 
+Now that Amazon S3 is consistent, the magic committer is enabled by default.
 
 It's also not been field tested to the extent of Netflix's committer; consider
 it the least mature of the committers.
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMiscOperations.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMiscOperations.java
index 2f0599d..e6ebfba 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMiscOperations.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMiscOperations.java
@@ -29,6 +29,7 @@ import com.amazonaws.services.s3.AmazonS3;
 import com.amazonaws.services.s3.model.GetBucketEncryptionResult;
 import com.amazonaws.services.s3.model.ObjectMetadata;
 import com.amazonaws.services.s3.model.PutObjectRequest;
+import org.assertj.core.api.Assertions;
 import org.junit.Assume;
 import org.junit.Test;
 
@@ -47,6 +48,7 @@ import static org.apache.hadoop.fs.contract.ContractTestUtils.touch;
 import static org.apache.hadoop.fs.s3a.Constants.SERVER_SIDE_ENCRYPTION_ALGORITHM;
 import static org.apache.hadoop.fs.s3a.Constants.SERVER_SIDE_ENCRYPTION_KEY;
 import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
+import static org.apache.hadoop.fs.s3a.impl.HeaderProcessing.XA_ETAG;
 import static org.hamcrest.Matchers.nullValue;
 
 /**
@@ -171,6 +173,9 @@ public class ITestS3AMiscOperations extends AbstractS3ATestBase {
     assertNotEquals("file 1 checksum", 0, checksum1.getLength());
     assertEquals("checksums of empty files", checksum1,
         fs.getFileChecksum(touchFile("file2"), 0));
+    Assertions.assertThat(fs.getXAttr(file1, XA_ETAG))
+        .describedAs("etag from xattr")
+        .isEqualTo(checksum1.getBytes());
   }
 
   /**
@@ -222,6 +227,9 @@ public class ITestS3AMiscOperations extends AbstractS3ATestBase {
     createFile(fs, file4, true,
         "hello, world".getBytes(StandardCharsets.UTF_8));
     assertNotEquals(checksum2, fs.getFileChecksum(file4, 0));
+    Assertions.assertThat(fs.getXAttr(file3, XA_ETAG))
+        .describedAs("etag from xattr")
+        .isEqualTo(checksum1.getBytes());
   }
 
   /**
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ARemoteFileChanged.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ARemoteFileChanged.java
index 66ec8ff..adcf578 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ARemoteFileChanged.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ARemoteFileChanged.java
@@ -272,7 +272,9 @@ public class ITestS3ARemoteFileChanged extends AbstractS3ATestBase {
   @Override
   public void teardown() throws Exception {
     // restore the s3 client so there's no mocking interfering with the teardown
-    originalS3Client.ifPresent(fs::setAmazonS3Client);
+    if (fs != null) {
+      originalS3Client.ifPresent(fs::setAmazonS3Client);
+    }
     super.teardown();
   }
 
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractITCommitProtocol.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractITCommitProtocol.java
index 1108194..14207e8 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractITCommitProtocol.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractITCommitProtocol.java
@@ -1335,11 +1335,12 @@ public abstract class AbstractITCommitProtocol extends AbstractCommitITest {
         = outputFormat.getRecordWriter(tContext);
     IntWritable iw = new IntWritable(1);
     recordWriter.write(iw, iw);
+    long expectedLength = 4;
     Path dest = recordWriter.getDest();
-    validateTaskAttemptPathDuringWrite(dest);
+    validateTaskAttemptPathDuringWrite(dest, expectedLength);
     recordWriter.close(tContext);
     // at this point
-    validateTaskAttemptPathAfterWrite(dest);
+    validateTaskAttemptPathAfterWrite(dest, expectedLength);
     assertTrue("Committer does not have data to commit " + committer,
         committer.needsTaskCommit(tContext));
     commitTask(committer, tContext);
@@ -1750,9 +1751,11 @@ public abstract class AbstractITCommitProtocol extends AbstractCommitITest {
    * Validate the path of a file being written to during the write
    * itself.
    * @param p path
+   * @param expectedLength
    * @throws IOException IO failure
    */
-  protected void validateTaskAttemptPathDuringWrite(Path p) throws IOException {
+  protected void validateTaskAttemptPathDuringWrite(Path p,
+      final long expectedLength) throws IOException {
 
   }
 
@@ -1760,9 +1763,11 @@ public abstract class AbstractITCommitProtocol extends AbstractCommitITest {
    * Validate the path of a file being written to after the write
    * operation has completed.
    * @param p path
+   * @param expectedLength
    * @throws IOException IO failure
    */
-  protected void validateTaskAttemptPathAfterWrite(Path p) throws IOException {
+  protected void validateTaskAttemptPathAfterWrite(Path p,
+      final long expectedLength) throws IOException {
 
   }
 
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestCommitOperations.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestCommitOperations.java
index 978f08c..b025f6f 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestCommitOperations.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestCommitOperations.java
@@ -26,6 +26,7 @@ import java.util.List;
 
 import com.amazonaws.services.s3.model.PartETag;
 import org.apache.hadoop.thirdparty.com.google.common.collect.Lists;
+import org.assertj.core.api.Assertions;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -52,6 +53,7 @@ import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
 import static org.apache.hadoop.fs.contract.ContractTestUtils.*;
 import static org.apache.hadoop.fs.s3a.S3ATestUtils.*;
 import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*;
+import static org.apache.hadoop.fs.s3a.commit.CommitOperations.extractMagicFileLength;
 import static org.apache.hadoop.fs.s3a.commit.CommitUtils.*;
 import static org.apache.hadoop.fs.s3a.commit.MagicCommitPaths.*;
 import static org.apache.hadoop.fs.s3a.Constants.*;
@@ -216,13 +218,13 @@ public class ITestCommitOperations extends AbstractCommitITest {
 
   @Test
   public void testCommitEmptyFile() throws Throwable {
-    describe("create then commit an empty file");
+    describe("create then commit an empty magic file");
     createCommitAndVerify("empty-commit.txt", new byte[0]);
   }
 
   @Test
   public void testCommitSmallFile() throws Throwable {
-    describe("create then commit an empty file");
+    describe("create then commit a small magic file");
     createCommitAndVerify("small-commit.txt", DATASET);
   }
 
@@ -288,6 +290,64 @@ public class ITestCommitOperations extends AbstractCommitITest {
     commit("child.txt", pendingChildPath, expectedDestPath, 0, 0);
   }
 
+  /**
+   * Verify that that when a marker file is renamed, its
+   * magic marker attribute is lost.
+   */
+  @Test
+  public void testMarkerFileRename()
+      throws Exception {
+    S3AFileSystem fs = getFileSystem();
+    Path destFile = methodPath();
+    Path destDir = destFile.getParent();
+    fs.delete(destDir, true);
+    Path magicDest = makeMagic(destFile);
+    Path magicDir = magicDest.getParent();
+    fs.mkdirs(magicDir);
+
+    // use the builder API to verify it works exactly the
+    // same.
+    try (FSDataOutputStream stream = fs.createFile(magicDest)
+        .overwrite(true)
+        .recursive()
+        .build()) {
+      assertIsMagicStream(stream);
+      stream.write(DATASET);
+    }
+    Path magic2 = new Path(magicDir, "magic2");
+    // rename the marker
+    fs.rename(magicDest, magic2);
+
+    // the renamed file has no header
+    Assertions.assertThat(extractMagicFileLength(fs, magic2))
+        .describedAs("XAttribute " + XA_MAGIC_MARKER + " of " + magic2)
+        .isEmpty();
+    // abort the upload, which is driven by the .pending files
+    // there must be 1 deleted file; during test debugging with aborted
+    // runs there may be more.
+    Assertions.assertThat(newCommitOperations()
+        .abortPendingUploadsUnderPath(destDir))
+        .describedAs("Aborting all pending uploads under %s", destDir)
+        .isGreaterThanOrEqualTo(1);
+  }
+
+  /**
+   * Assert that an output stream is magic.
+   * @param stream stream to probe.
+   */
+  protected void assertIsMagicStream(final FSDataOutputStream stream) {
+    Assertions.assertThat(stream.hasCapability(STREAM_CAPABILITY_MAGIC_OUTPUT))
+        .describedAs("Stream capability %s in stream %s",
+            STREAM_CAPABILITY_MAGIC_OUTPUT, stream)
+        .isTrue();
+  }
+
+  /**
+   * Create a file through the magic commit mechanism.
+   * @param filename file to create (with __magic path.)
+   * @param data data to write
+   * @throws Exception failure
+   */
   private void createCommitAndVerify(String filename, byte[] data)
       throws Exception {
     S3AFileSystem fs = getFileSystem();
@@ -295,19 +355,30 @@ public class ITestCommitOperations extends AbstractCommitITest {
     fs.delete(destFile.getParent(), true);
     Path magicDest = makeMagic(destFile);
     assertPathDoesNotExist("Magic file should not exist", magicDest);
+    long dataSize = data != null ? data.length : 0;
     try(FSDataOutputStream stream = fs.create(magicDest, true)) {
-      assertTrue(stream.hasCapability(STREAM_CAPABILITY_MAGIC_OUTPUT));
-      if (data != null && data.length > 0) {
+      assertIsMagicStream(stream);
+      if (dataSize > 0) {
         stream.write(data);
       }
       stream.close();
     }
     FileStatus status = getFileStatusEventually(fs, magicDest,
         CONSISTENCY_WAIT);
-    assertEquals("Non empty marker file: " + status, 0, status.getLen());
-
+    assertEquals("Magic marker file is not zero bytes: " + status,
+        0, 0);
+    Assertions.assertThat(extractMagicFileLength(fs,
+        magicDest))
+        .describedAs("XAttribute " + XA_MAGIC_MARKER + " of " + magicDest)
+        .isNotEmpty()
+        .hasValue(dataSize);
     commit(filename, destFile, HIGH_THROTTLE, 0);
     verifyFileContents(fs, destFile, data);
+    // the destination file doesn't have the attribute
+    Assertions.assertThat(extractMagicFileLength(fs,
+        destFile))
+        .describedAs("XAttribute " + XA_MAGIC_MARKER + " of " + destFile)
+        .isEmpty();
   }
 
   /**
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestMagicCommitProtocol.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestMagicCommitProtocol.java
index f6d6307..7ee1833 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestMagicCommitProtocol.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestMagicCommitProtocol.java
@@ -20,17 +20,21 @@ package org.apache.hadoop.fs.s3a.commit.magic;
 
 import java.io.IOException;
 import java.net.URI;
+import java.util.List;
 
 import org.assertj.core.api.Assertions;
 import org.junit.Test;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.contract.ContractTestUtils;
+import org.apache.hadoop.fs.s3a.S3AFileSystem;
 import org.apache.hadoop.fs.s3a.commit.AbstractITCommitProtocol;
 import org.apache.hadoop.fs.s3a.commit.AbstractS3ACommitter;
 import org.apache.hadoop.fs.s3a.commit.CommitConstants;
+import org.apache.hadoop.fs.s3a.commit.CommitOperations;
 import org.apache.hadoop.fs.s3a.commit.CommitUtils;
 import org.apache.hadoop.fs.s3a.commit.CommitterFaultInjection;
 import org.apache.hadoop.fs.s3a.commit.CommitterFaultInjectionImpl;
@@ -39,6 +43,7 @@ import org.apache.hadoop.mapreduce.JobStatus;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
 
+import static org.apache.hadoop.fs.s3a.S3AUtils.listAndFilter;
 import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*;
 import static org.hamcrest.CoreMatchers.containsString;
 
@@ -107,18 +112,44 @@ public class ITestMagicCommitProtocol extends AbstractITCommitProtocol {
     return new CommitterWithFailedThenSucceed(getOutDir(), tContext);
   }
 
-  protected void validateTaskAttemptPathDuringWrite(Path p) throws IOException {
+  protected void validateTaskAttemptPathDuringWrite(Path p,
+      final long expectedLength) throws IOException {
     String pathStr = p.toString();
     assertTrue("not magic " + pathStr,
         pathStr.contains(MAGIC));
     assertPathDoesNotExist("task attempt visible", p);
   }
 
-  protected void validateTaskAttemptPathAfterWrite(Path p) throws IOException {
-    FileStatus st = getFileSystem().getFileStatus(p);
-    assertEquals("file length in " + st, 0, st.getLen());
-    Path pendingFile = new Path(p.toString() + PENDING_SUFFIX);
+  protected void validateTaskAttemptPathAfterWrite(Path marker,
+      final long expectedLength) throws IOException {
+    // the pending file exists
+    Path pendingFile = new Path(marker.toString() + PENDING_SUFFIX);
     assertPathExists("pending file", pendingFile);
+    S3AFileSystem fs = getFileSystem();
+
+    // THIS SEQUENCE MUST BE RUN IN ORDER ON A S3GUARDED
+    // STORE
+    // if you list the parent dir and find the marker, it
+    // is really 0 bytes long
+    String name = marker.getName();
+    List<LocatedFileStatus> filtered = listAndFilter(fs,
+        marker.getParent(), false,
+        (path) -> path.getName().equals(name));
+    Assertions.assertThat(filtered)
+        .hasSize(1);
+    Assertions.assertThat(filtered.get(0))
+        .matches(lst -> lst.getLen() == 0,
+            "Listing should return 0 byte length");
+
+    // marker file is empty
+    FileStatus st = fs.getFileStatus(marker);
+    assertEquals("file length in " + st, 0, st.getLen());
+    // xattr header
+    Assertions.assertThat(CommitOperations.extractMagicFileLength(fs,
+        marker))
+        .describedAs("XAttribute " + XA_MAGIC_MARKER)
+        .isNotEmpty()
+        .hasValue(expectedLength);
   }
 
   /**
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestStagingCommitProtocol.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestStagingCommitProtocol.java
index a4dface..826c3cd 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestStagingCommitProtocol.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestStagingCommitProtocol.java
@@ -23,6 +23,7 @@ import java.util.UUID;
 
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.contract.ContractTestUtils;
@@ -113,14 +114,20 @@ public class ITestStagingCommitProtocol extends AbstractITCommitProtocol {
         IOException.class);
   }
 
-  protected void validateTaskAttemptPathDuringWrite(Path p) throws IOException {
+  protected void validateTaskAttemptPathDuringWrite(Path p,
+      final long expectedLength) throws IOException {
     // this is expected to be local FS
     ContractTestUtils.assertPathExists(getLocalFS(), "task attempt", p);
   }
 
-  protected void validateTaskAttemptPathAfterWrite(Path p) throws IOException {
+  protected void validateTaskAttemptPathAfterWrite(Path p,
+      final long expectedLength) throws IOException {
     // this is expected to be local FS
-    ContractTestUtils.assertPathExists(getLocalFS(), "task attempt", p);
+    // this is expected to be local FS
+    FileSystem localFS = getLocalFS();
+    ContractTestUtils.assertPathExists(localFS, "task attempt", p);
+    FileStatus st = localFS.getFileStatus(p);
+    assertEquals("file length in " + st, expectedLength, st.getLen());
   }
 
   protected FileSystem getLocalFS() throws IOException {
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestXAttrCost.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestXAttrCost.java
new file mode 100644
index 0000000..aa3ceca
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestXAttrCost.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.impl;
+
+import java.io.FileNotFoundException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+import org.assertj.core.api.AbstractStringAssert;
+import org.assertj.core.api.Assertions;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.S3AFileSystem;
+import org.apache.hadoop.fs.s3a.performance.AbstractS3ACostTest;
+
+import static org.apache.hadoop.fs.s3a.Statistic.INVOCATION_OP_XATTR_LIST;
+import static org.apache.hadoop.fs.s3a.Statistic.INVOCATION_XATTR_GET_MAP;
+import static org.apache.hadoop.fs.s3a.Statistic.INVOCATION_XATTR_GET_NAMED;
+import static org.apache.hadoop.fs.s3a.impl.HeaderProcessing.CONTENT_TYPE_OCTET_STREAM;
+import static org.apache.hadoop.fs.s3a.impl.HeaderProcessing.CONTENT_TYPE_APPLICATION_XML;
+import static org.apache.hadoop.fs.s3a.impl.HeaderProcessing.XA_CONTENT_LENGTH;
+import static org.apache.hadoop.fs.s3a.impl.HeaderProcessing.XA_CONTENT_TYPE;
+import static org.apache.hadoop.fs.s3a.impl.HeaderProcessing.XA_STANDARD_HEADERS;
+import static org.apache.hadoop.fs.s3a.impl.HeaderProcessing.decodeBytes;
+import static org.apache.hadoop.fs.s3a.performance.OperationCost.CREATE_FILE_OVERWRITE;
+
+/**
+ * Invoke XAttr API calls against objects in S3 and validate header
+ * extraction.
+ */
+public class ITestXAttrCost extends AbstractS3ACostTest {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(ITestXAttrCost.class);
+
+  private static final int GET_METADATA_ON_OBJECT = 1;
+  private static final int GET_METADATA_ON_DIR = GET_METADATA_ON_OBJECT * 2;
+
+  public ITestXAttrCost() {
+    // no parameterization here
+    super(false, true, false);
+  }
+
+  @Test
+  public void testXAttrRoot() throws Throwable {
+    describe("Test xattr on root");
+    Path root = new Path("/");
+    S3AFileSystem fs = getFileSystem();
+    Map<String, byte[]> xAttrs = verifyMetrics(
+        () -> fs.getXAttrs(root),
+        with(INVOCATION_XATTR_GET_MAP, GET_METADATA_ON_OBJECT));
+    logXAttrs(xAttrs);
+    List<String> headerList = verifyMetrics(() ->
+            fs.listXAttrs(root),
+        with(INVOCATION_OP_XATTR_LIST, GET_METADATA_ON_OBJECT));
+
+    // verify this contains all the standard markers,
+    // but not the magic marker header
+    Assertions.assertThat(headerList)
+        .describedAs("Headers on root object")
+        .containsOnly(
+            XA_CONTENT_LENGTH,
+            XA_CONTENT_TYPE);
+    assertHeaderEntry(xAttrs, XA_CONTENT_TYPE)
+        .isEqualTo(CONTENT_TYPE_APPLICATION_XML);
+  }
+
+  /**
+   * Log the attributes as strings.
+   * @param xAttrs map of attributes
+   */
+  private void logXAttrs(final Map<String, byte[]> xAttrs) {
+    xAttrs.forEach((k, v) ->
+        LOG.info("{} has bytes[{}] => \"{}\"",
+            k, v.length, decodeBytes(v)));
+  }
+
+  @Test
+  public void testXAttrFile() throws Throwable {
+    describe("Test xattr on a file");
+    Path testFile = methodPath();
+    create(testFile, true, CREATE_FILE_OVERWRITE);
+    S3AFileSystem fs = getFileSystem();
+    Map<String, byte[]> xAttrs = verifyMetrics(() ->
+            fs.getXAttrs(testFile),
+        with(INVOCATION_XATTR_GET_MAP, GET_METADATA_ON_OBJECT));
+    logXAttrs(xAttrs);
+    assertHeaderEntry(xAttrs, XA_CONTENT_LENGTH)
+        .isEqualTo("0");
+
+    // get the list of supported headers
+    List<String> headerList = verifyMetrics(
+        () -> fs.listXAttrs(testFile),
+        with(INVOCATION_OP_XATTR_LIST, GET_METADATA_ON_OBJECT));
+    // verify this contains all the standard markers,
+    // but not the magic marker header
+    Assertions.assertThat(headerList)
+        .describedAs("Supported headers")
+        .containsAnyElementsOf(Arrays.asList(XA_STANDARD_HEADERS));
+
+    // ask for one header and validate its value
+    byte[] bytes = verifyMetrics(() ->
+            fs.getXAttr(testFile, XA_CONTENT_LENGTH),
+        with(INVOCATION_XATTR_GET_NAMED, GET_METADATA_ON_OBJECT));
+    assertHeader(XA_CONTENT_LENGTH, bytes)
+        .isEqualTo("0");
+    assertHeaderEntry(xAttrs, XA_CONTENT_TYPE)
+        .isEqualTo(CONTENT_TYPE_OCTET_STREAM);
+  }
+
+  /**
+   * Directory attributes can be retrieved, but they take two HEAD requests.
+   * @throws Throwable
+   */
+  @Test
+  public void testXAttrDir() throws Throwable {
+    describe("Test xattr on a dir");
+
+    S3AFileSystem fs = getFileSystem();
+    Path dir = methodPath();
+    fs.mkdirs(dir);
+    Map<String, byte[]> xAttrs = verifyMetrics(() ->
+            fs.getXAttrs(dir),
+        with(INVOCATION_XATTR_GET_MAP, GET_METADATA_ON_DIR));
+    logXAttrs(xAttrs);
+    assertHeaderEntry(xAttrs, XA_CONTENT_LENGTH)
+        .isEqualTo("0");
+
+    // get the list of supported headers
+    List<String> headerList = verifyMetrics(
+        () -> fs.listXAttrs(dir),
+        with(INVOCATION_OP_XATTR_LIST, GET_METADATA_ON_DIR));
+    // verify this contains all the standard markers,
+    // but not the magic marker header
+    Assertions.assertThat(headerList)
+        .describedAs("Supported headers")
+        .containsAnyElementsOf(Arrays.asList(XA_STANDARD_HEADERS));
+
+    // ask for one header and validate its value
+    byte[] bytes = verifyMetrics(() ->
+            fs.getXAttr(dir, XA_CONTENT_LENGTH),
+        with(INVOCATION_XATTR_GET_NAMED, GET_METADATA_ON_DIR));
+    assertHeader(XA_CONTENT_LENGTH, bytes)
+        .isEqualTo("0");
+    assertHeaderEntry(xAttrs, XA_CONTENT_TYPE)
+        .isEqualTo(CONTENT_TYPE_OCTET_STREAM);
+  }
+
+  /**
+   * When the operations are called on a missing path, FNFE is
+   * raised and only one attempt is made to retry the operation.
+   */
+  @Test
+  public void testXAttrMissingFile() throws Throwable {
+    describe("Test xattr on a missing path");
+    Path testFile = methodPath();
+    S3AFileSystem fs = getFileSystem();
+    int getMetadataOnMissingFile = GET_METADATA_ON_DIR;
+    verifyMetricsIntercepting(FileNotFoundException.class, "", () ->
+            fs.getXAttrs(testFile),
+        with(INVOCATION_XATTR_GET_MAP, getMetadataOnMissingFile));
+    verifyMetricsIntercepting(FileNotFoundException.class, "", () ->
+            fs.getXAttr(testFile, XA_CONTENT_LENGTH),
+        with(INVOCATION_XATTR_GET_NAMED, getMetadataOnMissingFile));
+    verifyMetricsIntercepting(FileNotFoundException.class, "", () ->
+            fs.listXAttrs(testFile),
+        with(INVOCATION_OP_XATTR_LIST, getMetadataOnMissingFile));
+  }
+
+  /**
+   * Generate an assert on a named header in the map.
+   * @param xAttrs attribute map
+   * @param key header key
+   * @return the assertion
+   */
+  private AbstractStringAssert<?> assertHeaderEntry(
+      Map<String, byte[]> xAttrs, String key) {
+
+    return assertHeader(key, xAttrs.get(key));
+  }
+
+  /**
+   * Create an assertion on the header; check for the bytes
+   * being non-null/empty and then returns the decoded values
+   * as a string assert.
+   * @param key header key (for error)
+   * @param bytes value
+   * @return the assertion
+   */
+  private AbstractStringAssert<?> assertHeader(final String key,
+      final byte[] bytes) {
+
+    String decoded = decodeBytes(bytes);
+    return Assertions.assertThat(decoded)
+        .describedAs("xattr %s decoded to: %s", key, decoded)
+        .isNotNull()
+        .isNotEmpty();
+  }
+}
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestHeaderProcessing.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestHeaderProcessing.java
new file mode 100644
index 0000000..e0c6fee
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestHeaderProcessing.java
@@ -0,0 +1,313 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.impl;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import com.amazonaws.services.s3.model.ObjectMetadata;
+import org.assertj.core.api.Assertions;
+import org.assertj.core.util.Lists;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.S3ATestUtils;
+import org.apache.hadoop.fs.s3a.test.OperationTrackingStore;
+import org.apache.hadoop.test.HadoopTestBase;
+
+import static java.lang.System.currentTimeMillis;
+import static org.apache.hadoop.fs.s3a.commit.CommitConstants.XA_MAGIC_MARKER;
+import static org.apache.hadoop.fs.s3a.commit.CommitConstants.X_HEADER_MAGIC_MARKER;
+import static org.apache.hadoop.fs.s3a.impl.HeaderProcessing.XA_LAST_MODIFIED;
+import static org.apache.hadoop.fs.s3a.impl.HeaderProcessing.XA_CONTENT_LENGTH;
+import static org.apache.hadoop.fs.s3a.impl.HeaderProcessing.decodeBytes;
+import static org.apache.hadoop.fs.s3a.impl.HeaderProcessing.encodeBytes;
+import static org.apache.hadoop.fs.s3a.impl.HeaderProcessing.extractXAttrLongValue;
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+
+/**
+ * Unit tests of header processing logic in {@link HeaderProcessing}.
+ * Builds up a context accessor where the path
+ * defined in {@link #MAGIC_PATH} exists and returns object metadata.
+ *
+ */
+public class TestHeaderProcessing extends HadoopTestBase {
+
+  private static final XAttrContextAccessor CONTEXT_ACCESSORS
+      = new XAttrContextAccessor();
+
+  public static final String VALUE = "abcdeFGHIJ123!@##&82;";
+
+  public static final long FILE_LENGTH = 1024;
+
+  private static final String FINAL_FILE = "s3a://bucket/dest/output.csv";
+
+  private StoreContext context;
+
+  private HeaderProcessing headerProcessing;
+
+  private static final String MAGIC_KEY
+      = "dest/__magic/job1/ta1/__base/output.csv";
+  private static final String MAGIC_FILE
+      = "s3a://bucket/" + MAGIC_KEY;
+
+  private static final Path MAGIC_PATH =
+      new Path(MAGIC_FILE);
+
+  public static final long MAGIC_LEN = 4096L;
+
+  /**
+   * All the XAttrs which are built up.
+   */
+  private static final String[] RETRIEVED_XATTRS = {
+      XA_MAGIC_MARKER,
+      XA_CONTENT_LENGTH,
+      XA_LAST_MODIFIED
+  };
+
+  @Before
+  public void setup() throws Exception {
+    CONTEXT_ACCESSORS.len = FILE_LENGTH;
+    CONTEXT_ACCESSORS.userHeaders.put(
+        X_HEADER_MAGIC_MARKER,
+        Long.toString(MAGIC_LEN));
+    context = S3ATestUtils.createMockStoreContext(true,
+        new OperationTrackingStore(), CONTEXT_ACCESSORS);
+    headerProcessing = new HeaderProcessing(context);
+  }
+
+  @Test
+  public void testByteRoundTrip() throws Throwable {
+    Assertions.assertThat(decodeBytes(encodeBytes(VALUE)))
+        .describedAs("encoding of " + VALUE)
+        .isEqualTo(VALUE);
+  }
+
+  @Test
+  public void testGetMarkerXAttr() throws Throwable {
+    assertAttributeHasValue(XA_MAGIC_MARKER, MAGIC_LEN);
+  }
+
+  @Test
+  public void testGetLengthXAttr() throws Throwable {
+    assertAttributeHasValue(XA_CONTENT_LENGTH, FILE_LENGTH);
+  }
+
+  /**
+   * Last modified makes it through.
+   */
+  @Test
+  public void testGetDateXAttr() throws Throwable {
+    Assertions.assertThat(
+        decodeBytes(headerProcessing.getXAttr(MAGIC_PATH,
+            XA_LAST_MODIFIED)))
+        .describedAs("XAttribute " + XA_LAST_MODIFIED)
+        .isEqualTo(CONTEXT_ACCESSORS.date.toString());
+  }
+
+  /**
+   * The API calls on unknown paths raise 404s.
+   */
+  @Test
+  public void test404() throws Throwable {
+    intercept(FileNotFoundException.class, () ->
+        headerProcessing.getXAttr(new Path(FINAL_FILE), XA_MAGIC_MARKER));
+  }
+
+  /**
+   * This call returns all the attributes which aren't null, including
+   * all the standard HTTP headers.
+   */
+  @Test
+  public void testGetAllXAttrs() throws Throwable {
+    Map<String, byte[]> xAttrs = headerProcessing.getXAttrs(MAGIC_PATH);
+    Assertions.assertThat(xAttrs.keySet())
+        .describedAs("Attribute keys")
+        .contains(RETRIEVED_XATTRS);
+  }
+
+  /**
+   * This call returns all the attributes which aren't null, including
+   * all the standard HTTP headers.
+   */
+  @Test
+  public void testListXAttrKeys() throws Throwable {
+    List<String> xAttrs = headerProcessing.listXAttrs(MAGIC_PATH);
+    Assertions.assertThat(xAttrs)
+        .describedAs("Attribute keys")
+        .contains(RETRIEVED_XATTRS);
+  }
+
+  /**
+   * Filtering is on attribute key, not header.
+   */
+  @Test
+  public void testGetFilteredXAttrs() throws Throwable {
+    Map<String, byte[]> xAttrs = headerProcessing.getXAttrs(MAGIC_PATH,
+        Lists.list(XA_MAGIC_MARKER, XA_CONTENT_LENGTH, "unknown"));
+    Assertions.assertThat(xAttrs.keySet())
+        .describedAs("Attribute keys")
+        .containsExactlyInAnyOrder(XA_MAGIC_MARKER, XA_CONTENT_LENGTH);
+    // and the values are good
+    assertLongAttributeValue(
+        XA_MAGIC_MARKER,
+        xAttrs.get(XA_MAGIC_MARKER),
+        MAGIC_LEN);
+    assertLongAttributeValue(
+        XA_CONTENT_LENGTH,
+        xAttrs.get(XA_CONTENT_LENGTH),
+        FILE_LENGTH);
+  }
+
+  /**
+   * An empty list of keys results in empty results.
+   */
+  @Test
+  public void testFilterEmptyXAttrs() throws Throwable {
+    Map<String, byte[]> xAttrs = headerProcessing.getXAttrs(MAGIC_PATH,
+        Lists.list());
+    Assertions.assertThat(xAttrs.keySet())
+        .describedAs("Attribute keys")
+        .isEmpty();
+  }
+
+  /**
+   * Add two headers to the metadata, then verify that
+   * the magic marker header is copied, but not the other header.
+   */
+  @Test
+  public void testMetadataCopySkipsMagicAttribute() throws Throwable {
+
+    final String owner = "x-header-owner";
+    final String root = "root";
+    CONTEXT_ACCESSORS.userHeaders.put(owner, root);
+    final ObjectMetadata source = context.getContextAccessors()
+        .getObjectMetadata(MAGIC_KEY);
+    final Map<String, String> sourceUserMD = source.getUserMetadata();
+    Assertions.assertThat(sourceUserMD.get(owner))
+        .describedAs("owner header in copied MD")
+        .isEqualTo(root);
+
+    ObjectMetadata dest = new ObjectMetadata();
+    headerProcessing.cloneObjectMetadata(source, dest);
+
+    Assertions.assertThat(dest.getUserMetadata().get(X_HEADER_MAGIC_MARKER))
+        .describedAs("Magic marker header in copied MD")
+        .isNull();
+    Assertions.assertThat(dest.getUserMetadata().get(owner))
+        .describedAs("owner header in copied MD")
+        .isEqualTo(root);
+  }
+
+  /**
+   * Assert that an XAttr has a specific long value.
+   * @param key attribute key
+   * @param bytes bytes of the attribute.
+   * @param expected expected numeric value.
+   */
+  private void assertLongAttributeValue(
+      final String key,
+      final byte[] bytes,
+      final long expected) {
+    Assertions.assertThat(extractXAttrLongValue(bytes))
+        .describedAs("XAttribute " + key)
+        .isNotEmpty()
+        .hasValue(expected);
+  }
+
+  /**
+   * Assert that a retrieved XAttr has a specific long value.
+   * @param key attribute key
+   * @param expected expected numeric value.
+   */
+  protected void assertAttributeHasValue(final String key,
+      final long expected)
+      throws IOException {
+    assertLongAttributeValue(
+        key,
+        headerProcessing.getXAttr(MAGIC_PATH, key),
+        expected);
+  }
+
+  /**
+   * Context accessor with XAttrs returned for the {@link #MAGIC_PATH}
+   * path.
+   */
+  private static final class XAttrContextAccessor
+      implements ContextAccessors {
+
+    private final Map<String, String> userHeaders = new HashMap<>();
+
+    private long len;
+    private Date date = new Date(currentTimeMillis());
+
+    @Override
+    public Path keyToPath(final String key) {
+      return new Path("s3a://bucket/" + key);
+    }
+
+    @Override
+    public String pathToKey(final Path path) {
+      // key is path with leading / stripped.
+      String key = path.toUri().getPath();
+      return key.length() > 1 ? key.substring(1) : key;
+    }
+
+    @Override
+    public File createTempFile(final String prefix, final long size)
+        throws IOException {
+      throw new UnsupportedOperationException("unsppported");
+    }
+
+    @Override
+    public String getBucketLocation() throws IOException {
+      return null;
+    }
+
+    @Override
+    public Path makeQualified(final Path path) {
+      return path;
+    }
+
+    @Override
+    public ObjectMetadata getObjectMetadata(final String key)
+        throws IOException {
+      if (MAGIC_KEY.equals(key)) {
+        ObjectMetadata omd = new ObjectMetadata();
+        omd.setUserMetadata(userHeaders);
+        omd.setContentLength(len);
+        omd.setLastModified(date);
+        return omd;
+      } else {
+        throw new FileNotFoundException(key);
+      }
+    }
+
+    public void setHeader(String key, String val) {
+      userHeaders.put(key, val);
+    }
+  }
+
+}
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestPartialDeleteFailures.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestPartialDeleteFailures.java
index a2e7031..42714cb 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestPartialDeleteFailures.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestPartialDeleteFailures.java
@@ -29,6 +29,7 @@ import java.util.stream.Collectors;
 import com.amazonaws.services.s3.model.DeleteObjectsRequest;
 import com.amazonaws.services.s3.model.MultiObjectDeleteException;
 import org.apache.hadoop.thirdparty.com.google.common.collect.Lists;
+import com.amazonaws.services.s3.model.ObjectMetadata;
 import org.assertj.core.api.Assertions;
 import org.junit.Before;
 import org.junit.Test;
@@ -226,7 +227,8 @@ public class TestPartialDeleteFailures {
   }
 
 
-  private static class MinimalContextAccessor implements ContextAccessors {
+  private static final class MinimalContextAccessor
+      implements ContextAccessors {
 
     @Override
     public Path keyToPath(final String key) {
@@ -253,6 +255,12 @@ public class TestPartialDeleteFailures {
     public Path makeQualified(final Path path) {
       return path;
     }
+
+    @Override
+    public ObjectMetadata getObjectMetadata(final String key)
+        throws IOException {
+      return new ObjectMetadata();
+    }
   }
 
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org