You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by st...@apache.org on 2022/03/23 20:13:55 UTC

[hadoop] branch branch-3.3 updated (55f4baf -> b749438)

This is an automated email from the ASF dual-hosted git repository.

stevel pushed a change to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/hadoop.git.


    from 55f4baf  HDFS-16501. Print the exception when reporting a bad block (#4062)
     new ebdd877  HADOOP-17851. S3A to support user-specified content encoding (#3498)
     new b749438  HADOOP-14661. Add S3 requester pays bucket support to S3A (#3962)

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../java/org/apache/hadoop/fs/s3a/Constants.java   |  15 +++
 .../hadoop/fs/s3a/DefaultS3ClientFactory.java      |   8 ++
 .../org/apache/hadoop/fs/s3a/S3AFileSystem.java    |   5 +
 .../org/apache/hadoop/fs/s3a/S3ClientFactory.java  |   4 +-
 .../apache/hadoop/fs/s3a/api/RequestFactory.java   |   6 ++
 .../hadoop/fs/s3a/impl/HeaderProcessing.java       |   2 +-
 .../hadoop/fs/s3a/impl/RequestFactoryImpl.java     |  72 ++++++++-----
 .../src/site/markdown/tools/hadoop-aws/index.md    |  30 ++++++
 .../src/site/markdown/tools/hadoop-aws/testing.md  |  25 +++++
 .../tools/hadoop-aws/troubleshooting_s3a.md        |  14 +++
 .../hadoop/fs/s3a/ITestS3AContentEncoding.java     |  93 +++++++++++++++++
 .../hadoop/fs/s3a/ITestS3ARequesterPays.java       | 113 +++++++++++++++++++++
 .../org/apache/hadoop/fs/s3a/S3ATestConstants.java |  11 ++
 13 files changed, 371 insertions(+), 27 deletions(-)
 create mode 100644 hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AContentEncoding.java
 create mode 100644 hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ARequesterPays.java

---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[hadoop] 01/02: HADOOP-17851. S3A to support user-specified content encoding (#3498)

Posted by st...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

stevel pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/hadoop.git

commit ebdd8771c5725668f8d1bff5ad6aa8296e57d168
Author: Steve Loughran <st...@cloudera.com>
AuthorDate: Wed Sep 29 13:42:07 2021 +0100

    HADOOP-17851. S3A to support user-specified content encoding (#3498)
    
    The option fs.s3a.object.content.encoding declares the content encoding to be set on files when they are written; this is served up in the "Content-Encoding" HTTP header when reading objects back in.
    
    This is useful for people loading the data into other tools in the AWS ecosystem which don't use file extensions to infer compression type (e.g. serving compressed files from S3 or importing into RDS)
    
    Contributed by: Holden Karau
    
    Change-Id: Ice0da75b516370f51f79e45f391d46c5c7aa4ce4
---
 .../java/org/apache/hadoop/fs/s3a/Constants.java   |  6 ++
 .../org/apache/hadoop/fs/s3a/S3AFileSystem.java    |  4 +
 .../apache/hadoop/fs/s3a/api/RequestFactory.java   |  6 ++
 .../hadoop/fs/s3a/impl/HeaderProcessing.java       |  2 +-
 .../hadoop/fs/s3a/impl/RequestFactoryImpl.java     | 54 ++++++++++++-
 .../src/site/markdown/tools/hadoop-aws/index.md    | 11 +++
 .../hadoop/fs/s3a/ITestS3AContentEncoding.java     | 93 ++++++++++++++++++++++
 7 files changed, 171 insertions(+), 5 deletions(-)

diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
index e5fccf0..dd7e425 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
@@ -413,6 +413,12 @@ public final class Constants {
   public static final String CANNED_ACL = "fs.s3a.acl.default";
   public static final String DEFAULT_CANNED_ACL = "";
 
+  /**
+   * Content encoding: gzip, deflate, compress, br, etc.
+   * Value {@value}.
+   */
+  public static final String CONTENT_ENCODING = "fs.s3a.object.content.encoding";
+
   // should we try to purge old multipart uploads when starting up
   public static final String PURGE_EXISTING_MULTIPART =
       "fs.s3a.multipart.purge";
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
index 565dc82..cbefdf7 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
@@ -937,12 +937,16 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
     // request factory.
     initCannedAcls(getConf());
 
+    // Any encoding type
+    String contentEncoding = getConf().getTrimmed(CONTENT_ENCODING, null);
+
     return RequestFactoryImpl.builder()
         .withBucket(requireNonNull(bucket))
         .withCannedACL(getCannedACL())
         .withEncryptionSecrets(requireNonNull(encryptionSecrets))
         .withMultipartPartCountLimit(partCountLimit)
         .withRequestPreparer(getAuditManager()::requestCreated)
+        .withContentEncoding(contentEncoding)
         .build();
   }
 
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/api/RequestFactory.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/api/RequestFactory.java
index f3e3e56..97a15d9 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/api/RequestFactory.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/api/RequestFactory.java
@@ -101,6 +101,12 @@ public interface RequestFactory {
   S3AEncryptionMethods getServerSideEncryptionAlgorithm();
 
   /**
+   * Get the content encoding (e.g. gzip) or return null if none.
+   * @return content encoding
+   */
+  String getContentEncoding();
+
+  /**
    * Create a new object metadata instance.
    * Any standard metadata headers are added here, for example:
    * encryption.
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/HeaderProcessing.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/HeaderProcessing.java
index 17394b7..f75066e 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/HeaderProcessing.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/HeaderProcessing.java
@@ -83,7 +83,7 @@ public class HeaderProcessing extends AbstractStoreOperation {
       XA_HEADER_PREFIX + Headers.CONTENT_DISPOSITION;
 
   /**
-   * Standard HTTP header found on some S3 objects: {@value}.
+   * Content encoding; can be configured: {@value}.
    */
   public static final String XA_CONTENT_ENCODING =
       XA_HEADER_PREFIX + Headers.CONTENT_ENCODING;
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RequestFactoryImpl.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RequestFactoryImpl.java
index daa9076..3c26fe2 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RequestFactoryImpl.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RequestFactoryImpl.java
@@ -119,6 +119,11 @@ public class RequestFactoryImpl implements RequestFactory {
   private final PrepareRequest requestPreparer;
 
   /**
+   * Content encoding (null for none).
+   */
+  private final String contentEncoding;
+
+  /**
    * Constructor.
    * @param builder builder with all the configuration.
    */
@@ -130,6 +135,7 @@ public class RequestFactoryImpl implements RequestFactory {
     this.multipartPartCountLimit = builder.multipartPartCountLimit;
     this.requesterPays = builder.requesterPays;
     this.requestPreparer = builder.requestPreparer;
+    this.contentEncoding = builder.contentEncoding;
   }
 
   /**
@@ -194,6 +200,15 @@ public class RequestFactoryImpl implements RequestFactory {
   }
 
   /**
+   * Get the content encoding (e.g. gzip) or return null if none.
+   * @return content encoding
+   */
+  @Override
+  public String getContentEncoding() {
+    return contentEncoding;
+  }
+
+  /**
    * Sets server side encryption parameters to the part upload
    * request when encryption is enabled.
    * @param request upload part request
@@ -236,13 +251,18 @@ public class RequestFactoryImpl implements RequestFactory {
   /**
    * Set the optional metadata for an object being created or copied.
    * @param metadata to update.
+   * @param isDirectoryMarker is this for a directory marker?
    */
-  protected void setOptionalObjectMetadata(ObjectMetadata metadata) {
+  protected void setOptionalObjectMetadata(ObjectMetadata metadata,
+      boolean isDirectoryMarker) {
     final S3AEncryptionMethods algorithm
         = getServerSideEncryptionAlgorithm();
     if (S3AEncryptionMethods.SSE_S3 == algorithm) {
       metadata.setSSEAlgorithm(algorithm.getMethod());
     }
+    if (contentEncoding != null && !isDirectoryMarker) {
+      metadata.setContentEncoding(contentEncoding);
+    }
   }
 
   /**
@@ -255,8 +275,21 @@ public class RequestFactoryImpl implements RequestFactory {
    */
   @Override
   public ObjectMetadata newObjectMetadata(long length) {
+    return createObjectMetadata(length, false);
+  }
+
+  /**
+   * Create a new object metadata instance.
+   * Any standard metadata headers are added here, for example:
+   * encryption.
+   *
+   * @param length length of data to set in header; Ignored if negative
+   * @param isDirectoryMarker is this for a directory marker?
+   * @return a new metadata instance
+   */
+  private ObjectMetadata createObjectMetadata(long length, boolean isDirectoryMarker) {
     final ObjectMetadata om = new ObjectMetadata();
-    setOptionalObjectMetadata(om);
+    setOptionalObjectMetadata(om, isDirectoryMarker);
     if (length >= 0) {
       om.setContentLength(length);
     }
@@ -271,7 +304,7 @@ public class RequestFactoryImpl implements RequestFactory {
         new CopyObjectRequest(getBucket(), srcKey, getBucket(), dstKey);
     ObjectMetadata dstom = newObjectMetadata(srcom.getContentLength());
     HeaderProcessing.cloneObjectMetadata(srcom, dstom);
-    setOptionalObjectMetadata(dstom);
+    setOptionalObjectMetadata(dstom, false);
     copyEncryptionParameters(srcom, copyObjectRequest);
     copyObjectRequest.setCannedAccessControlList(cannedACL);
     copyObjectRequest.setNewObjectMetadata(dstom);
@@ -371,7 +404,7 @@ public class RequestFactoryImpl implements RequestFactory {
       }
     };
     // preparation happens in here
-    final ObjectMetadata md = newObjectMetadata(0L);
+    final ObjectMetadata md = createObjectMetadata(0L, true);
     md.setContentType(HeaderProcessing.CONTENT_TYPE_X_DIRECTORY);
     PutObjectRequest putObjectRequest =
         newPutObjectRequest(key, md, im);
@@ -585,6 +618,9 @@ public class RequestFactoryImpl implements RequestFactory {
     /** Requester Pays flag. */
     private boolean requesterPays = false;
 
+    /** Content Encoding. */
+    private String contentEncoding;
+
     /**
      * Multipart limit.
      */
@@ -607,6 +643,16 @@ public class RequestFactoryImpl implements RequestFactory {
     }
 
     /**
+     * Content encoding.
+     * @param value new value
+     * @return the builder
+     */
+    public RequestFactoryBuilder withContentEncoding(final String value) {
+      contentEncoding = value;
+      return this;
+    }
+
+    /**
      * Target bucket.
      * @param value new value
      * @return the builder
diff --git a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md
index 02c6d50..f390f1d 100644
--- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md
+++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md
@@ -1057,6 +1057,17 @@ options are covered in [Testing](./testing.md).
      client has permission to read the bucket.
   </description>
 </property>
+
+<property>
+  <name>fs.s3a.object.content.encoding</name>
+  <value></value>
+  <description>
+    Content encoding: gzip, deflate, compress, br, etc.
+    This will be set in the "Content-Encoding" header of the object,
+    and returned in HTTP HEAD/GET requests.
+  </description>
+</property>
+
 ```
 
 ## <a name="retry_and_recovery"></a>Retry and Recovery
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AContentEncoding.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AContentEncoding.java
new file mode 100644
index 0000000..4a96bf5
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AContentEncoding.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.assertj.core.api.Assertions;
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.contract.ContractTestUtils;
+
+import static org.apache.hadoop.fs.s3a.Constants.CONTENT_ENCODING;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
+import static org.apache.hadoop.fs.s3a.impl.HeaderProcessing.XA_CONTENT_ENCODING;
+import static org.apache.hadoop.fs.s3a.impl.HeaderProcessing.decodeBytes;
+
+/**
+ * Tests of content encoding object meta data.
+ */
+public class ITestS3AContentEncoding extends AbstractS3ATestBase {
+
+  private static final String GZIP = "gzip";
+
+  @Override
+  protected Configuration createConfiguration() {
+    Configuration conf = super.createConfiguration();
+    removeBaseAndBucketOverrides(conf, CONTENT_ENCODING);
+    conf.set(CONTENT_ENCODING, GZIP);
+
+    return conf;
+  }
+
+  @Test
+  public void testCreatedObjectsHaveEncoding() throws Throwable {
+    S3AFileSystem fs = getFileSystem();
+    Path dir = methodPath();
+    fs.mkdirs(dir);
+    // even with content encoding enabled, directories do not have
+    // encoding.
+    Assertions.assertThat(getEncoding(dir))
+        .describedAs("Encoding of object %s", dir)
+        .isNull();
+    Path path = new Path(dir, "1");
+    ContractTestUtils.touch(fs, path);
+    assertObjectHasEncoding(path);
+    Path path2 = new Path(dir, "2");
+    fs.rename(path, path2);
+    assertObjectHasEncoding(path2);
+  }
+
+  /**
+   * Assert that a given object has gzip encoding specified.
+   * @param path path
+   *
+   */
+  private void assertObjectHasEncoding(Path path) throws Throwable {
+    Assertions.assertThat(getEncoding(path))
+        .describedAs("Encoding of object %s", path)
+        .isEqualTo(GZIP);
+  }
+
+  /**
+   * Get the encoding of a path.
+   * @param path path
+   * @return encoding string or null
+   * @throws IOException IO Failure.
+   */
+  private String getEncoding(Path path) throws IOException {
+    S3AFileSystem fs = getFileSystem();
+
+    Map<String, byte[]> xAttrs = fs.getXAttrs(path);
+    return decodeBytes(xAttrs.get(XA_CONTENT_ENCODING));
+  }
+}

---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[hadoop] 02/02: HADOOP-14661. Add S3 requester pays bucket support to S3A (#3962)

Posted by st...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

stevel pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/hadoop.git

commit b749438a8cb100448fff8485a1703c9bdbfa9063
Author: Daniel Carl Jones <da...@danielcarl.info>
AuthorDate: Wed Mar 23 20:00:50 2022 +0000

    HADOOP-14661. Add S3 requester pays bucket support to S3A (#3962)
    
    Adds the option fs.s3a.requester.pays.enabled, which, if set to true, allows
    the client to access S3 buckets where the requester is billed for the IO.
    
    Contributed by Daniel Carl Jones
    
    Change-Id: I51f64d0f9b3be3c4ec493bcf91927fca3b20407a
---
 .../java/org/apache/hadoop/fs/s3a/Constants.java   |   9 ++
 .../hadoop/fs/s3a/DefaultS3ClientFactory.java      |   8 ++
 .../org/apache/hadoop/fs/s3a/S3AFileSystem.java    |   1 +
 .../org/apache/hadoop/fs/s3a/S3ClientFactory.java  |   4 +-
 .../hadoop/fs/s3a/impl/RequestFactoryImpl.java     |  22 ----
 .../src/site/markdown/tools/hadoop-aws/index.md    |  19 ++++
 .../src/site/markdown/tools/hadoop-aws/testing.md  |  25 +++++
 .../tools/hadoop-aws/troubleshooting_s3a.md        |  14 +++
 .../hadoop/fs/s3a/ITestS3ARequesterPays.java       | 113 +++++++++++++++++++++
 .../org/apache/hadoop/fs/s3a/S3ATestConstants.java |  11 ++
 10 files changed, 202 insertions(+), 24 deletions(-)

diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
index dd7e425..cb3d72e 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
@@ -157,6 +157,15 @@ public final class Constants {
       "fs.s3a.connection.ssl.enabled";
   public static final boolean DEFAULT_SECURE_CONNECTIONS = true;
 
+  /**
+   * Configuration option for S3 Requester Pays feature: {@value}.
+   */
+  public static final String ALLOW_REQUESTER_PAYS = "fs.s3a.requester.pays.enabled";
+  /**
+   * Default configuration for {@value ALLOW_REQUESTER_PAYS}: {@value}.
+   */
+  public static final boolean DEFAULT_ALLOW_REQUESTER_PAYS = false;
+
   // use OpenSSL or JSEE for secure connections
   public static final String SSL_CHANNEL_MODE =  "fs.s3a.ssl.channel.mode";
   public static final DelegatingSSLSocketFactory.SSLChannelMode
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java
index b9e51ef..f12a7e0 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java
@@ -54,6 +54,7 @@ import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.fs.s3a.statistics.impl.AwsStatisticsCollector;
 import org.apache.hadoop.fs.store.LogExactlyOnce;
 
+import static com.amazonaws.services.s3.Headers.REQUESTER_PAYS_HEADER;
 import static org.apache.hadoop.fs.s3a.Constants.AWS_REGION;
 import static org.apache.hadoop.fs.s3a.Constants.AWS_S3_CENTRAL_REGION;
 import static org.apache.hadoop.fs.s3a.Constants.EXPERIMENTAL_AWS_INTERNAL_THROTTLING;
@@ -75,6 +76,8 @@ public class DefaultS3ClientFactory extends Configured
 
   private static final String S3_SERVICE_NAME = "s3";
 
+  private static final String REQUESTER_PAYS_HEADER_VALUE = "requester";
+
   /**
    * Subclasses refer to this.
    */
@@ -118,6 +121,11 @@ public class DefaultS3ClientFactory extends Configured
     parameters.getHeaders().forEach((h, v) ->
         awsConf.addHeader(h, v));
 
+    if (parameters.isRequesterPays()) {
+      // All calls must acknowledge requester will pay via header.
+      awsConf.addHeader(REQUESTER_PAYS_HEADER, REQUESTER_PAYS_HEADER_VALUE);
+    }
+
     // When EXPERIMENTAL_AWS_INTERNAL_THROTTLING is false
     // throttling is explicitly disabled on the S3 client so that
     // all failures are collected in S3A instrumentation, and its
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
index cbefdf7..f20269c 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
@@ -841,6 +841,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
         .withMetrics(statisticsContext.newStatisticsFromAwsSdk())
         .withPathStyleAccess(conf.getBoolean(PATH_STYLE_ACCESS, false))
         .withUserAgentSuffix(uaSuffix)
+        .withRequesterPays(conf.getBoolean(ALLOW_REQUESTER_PAYS, DEFAULT_ALLOW_REQUESTER_PAYS))
         .withRequestHandlers(auditManager.createRequestHandlers());
 
     s3 = ReflectionUtils.newInstance(s3ClientFactoryClass, conf)
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ClientFactory.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ClientFactory.java
index 5ef99ed..34674c7 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ClientFactory.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ClientFactory.java
@@ -101,7 +101,7 @@ public interface S3ClientFactory {
     private boolean pathStyleAccess;
 
     /**
-     * This is in the settings awaiting wiring up and testing.
+     * Permit requests to requester pays buckets.
      */
     private boolean requesterPays;
 
@@ -168,7 +168,7 @@ public interface S3ClientFactory {
     }
 
     /**
-     * Requester pays option. Not yet wired up.
+     * Set requester pays option.
      * @param value new value
      * @return the builder
      */
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RequestFactoryImpl.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RequestFactoryImpl.java
index 3c26fe2..95bdc52 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RequestFactoryImpl.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RequestFactoryImpl.java
@@ -107,13 +107,6 @@ public class RequestFactoryImpl implements RequestFactory {
   private final long multipartPartCountLimit;
 
   /**
-   * Requester Pays.
-   * This is to be wired up in a PR with its
-   * own tests and docs.
-   */
-  private final boolean requesterPays;
-
-  /**
    * Callback to prepare requests.
    */
   private final PrepareRequest requestPreparer;
@@ -133,7 +126,6 @@ public class RequestFactoryImpl implements RequestFactory {
     this.cannedACL = builder.cannedACL;
     this.encryptionSecrets = builder.encryptionSecrets;
     this.multipartPartCountLimit = builder.multipartPartCountLimit;
-    this.requesterPays = builder.requesterPays;
     this.requestPreparer = builder.requestPreparer;
     this.contentEncoding = builder.contentEncoding;
   }
@@ -615,9 +607,6 @@ public class RequestFactoryImpl implements RequestFactory {
      */
     private CannedAccessControlList cannedACL = null;
 
-    /** Requester Pays flag. */
-    private boolean requesterPays = false;
-
     /** Content Encoding. */
     private String contentEncoding;
 
@@ -685,17 +674,6 @@ public class RequestFactoryImpl implements RequestFactory {
     }
 
     /**
-     * Requester Pays flag.
-     * @param value new value
-     * @return the builder
-     */
-    public RequestFactoryBuilder withRequesterPays(
-        final boolean value) {
-      requesterPays = value;
-      return this;
-    }
-
-    /**
      * Multipart limit.
      * @param value new value
      * @return the builder
diff --git a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md
index f390f1d..d6ad3ed 100644
--- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md
+++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md
@@ -1630,6 +1630,25 @@ Before using Access Points make sure you're not impacted by the following:
 considering endpoints, if you have any custom signers that use the host endpoint property make
 sure to update them if needed;
 
+## <a name="requester_pays"></a>Requester Pays buckets
+
+S3A supports buckets with
+[Requester Pays](https://docs.aws.amazon.com/AmazonS3/latest/userguide/RequesterPaysBuckets.html)
+enabled. When a bucket is configured with requester pays, the requester must cover
+the per-request cost.
+
+For requests to be successful, the S3 client must acknowledge that they will pay
+for these requests by setting a request flag, usually a header, on each request.
+
+To enable this feature within S3A, configure the `fs.s3a.requester.pays.enabled` property.
+
+```xml
+<property>
+    <name>fs.s3a.requester.pays.enabled</name>
+    <value>true</value>
+</property>
+```
+
 ## <a name="upload"></a>How S3A writes data to S3
 
 The original S3A client implemented file writes by
diff --git a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/testing.md b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/testing.md
index 559687a..2641b87 100644
--- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/testing.md
+++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/testing.md
@@ -593,6 +593,31 @@ your `core-site.xml` file, so that trying to use S3 select fails fast with
 a meaningful error ("S3 Select not supported") rather than a generic Bad Request
 exception.
 
+### Testing Requester Pays
+
+By default, the requester pays tests will look for a bucket that exists on Amazon S3
+in us-east-1.
+
+If the endpoint does support requester pays, you can specify an alternative object.
+The test only requires an object of at least a few bytes in order
+to check that lists and basic reads work.
+
+```xml
+<property>
+  <name>test.fs.s3a.requester.pays.file</name>
+  <value>s3a://my-req-pays-enabled-bucket/on-another-endpoint.json</value>
+</property>
+```
+
+If the endpoint does not support requester pays, you can also disable the tests by configuring
+the test URI as a single space.
+
+```xml
+<property>
+  <name>test.fs.s3a.requester.pays.file</name>
+  <value> </value>
+</property>
+```
 
 ### Testing Session Credentials
 
diff --git a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/troubleshooting_s3a.md b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/troubleshooting_s3a.md
index 3019b85..96e6e28 100644
--- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/troubleshooting_s3a.md
+++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/troubleshooting_s3a.md
@@ -547,6 +547,20 @@ When trying to write or read SEE-KMS-encrypted data, the client gets a
 The caller does not have the permissions to access
 the key with which the data was encrypted.
 
+### <a name="access_denied_requester_pays"></a>`AccessDeniedException` when using a "Requester Pays" enabled bucket
+
+When making cross-account requests to a requester pays enabled bucket, all calls must acknowledge via a header that the requester will be billed.
+
+If you don't enable this acknowledgement within S3A, then you will see a message similar to this:
+
+```
+java.nio.file.AccessDeniedException: s3a://my-bucket/my-object: getFileStatus on s3a://my-bucket/my-object:
+com.amazonaws.services.s3.model.AmazonS3Exception: Forbidden (Service: Amazon S3; Status Code: 403;
+Error Code: 403 Forbidden; Request ID: myshortreqid; S3 Extended Request ID: mylongreqid):403 Forbidden
+```
+
+To enable requester pays, set `fs.s3a.requester.pays.enabled` property to `true`.
+
 ### <a name="no_region_session_credentials"></a> "Unable to find a region via the region provider chain." when using session credentials.
 
 Region must be provided when requesting session credentials, or an exception will be thrown with the message:
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ARequesterPays.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ARequesterPays.java
new file mode 100644
index 0000000..c2e7684
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ARequesterPays.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import java.nio.file.AccessDeniedException;
+
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.statistics.IOStatisticAssertions;
+import org.apache.hadoop.fs.statistics.StreamStatisticNames;
+
+import static org.apache.hadoop.fs.s3a.Constants.ALLOW_REQUESTER_PAYS;
+import static org.apache.hadoop.fs.s3a.Constants.S3A_BUCKET_PROBE;
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+
+/**
+ * Tests for Requester Pays feature.
+ */
+public class ITestS3ARequesterPays extends AbstractS3ATestBase {
+
+  @Override
+  protected Configuration createConfiguration() {
+    Configuration conf = super.createConfiguration();
+    S3ATestUtils.removeBaseAndBucketOverrides(conf,
+        ALLOW_REQUESTER_PAYS,
+        S3A_BUCKET_PROBE);
+    return conf;
+  }
+
+  @Test
+  public void testRequesterPaysOptionSuccess() throws Throwable {
+    describe("Test requester pays enabled case by reading last then first byte");
+
+    Configuration conf = this.createConfiguration();
+    conf.setBoolean(ALLOW_REQUESTER_PAYS, true);
+    // Enable bucket exists check, the first failure point people may encounter
+    conf.setInt(S3A_BUCKET_PROBE, 2);
+
+    Path requesterPaysPath = getRequesterPaysPath(conf);
+
+    try (
+        FileSystem fs = requesterPaysPath.getFileSystem(conf);
+        FSDataInputStream inputStream = fs.open(requesterPaysPath);
+    ) {
+      long fileLength = fs.getFileStatus(requesterPaysPath).getLen();
+
+      inputStream.seek(fileLength - 1);
+      inputStream.readByte();
+
+      // Jump back to the start, triggering a new GetObject request.
+      inputStream.seek(0);
+      inputStream.readByte();
+
+      // Verify > 1 call was made, so we're sure it is correctly configured for each request
+      IOStatisticAssertions
+          .assertThatStatisticCounter(inputStream.getIOStatistics(),
+              StreamStatisticNames.STREAM_READ_OPENED)
+          .isGreaterThan(1);
+
+      // Check list calls work without error
+      fs.listFiles(requesterPaysPath.getParent(), false);
+    }
+  }
+
+  @Test
+  public void testRequesterPaysDisabledFails() throws Throwable {
+    describe("Verify expected failure for requester pays buckets when client has it disabled");
+
+    Configuration conf = this.createConfiguration();
+    conf.setBoolean(ALLOW_REQUESTER_PAYS, false);
+    Path requesterPaysPath = getRequesterPaysPath(conf);
+
+    try (FileSystem fs = requesterPaysPath.getFileSystem(conf)) {
+      intercept(
+          AccessDeniedException.class,
+          "403 Forbidden",
+          "Expected requester pays bucket to fail without header set",
+          () -> fs.open(requesterPaysPath).close()
+      );
+    }
+  }
+
+  private Path getRequesterPaysPath(Configuration conf) {
+    String requesterPaysFile =
+        conf.getTrimmed(KEY_REQUESTER_PAYS_FILE, DEFAULT_REQUESTER_PAYS_FILE);
+    S3ATestUtils.assume(
+        "Empty test property: " + KEY_REQUESTER_PAYS_FILE,
+        !requesterPaysFile.isEmpty()
+    );
+    return new Path(requesterPaysFile);
+  }
+
+}
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestConstants.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestConstants.java
index aca622a..742c22a 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestConstants.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestConstants.java
@@ -98,6 +98,17 @@ public interface S3ATestConstants {
   String DEFAULT_CSVTEST_FILE = LANDSAT_BUCKET + "scene_list.gz";
 
   /**
+   * Configuration key for an existing object in a requester pays bucket: {@value}.
+   * If not set, defaults to {@value DEFAULT_REQUESTER_PAYS_FILE}.
+   */
+  String KEY_REQUESTER_PAYS_FILE = TEST_FS_S3A + "requester.pays.file";
+
+  /**
+   * Default path for an S3 object inside a requester pays enabled bucket: {@value}.
+   */
+  String DEFAULT_REQUESTER_PAYS_FILE = "s3a://usgs-landsat/collection02/catalog.json";
+
+  /**
    * Name of the property to define the timeout for scale tests: {@value}.
    * Measured in seconds.
    */

---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org