You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by dw...@apache.org on 2023/04/05 23:36:29 UTC

[iceberg] 02/02: AWS: Support S3 Credentials provider with DefaultAwsClientFactory #7063 (#7066)

This is an automated email from the ASF dual-hosted git repository.

dweeks pushed a commit to branch 1.2.x
in repository https://gitbox.apache.org/repos/asf/iceberg.git

commit bcb4f35e15aeaf8f113c7fe8fac03f551a51a35e
Author: Pani Dhakshnamurthy <dp...@users.noreply.github.com>
AuthorDate: Thu Mar 23 13:13:48 2023 -0500

    AWS: Support S3 Credentials provider with DefaultAwsClientFactory #7063 (#7066)
    
    Allows dynamically loading a Credential Provider with the DefaultAwsClientFactory.
    
    Co-authored-by: pani <dd...@apple.com>
---
 .../org/apache/iceberg/aws/AwsClientFactories.java |   7 +
 .../java/org/apache/iceberg/aws/AwsProperties.java | 140 +++++++++++++++++--
 .../apache/iceberg/aws/TestAwsClientFactories.java | 153 +++++++++++++++++++++
 3 files changed, 292 insertions(+), 8 deletions(-)

diff --git a/aws/src/main/java/org/apache/iceberg/aws/AwsClientFactories.java b/aws/src/main/java/org/apache/iceberg/aws/AwsClientFactories.java
index 1780253428..d62b01ed93 100644
--- a/aws/src/main/java/org/apache/iceberg/aws/AwsClientFactories.java
+++ b/aws/src/main/java/org/apache/iceberg/aws/AwsClientFactories.java
@@ -98,6 +98,7 @@ public class AwsClientFactories {
     @Override
     public S3Client s3() {
       return S3Client.builder()
+          .applyMutation(awsProperties::applyClientRegionConfiguration)
           .applyMutation(awsProperties::applyHttpClientConfigurations)
           .applyMutation(awsProperties::applyS3EndpointConfigurations)
           .applyMutation(awsProperties::applyS3ServiceConfigurations)
@@ -109,22 +110,28 @@ public class AwsClientFactories {
     @Override
     public GlueClient glue() {
       return GlueClient.builder()
+          .applyMutation(awsProperties::applyClientRegionConfiguration)
           .applyMutation(awsProperties::applyHttpClientConfigurations)
           .applyMutation(awsProperties::applyGlueEndpointConfigurations)
+          .applyMutation(awsProperties::applyClientCredentialConfigurations)
           .build();
     }
 
     @Override
     public KmsClient kms() {
       return KmsClient.builder()
+          .applyMutation(awsProperties::applyClientRegionConfiguration)
           .applyMutation(awsProperties::applyHttpClientConfigurations)
+          .applyMutation(awsProperties::applyClientCredentialConfigurations)
           .build();
     }
 
     @Override
     public DynamoDbClient dynamo() {
       return DynamoDbClient.builder()
+          .applyMutation(awsProperties::applyClientRegionConfiguration)
           .applyMutation(awsProperties::applyHttpClientConfigurations)
+          .applyMutation(awsProperties::applyClientCredentialConfigurations)
           .applyMutation(awsProperties::applyDynamoDbEndpointConfigurations)
           .build();
     }
diff --git a/aws/src/main/java/org/apache/iceberg/aws/AwsProperties.java b/aws/src/main/java/org/apache/iceberg/aws/AwsProperties.java
index b1ab6c0d5d..86188cc550 100644
--- a/aws/src/main/java/org/apache/iceberg/aws/AwsProperties.java
+++ b/aws/src/main/java/org/apache/iceberg/aws/AwsProperties.java
@@ -29,6 +29,7 @@ import org.apache.iceberg.aws.glue.GlueCatalog;
 import org.apache.iceberg.aws.lakeformation.LakeFormationAwsClientFactory;
 import org.apache.iceberg.aws.s3.S3FileIO;
 import org.apache.iceberg.aws.s3.signer.S3V4RestSignerClient;
+import org.apache.iceberg.common.DynClasses;
 import org.apache.iceberg.common.DynMethods;
 import org.apache.iceberg.exceptions.ValidationException;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
@@ -45,6 +46,7 @@ import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
 import software.amazon.awssdk.auth.credentials.AwsSessionCredentials;
 import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
 import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
+import software.amazon.awssdk.awscore.client.builder.AwsClientBuilder;
 import software.amazon.awssdk.awscore.client.builder.AwsSyncClientBuilder;
 import software.amazon.awssdk.core.client.builder.SdkClientBuilder;
 import software.amazon.awssdk.core.client.config.SdkAdvancedClientOption;
@@ -351,6 +353,38 @@ public class AwsProperties implements Serializable {
    */
   public static final String CLIENT_ASSUME_ROLE_SESSION_NAME = "client.assume-role.session-name";
 
+  /**
+   * Configure the AWS credentials provider used to create AWS clients. A fully qualified concrete
+   * class with package that implements the {@link AwsCredentialsProvider} interface is required.
+   *
+   * <p>Additionally, the implementation class must also have a create() or create(Map) method
+   * implemented, which returns an instance of the class that provides aws credentials provider.
+   *
+   * <p>Example:
+   * client.credentials-provider=software.amazon.awssdk.auth.credentials.SystemPropertyCredentialsProvider
+   *
+   * <p>When set, the default client factory {@link
+   * org.apache.iceberg.aws.AwsClientFactories.DefaultAwsClientFactory} and also other client
+   * factory classes will use this provider to get AWS credentials provided instead of reading the
+   * default credential chain to get AWS access credentials.
+   */
+  public static final String CLIENT_CREDENTIALS_PROVIDER = "client.credentials-provider";
+
+  /**
+   * Used by the client.credentials-provider configured value that will be used by {@link
+   * org.apache.iceberg.aws.AwsClientFactories.DefaultAwsClientFactory} and also other client
+   * factory classes to pass provider-specific properties. Each property consists of a key name and
+   * an associated value.
+   */
+  private static final String CLIENT_CREDENTIAL_PROVIDER_PREFIX = "client.credentials-provider.";
+
+  /**
+   * Used by {@link org.apache.iceberg.aws.AwsClientFactories.DefaultAwsClientFactory} and also
+   * other client factory classes. If set, all AWS clients except STS client will use the given
+   * region instead of the default region chain.
+   */
+  public static final String CLIENT_REGION = "client.region";
+
   /**
    * The type of {@link software.amazon.awssdk.http.SdkHttpClient} implementation used by {@link
    * AwsClientFactory} If set, all AWS clients will use this specified HTTP client. If not set,
@@ -677,6 +711,9 @@ public class AwsProperties implements Serializable {
   private int clientAssumeRoleTimeoutSec;
   private String clientAssumeRoleRegion;
   private String clientAssumeRoleSessionName;
+  private String clientRegion;
+  private String clientCredentialsProvider;
+  private final Map<String, String> clientCredentialsProviderProperties;
 
   private String s3FileIoSseType;
   private String s3FileIoSseKey;
@@ -733,6 +770,9 @@ public class AwsProperties implements Serializable {
     this.clientAssumeRoleExternalId = null;
     this.clientAssumeRoleRegion = null;
     this.clientAssumeRoleSessionName = null;
+    this.clientRegion = null;
+    this.clientCredentialsProvider = null;
+    this.clientCredentialsProviderProperties = null;
 
     this.s3FileIoSseType = S3FILEIO_SSE_TYPE_NONE;
     this.s3FileIoSseKey = null;
@@ -794,6 +834,10 @@ public class AwsProperties implements Serializable {
     this.clientAssumeRoleExternalId = properties.get(CLIENT_ASSUME_ROLE_EXTERNAL_ID);
     this.clientAssumeRoleRegion = properties.get(CLIENT_ASSUME_ROLE_REGION);
     this.clientAssumeRoleSessionName = properties.get(CLIENT_ASSUME_ROLE_SESSION_NAME);
+    this.clientRegion = properties.get(CLIENT_REGION);
+    this.clientCredentialsProvider = properties.get(CLIENT_CREDENTIALS_PROVIDER);
+    this.clientCredentialsProviderProperties =
+        PropertyUtil.propertiesWithPrefix(properties, CLIENT_CREDENTIAL_PROVIDER_PREFIX);
 
     this.s3FileIoSseType = properties.getOrDefault(S3FILEIO_SSE_TYPE, S3FILEIO_SSE_TYPE_NONE);
     this.s3FileIoSseKey = properties.get(S3FILEIO_SSE_KEY);
@@ -1116,6 +1160,14 @@ public class AwsProperties implements Serializable {
     return httpClientProperties;
   }
 
+  public String clientRegion() {
+    return clientRegion;
+  }
+
+  public void setClientRegion(String clientRegion) {
+    this.clientRegion = clientRegion;
+  }
+
   /**
    * Configure the credentials for an S3 client.
    *
@@ -1132,6 +1184,36 @@ public class AwsProperties implements Serializable {
             : credentialsProvider(s3AccessKeyId, s3SecretAccessKey, s3SessionToken));
   }
 
+  /**
+   * Configure a client AWS region.
+   *
+   * <p>Sample usage:
+   *
+   * <pre>
+   *     S3Client.builder().applyMutation(awsProperties::applyClientRegionConfiguration)
+   * </pre>
+   */
+  public <T extends AwsClientBuilder> void applyClientRegionConfiguration(T builder) {
+    if (clientRegion != null) {
+      builder.region(Region.of(clientRegion));
+    }
+  }
+
+  /**
+   * Configure the credential provider for AWS clients.
+   *
+   * <p>Sample usage:
+   *
+   * <pre>
+   *     DynamoDbClient.builder().applyMutation(awsProperties::applyClientCredentialConfigurations)
+   * </pre>
+   */
+  public <T extends AwsClientBuilder> void applyClientCredentialConfigurations(T builder) {
+    if (!Strings.isNullOrEmpty(this.clientCredentialsProvider)) {
+      builder.credentialsProvider(credentialsProvider(this.clientCredentialsProvider));
+    }
+  }
+
   /**
    * Configure services settings for an S3 client. The settings include: s3DualStack,
    * s3UseArnRegion, s3PathStyleAccess, and s3Acceleration
@@ -1188,14 +1270,12 @@ public class AwsProperties implements Serializable {
     switch (httpClientType) {
       case HTTP_CLIENT_TYPE_URLCONNECTION:
         UrlConnectionHttpClientConfigurations urlConnectionHttpClientConfigurations =
-            (UrlConnectionHttpClientConfigurations)
-                loadHttpClientConfigurations(UrlConnectionHttpClientConfigurations.class.getName());
+            loadHttpClientConfigurations(UrlConnectionHttpClientConfigurations.class.getName());
         urlConnectionHttpClientConfigurations.configureHttpClientBuilder(builder);
         break;
       case HTTP_CLIENT_TYPE_APACHE:
         ApacheHttpClientConfigurations apacheHttpClientConfigurations =
-            (ApacheHttpClientConfigurations)
-                loadHttpClientConfigurations(ApacheHttpClientConfigurations.class.getName());
+            loadHttpClientConfigurations(ApacheHttpClientConfigurations.class.getName());
         apacheHttpClientConfigurations.configureHttpClientBuilder(builder);
         break;
       default:
@@ -1291,8 +1371,52 @@ public class AwsProperties implements Serializable {
         return StaticCredentialsProvider.create(
             AwsSessionCredentials.create(accessKeyId, secretAccessKey, sessionToken));
       }
-    } else {
-      return DefaultCredentialsProvider.create();
+    }
+
+    if (!Strings.isNullOrEmpty(this.clientCredentialsProvider)) {
+      return credentialsProvider(this.clientCredentialsProvider);
+    }
+
+    return DefaultCredentialsProvider.create();
+  }
+
+  private AwsCredentialsProvider credentialsProvider(String credentialsProviderClass) {
+    Class<?> providerClass;
+    try {
+      providerClass = DynClasses.builder().impl(credentialsProviderClass).buildChecked();
+    } catch (ClassNotFoundException e) {
+      throw new IllegalArgumentException(
+          String.format(
+              "Cannot load class %s, it does not exist in the classpath", credentialsProviderClass),
+          e);
+    }
+
+    Preconditions.checkArgument(
+        AwsCredentialsProvider.class.isAssignableFrom(providerClass),
+        String.format(
+            "Cannot initialize %s, it does not implement %s.",
+            credentialsProviderClass, AwsCredentialsProvider.class.getName()));
+
+    AwsCredentialsProvider provider;
+    try {
+      try {
+        provider =
+            DynMethods.builder("create")
+                .hiddenImpl(providerClass, Map.class)
+                .buildStaticChecked()
+                .invoke(clientCredentialsProviderProperties);
+      } catch (NoSuchMethodException e) {
+        provider =
+            DynMethods.builder("create").hiddenImpl(providerClass).buildStaticChecked().invoke();
+      }
+
+      return provider;
+    } catch (NoSuchMethodException e) {
+      throw new IllegalArgumentException(
+          String.format(
+              "Cannot create an instance of %s, it does not contain a static 'create' or 'create(Map<String, String>)' method",
+              credentialsProviderClass),
+          e);
     }
   }
 
@@ -1308,7 +1432,7 @@ public class AwsProperties implements Serializable {
    * software.amazon.awssdk.http.apache.ApacheHttpClient}, since including both will cause error
    * described in <a href="https://github.com/apache/iceberg/issues/6715">issue#6715</a>
    */
-  private Object loadHttpClientConfigurations(String impl) {
+  private <T> T loadHttpClientConfigurations(String impl) {
     Object httpClientConfigurations;
     try {
       httpClientConfigurations =
@@ -1316,7 +1440,7 @@ public class AwsProperties implements Serializable {
               .hiddenImpl(impl, Map.class)
               .buildStaticChecked()
               .invoke(httpClientProperties);
-      return httpClientConfigurations;
+      return (T) httpClientConfigurations;
     } catch (NoSuchMethodException e) {
       throw new IllegalArgumentException(
           String.format("Cannot create %s to generate and configure the http client builder", impl),
diff --git a/aws/src/test/java/org/apache/iceberg/aws/TestAwsClientFactories.java b/aws/src/test/java/org/apache/iceberg/aws/TestAwsClientFactories.java
index 3efefb986a..38afae130c 100644
--- a/aws/src/test/java/org/apache/iceberg/aws/TestAwsClientFactories.java
+++ b/aws/src/test/java/org/apache/iceberg/aws/TestAwsClientFactories.java
@@ -24,13 +24,20 @@ import org.apache.iceberg.AssertHelpers;
 import org.apache.iceberg.TestHelpers;
 import org.apache.iceberg.aws.lakeformation.LakeFormationAwsClientFactory;
 import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.util.SerializationUtil;
 import org.assertj.core.api.Assertions;
+import org.assertj.core.api.ThrowableAssert;
 import org.junit.Assert;
 import org.junit.Test;
+import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
+import software.amazon.awssdk.auth.credentials.AwsCredentials;
+import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
+import software.amazon.awssdk.regions.Region;
 import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
 import software.amazon.awssdk.services.glue.GlueClient;
+import software.amazon.awssdk.services.glue.model.GetTablesRequest;
 import software.amazon.awssdk.services.kms.KmsClient;
 import software.amazon.awssdk.services.s3.S3Client;
 
@@ -132,6 +139,152 @@ public class TestAwsClientFactories {
         .isInstanceOf(LakeFormationAwsClientFactory.class);
   }
 
+  @Test
+  public void testWithDummyValidCredentialsProvider() {
+    AwsClientFactory defaultAwsClientFactory =
+        getAwsClientFactoryByCredentialsProvider(DummyValidProvider.class.getName());
+    assertDefaultAwsClientFactory(defaultAwsClientFactory);
+    assertClientObjectsNotNull(defaultAwsClientFactory);
+    // Ensuring S3Exception thrown instead exception thrown by resolveCredentials() implemented by
+    // test credentials provider
+    Assertions.assertThatThrownBy(() -> defaultAwsClientFactory.s3().listBuckets())
+        .isInstanceOf(software.amazon.awssdk.services.s3.model.S3Exception.class)
+        .hasMessageContaining("The AWS Access Key Id you provided does not exist in our records");
+  }
+
+  @Test
+  public void testWithNoCreateMethodCredentialsProvider() {
+    String providerClassName = NoCreateMethod.class.getName();
+    String containsMessage =
+        "it does not contain a static 'create' or 'create(Map<String, String>)' method";
+    testProviderAndAssertThrownBy(providerClassName, containsMessage);
+  }
+
+  @Test
+  public void testWithNoArgCreateMethodCredentialsProvider() {
+    String providerClassName = CreateMethod.class.getName();
+    String containsMessage = "Unable to load credentials from " + providerClassName;
+    testProviderAndAssertThrownBy(providerClassName, containsMessage);
+  }
+
+  @Test
+  public void testWithMapArgCreateMethodCredentialsProvider() {
+    String providerClassName = CreateMapMethod.class.getName();
+    String containsMessage = "Unable to load credentials from " + providerClassName;
+    testProviderAndAssertThrownBy(providerClassName, containsMessage);
+  }
+
+  @Test
+  public void testWithClassDoesNotExistsCredentialsProvider() {
+    String providerClassName = "invalidClassName";
+    String containsMessage = "it does not exist in the classpath";
+    testProviderAndAssertThrownBy(providerClassName, containsMessage);
+  }
+
+  @Test
+  public void testWithClassDoesNotImplementCredentialsProvider() {
+    String providerClassName = NoInterface.class.getName();
+    String containsMessage =
+        "it does not implement software.amazon.awssdk.auth.credentials.AwsCredentialsProvider";
+    testProviderAndAssertThrownBy(providerClassName, containsMessage);
+  }
+
+  private void testProviderAndAssertThrownBy(String providerClassName, String containsMessage) {
+    AwsClientFactory defaultAwsClientFactory =
+        getAwsClientFactoryByCredentialsProvider(providerClassName);
+    assertDefaultAwsClientFactory(defaultAwsClientFactory);
+    assertAllClientObjectsThrownBy(defaultAwsClientFactory, containsMessage);
+  }
+
+  public void assertAllClientObjectsThrownBy(
+      AwsClientFactory defaultAwsClientFactory, String containsMessage) {
+    // invoking sdk client apis to ensure resolveCredentials() being called
+    assertThatThrownBy(() -> defaultAwsClientFactory.s3().listBuckets(), containsMessage);
+    assertThatThrownBy(
+        () -> defaultAwsClientFactory.glue().getTables(GetTablesRequest.builder().build()),
+        containsMessage);
+    assertThatThrownBy(() -> defaultAwsClientFactory.dynamo().listTables(), containsMessage);
+    assertThatThrownBy(() -> defaultAwsClientFactory.kms().listAliases(), containsMessage);
+  }
+
+  private void assertClientObjectsNotNull(AwsClientFactory defaultAwsClientFactory) {
+    Assertions.assertThat(defaultAwsClientFactory.s3()).isNotNull();
+    Assertions.assertThat(defaultAwsClientFactory.dynamo()).isNotNull();
+    Assertions.assertThat(defaultAwsClientFactory.glue()).isNotNull();
+    Assertions.assertThat(defaultAwsClientFactory.kms()).isNotNull();
+  }
+
+  private void assertThatThrownBy(
+      ThrowableAssert.ThrowingCallable shouldRaiseThrowable, String containsMessage) {
+    Assertions.assertThatThrownBy(shouldRaiseThrowable)
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessageContaining(containsMessage);
+  }
+
+  private void assertDefaultAwsClientFactory(AwsClientFactory awsClientFactory) {
+    Assertions.assertThat(awsClientFactory)
+        .isInstanceOf(AwsClientFactories.DefaultAwsClientFactory.class);
+  }
+
+  private AwsClientFactory getAwsClientFactoryByCredentialsProvider(String providerClass) {
+    Map<String, String> properties = getDefaultClientFactoryProperties(providerClass);
+    AwsClientFactory defaultAwsClientFactory = AwsClientFactories.from(properties);
+    return defaultAwsClientFactory;
+  }
+
+  private Map<String, String> getDefaultClientFactoryProperties(String providerClass) {
+    Map<String, String> properties = Maps.newHashMap();
+    properties.put(AwsProperties.CLIENT_CREDENTIALS_PROVIDER + ".param1", "value1");
+    properties.put(AwsProperties.CLIENT_REGION, Region.AWS_GLOBAL.toString());
+    properties.put(AwsProperties.CLIENT_CREDENTIALS_PROVIDER, providerClass);
+    return properties;
+  }
+
+  private static class NoInterface {}
+
+  private static class DummyValidProvider implements AwsCredentialsProvider {
+
+    public static DummyValidProvider create() {
+      return new DummyValidProvider();
+    }
+
+    @Override
+    public AwsCredentials resolveCredentials() {
+      return AwsBasicCredentials.create("test-accessKeyId", "test-secretAccessKey");
+    }
+  }
+
+  private abstract static class ProviderTestBase implements AwsCredentialsProvider {
+
+    @Override
+    public AwsCredentials resolveCredentials() {
+      throw new IllegalArgumentException(
+          "Unable to load credentials from " + this.getClass().getName());
+    }
+  }
+
+  private static class NoCreateMethod extends ProviderTestBase {}
+
+  private static class CreateMethod extends ProviderTestBase {
+    public static CreateMethod create() {
+      return new CreateMethod();
+    }
+  }
+
+  private static class CreateMapMethod extends ProviderTestBase {
+
+    private final Map<String, String> properties;
+
+    CreateMapMethod(Map<String, String> properties) {
+      this.properties = Preconditions.checkNotNull(properties, "properties cannot be null");
+      Preconditions.checkArgument(properties.get("param1") != null, "param1 value cannot be null");
+    }
+
+    public static CreateMapMethod create(Map<String, String> properties) {
+      return new CreateMapMethod(properties);
+    }
+  }
+
   public static class CustomFactory implements AwsClientFactory {
 
     public CustomFactory() {}