You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by st...@apache.org on 2022/03/23 20:01:43 UTC

[hadoop] branch trunk updated: HADOOP-14661. Add S3 requester pays bucket support to S3A (#3962)

This is an automated email from the ASF dual-hosted git repository.

stevel pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 9edfe30  HADOOP-14661. Add S3 requester pays bucket support to S3A (#3962)
9edfe30 is described below

commit 9edfe30a60f4c43405301927a8f4014d1dd3842d
Author: Daniel Carl Jones <da...@danielcarl.info>
AuthorDate: Wed Mar 23 20:00:50 2022 +0000

    HADOOP-14661. Add S3 requester pays bucket support to S3A (#3962)
    
    
    Adds the option fs.s3a.requester.pays.enabled, which, if set to true, allows
    the client to access S3 buckets where the requester is billed for the IO.
    
    Contributed by Daniel Carl Jones
---
 .../java/org/apache/hadoop/fs/s3a/Constants.java   |   9 ++
 .../hadoop/fs/s3a/DefaultS3ClientFactory.java      |   8 ++
 .../org/apache/hadoop/fs/s3a/S3AFileSystem.java    |   1 +
 .../org/apache/hadoop/fs/s3a/S3ClientFactory.java  |   4 +-
 .../hadoop/fs/s3a/impl/RequestFactoryImpl.java     |  22 ----
 .../src/site/markdown/tools/hadoop-aws/index.md    |  19 ++++
 .../src/site/markdown/tools/hadoop-aws/testing.md  |  25 +++++
 .../tools/hadoop-aws/troubleshooting_s3a.md        |  14 +++
 .../hadoop/fs/s3a/ITestS3ARequesterPays.java       | 113 +++++++++++++++++++++
 .../org/apache/hadoop/fs/s3a/S3ATestConstants.java |  11 ++
 10 files changed, 202 insertions(+), 24 deletions(-)

diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
index dd7e425..cb3d72e 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
@@ -157,6 +157,15 @@ public final class Constants {
       "fs.s3a.connection.ssl.enabled";
   public static final boolean DEFAULT_SECURE_CONNECTIONS = true;
 
+  /**
+   * Configuration option for S3 Requester Pays feature: {@value}.
+   */
+  public static final String ALLOW_REQUESTER_PAYS = "fs.s3a.requester.pays.enabled";
+  /**
+   * Default configuration for {@value ALLOW_REQUESTER_PAYS}: {@value}.
+   */
+  public static final boolean DEFAULT_ALLOW_REQUESTER_PAYS = false;
+
   // use OpenSSL or JSEE for secure connections
   public static final String SSL_CHANNEL_MODE =  "fs.s3a.ssl.channel.mode";
   public static final DelegatingSSLSocketFactory.SSLChannelMode
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java
index c14558a..c374ef7 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java
@@ -54,6 +54,7 @@ import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.fs.s3a.statistics.impl.AwsStatisticsCollector;
 import org.apache.hadoop.fs.store.LogExactlyOnce;
 
+import static com.amazonaws.services.s3.Headers.REQUESTER_PAYS_HEADER;
 import static org.apache.hadoop.fs.s3a.Constants.AWS_REGION;
 import static org.apache.hadoop.fs.s3a.Constants.AWS_S3_CENTRAL_REGION;
 import static org.apache.hadoop.fs.s3a.Constants.EXPERIMENTAL_AWS_INTERNAL_THROTTLING;
@@ -75,6 +76,8 @@ public class DefaultS3ClientFactory extends Configured
 
   private static final String S3_SERVICE_NAME = "s3";
 
+  private static final String REQUESTER_PAYS_HEADER_VALUE = "requester";
+
   /**
    * Subclasses refer to this.
    */
@@ -118,6 +121,11 @@ public class DefaultS3ClientFactory extends Configured
     parameters.getHeaders().forEach((h, v) ->
         awsConf.addHeader(h, v));
 
+    if (parameters.isRequesterPays()) {
+      // All calls must acknowledge requester will pay via header.
+      awsConf.addHeader(REQUESTER_PAYS_HEADER, REQUESTER_PAYS_HEADER_VALUE);
+    }
+
     // When EXPERIMENTAL_AWS_INTERNAL_THROTTLING is false
     // throttling is explicitly disabled on the S3 client so that
     // all failures are collected in S3A instrumentation, and its
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
index 4b450c4..83c3a74 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
@@ -841,6 +841,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
         .withMetrics(statisticsContext.newStatisticsFromAwsSdk())
         .withPathStyleAccess(conf.getBoolean(PATH_STYLE_ACCESS, false))
         .withUserAgentSuffix(uaSuffix)
+        .withRequesterPays(conf.getBoolean(ALLOW_REQUESTER_PAYS, DEFAULT_ALLOW_REQUESTER_PAYS))
         .withRequestHandlers(auditManager.createRequestHandlers());
 
     s3 = ReflectionUtils.newInstance(s3ClientFactoryClass, conf)
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ClientFactory.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ClientFactory.java
index 5ef99ed..34674c7 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ClientFactory.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ClientFactory.java
@@ -101,7 +101,7 @@ public interface S3ClientFactory {
     private boolean pathStyleAccess;
 
     /**
-     * This is in the settings awaiting wiring up and testing.
+     * Permit requests to requester pays buckets.
      */
     private boolean requesterPays;
 
@@ -168,7 +168,7 @@ public interface S3ClientFactory {
     }
 
     /**
-     * Requester pays option. Not yet wired up.
+     * Set requester pays option.
      * @param value new value
      * @return the builder
      */
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RequestFactoryImpl.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RequestFactoryImpl.java
index fa58323..a73e719 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RequestFactoryImpl.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RequestFactoryImpl.java
@@ -107,13 +107,6 @@ public class RequestFactoryImpl implements RequestFactory {
   private final long multipartPartCountLimit;
 
   /**
-   * Requester Pays.
-   * This is to be wired up in a PR with its
-   * own tests and docs.
-   */
-  private final boolean requesterPays;
-
-  /**
    * Callback to prepare requests.
    */
   private final PrepareRequest requestPreparer;
@@ -133,7 +126,6 @@ public class RequestFactoryImpl implements RequestFactory {
     this.cannedACL = builder.cannedACL;
     this.encryptionSecrets = builder.encryptionSecrets;
     this.multipartPartCountLimit = builder.multipartPartCountLimit;
-    this.requesterPays = builder.requesterPays;
     this.requestPreparer = builder.requestPreparer;
     this.contentEncoding = builder.contentEncoding;
   }
@@ -615,9 +607,6 @@ public class RequestFactoryImpl implements RequestFactory {
      */
     private CannedAccessControlList cannedACL = null;
 
-    /** Requester Pays flag. */
-    private boolean requesterPays = false;
-
     /** Content Encoding. */
     private String contentEncoding;
 
@@ -685,17 +674,6 @@ public class RequestFactoryImpl implements RequestFactory {
     }
 
     /**
-     * Requester Pays flag.
-     * @param value new value
-     * @return the builder
-     */
-    public RequestFactoryBuilder withRequesterPays(
-        final boolean value) {
-      requesterPays = value;
-      return this;
-    }
-
-    /**
      * Multipart limit.
      * @param value new value
      * @return the builder
diff --git a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md
index aa4e13f..df08a96 100644
--- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md
+++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md
@@ -1633,6 +1633,25 @@ Before using Access Points make sure you're not impacted by the following:
 considering endpoints, if you have any custom signers that use the host endpoint property make
 sure to update them if needed;
 
+## <a name="requester_pays"></a>Requester Pays buckets
+
+S3A supports buckets with
+[Requester Pays](https://docs.aws.amazon.com/AmazonS3/latest/userguide/RequesterPaysBuckets.html)
+enabled. When a bucket is configured with requester pays, the requester must cover
+the per-request cost.
+
+For requests to be successful, the S3 client must acknowledge that they will pay
+for these requests by setting a request flag, usually a header, on each request.
+
+To enable this feature within S3A, configure the `fs.s3a.requester.pays.enabled` property.
+
+```xml
+<property>
+    <name>fs.s3a.requester.pays.enabled</name>
+    <value>true</value>
+</property>
+```
+
 ## <a name="upload"></a>How S3A writes data to S3
 
 The original S3A client implemented file writes by
diff --git a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/testing.md b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/testing.md
index 559687a..2641b87 100644
--- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/testing.md
+++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/testing.md
@@ -593,6 +593,31 @@ your `core-site.xml` file, so that trying to use S3 select fails fast with
 a meaningful error ("S3 Select not supported") rather than a generic Bad Request
 exception.
 
+### Testing Requester Pays
+
+By default, the requester pays tests will look for a bucket that exists on Amazon S3
+in us-east-1.
+
+If the endpoint does support requester pays, you can specify an alternative object.
+The test only requires an object of at least a few bytes in order
+to check that lists and basic reads work.
+
+```xml
+<property>
+  <name>test.fs.s3a.requester.pays.file</name>
+  <value>s3a://my-req-pays-enabled-bucket/on-another-endpoint.json</value>
+</property>
+```
+
+If the endpoint does not support requester pays, you can also disable the tests by configuring
+the test URI as a single space.
+
+```xml
+<property>
+  <name>test.fs.s3a.requester.pays.file</name>
+  <value> </value>
+</property>
+```
 
 ### Testing Session Credentials
 
diff --git a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/troubleshooting_s3a.md b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/troubleshooting_s3a.md
index 3019b85..96e6e28 100644
--- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/troubleshooting_s3a.md
+++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/troubleshooting_s3a.md
@@ -547,6 +547,20 @@ When trying to write or read SEE-KMS-encrypted data, the client gets a
 The caller does not have the permissions to access
 the key with which the data was encrypted.
 
+### <a name="access_denied_requester_pays"></a>`AccessDeniedException` when using a "Requester Pays" enabled bucket
+
+When making cross-account requests to a requester pays enabled bucket, all calls must acknowledge via a header that the requester will be billed.
+
+If you don't enable this acknowledgement within S3A, then you will see a message similar to this:
+
+```
+java.nio.file.AccessDeniedException: s3a://my-bucket/my-object: getFileStatus on s3a://my-bucket/my-object:
+com.amazonaws.services.s3.model.AmazonS3Exception: Forbidden (Service: Amazon S3; Status Code: 403;
+Error Code: 403 Forbidden; Request ID: myshortreqid; S3 Extended Request ID: mylongreqid):403 Forbidden
+```
+
+To enable requester pays, set `fs.s3a.requester.pays.enabled` property to `true`.
+
 ### <a name="no_region_session_credentials"></a> "Unable to find a region via the region provider chain." when using session credentials.
 
 Region must be provided when requesting session credentials, or an exception will be thrown with the message:
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ARequesterPays.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ARequesterPays.java
new file mode 100644
index 0000000..c2e7684
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ARequesterPays.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import java.nio.file.AccessDeniedException;
+
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.statistics.IOStatisticAssertions;
+import org.apache.hadoop.fs.statistics.StreamStatisticNames;
+
+import static org.apache.hadoop.fs.s3a.Constants.ALLOW_REQUESTER_PAYS;
+import static org.apache.hadoop.fs.s3a.Constants.S3A_BUCKET_PROBE;
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+
+/**
+ * Tests for Requester Pays feature.
+ */
+public class ITestS3ARequesterPays extends AbstractS3ATestBase {
+
+  @Override
+  protected Configuration createConfiguration() {
+    Configuration conf = super.createConfiguration();
+    S3ATestUtils.removeBaseAndBucketOverrides(conf,
+        ALLOW_REQUESTER_PAYS,
+        S3A_BUCKET_PROBE);
+    return conf;
+  }
+
+  @Test
+  public void testRequesterPaysOptionSuccess() throws Throwable {
+    describe("Test requester pays enabled case by reading last then first byte");
+
+    Configuration conf = this.createConfiguration();
+    conf.setBoolean(ALLOW_REQUESTER_PAYS, true);
+    // Enable bucket exists check, the first failure point people may encounter
+    conf.setInt(S3A_BUCKET_PROBE, 2);
+
+    Path requesterPaysPath = getRequesterPaysPath(conf);
+
+    try (
+        FileSystem fs = requesterPaysPath.getFileSystem(conf);
+        FSDataInputStream inputStream = fs.open(requesterPaysPath);
+    ) {
+      long fileLength = fs.getFileStatus(requesterPaysPath).getLen();
+
+      inputStream.seek(fileLength - 1);
+      inputStream.readByte();
+
+      // Jump back to the start, triggering a new GetObject request.
+      inputStream.seek(0);
+      inputStream.readByte();
+
+      // Verify > 1 call was made, so we're sure it is correctly configured for each request
+      IOStatisticAssertions
+          .assertThatStatisticCounter(inputStream.getIOStatistics(),
+              StreamStatisticNames.STREAM_READ_OPENED)
+          .isGreaterThan(1);
+
+      // Check list calls work without error
+      fs.listFiles(requesterPaysPath.getParent(), false);
+    }
+  }
+
+  @Test
+  public void testRequesterPaysDisabledFails() throws Throwable {
+    describe("Verify expected failure for requester pays buckets when client has it disabled");
+
+    Configuration conf = this.createConfiguration();
+    conf.setBoolean(ALLOW_REQUESTER_PAYS, false);
+    Path requesterPaysPath = getRequesterPaysPath(conf);
+
+    try (FileSystem fs = requesterPaysPath.getFileSystem(conf)) {
+      intercept(
+          AccessDeniedException.class,
+          "403 Forbidden",
+          "Expected requester pays bucket to fail without header set",
+          () -> fs.open(requesterPaysPath).close()
+      );
+    }
+  }
+
+  private Path getRequesterPaysPath(Configuration conf) {
+    String requesterPaysFile =
+        conf.getTrimmed(KEY_REQUESTER_PAYS_FILE, DEFAULT_REQUESTER_PAYS_FILE);
+    S3ATestUtils.assume(
+        "Empty test property: " + KEY_REQUESTER_PAYS_FILE,
+        !requesterPaysFile.isEmpty()
+    );
+    return new Path(requesterPaysFile);
+  }
+
+}
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestConstants.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestConstants.java
index aca622a..742c22a 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestConstants.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestConstants.java
@@ -98,6 +98,17 @@ public interface S3ATestConstants {
   String DEFAULT_CSVTEST_FILE = LANDSAT_BUCKET + "scene_list.gz";
 
   /**
+   * Configuration key for an existing object in a requester pays bucket: {@value}.
+   * If not set, defaults to {@value DEFAULT_REQUESTER_PAYS_FILE}.
+   */
+  String KEY_REQUESTER_PAYS_FILE = TEST_FS_S3A + "requester.pays.file";
+
+  /**
+   * Default path for an S3 object inside a requester pays enabled bucket: {@value}.
+   */
+  String DEFAULT_REQUESTER_PAYS_FILE = "s3a://usgs-landsat/collection02/catalog.json";
+
+  /**
    * Name of the property to define the timeout for scale tests: {@value}.
    * Measured in seconds.
    */

---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org