You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by st...@apache.org on 2021/01/26 19:47:28 UTC
[hadoop] branch branch-3.3 updated: HADOOP-17414. Magic committer
files don't have the count of bytes written collected by spark (#2530)
This is an automated email from the ASF dual-hosted git repository.
stevel pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/branch-3.3 by this push:
new fb603e8 HADOOP-17414. Magic committer files don't have the count of bytes written collected by spark (#2530)
fb603e8 is described below
commit fb603e81f0fe9c5c92089cbe11af07fd4ac00bba
Author: Steve Loughran <st...@cloudera.com>
AuthorDate: Tue Jan 26 19:30:51 2021 +0000
HADOOP-17414. Magic committer files don't have the count of bytes written collected by spark (#2530)
This needs SPARK-33739 in the matching spark branch in order to work
Contributed by Steve Loughran.
Change-Id: I4fe75b057159e35aacc072da3cb7343467c0c3f1
---
.../fs/statistics/DurationTrackerFactory.java | 8 +-
.../hadoop/fs/statistics/StoreStatisticNames.java | 18 +
.../java/org/apache/hadoop/fs/s3a/Constants.java | 6 +
.../apache/hadoop/fs/s3a/S3ABlockOutputStream.java | 2 +-
.../org/apache/hadoop/fs/s3a/S3AFileSystem.java | 127 +++---
.../apache/hadoop/fs/s3a/S3AInstrumentation.java | 2 +-
.../java/org/apache/hadoop/fs/s3a/Statistic.java | 18 +
.../apache/hadoop/fs/s3a/WriteOperationHelper.java | 12 +-
.../org/apache/hadoop/fs/s3a/WriteOperations.java | 6 +-
.../hadoop/fs/s3a/commit/CommitConstants.java | 14 +
.../hadoop/fs/s3a/commit/CommitOperations.java | 26 ++
.../fs/s3a/commit/magic/MagicCommitTracker.java | 23 +-
.../hadoop/fs/s3a/impl/ContextAccessors.java | 13 +
.../hadoop/fs/s3a/impl/HeaderProcessing.java | 500 +++++++++++++++++++++
.../fs/s3a/statistics/CountersAndGauges.java | 3 +-
.../impl/BondedS3AStatisticsContext.java | 6 +
.../tools/hadoop-aws/committer_architecture.md | 10 +
.../site/markdown/tools/hadoop-aws/committers.md | 1 +
.../hadoop/fs/s3a/ITestS3AMiscOperations.java | 8 +
.../hadoop/fs/s3a/ITestS3ARemoteFileChanged.java | 4 +-
.../fs/s3a/commit/AbstractITCommitProtocol.java | 13 +-
.../fs/s3a/commit/ITestCommitOperations.java | 83 +++-
.../s3a/commit/magic/ITestMagicCommitProtocol.java | 41 +-
.../integration/ITestStagingCommitProtocol.java | 13 +-
.../apache/hadoop/fs/s3a/impl/ITestXAttrCost.java | 219 +++++++++
.../hadoop/fs/s3a/impl/TestHeaderProcessing.java | 313 +++++++++++++
.../fs/s3a/impl/TestPartialDeleteFailures.java | 10 +-
27 files changed, 1391 insertions(+), 108 deletions(-)
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/DurationTrackerFactory.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/DurationTrackerFactory.java
index b1d87c9..641d7e8 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/DurationTrackerFactory.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/DurationTrackerFactory.java
@@ -18,6 +18,8 @@
package org.apache.hadoop.fs.statistics;
+import static org.apache.hadoop.fs.statistics.IOStatisticsSupport.stubDurationTracker;
+
/**
* Interface for a source of duration tracking.
*
@@ -36,12 +38,16 @@ public interface DurationTrackerFactory {
* by the given count.
*
* The expected use is within a try-with-resources clause.
+ *
+ * The default implementation returns a stub duration tracker.
* @param key statistic key prefix
* @param count #of times to increment the matching counter in this
* operation.
* @return an object to close after an operation completes.
*/
- DurationTracker trackDuration(String key, long count);
+ default DurationTracker trackDuration(String key, long count) {
+ return stubDurationTracker();
+ }
/**
* Initiate a duration tracking operation by creating/returning
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StoreStatisticNames.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StoreStatisticNames.java
index 4baf37d..0dd6540 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StoreStatisticNames.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StoreStatisticNames.java
@@ -130,6 +130,24 @@ public final class StoreStatisticNames {
/** {@value}. */
public static final String OP_TRUNCATE = "op_truncate";
+ /* The XAttr API */
+
+ /** Invoke {@code getXAttrs(Path path)}: {@value}. */
+ public static final String OP_XATTR_GET_MAP = "op_xattr_get_map";
+
+ /** Invoke {@code getXAttr(Path, String)}: {@value}. */
+ public static final String OP_XATTR_GET_NAMED = "op_xattr_get_named";
+
+ /**
+ * Invoke {@code getXAttrs(Path path, List<String> names)}: {@value}.
+ */
+ public static final String OP_XATTR_GET_NAMED_MAP =
+ "op_xattr_get_named_map";
+
+ /** Invoke {@code listXAttrs(Path path)}: {@value}. */
+ public static final String OP_XATTR_LIST = "op_xattr_list";
+
+
/** {@value}. */
public static final String DELEGATION_TOKENS_ISSUED
= "delegation_tokens_issued";
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
index fcaec50..5079ed9 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
@@ -1048,4 +1048,10 @@ public final class Constants {
public static final String STORE_CAPABILITY_DIRECTORY_MARKER_ACTION_DELETE
= "fs.s3a.capability.directory.marker.action.delete";
+ /**
+ * To comply with the XAttr rules, all headers of the object retrieved
+ * through the getXAttr APIs have the prefix: {@value}.
+ */
+ public static final String XA_HEADER_PREFIX = "header.";
+
}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java
index 0fdad21..5784ab8 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java
@@ -447,7 +447,7 @@ class S3ABlockOutputStream extends OutputStream implements
final PutObjectRequest putObjectRequest = uploadData.hasFile() ?
writeOperationHelper.createPutObjectRequest(key, uploadData.getFile())
: writeOperationHelper.createPutObjectRequest(key,
- uploadData.getUploadStream(), size);
+ uploadData.getUploadStream(), size, null);
BlockUploadProgress callback =
new BlockUploadProgress(
block, progressListener, now());
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
index 4e1a67f..e6fb8c0 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
@@ -108,6 +108,7 @@ import org.apache.hadoop.fs.s3a.impl.CopyOutcome;
import org.apache.hadoop.fs.s3a.impl.DeleteOperation;
import org.apache.hadoop.fs.s3a.impl.DirectoryPolicy;
import org.apache.hadoop.fs.s3a.impl.DirectoryPolicyImpl;
+import org.apache.hadoop.fs.s3a.impl.HeaderProcessing;
import org.apache.hadoop.fs.s3a.impl.InternalConstants;
import org.apache.hadoop.fs.s3a.impl.ListingOperationCallbacks;
import org.apache.hadoop.fs.s3a.impl.MultiObjectDeleteSupport;
@@ -332,6 +333,11 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
private DirectoryPolicy directoryPolicy;
/**
+ * Header processing for XAttr.
+ */
+ private HeaderProcessing headerProcessing;
+
+ /**
* Context accessors for re-use.
*/
private final ContextAccessors contextAccessors = new ContextAccessorsImpl();
@@ -457,6 +463,8 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
magicCommitterEnabled ? "is" : "is not");
committerIntegration = new MagicCommitIntegration(
this, magicCommitterEnabled);
+ // header processing for rename and magic committer
+ headerProcessing = new HeaderProcessing(createStoreContext());
// instantiate S3 Select support
selectBinding = new SelectBinding(writeHelper);
@@ -1781,14 +1789,15 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
/**
* Low-level call to get at the object metadata.
- * @param path path to the object
+ * @param path path to the object. This will be qualified.
* @return metadata
* @throws IOException IO and object access problems.
*/
@VisibleForTesting
@Retries.RetryTranslated
public ObjectMetadata getObjectMetadata(Path path) throws IOException {
- return getObjectMetadata(path, null, invoker, null);
+ return getObjectMetadata(makeQualified(path), null, invoker,
+ "getObjectMetadata");
}
/**
@@ -1800,31 +1809,17 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
* @return metadata
* @throws IOException IO and object access problems.
*/
- @VisibleForTesting
@Retries.RetryTranslated
- public ObjectMetadata getObjectMetadata(Path path,
+ private ObjectMetadata getObjectMetadata(Path path,
ChangeTracker changeTracker, Invoker changeInvoker, String operation)
throws IOException {
checkNotClosed();
- return once("getObjectMetadata", path.toString(),
+ String key = pathToKey(path);
+ return once(operation, path.toString(),
() ->
// this always does a full HEAD to the object
getObjectMetadata(
- pathToKey(path), changeTracker, changeInvoker, operation));
- }
-
- /**
- * Get all the headers of the object of a path, if the object exists.
- * @param path path to probe
- * @return an immutable map of object headers.
- * @throws IOException failure of the query
- */
- @Retries.RetryTranslated
- public Map<String, Object> getObjectHeaders(Path path) throws IOException {
- LOG.debug("getObjectHeaders({})", path);
- checkNotClosed();
- incrementReadOperations();
- return getObjectMetadata(path).getRawMetadata();
+ key, changeTracker, changeInvoker, operation));
}
/**
@@ -2021,7 +2016,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
@Retries.RetryRaw
@VisibleForTesting
ObjectMetadata getObjectMetadata(String key) throws IOException {
- return getObjectMetadata(key, null, invoker,null);
+ return getObjectMetadata(key, null, invoker, "getObjectMetadata");
}
/**
@@ -4099,59 +4094,8 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
* @return a copy of {@link ObjectMetadata} with only relevant attributes
*/
private ObjectMetadata cloneObjectMetadata(ObjectMetadata source) {
- // This approach may be too brittle, especially if
- // in future there are new attributes added to ObjectMetadata
- // that we do not explicitly call to set here
ObjectMetadata ret = newObjectMetadata(source.getContentLength());
-
- // Possibly null attributes
- // Allowing nulls to pass breaks it during later use
- if (source.getCacheControl() != null) {
- ret.setCacheControl(source.getCacheControl());
- }
- if (source.getContentDisposition() != null) {
- ret.setContentDisposition(source.getContentDisposition());
- }
- if (source.getContentEncoding() != null) {
- ret.setContentEncoding(source.getContentEncoding());
- }
- if (source.getContentMD5() != null) {
- ret.setContentMD5(source.getContentMD5());
- }
- if (source.getContentType() != null) {
- ret.setContentType(source.getContentType());
- }
- if (source.getExpirationTime() != null) {
- ret.setExpirationTime(source.getExpirationTime());
- }
- if (source.getExpirationTimeRuleId() != null) {
- ret.setExpirationTimeRuleId(source.getExpirationTimeRuleId());
- }
- if (source.getHttpExpiresDate() != null) {
- ret.setHttpExpiresDate(source.getHttpExpiresDate());
- }
- if (source.getLastModified() != null) {
- ret.setLastModified(source.getLastModified());
- }
- if (source.getOngoingRestore() != null) {
- ret.setOngoingRestore(source.getOngoingRestore());
- }
- if (source.getRestoreExpirationTime() != null) {
- ret.setRestoreExpirationTime(source.getRestoreExpirationTime());
- }
- if (source.getSSEAlgorithm() != null) {
- ret.setSSEAlgorithm(source.getSSEAlgorithm());
- }
- if (source.getSSECustomerAlgorithm() != null) {
- ret.setSSECustomerAlgorithm(source.getSSECustomerAlgorithm());
- }
- if (source.getSSECustomerKeyMd5() != null) {
- ret.setSSECustomerKeyMd5(source.getSSECustomerKeyMd5());
- }
-
- for (Map.Entry<String, String> e : source.getUserMetadata().entrySet()) {
- ret.addUserMetadata(e.getKey(), e.getValue());
- }
+ getHeaderProcessing().cloneObjectMetadata(source, ret);
return ret;
}
@@ -4383,6 +4327,37 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
}
/**
+ * Get header processing support.
+ * @return the header processing of this instance.
+ */
+ private HeaderProcessing getHeaderProcessing() {
+ return headerProcessing;
+ }
+
+ @Override
+ public byte[] getXAttr(final Path path, final String name)
+ throws IOException {
+ return getHeaderProcessing().getXAttr(path, name);
+ }
+
+ @Override
+ public Map<String, byte[]> getXAttrs(final Path path) throws IOException {
+ return getHeaderProcessing().getXAttrs(path);
+ }
+
+ @Override
+ public Map<String, byte[]> getXAttrs(final Path path,
+ final List<String> names)
+ throws IOException {
+ return getHeaderProcessing().getXAttrs(path, names);
+ }
+
+ @Override
+ public List<String> listXAttrs(final Path path) throws IOException {
+ return getHeaderProcessing().listXAttrs(path);
+ }
+
+ /**
* {@inheritDoc}.
*
* This implementation is optimized for S3, which can do a bulk listing
@@ -5088,5 +5063,11 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
return S3AFileSystem.this.makeQualified(path);
}
+ @Override
+ public ObjectMetadata getObjectMetadata(final String key)
+ throws IOException {
+ return once("getObjectMetadata", key, () ->
+ S3AFileSystem.this.getObjectMetadata(key));
+ }
}
}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java
index c25e3b3..5fcc157 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java
@@ -120,7 +120,7 @@ import static org.apache.hadoop.fs.s3a.Statistic.*;
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class S3AInstrumentation implements Closeable, MetricsSource,
- CountersAndGauges, IOStatisticsSource, DurationTrackerFactory {
+ CountersAndGauges, IOStatisticsSource {
private static final Logger LOG = LoggerFactory.getLogger(
S3AInstrumentation.class);
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java
index 6709382..0bd2a62 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java
@@ -157,6 +157,24 @@ public enum Statistic {
"Calls of rename()",
TYPE_COUNTER),
+ /* The XAttr API metrics are all durations */
+ INVOCATION_XATTR_GET_MAP(
+ StoreStatisticNames.OP_XATTR_GET_MAP,
+ "Calls of getXAttrs(Path path)",
+ TYPE_DURATION),
+ INVOCATION_XATTR_GET_NAMED(
+ StoreStatisticNames.OP_XATTR_GET_NAMED,
+ "Calls of getXAttr(Path, String)",
+ TYPE_DURATION),
+ INVOCATION_XATTR_GET_NAMED_MAP(
+ StoreStatisticNames.OP_XATTR_GET_NAMED_MAP,
+ "Calls of xattr()",
+ TYPE_DURATION),
+ INVOCATION_OP_XATTR_LIST(
+ StoreStatisticNames.OP_XATTR_LIST,
+ "Calls of getXAttrs(Path path, List<String> names)",
+ TYPE_DURATION),
+
/* Object IO */
OBJECT_COPY_REQUESTS(StoreStatisticNames.OBJECT_COPY_REQUESTS,
"Object copy requests",
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java
index e75c09c..49a5eb2 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java
@@ -25,6 +25,7 @@ import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import com.amazonaws.services.s3.model.AmazonS3Exception;
@@ -172,12 +173,19 @@ public class WriteOperationHelper implements WriteOperations {
* @param destKey destination key
* @param inputStream source data.
* @param length size, if known. Use -1 for not known
+ * @param headers optional map of custom headers.
* @return the request
*/
public PutObjectRequest createPutObjectRequest(String destKey,
- InputStream inputStream, long length) {
+ InputStream inputStream,
+ long length,
+ final Map<String, String> headers) {
+ ObjectMetadata objectMetadata = newObjectMetadata(length);
+ if (headers != null) {
+ objectMetadata.setUserMetadata(headers);
+ }
return owner.newPutObjectRequest(destKey,
- newObjectMetadata(length),
+ objectMetadata,
inputStream);
}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperations.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperations.java
index 0b33614..2636ed7 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperations.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperations.java
@@ -24,6 +24,7 @@ import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import com.amazonaws.services.s3.model.CompleteMultipartUploadResult;
@@ -77,10 +78,13 @@ public interface WriteOperations {
* @param destKey destination key
* @param inputStream source data.
* @param length size, if known. Use -1 for not known
+ * @param headers optional map of custom headers.
* @return the request
*/
PutObjectRequest createPutObjectRequest(String destKey,
- InputStream inputStream, long length);
+ InputStream inputStream,
+ long length,
+ @Nullable Map<String, String> headers);
/**
* Create a {@link PutObjectRequest} request to upload a file.
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitConstants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitConstants.java
index 3224a5a..6093996 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitConstants.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitConstants.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.fs.s3a.commit;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
+import static org.apache.hadoop.fs.s3a.Constants.XA_HEADER_PREFIX;
import static org.apache.hadoop.mapreduce.lib.output.PathOutputCommitterFactory.COMMITTER_FACTORY_SCHEME_PATTERN;
/**
@@ -316,4 +317,17 @@ public final class CommitConstants {
public static final boolean DEFAULT_S3A_COMMITTER_GENERATE_UUID =
false;
+ /**
+ * Magic Marker header to declare final file length on magic uploads
+ * marker objects: {@value}.
+ */
+ public static final String X_HEADER_MAGIC_MARKER =
+ "x-hadoop-s3a-magic-data-length";
+
+ /**
+ * XAttr name of magic marker, with "header." prefix: {@value}.
+ */
+ public static final String XA_MAGIC_MARKER = XA_HEADER_PREFIX
+ + X_HEADER_MAGIC_MARKER;
+
}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitOperations.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitOperations.java
index c9fb380..4562e0f 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitOperations.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitOperations.java
@@ -26,6 +26,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@@ -39,6 +40,7 @@ import org.slf4j.LoggerFactory;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
@@ -50,6 +52,7 @@ import org.apache.hadoop.fs.s3a.WriteOperationHelper;
import org.apache.hadoop.fs.s3a.commit.files.PendingSet;
import org.apache.hadoop.fs.s3a.commit.files.SinglePendingCommit;
import org.apache.hadoop.fs.s3a.commit.files.SuccessData;
+import org.apache.hadoop.fs.s3a.impl.HeaderProcessing;
import org.apache.hadoop.fs.s3a.impl.InternalConstants;
import org.apache.hadoop.fs.s3a.s3guard.BulkOperationState;
import org.apache.hadoop.fs.s3a.statistics.CommitterStatistics;
@@ -607,6 +610,29 @@ public class CommitOperations implements IOStatisticsSource {
}
/**
+ * Get the magic file length of a file.
+ * If the FS doesn't support the API, the attribute is missing or
+ * the parse to long fails, then Optional.empty() is returned.
+ * Static for some easier testability.
+ * @param fs filesystem
+ * @param path path
+ * @return either a length or None.
+ * @throws IOException on error
+ * */
+ public static Optional<Long> extractMagicFileLength(FileSystem fs, Path path)
+ throws IOException {
+ byte[] bytes;
+ try {
+ bytes = fs.getXAttr(path, XA_MAGIC_MARKER);
+ } catch (UnsupportedOperationException e) {
+ // FS doesn't support xattr.
+ LOG.debug("Filesystem {} doesn't support XAttr API", fs);
+ return Optional.empty();
+ }
+ return HeaderProcessing.extractXAttrLongValue(bytes);
+ }
+
+ /**
* Commit context.
*
* It is used to manage the final commit sequence where files become
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicCommitTracker.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicCommitTracker.java
index 0f1a0a6..ddaee19 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicCommitTracker.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicCommitTracker.java
@@ -20,7 +20,9 @@ package org.apache.hadoop.fs.s3a.commit.magic;
import java.io.ByteArrayInputStream;
import java.io.IOException;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import com.amazonaws.services.s3.model.PartETag;
import com.amazonaws.services.s3.model.PutObjectRequest;
@@ -37,6 +39,8 @@ import org.apache.hadoop.fs.s3a.commit.files.SinglePendingCommit;
import org.apache.hadoop.fs.statistics.IOStatistics;
import org.apache.hadoop.fs.statistics.IOStatisticsSnapshot;
+import static org.apache.hadoop.fs.s3a.commit.CommitConstants.X_HEADER_MAGIC_MARKER;
+
/**
* Put tracker for Magic commits.
* <p>Important</p>: must not directly or indirectly import a class which
@@ -122,13 +126,6 @@ public class MagicCommitTracker extends PutTracker {
Preconditions.checkArgument(!parts.isEmpty(),
"No uploaded parts to save");
- // put a 0-byte file with the name of the original under-magic path
- PutObjectRequest originalDestPut = writer.createPutObjectRequest(
- originalDestKey,
- new ByteArrayInputStream(EMPTY),
- 0);
- writer.uploadObject(originalDestPut);
-
// build the commit summary
SinglePendingCommit commitData = new SinglePendingCommit();
commitData.touch(System.currentTimeMillis());
@@ -150,9 +147,19 @@ public class MagicCommitTracker extends PutTracker {
PutObjectRequest put = writer.createPutObjectRequest(
pendingPartKey,
new ByteArrayInputStream(bytes),
- bytes.length);
+ bytes.length, null);
writer.uploadObject(put);
+ // Add the final file length as a header
+ Map<String, String> headers = new HashMap<>();
+ headers.put(X_HEADER_MAGIC_MARKER, Long.toString(bytesWritten));
+ // now put a 0-byte file with the name of the original under-magic path
+ PutObjectRequest originalDestPut = writer.createPutObjectRequest(
+ originalDestKey,
+ new ByteArrayInputStream(EMPTY),
+ 0,
+ headers);
+ writer.uploadObject(originalDestPut);
return false;
}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ContextAccessors.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ContextAccessors.java
index d39c649..27ac7de 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ContextAccessors.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ContextAccessors.java
@@ -22,6 +22,8 @@ import java.io.File;
import java.io.IOException;
import java.nio.file.AccessDeniedException;
+import com.amazonaws.services.s3.model.ObjectMetadata;
+
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.s3a.Retries;
@@ -81,4 +83,15 @@ public interface ContextAccessors {
* @return possibly new path.
*/
Path makeQualified(Path path);
+
+ /**
+ * Retrieve the object metadata.
+ *
+ * @param key key to retrieve.
+ * @return metadata
+ * @throws IOException IO and object access problems.
+ */
+ @Retries.RetryTranslated
+ ObjectMetadata getObjectMetadata(String key) throws IOException;
+
}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/HeaderProcessing.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/HeaderProcessing.java
new file mode 100644
index 0000000..5efec2b
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/HeaderProcessing.java
@@ -0,0 +1,500 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.impl;
+
+import javax.annotation.Nullable;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.TreeMap;
+
+import com.amazonaws.services.s3.Headers;
+import com.amazonaws.services.s3.model.ObjectMetadata;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.Statistic;
+import org.apache.hadoop.fs.s3a.statistics.S3AStatisticsContext;
+
+import static org.apache.hadoop.fs.s3a.Constants.XA_HEADER_PREFIX;
+import static org.apache.hadoop.fs.s3a.Statistic.INVOCATION_OP_XATTR_LIST;
+import static org.apache.hadoop.fs.s3a.Statistic.INVOCATION_XATTR_GET_MAP;
+import static org.apache.hadoop.fs.s3a.Statistic.INVOCATION_XATTR_GET_NAMED;
+import static org.apache.hadoop.fs.s3a.Statistic.INVOCATION_XATTR_GET_NAMED_MAP;
+import static org.apache.hadoop.fs.s3a.commit.CommitConstants.X_HEADER_MAGIC_MARKER;
+import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDuration;
+
+/**
+ * Part of the S3A FS where object headers are
+ * processed.
+ * Implements all the various XAttr read operations.
+ * Those APIs all expect byte arrays back.
+ * Metadata cloning is also implemented here, so as
+ * to stay in sync with custom header logic.
+ *
+ * The standard header names are extracted from the AWS SDK.
+ * The S3A connector does not (currently) support setting them,
+ * though it would be possible to do so through the createFile()
+ * builder API.
+ */
+public class HeaderProcessing extends AbstractStoreOperation {
+
+ private static final Logger LOG = LoggerFactory.getLogger(
+ HeaderProcessing.class);
+
+ /**
+ * An empty buffer.
+ */
+ private static final byte[] EMPTY = new byte[0];
+
+
+ /**
+ * Standard HTTP header found on some S3 objects: {@value}.
+ */
+ public static final String XA_CACHE_CONTROL =
+ XA_HEADER_PREFIX + Headers.CACHE_CONTROL;
+ /**
+ * Standard HTTP header found on some S3 objects: {@value}.
+ */
+ public static final String XA_CONTENT_DISPOSITION =
+ XA_HEADER_PREFIX + Headers.CONTENT_DISPOSITION;
+
+ /**
+ * Standard HTTP header found on some S3 objects: {@value}.
+ */
+ public static final String XA_CONTENT_ENCODING =
+ XA_HEADER_PREFIX + Headers.CONTENT_ENCODING;
+
+ /**
+ * Standard HTTP header found on some S3 objects: {@value}.
+ */
+ public static final String XA_CONTENT_LANGUAGE =
+ XA_HEADER_PREFIX + Headers.CONTENT_LANGUAGE;
+
+ /**
+ * Length XAttr: {@value}.
+ */
+ public static final String XA_CONTENT_LENGTH =
+ XA_HEADER_PREFIX + Headers.CONTENT_LENGTH;
+
+ /**
+ * Standard HTTP header found on some S3 objects: {@value}.
+ */
+ public static final String XA_CONTENT_MD5 =
+ XA_HEADER_PREFIX + Headers.CONTENT_MD5;
+
+ /**
+ * Content range: {@value}.
+ * This is returned on GET requests with ranges.
+ */
+ public static final String XA_CONTENT_RANGE =
+ XA_HEADER_PREFIX + Headers.CONTENT_RANGE;
+
+ /**
+ * Content type: may be set when uploading.
+ * {@value}.
+ */
+ public static final String XA_CONTENT_TYPE =
+ XA_HEADER_PREFIX + Headers.CONTENT_TYPE;
+
+ /**
+ * Etag Header {@value}.
+ * Also accessible via {@code ObjectMetadata.getEtag()}, where
+ * it can be retrieved via {@code getFileChecksum(path)} if
+ * the S3A connector is enabled.
+ */
+ public static final String XA_ETAG = XA_HEADER_PREFIX + Headers.ETAG;
+
+
+ /**
+ * last modified XAttr: {@value}.
+ */
+ public static final String XA_LAST_MODIFIED =
+ XA_HEADER_PREFIX + Headers.LAST_MODIFIED;
+
+ /* AWS Specific Headers. May not be found on other S3 endpoints. */
+
+ /**
+ * object archive status; empty if not on S3 Glacier
+ * (i.e all normal files should be non-archived as
+ * S3A and applications don't handle archived data)
+ * Value {@value}.
+ */
+ public static final String XA_ARCHIVE_STATUS =
+ XA_HEADER_PREFIX + Headers.ARCHIVE_STATUS;
+
+ /**
+ * Object legal hold status. {@value}.
+ */
+ public static final String XA_OBJECT_LOCK_LEGAL_HOLD_STATUS =
+ XA_HEADER_PREFIX + Headers.OBJECT_LOCK_LEGAL_HOLD_STATUS;
+
+ /**
+ * Object lock mode. {@value}.
+ */
+ public static final String XA_OBJECT_LOCK_MODE =
+ XA_HEADER_PREFIX + Headers.OBJECT_LOCK_MODE;
+
+ /**
+ * ISO8601 expiry date of object lock hold. {@value}.
+ */
+ public static final String XA_OBJECT_LOCK_RETAIN_UNTIL_DATE =
+ XA_HEADER_PREFIX + Headers.OBJECT_LOCK_RETAIN_UNTIL_DATE;
+
+ /**
+ * Replication status for cross-region replicated objects. {@value}.
+ */
+ public static final String XA_OBJECT_REPLICATION_STATUS =
+ XA_HEADER_PREFIX + Headers.OBJECT_REPLICATION_STATUS;
+
+ /**
+ * Version ID; empty for non-versioned buckets/data. {@value}.
+ */
+ public static final String XA_S3_VERSION_ID =
+ XA_HEADER_PREFIX + Headers.S3_VERSION_ID;
+
+ /**
+ * The server-side encryption algorithm to use
+ * with AWS-managed keys: {@value}.
+ */
+ public static final String XA_SERVER_SIDE_ENCRYPTION =
+ XA_HEADER_PREFIX + Headers.SERVER_SIDE_ENCRYPTION;
+
+ /**
+ * Storage Class XAttr: {@value}.
+ */
+ public static final String XA_STORAGE_CLASS =
+ XA_HEADER_PREFIX + Headers.STORAGE_CLASS;
+
+ /**
+ * Standard headers which are retrieved from HEAD Requests
+ * and set as XAttrs if the response included the relevant header.
+ */
+ public static final String[] XA_STANDARD_HEADERS = {
+ /* HTTP standard headers */
+ XA_CACHE_CONTROL,
+ XA_CONTENT_DISPOSITION,
+ XA_CONTENT_ENCODING,
+ XA_CONTENT_LANGUAGE,
+ XA_CONTENT_LENGTH,
+ XA_CONTENT_MD5,
+ XA_CONTENT_RANGE,
+ XA_CONTENT_TYPE,
+ XA_ETAG,
+ XA_LAST_MODIFIED,
+ /* aws headers */
+ XA_ARCHIVE_STATUS,
+ XA_OBJECT_LOCK_LEGAL_HOLD_STATUS,
+ XA_OBJECT_LOCK_MODE,
+ XA_OBJECT_LOCK_RETAIN_UNTIL_DATE,
+ XA_OBJECT_REPLICATION_STATUS,
+ XA_S3_VERSION_ID,
+ XA_SERVER_SIDE_ENCRYPTION,
+ XA_STORAGE_CLASS,
+ };
+
+ /**
+ * Content type of generic binary objects.
+ * This is the default for uploaded objects.
+ */
+ public static final String CONTENT_TYPE_OCTET_STREAM =
+ "application/octet-stream";
+
+ /**
+ * XML content type : {@value}.
+ * This is application/xml, not text/xml, and is
+ * what a HEAD of / returns as the type of a root path.
+ */
+ public static final String CONTENT_TYPE_APPLICATION_XML =
+ "application/xml";
+
+ /**
+ * Construct.
+ * @param storeContext store context.
+ */
+ public HeaderProcessing(final StoreContext storeContext) {
+ super(storeContext);
+ }
+
+ /**
+ * Query the store, get all the headers into a map. Each Header
+ * has the "header." prefix.
+ * Caller must have read access.
+ * The value of each header is the string value of the object
+ * UTF-8 encoded.
+ * @param path path of object.
+ * @param statistic statistic to use for duration tracking.
+ * @return the headers
+ * @throws IOException failure, including file not found.
+ */
+ private Map<String, byte[]> retrieveHeaders(
+ final Path path,
+ final Statistic statistic) throws IOException {
+ StoreContext context = getStoreContext();
+ ContextAccessors accessors = context.getContextAccessors();
+ String objectKey = accessors.pathToKey(path);
+ ObjectMetadata md;
+ String symbol = statistic.getSymbol();
+ S3AStatisticsContext instrumentation = context.getInstrumentation();
+ try {
+ md = trackDuration(instrumentation, symbol, () ->
+ accessors.getObjectMetadata(objectKey));
+ } catch (FileNotFoundException e) {
+ // no entry. It could be a directory, so try again.
+ md = trackDuration(instrumentation, symbol, () ->
+ accessors.getObjectMetadata(objectKey + "/"));
+ }
+ // all user metadata
+ Map<String, String> rawHeaders = md.getUserMetadata();
+ Map<String, byte[]> headers = new TreeMap<>();
+ rawHeaders.forEach((key, value) ->
+ headers.put(XA_HEADER_PREFIX + key, encodeBytes(value)));
+
+ // and add the usual content length &c, if set
+ maybeSetHeader(headers, XA_CACHE_CONTROL,
+ md.getCacheControl());
+ maybeSetHeader(headers, XA_CONTENT_DISPOSITION,
+ md.getContentDisposition());
+ maybeSetHeader(headers, XA_CONTENT_ENCODING,
+ md.getContentEncoding());
+ maybeSetHeader(headers, XA_CONTENT_LANGUAGE,
+ md.getContentLanguage());
+ maybeSetHeader(headers, XA_CONTENT_LENGTH,
+ md.getContentLength());
+ maybeSetHeader(headers, XA_CONTENT_MD5,
+ md.getContentMD5());
+ maybeSetHeader(headers, XA_CONTENT_RANGE,
+ md.getContentRange());
+ maybeSetHeader(headers, XA_CONTENT_TYPE,
+ md.getContentType());
+ maybeSetHeader(headers, XA_ETAG,
+ md.getETag());
+ maybeSetHeader(headers, XA_LAST_MODIFIED,
+ md.getLastModified());
+
+ // AWS custom headers
+ maybeSetHeader(headers, XA_ARCHIVE_STATUS,
+ md.getArchiveStatus());
+ maybeSetHeader(headers, XA_OBJECT_LOCK_LEGAL_HOLD_STATUS,
+ md.getObjectLockLegalHoldStatus());
+ maybeSetHeader(headers, XA_OBJECT_LOCK_MODE,
+ md.getObjectLockMode());
+ maybeSetHeader(headers, XA_OBJECT_LOCK_RETAIN_UNTIL_DATE,
+ md.getObjectLockRetainUntilDate());
+ maybeSetHeader(headers, XA_OBJECT_REPLICATION_STATUS,
+ md.getReplicationStatus());
+ maybeSetHeader(headers, XA_S3_VERSION_ID,
+ md.getVersionId());
+ maybeSetHeader(headers, XA_SERVER_SIDE_ENCRYPTION,
+ md.getSSEAlgorithm());
+ maybeSetHeader(headers, XA_STORAGE_CLASS,
+ md.getStorageClass());
+ maybeSetHeader(headers, XA_STORAGE_CLASS,
+ md.getReplicationStatus());
+ return headers;
+ }
+
+ /**
+ * Set a header if the value is non null.
+ *
+ * @param headers header map
+ * @param name header name
+ * @param value value to encode.
+ */
+ private void maybeSetHeader(
+ final Map<String, byte[]> headers,
+ final String name,
+ final Object value) {
+ if (value != null) {
+ headers.put(name, encodeBytes(value));
+ }
+ }
+
+ /**
+ * Stringify an object and return its bytes in UTF-8 encoding.
+ * @param s source
+ * @return encoded object or an empty buffer
+ */
+ public static byte[] encodeBytes(@Nullable Object s) {
+ return s == null
+ ? EMPTY
+ : s.toString().getBytes(StandardCharsets.UTF_8);
+ }
+
+ /**
+ * Get the string value from the bytes.
+ * if null : return null, otherwise the UTF-8 decoded
+ * bytes.
+ * @param bytes source bytes
+ * @return decoded value
+ */
+ public static String decodeBytes(byte[] bytes) {
+ return bytes == null
+ ? null
+ : new String(bytes, StandardCharsets.UTF_8);
+ }
+
+ /**
+ * Get an XAttr name and value for a file or directory.
+ * @param path Path to get extended attribute
+ * @param name XAttr name.
+ * @return byte[] XAttr value or null
+ * @throws IOException IO failure
+ */
+ public byte[] getXAttr(Path path, String name) throws IOException {
+ return retrieveHeaders(path, INVOCATION_XATTR_GET_NAMED).get(name);
+ }
+
+ /**
+ * See {@code FileSystem.getXAttrs(path}.
+ *
+ * @param path Path to get extended attributes
+ * @return Map describing the XAttrs of the file or directory
+ * @throws IOException IO failure
+ */
+ public Map<String, byte[]> getXAttrs(Path path) throws IOException {
+ return retrieveHeaders(path, INVOCATION_XATTR_GET_MAP);
+ }
+
+ /**
+ * See {@code FileSystem.listXAttrs(path)}.
+ * @param path Path to get extended attributes
+ * @return List of supported XAttrs
+ * @throws IOException IO failure
+ */
+ public List<String> listXAttrs(final Path path) throws IOException {
+ return new ArrayList<>(retrieveHeaders(path, INVOCATION_OP_XATTR_LIST)
+ .keySet());
+ }
+
+ /**
+ * See {@code FileSystem.getXAttrs(path, names}.
+ * @param path Path to get extended attributes
+ * @param names XAttr names.
+ * @return Map describing the XAttrs of the file or directory
+ * @throws IOException IO failure
+ */
+ public Map<String, byte[]> getXAttrs(Path path, List<String> names)
+ throws IOException {
+ Map<String, byte[]> headers = retrieveHeaders(path,
+ INVOCATION_XATTR_GET_NAMED_MAP);
+ Map<String, byte[]> result = new TreeMap<>();
+ headers.entrySet().stream()
+ .filter(entry -> names.contains(entry.getKey()))
+ .forEach(entry -> result.put(entry.getKey(), entry.getValue()));
+ return result;
+ }
+
+ /**
+ * Convert an XAttr byte array to a long.
+ * testability.
+ * @param data data to parse
+ * @return either a length or none
+ */
+ public static Optional<Long> extractXAttrLongValue(byte[] data) {
+ String xAttr;
+ xAttr = HeaderProcessing.decodeBytes(data);
+ if (StringUtils.isNotEmpty(xAttr)) {
+ try {
+ long l = Long.parseLong(xAttr);
+ if (l >= 0) {
+ return Optional.of(l);
+ }
+ } catch (NumberFormatException ex) {
+ LOG.warn("Not a number: {}", xAttr, ex);
+ }
+ }
+ // missing/empty header or parse failure.
+ return Optional.empty();
+ }
+
+ /**
+ * Creates a copy of the passed {@link ObjectMetadata}.
+ * Does so without using the {@link ObjectMetadata#clone()} method,
+ * to avoid copying unnecessary headers.
+ * This operation does not copy the {@code X_HEADER_MAGIC_MARKER}
+ * header to avoid confusion. If a marker file is renamed,
+ * it loses information about any remapped file.
+ * If new fields are added to ObjectMetadata which are not
+ * present in the user metadata headers, they will not be picked
+ * up or cloned unless this operation is updated.
+ * @param source the {@link ObjectMetadata} to copy
+ * @param dest the metadata to update; this is the return value.
+ */
+ public void cloneObjectMetadata(ObjectMetadata source,
+ ObjectMetadata dest) {
+
+ // Possibly null attributes
+ // Allowing nulls to pass breaks it during later use
+ if (source.getCacheControl() != null) {
+ dest.setCacheControl(source.getCacheControl());
+ }
+ if (source.getContentDisposition() != null) {
+ dest.setContentDisposition(source.getContentDisposition());
+ }
+ if (source.getContentEncoding() != null) {
+ dest.setContentEncoding(source.getContentEncoding());
+ }
+ if (source.getContentMD5() != null) {
+ dest.setContentMD5(source.getContentMD5());
+ }
+ if (source.getContentType() != null) {
+ dest.setContentType(source.getContentType());
+ }
+ if (source.getExpirationTime() != null) {
+ dest.setExpirationTime(source.getExpirationTime());
+ }
+ if (source.getExpirationTimeRuleId() != null) {
+ dest.setExpirationTimeRuleId(source.getExpirationTimeRuleId());
+ }
+ if (source.getHttpExpiresDate() != null) {
+ dest.setHttpExpiresDate(source.getHttpExpiresDate());
+ }
+ if (source.getLastModified() != null) {
+ dest.setLastModified(source.getLastModified());
+ }
+ if (source.getOngoingRestore() != null) {
+ dest.setOngoingRestore(source.getOngoingRestore());
+ }
+ if (source.getRestoreExpirationTime() != null) {
+ dest.setRestoreExpirationTime(source.getRestoreExpirationTime());
+ }
+ if (source.getSSEAlgorithm() != null) {
+ dest.setSSEAlgorithm(source.getSSEAlgorithm());
+ }
+ if (source.getSSECustomerAlgorithm() != null) {
+ dest.setSSECustomerAlgorithm(source.getSSECustomerAlgorithm());
+ }
+ if (source.getSSECustomerKeyMd5() != null) {
+ dest.setSSECustomerKeyMd5(source.getSSECustomerKeyMd5());
+ }
+
+ // copy user metadata except the magic marker header.
+ source.getUserMetadata().entrySet().stream()
+ .filter(e -> !e.getKey().equals(X_HEADER_MAGIC_MARKER))
+ .forEach(e -> dest.addUserMetadata(e.getKey(), e.getValue()));
+ }
+
+}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/CountersAndGauges.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/CountersAndGauges.java
index 61cc033..f9093ff 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/CountersAndGauges.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/CountersAndGauges.java
@@ -21,11 +21,12 @@ package org.apache.hadoop.fs.s3a.statistics;
import java.time.Duration;
import org.apache.hadoop.fs.s3a.Statistic;
+import org.apache.hadoop.fs.statistics.DurationTrackerFactory;
/**
* This is the foundational API for collecting S3A statistics.
*/
-public interface CountersAndGauges {
+public interface CountersAndGauges extends DurationTrackerFactory {
/**
* Increment a specific counter.
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/impl/BondedS3AStatisticsContext.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/impl/BondedS3AStatisticsContext.java
index 006eb24..51bb4af 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/impl/BondedS3AStatisticsContext.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/impl/BondedS3AStatisticsContext.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics;
import org.apache.hadoop.fs.s3a.statistics.S3AMultipartUploaderStatistics;
import org.apache.hadoop.fs.s3a.statistics.S3AStatisticsContext;
import org.apache.hadoop.fs.s3a.statistics.StatisticsFromAwsSdk;
+import org.apache.hadoop.fs.statistics.DurationTracker;
/**
* An S3A statistics context which is bonded to a
@@ -210,6 +211,11 @@ public class BondedS3AStatisticsContext implements S3AStatisticsContext {
return new S3AMultipartUploaderStatisticsImpl(this::incrementCounter);
}
+ @Override
+ public DurationTracker trackDuration(final String key, final long count) {
+ return getInstrumentation().trackDuration(key, count);
+ }
+
/**
* This is the interface which an integration source must implement
* for the integration.
diff --git a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/committer_architecture.md b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/committer_architecture.md
index 48d75dc..048f08c 100644
--- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/committer_architecture.md
+++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/committer_architecture.md
@@ -1337,6 +1337,16 @@ On `close()`, summary data would be written to the file
`/results/latest/__magic/job400_1/task_01_01/latest.orc.lzo.pending`.
This would contain the upload ID and all the parts and etags of uploaded data.
+A marker file is also created, so that code which verifies that a newly created file
+exists does not fail.
+1. These marker files are zero bytes long.
+1. They declare the full length of the final file in the HTTP header
+ `x-hadoop-s3a-magic-data-length`.
+1. A call to `getXAttr("header.x-hadoop-s3a-magic-data-length")` will return a
+ string containing the number of bytes in the data uploaded.
+
+This is needed so that the Spark write-tracking code can report how much data
+has been created.
#### Task commit
diff --git a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/committers.md b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/committers.md
index 0e86f52..d4292df 100644
--- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/committers.md
+++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/committers.md
@@ -360,6 +360,7 @@ However, it has extra requirements of the filesystem
1. The S3A client must be configured to recognize interactions
with the magic directories and treat them specially.
+Now that Amazon S3 is consistent, the magic committer is enabled by default.
It's also not been field tested to the extent of Netflix's committer; consider
it the least mature of the committers.
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMiscOperations.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMiscOperations.java
index 2f0599d..e6ebfba 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMiscOperations.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMiscOperations.java
@@ -29,6 +29,7 @@ import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.model.GetBucketEncryptionResult;
import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.model.PutObjectRequest;
+import org.assertj.core.api.Assertions;
import org.junit.Assume;
import org.junit.Test;
@@ -47,6 +48,7 @@ import static org.apache.hadoop.fs.contract.ContractTestUtils.touch;
import static org.apache.hadoop.fs.s3a.Constants.SERVER_SIDE_ENCRYPTION_ALGORITHM;
import static org.apache.hadoop.fs.s3a.Constants.SERVER_SIDE_ENCRYPTION_KEY;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
+import static org.apache.hadoop.fs.s3a.impl.HeaderProcessing.XA_ETAG;
import static org.hamcrest.Matchers.nullValue;
/**
@@ -171,6 +173,9 @@ public class ITestS3AMiscOperations extends AbstractS3ATestBase {
assertNotEquals("file 1 checksum", 0, checksum1.getLength());
assertEquals("checksums of empty files", checksum1,
fs.getFileChecksum(touchFile("file2"), 0));
+ Assertions.assertThat(fs.getXAttr(file1, XA_ETAG))
+ .describedAs("etag from xattr")
+ .isEqualTo(checksum1.getBytes());
}
/**
@@ -222,6 +227,9 @@ public class ITestS3AMiscOperations extends AbstractS3ATestBase {
createFile(fs, file4, true,
"hello, world".getBytes(StandardCharsets.UTF_8));
assertNotEquals(checksum2, fs.getFileChecksum(file4, 0));
+ Assertions.assertThat(fs.getXAttr(file3, XA_ETAG))
+ .describedAs("etag from xattr")
+ .isEqualTo(checksum1.getBytes());
}
/**
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ARemoteFileChanged.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ARemoteFileChanged.java
index 66ec8ff..adcf578 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ARemoteFileChanged.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ARemoteFileChanged.java
@@ -272,7 +272,9 @@ public class ITestS3ARemoteFileChanged extends AbstractS3ATestBase {
@Override
public void teardown() throws Exception {
// restore the s3 client so there's no mocking interfering with the teardown
- originalS3Client.ifPresent(fs::setAmazonS3Client);
+ if (fs != null) {
+ originalS3Client.ifPresent(fs::setAmazonS3Client);
+ }
super.teardown();
}
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractITCommitProtocol.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractITCommitProtocol.java
index 1108194..14207e8 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractITCommitProtocol.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractITCommitProtocol.java
@@ -1335,11 +1335,12 @@ public abstract class AbstractITCommitProtocol extends AbstractCommitITest {
= outputFormat.getRecordWriter(tContext);
IntWritable iw = new IntWritable(1);
recordWriter.write(iw, iw);
+ long expectedLength = 4;
Path dest = recordWriter.getDest();
- validateTaskAttemptPathDuringWrite(dest);
+ validateTaskAttemptPathDuringWrite(dest, expectedLength);
recordWriter.close(tContext);
// at this point
- validateTaskAttemptPathAfterWrite(dest);
+ validateTaskAttemptPathAfterWrite(dest, expectedLength);
assertTrue("Committer does not have data to commit " + committer,
committer.needsTaskCommit(tContext));
commitTask(committer, tContext);
@@ -1750,9 +1751,11 @@ public abstract class AbstractITCommitProtocol extends AbstractCommitITest {
* Validate the path of a file being written to during the write
* itself.
* @param p path
+ * @param expectedLength
* @throws IOException IO failure
*/
- protected void validateTaskAttemptPathDuringWrite(Path p) throws IOException {
+ protected void validateTaskAttemptPathDuringWrite(Path p,
+ final long expectedLength) throws IOException {
}
@@ -1760,9 +1763,11 @@ public abstract class AbstractITCommitProtocol extends AbstractCommitITest {
* Validate the path of a file being written to after the write
* operation has completed.
* @param p path
+ * @param expectedLength
* @throws IOException IO failure
*/
- protected void validateTaskAttemptPathAfterWrite(Path p) throws IOException {
+ protected void validateTaskAttemptPathAfterWrite(Path p,
+ final long expectedLength) throws IOException {
}
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestCommitOperations.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestCommitOperations.java
index 978f08c..b025f6f 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestCommitOperations.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestCommitOperations.java
@@ -26,6 +26,7 @@ import java.util.List;
import com.amazonaws.services.s3.model.PartETag;
import org.apache.hadoop.thirdparty.com.google.common.collect.Lists;
+import org.assertj.core.api.Assertions;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -52,6 +53,7 @@ import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
import static org.apache.hadoop.fs.contract.ContractTestUtils.*;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.*;
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*;
+import static org.apache.hadoop.fs.s3a.commit.CommitOperations.extractMagicFileLength;
import static org.apache.hadoop.fs.s3a.commit.CommitUtils.*;
import static org.apache.hadoop.fs.s3a.commit.MagicCommitPaths.*;
import static org.apache.hadoop.fs.s3a.Constants.*;
@@ -216,13 +218,13 @@ public class ITestCommitOperations extends AbstractCommitITest {
@Test
public void testCommitEmptyFile() throws Throwable {
- describe("create then commit an empty file");
+ describe("create then commit an empty magic file");
createCommitAndVerify("empty-commit.txt", new byte[0]);
}
@Test
public void testCommitSmallFile() throws Throwable {
- describe("create then commit an empty file");
+ describe("create then commit a small magic file");
createCommitAndVerify("small-commit.txt", DATASET);
}
@@ -288,6 +290,64 @@ public class ITestCommitOperations extends AbstractCommitITest {
commit("child.txt", pendingChildPath, expectedDestPath, 0, 0);
}
+ /**
+ * Verify that that when a marker file is renamed, its
+ * magic marker attribute is lost.
+ */
+ @Test
+ public void testMarkerFileRename()
+ throws Exception {
+ S3AFileSystem fs = getFileSystem();
+ Path destFile = methodPath();
+ Path destDir = destFile.getParent();
+ fs.delete(destDir, true);
+ Path magicDest = makeMagic(destFile);
+ Path magicDir = magicDest.getParent();
+ fs.mkdirs(magicDir);
+
+ // use the builder API to verify it works exactly the
+ // same.
+ try (FSDataOutputStream stream = fs.createFile(magicDest)
+ .overwrite(true)
+ .recursive()
+ .build()) {
+ assertIsMagicStream(stream);
+ stream.write(DATASET);
+ }
+ Path magic2 = new Path(magicDir, "magic2");
+ // rename the marker
+ fs.rename(magicDest, magic2);
+
+ // the renamed file has no header
+ Assertions.assertThat(extractMagicFileLength(fs, magic2))
+ .describedAs("XAttribute " + XA_MAGIC_MARKER + " of " + magic2)
+ .isEmpty();
+ // abort the upload, which is driven by the .pending files
+ // there must be 1 deleted file; during test debugging with aborted
+ // runs there may be more.
+ Assertions.assertThat(newCommitOperations()
+ .abortPendingUploadsUnderPath(destDir))
+ .describedAs("Aborting all pending uploads under %s", destDir)
+ .isGreaterThanOrEqualTo(1);
+ }
+
+ /**
+ * Assert that an output stream is magic.
+ * @param stream stream to probe.
+ */
+ protected void assertIsMagicStream(final FSDataOutputStream stream) {
+ Assertions.assertThat(stream.hasCapability(STREAM_CAPABILITY_MAGIC_OUTPUT))
+ .describedAs("Stream capability %s in stream %s",
+ STREAM_CAPABILITY_MAGIC_OUTPUT, stream)
+ .isTrue();
+ }
+
+ /**
+ * Create a file through the magic commit mechanism.
+ * @param filename file to create (with __magic path.)
+ * @param data data to write
+ * @throws Exception failure
+ */
private void createCommitAndVerify(String filename, byte[] data)
throws Exception {
S3AFileSystem fs = getFileSystem();
@@ -295,19 +355,30 @@ public class ITestCommitOperations extends AbstractCommitITest {
fs.delete(destFile.getParent(), true);
Path magicDest = makeMagic(destFile);
assertPathDoesNotExist("Magic file should not exist", magicDest);
+ long dataSize = data != null ? data.length : 0;
try(FSDataOutputStream stream = fs.create(magicDest, true)) {
- assertTrue(stream.hasCapability(STREAM_CAPABILITY_MAGIC_OUTPUT));
- if (data != null && data.length > 0) {
+ assertIsMagicStream(stream);
+ if (dataSize > 0) {
stream.write(data);
}
stream.close();
}
FileStatus status = getFileStatusEventually(fs, magicDest,
CONSISTENCY_WAIT);
- assertEquals("Non empty marker file: " + status, 0, status.getLen());
-
+ assertEquals("Magic marker file is not zero bytes: " + status,
+ 0, 0);
+ Assertions.assertThat(extractMagicFileLength(fs,
+ magicDest))
+ .describedAs("XAttribute " + XA_MAGIC_MARKER + " of " + magicDest)
+ .isNotEmpty()
+ .hasValue(dataSize);
commit(filename, destFile, HIGH_THROTTLE, 0);
verifyFileContents(fs, destFile, data);
+ // the destination file doesn't have the attribute
+ Assertions.assertThat(extractMagicFileLength(fs,
+ destFile))
+ .describedAs("XAttribute " + XA_MAGIC_MARKER + " of " + destFile)
+ .isEmpty();
}
/**
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestMagicCommitProtocol.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestMagicCommitProtocol.java
index f6d6307..7ee1833 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestMagicCommitProtocol.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestMagicCommitProtocol.java
@@ -20,17 +20,21 @@ package org.apache.hadoop.fs.s3a.commit.magic;
import java.io.IOException;
import java.net.URI;
+import java.util.List;
import org.assertj.core.api.Assertions;
import org.junit.Test;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.contract.ContractTestUtils;
+import org.apache.hadoop.fs.s3a.S3AFileSystem;
import org.apache.hadoop.fs.s3a.commit.AbstractITCommitProtocol;
import org.apache.hadoop.fs.s3a.commit.AbstractS3ACommitter;
import org.apache.hadoop.fs.s3a.commit.CommitConstants;
+import org.apache.hadoop.fs.s3a.commit.CommitOperations;
import org.apache.hadoop.fs.s3a.commit.CommitUtils;
import org.apache.hadoop.fs.s3a.commit.CommitterFaultInjection;
import org.apache.hadoop.fs.s3a.commit.CommitterFaultInjectionImpl;
@@ -39,6 +43,7 @@ import org.apache.hadoop.mapreduce.JobStatus;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
+import static org.apache.hadoop.fs.s3a.S3AUtils.listAndFilter;
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*;
import static org.hamcrest.CoreMatchers.containsString;
@@ -107,18 +112,44 @@ public class ITestMagicCommitProtocol extends AbstractITCommitProtocol {
return new CommitterWithFailedThenSucceed(getOutDir(), tContext);
}
- protected void validateTaskAttemptPathDuringWrite(Path p) throws IOException {
+ protected void validateTaskAttemptPathDuringWrite(Path p,
+ final long expectedLength) throws IOException {
String pathStr = p.toString();
assertTrue("not magic " + pathStr,
pathStr.contains(MAGIC));
assertPathDoesNotExist("task attempt visible", p);
}
- protected void validateTaskAttemptPathAfterWrite(Path p) throws IOException {
- FileStatus st = getFileSystem().getFileStatus(p);
- assertEquals("file length in " + st, 0, st.getLen());
- Path pendingFile = new Path(p.toString() + PENDING_SUFFIX);
+ protected void validateTaskAttemptPathAfterWrite(Path marker,
+ final long expectedLength) throws IOException {
+ // the pending file exists
+ Path pendingFile = new Path(marker.toString() + PENDING_SUFFIX);
assertPathExists("pending file", pendingFile);
+ S3AFileSystem fs = getFileSystem();
+
+ // THIS SEQUENCE MUST BE RUN IN ORDER ON A S3GUARDED
+ // STORE
+ // if you list the parent dir and find the marker, it
+ // is really 0 bytes long
+ String name = marker.getName();
+ List<LocatedFileStatus> filtered = listAndFilter(fs,
+ marker.getParent(), false,
+ (path) -> path.getName().equals(name));
+ Assertions.assertThat(filtered)
+ .hasSize(1);
+ Assertions.assertThat(filtered.get(0))
+ .matches(lst -> lst.getLen() == 0,
+ "Listing should return 0 byte length");
+
+ // marker file is empty
+ FileStatus st = fs.getFileStatus(marker);
+ assertEquals("file length in " + st, 0, st.getLen());
+ // xattr header
+ Assertions.assertThat(CommitOperations.extractMagicFileLength(fs,
+ marker))
+ .describedAs("XAttribute " + XA_MAGIC_MARKER)
+ .isNotEmpty()
+ .hasValue(expectedLength);
}
/**
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestStagingCommitProtocol.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestStagingCommitProtocol.java
index a4dface..826c3cd 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestStagingCommitProtocol.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestStagingCommitProtocol.java
@@ -23,6 +23,7 @@ import java.util.UUID;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.contract.ContractTestUtils;
@@ -113,14 +114,20 @@ public class ITestStagingCommitProtocol extends AbstractITCommitProtocol {
IOException.class);
}
- protected void validateTaskAttemptPathDuringWrite(Path p) throws IOException {
+ protected void validateTaskAttemptPathDuringWrite(Path p,
+ final long expectedLength) throws IOException {
// this is expected to be local FS
ContractTestUtils.assertPathExists(getLocalFS(), "task attempt", p);
}
- protected void validateTaskAttemptPathAfterWrite(Path p) throws IOException {
+ protected void validateTaskAttemptPathAfterWrite(Path p,
+ final long expectedLength) throws IOException {
// this is expected to be local FS
- ContractTestUtils.assertPathExists(getLocalFS(), "task attempt", p);
+ // this is expected to be local FS
+ FileSystem localFS = getLocalFS();
+ ContractTestUtils.assertPathExists(localFS, "task attempt", p);
+ FileStatus st = localFS.getFileStatus(p);
+ assertEquals("file length in " + st, expectedLength, st.getLen());
}
protected FileSystem getLocalFS() throws IOException {
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestXAttrCost.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestXAttrCost.java
new file mode 100644
index 0000000..aa3ceca
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestXAttrCost.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.impl;
+
+import java.io.FileNotFoundException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+import org.assertj.core.api.AbstractStringAssert;
+import org.assertj.core.api.Assertions;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.S3AFileSystem;
+import org.apache.hadoop.fs.s3a.performance.AbstractS3ACostTest;
+
+import static org.apache.hadoop.fs.s3a.Statistic.INVOCATION_OP_XATTR_LIST;
+import static org.apache.hadoop.fs.s3a.Statistic.INVOCATION_XATTR_GET_MAP;
+import static org.apache.hadoop.fs.s3a.Statistic.INVOCATION_XATTR_GET_NAMED;
+import static org.apache.hadoop.fs.s3a.impl.HeaderProcessing.CONTENT_TYPE_OCTET_STREAM;
+import static org.apache.hadoop.fs.s3a.impl.HeaderProcessing.CONTENT_TYPE_APPLICATION_XML;
+import static org.apache.hadoop.fs.s3a.impl.HeaderProcessing.XA_CONTENT_LENGTH;
+import static org.apache.hadoop.fs.s3a.impl.HeaderProcessing.XA_CONTENT_TYPE;
+import static org.apache.hadoop.fs.s3a.impl.HeaderProcessing.XA_STANDARD_HEADERS;
+import static org.apache.hadoop.fs.s3a.impl.HeaderProcessing.decodeBytes;
+import static org.apache.hadoop.fs.s3a.performance.OperationCost.CREATE_FILE_OVERWRITE;
+
+/**
+ * Invoke XAttr API calls against objects in S3 and validate header
+ * extraction.
+ */
+public class ITestXAttrCost extends AbstractS3ACostTest {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(ITestXAttrCost.class);
+
+ private static final int GET_METADATA_ON_OBJECT = 1;
+ private static final int GET_METADATA_ON_DIR = GET_METADATA_ON_OBJECT * 2;
+
+ public ITestXAttrCost() {
+ // no parameterization here
+ super(false, true, false);
+ }
+
+ @Test
+ public void testXAttrRoot() throws Throwable {
+ describe("Test xattr on root");
+ Path root = new Path("/");
+ S3AFileSystem fs = getFileSystem();
+ Map<String, byte[]> xAttrs = verifyMetrics(
+ () -> fs.getXAttrs(root),
+ with(INVOCATION_XATTR_GET_MAP, GET_METADATA_ON_OBJECT));
+ logXAttrs(xAttrs);
+ List<String> headerList = verifyMetrics(() ->
+ fs.listXAttrs(root),
+ with(INVOCATION_OP_XATTR_LIST, GET_METADATA_ON_OBJECT));
+
+ // verify this contains all the standard markers,
+ // but not the magic marker header
+ Assertions.assertThat(headerList)
+ .describedAs("Headers on root object")
+ .containsOnly(
+ XA_CONTENT_LENGTH,
+ XA_CONTENT_TYPE);
+ assertHeaderEntry(xAttrs, XA_CONTENT_TYPE)
+ .isEqualTo(CONTENT_TYPE_APPLICATION_XML);
+ }
+
+ /**
+ * Log the attributes as strings.
+ * @param xAttrs map of attributes
+ */
+ private void logXAttrs(final Map<String, byte[]> xAttrs) {
+ xAttrs.forEach((k, v) ->
+ LOG.info("{} has bytes[{}] => \"{}\"",
+ k, v.length, decodeBytes(v)));
+ }
+
+ @Test
+ public void testXAttrFile() throws Throwable {
+ describe("Test xattr on a file");
+ Path testFile = methodPath();
+ create(testFile, true, CREATE_FILE_OVERWRITE);
+ S3AFileSystem fs = getFileSystem();
+ Map<String, byte[]> xAttrs = verifyMetrics(() ->
+ fs.getXAttrs(testFile),
+ with(INVOCATION_XATTR_GET_MAP, GET_METADATA_ON_OBJECT));
+ logXAttrs(xAttrs);
+ assertHeaderEntry(xAttrs, XA_CONTENT_LENGTH)
+ .isEqualTo("0");
+
+ // get the list of supported headers
+ List<String> headerList = verifyMetrics(
+ () -> fs.listXAttrs(testFile),
+ with(INVOCATION_OP_XATTR_LIST, GET_METADATA_ON_OBJECT));
+ // verify this contains all the standard markers,
+ // but not the magic marker header
+ Assertions.assertThat(headerList)
+ .describedAs("Supported headers")
+ .containsAnyElementsOf(Arrays.asList(XA_STANDARD_HEADERS));
+
+ // ask for one header and validate its value
+ byte[] bytes = verifyMetrics(() ->
+ fs.getXAttr(testFile, XA_CONTENT_LENGTH),
+ with(INVOCATION_XATTR_GET_NAMED, GET_METADATA_ON_OBJECT));
+ assertHeader(XA_CONTENT_LENGTH, bytes)
+ .isEqualTo("0");
+ assertHeaderEntry(xAttrs, XA_CONTENT_TYPE)
+ .isEqualTo(CONTENT_TYPE_OCTET_STREAM);
+ }
+
+ /**
+ * Directory attributes can be retrieved, but they take two HEAD requests.
+ * @throws Throwable
+ */
+ @Test
+ public void testXAttrDir() throws Throwable {
+ describe("Test xattr on a dir");
+
+ S3AFileSystem fs = getFileSystem();
+ Path dir = methodPath();
+ fs.mkdirs(dir);
+ Map<String, byte[]> xAttrs = verifyMetrics(() ->
+ fs.getXAttrs(dir),
+ with(INVOCATION_XATTR_GET_MAP, GET_METADATA_ON_DIR));
+ logXAttrs(xAttrs);
+ assertHeaderEntry(xAttrs, XA_CONTENT_LENGTH)
+ .isEqualTo("0");
+
+ // get the list of supported headers
+ List<String> headerList = verifyMetrics(
+ () -> fs.listXAttrs(dir),
+ with(INVOCATION_OP_XATTR_LIST, GET_METADATA_ON_DIR));
+ // verify this contains all the standard markers,
+ // but not the magic marker header
+ Assertions.assertThat(headerList)
+ .describedAs("Supported headers")
+ .containsAnyElementsOf(Arrays.asList(XA_STANDARD_HEADERS));
+
+ // ask for one header and validate its value
+ byte[] bytes = verifyMetrics(() ->
+ fs.getXAttr(dir, XA_CONTENT_LENGTH),
+ with(INVOCATION_XATTR_GET_NAMED, GET_METADATA_ON_DIR));
+ assertHeader(XA_CONTENT_LENGTH, bytes)
+ .isEqualTo("0");
+ assertHeaderEntry(xAttrs, XA_CONTENT_TYPE)
+ .isEqualTo(CONTENT_TYPE_OCTET_STREAM);
+ }
+
+ /**
+ * When the operations are called on a missing path, FNFE is
+ * raised and only one attempt is made to retry the operation.
+ */
+ @Test
+ public void testXAttrMissingFile() throws Throwable {
+ describe("Test xattr on a missing path");
+ Path testFile = methodPath();
+ S3AFileSystem fs = getFileSystem();
+ int getMetadataOnMissingFile = GET_METADATA_ON_DIR;
+ verifyMetricsIntercepting(FileNotFoundException.class, "", () ->
+ fs.getXAttrs(testFile),
+ with(INVOCATION_XATTR_GET_MAP, getMetadataOnMissingFile));
+ verifyMetricsIntercepting(FileNotFoundException.class, "", () ->
+ fs.getXAttr(testFile, XA_CONTENT_LENGTH),
+ with(INVOCATION_XATTR_GET_NAMED, getMetadataOnMissingFile));
+ verifyMetricsIntercepting(FileNotFoundException.class, "", () ->
+ fs.listXAttrs(testFile),
+ with(INVOCATION_OP_XATTR_LIST, getMetadataOnMissingFile));
+ }
+
+ /**
+ * Generate an assert on a named header in the map.
+ * @param xAttrs attribute map
+ * @param key header key
+ * @return the assertion
+ */
+ private AbstractStringAssert<?> assertHeaderEntry(
+ Map<String, byte[]> xAttrs, String key) {
+
+ return assertHeader(key, xAttrs.get(key));
+ }
+
+ /**
+ * Create an assertion on the header; check for the bytes
+ * being non-null/empty and then returns the decoded values
+ * as a string assert.
+ * @param key header key (for error)
+ * @param bytes value
+ * @return the assertion
+ */
+ private AbstractStringAssert<?> assertHeader(final String key,
+ final byte[] bytes) {
+
+ String decoded = decodeBytes(bytes);
+ return Assertions.assertThat(decoded)
+ .describedAs("xattr %s decoded to: %s", key, decoded)
+ .isNotNull()
+ .isNotEmpty();
+ }
+}
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestHeaderProcessing.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestHeaderProcessing.java
new file mode 100644
index 0000000..e0c6fee
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestHeaderProcessing.java
@@ -0,0 +1,313 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.impl;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import com.amazonaws.services.s3.model.ObjectMetadata;
+import org.assertj.core.api.Assertions;
+import org.assertj.core.util.Lists;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.S3ATestUtils;
+import org.apache.hadoop.fs.s3a.test.OperationTrackingStore;
+import org.apache.hadoop.test.HadoopTestBase;
+
+import static java.lang.System.currentTimeMillis;
+import static org.apache.hadoop.fs.s3a.commit.CommitConstants.XA_MAGIC_MARKER;
+import static org.apache.hadoop.fs.s3a.commit.CommitConstants.X_HEADER_MAGIC_MARKER;
+import static org.apache.hadoop.fs.s3a.impl.HeaderProcessing.XA_LAST_MODIFIED;
+import static org.apache.hadoop.fs.s3a.impl.HeaderProcessing.XA_CONTENT_LENGTH;
+import static org.apache.hadoop.fs.s3a.impl.HeaderProcessing.decodeBytes;
+import static org.apache.hadoop.fs.s3a.impl.HeaderProcessing.encodeBytes;
+import static org.apache.hadoop.fs.s3a.impl.HeaderProcessing.extractXAttrLongValue;
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+
+/**
+ * Unit tests of header processing logic in {@link HeaderProcessing}.
+ * Builds up a context accessor where the path
+ * defined in {@link #MAGIC_PATH} exists and returns object metadata.
+ *
+ */
+public class TestHeaderProcessing extends HadoopTestBase {
+
+ private static final XAttrContextAccessor CONTEXT_ACCESSORS
+ = new XAttrContextAccessor();
+
+ public static final String VALUE = "abcdeFGHIJ123!@##&82;";
+
+ public static final long FILE_LENGTH = 1024;
+
+ private static final String FINAL_FILE = "s3a://bucket/dest/output.csv";
+
+ private StoreContext context;
+
+ private HeaderProcessing headerProcessing;
+
+ private static final String MAGIC_KEY
+ = "dest/__magic/job1/ta1/__base/output.csv";
+ private static final String MAGIC_FILE
+ = "s3a://bucket/" + MAGIC_KEY;
+
+ private static final Path MAGIC_PATH =
+ new Path(MAGIC_FILE);
+
+ public static final long MAGIC_LEN = 4096L;
+
+ /**
+ * All the XAttrs which are built up.
+ */
+ private static final String[] RETRIEVED_XATTRS = {
+ XA_MAGIC_MARKER,
+ XA_CONTENT_LENGTH,
+ XA_LAST_MODIFIED
+ };
+
+ @Before
+ public void setup() throws Exception {
+ CONTEXT_ACCESSORS.len = FILE_LENGTH;
+ CONTEXT_ACCESSORS.userHeaders.put(
+ X_HEADER_MAGIC_MARKER,
+ Long.toString(MAGIC_LEN));
+ context = S3ATestUtils.createMockStoreContext(true,
+ new OperationTrackingStore(), CONTEXT_ACCESSORS);
+ headerProcessing = new HeaderProcessing(context);
+ }
+
+ @Test
+ public void testByteRoundTrip() throws Throwable {
+ Assertions.assertThat(decodeBytes(encodeBytes(VALUE)))
+ .describedAs("encoding of " + VALUE)
+ .isEqualTo(VALUE);
+ }
+
+ @Test
+ public void testGetMarkerXAttr() throws Throwable {
+ assertAttributeHasValue(XA_MAGIC_MARKER, MAGIC_LEN);
+ }
+
+ @Test
+ public void testGetLengthXAttr() throws Throwable {
+ assertAttributeHasValue(XA_CONTENT_LENGTH, FILE_LENGTH);
+ }
+
+ /**
+ * Last modified makes it through.
+ */
+ @Test
+ public void testGetDateXAttr() throws Throwable {
+ Assertions.assertThat(
+ decodeBytes(headerProcessing.getXAttr(MAGIC_PATH,
+ XA_LAST_MODIFIED)))
+ .describedAs("XAttribute " + XA_LAST_MODIFIED)
+ .isEqualTo(CONTEXT_ACCESSORS.date.toString());
+ }
+
+ /**
+ * The API calls on unknown paths raise 404s.
+ */
+ @Test
+ public void test404() throws Throwable {
+ intercept(FileNotFoundException.class, () ->
+ headerProcessing.getXAttr(new Path(FINAL_FILE), XA_MAGIC_MARKER));
+ }
+
+ /**
+ * This call returns all the attributes which aren't null, including
+ * all the standard HTTP headers.
+ */
+ @Test
+ public void testGetAllXAttrs() throws Throwable {
+ Map<String, byte[]> xAttrs = headerProcessing.getXAttrs(MAGIC_PATH);
+ Assertions.assertThat(xAttrs.keySet())
+ .describedAs("Attribute keys")
+ .contains(RETRIEVED_XATTRS);
+ }
+
+ /**
+ * This call returns all the attributes which aren't null, including
+ * all the standard HTTP headers.
+ */
+ @Test
+ public void testListXAttrKeys() throws Throwable {
+ List<String> xAttrs = headerProcessing.listXAttrs(MAGIC_PATH);
+ Assertions.assertThat(xAttrs)
+ .describedAs("Attribute keys")
+ .contains(RETRIEVED_XATTRS);
+ }
+
+ /**
+ * Filtering is on attribute key, not header.
+ */
+ @Test
+ public void testGetFilteredXAttrs() throws Throwable {
+ Map<String, byte[]> xAttrs = headerProcessing.getXAttrs(MAGIC_PATH,
+ Lists.list(XA_MAGIC_MARKER, XA_CONTENT_LENGTH, "unknown"));
+ Assertions.assertThat(xAttrs.keySet())
+ .describedAs("Attribute keys")
+ .containsExactlyInAnyOrder(XA_MAGIC_MARKER, XA_CONTENT_LENGTH);
+ // and the values are good
+ assertLongAttributeValue(
+ XA_MAGIC_MARKER,
+ xAttrs.get(XA_MAGIC_MARKER),
+ MAGIC_LEN);
+ assertLongAttributeValue(
+ XA_CONTENT_LENGTH,
+ xAttrs.get(XA_CONTENT_LENGTH),
+ FILE_LENGTH);
+ }
+
+ /**
+ * An empty list of keys results in empty results.
+ */
+ @Test
+ public void testFilterEmptyXAttrs() throws Throwable {
+ Map<String, byte[]> xAttrs = headerProcessing.getXAttrs(MAGIC_PATH,
+ Lists.list());
+ Assertions.assertThat(xAttrs.keySet())
+ .describedAs("Attribute keys")
+ .isEmpty();
+ }
+
+ /**
+ * Add two headers to the metadata, then verify that
+ * the magic marker header is copied, but not the other header.
+ */
+ @Test
+ public void testMetadataCopySkipsMagicAttribute() throws Throwable {
+
+ final String owner = "x-header-owner";
+ final String root = "root";
+ CONTEXT_ACCESSORS.userHeaders.put(owner, root);
+ final ObjectMetadata source = context.getContextAccessors()
+ .getObjectMetadata(MAGIC_KEY);
+ final Map<String, String> sourceUserMD = source.getUserMetadata();
+ Assertions.assertThat(sourceUserMD.get(owner))
+ .describedAs("owner header in copied MD")
+ .isEqualTo(root);
+
+ ObjectMetadata dest = new ObjectMetadata();
+ headerProcessing.cloneObjectMetadata(source, dest);
+
+ Assertions.assertThat(dest.getUserMetadata().get(X_HEADER_MAGIC_MARKER))
+ .describedAs("Magic marker header in copied MD")
+ .isNull();
+ Assertions.assertThat(dest.getUserMetadata().get(owner))
+ .describedAs("owner header in copied MD")
+ .isEqualTo(root);
+ }
+
+ /**
+ * Assert that an XAttr has a specific long value.
+ * @param key attribute key
+ * @param bytes bytes of the attribute.
+ * @param expected expected numeric value.
+ */
+ private void assertLongAttributeValue(
+ final String key,
+ final byte[] bytes,
+ final long expected) {
+ Assertions.assertThat(extractXAttrLongValue(bytes))
+ .describedAs("XAttribute " + key)
+ .isNotEmpty()
+ .hasValue(expected);
+ }
+
+ /**
+ * Assert that a retrieved XAttr has a specific long value.
+ * @param key attribute key
+ * @param expected expected numeric value.
+ */
+ protected void assertAttributeHasValue(final String key,
+ final long expected)
+ throws IOException {
+ assertLongAttributeValue(
+ key,
+ headerProcessing.getXAttr(MAGIC_PATH, key),
+ expected);
+ }
+
+ /**
+ * Context accessor with XAttrs returned for the {@link #MAGIC_PATH}
+ * path.
+ */
+ private static final class XAttrContextAccessor
+ implements ContextAccessors {
+
+ private final Map<String, String> userHeaders = new HashMap<>();
+
+ private long len;
+ private Date date = new Date(currentTimeMillis());
+
+ @Override
+ public Path keyToPath(final String key) {
+ return new Path("s3a://bucket/" + key);
+ }
+
+ @Override
+ public String pathToKey(final Path path) {
+ // key is path with leading / stripped.
+ String key = path.toUri().getPath();
+ return key.length() > 1 ? key.substring(1) : key;
+ }
+
+ @Override
+ public File createTempFile(final String prefix, final long size)
+ throws IOException {
+ throw new UnsupportedOperationException("unsppported");
+ }
+
+ @Override
+ public String getBucketLocation() throws IOException {
+ return null;
+ }
+
+ @Override
+ public Path makeQualified(final Path path) {
+ return path;
+ }
+
+ @Override
+ public ObjectMetadata getObjectMetadata(final String key)
+ throws IOException {
+ if (MAGIC_KEY.equals(key)) {
+ ObjectMetadata omd = new ObjectMetadata();
+ omd.setUserMetadata(userHeaders);
+ omd.setContentLength(len);
+ omd.setLastModified(date);
+ return omd;
+ } else {
+ throw new FileNotFoundException(key);
+ }
+ }
+
+ public void setHeader(String key, String val) {
+ userHeaders.put(key, val);
+ }
+ }
+
+}
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestPartialDeleteFailures.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestPartialDeleteFailures.java
index a2e7031..42714cb 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestPartialDeleteFailures.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestPartialDeleteFailures.java
@@ -29,6 +29,7 @@ import java.util.stream.Collectors;
import com.amazonaws.services.s3.model.DeleteObjectsRequest;
import com.amazonaws.services.s3.model.MultiObjectDeleteException;
import org.apache.hadoop.thirdparty.com.google.common.collect.Lists;
+import com.amazonaws.services.s3.model.ObjectMetadata;
import org.assertj.core.api.Assertions;
import org.junit.Before;
import org.junit.Test;
@@ -226,7 +227,8 @@ public class TestPartialDeleteFailures {
}
- private static class MinimalContextAccessor implements ContextAccessors {
+ private static final class MinimalContextAccessor
+ implements ContextAccessors {
@Override
public Path keyToPath(final String key) {
@@ -253,6 +255,12 @@ public class TestPartialDeleteFailures {
public Path makeQualified(final Path path) {
return path;
}
+
+ @Override
+ public ObjectMetadata getObjectMetadata(final String key)
+ throws IOException {
+ return new ObjectMetadata();
+ }
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org