You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by ja...@apache.org on 2022/09/18 00:32:31 UTC
[iceberg] branch master updated: AWS: Refactor util methods for applying AWS clients configurations (#5684)
This is an automated email from the ASF dual-hosted git repository.
jackye pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new a7e67d09a7 AWS: Refactor util methods for applying AWS clients configurations (#5684)
a7e67d09a7 is described below
commit a7e67d09a7f05821cafdafba740e9efbdbdadf9c
Author: Rushan Jiang <ru...@andrew.cmu.edu>
AuthorDate: Sat Sep 17 20:32:26 2022 -0400
AWS: Refactor util methods for applying AWS clients configurations (#5684)
---
.../iceberg/aws/AssumeRoleAwsClientFactory.java | 127 +++-------
.../org/apache/iceberg/aws/AwsClientFactories.java | 107 ++++----
.../java/org/apache/iceberg/aws/AwsProperties.java | 278 +++++++++++++++++++--
.../LakeFormationAwsClientFactory.java | 17 +-
.../apache/iceberg/aws/TestAwsClientFactories.java | 22 --
.../org/apache/iceberg/aws/TestAwsProperties.java | 83 +++++-
6 files changed, 436 insertions(+), 198 deletions(-)
diff --git a/aws/src/main/java/org/apache/iceberg/aws/AssumeRoleAwsClientFactory.java b/aws/src/main/java/org/apache/iceberg/aws/AssumeRoleAwsClientFactory.java
index b9cb925346..214b710e20 100644
--- a/aws/src/main/java/org/apache/iceberg/aws/AssumeRoleAwsClientFactory.java
+++ b/aws/src/main/java/org/apache/iceberg/aws/AssumeRoleAwsClientFactory.java
@@ -19,11 +19,8 @@
package org.apache.iceberg.aws;
import java.util.Map;
-import java.util.Set;
import java.util.UUID;
-import java.util.stream.Collectors;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
-import org.apache.iceberg.util.PropertyUtil;
import software.amazon.awssdk.awscore.client.builder.AwsClientBuilder;
import software.amazon.awssdk.awscore.client.builder.AwsSyncClientBuilder;
import software.amazon.awssdk.regions.Region;
@@ -34,133 +31,91 @@ import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.sts.StsClient;
import software.amazon.awssdk.services.sts.auth.StsAssumeRoleCredentialsProvider;
import software.amazon.awssdk.services.sts.model.AssumeRoleRequest;
-import software.amazon.awssdk.services.sts.model.Tag;
public class AssumeRoleAwsClientFactory implements AwsClientFactory {
-
- private String roleArn;
- private String externalId;
- private Set<Tag> tags;
- private int timeout;
- private String region;
- private String s3Endpoint;
- private boolean s3UseArnRegionEnabled;
- private String dynamoDbEndpoint;
- private String httpClientType;
+ private AwsProperties awsProperties;
+ private AssumeRoleRequest assumeRoleRequest;
@Override
public S3Client s3() {
return S3Client.builder()
- .applyMutation(this::configure)
- .applyMutation(builder -> AwsClientFactories.configureEndpoint(builder, s3Endpoint))
- .serviceConfiguration(s -> s.useArnRegionEnabled(s3UseArnRegionEnabled).build())
+ .applyMutation(this::applyAssumeRoleConfigurations)
+ .applyMutation(awsProperties::applyHttpClientConfigurations)
+ .applyMutation(awsProperties::applyS3EndpointConfigurations)
+ .applyMutation(awsProperties::applyS3ServiceConfigurations)
.build();
}
@Override
public GlueClient glue() {
- return GlueClient.builder().applyMutation(this::configure).build();
+ return GlueClient.builder()
+ .applyMutation(this::applyAssumeRoleConfigurations)
+ .applyMutation(awsProperties::applyHttpClientConfigurations)
+ .build();
}
@Override
public KmsClient kms() {
- return KmsClient.builder().applyMutation(this::configure).build();
+ return KmsClient.builder()
+ .applyMutation(this::applyAssumeRoleConfigurations)
+ .applyMutation(awsProperties::applyHttpClientConfigurations)
+ .build();
}
@Override
public DynamoDbClient dynamo() {
return DynamoDbClient.builder()
- .applyMutation(this::configure)
- .applyMutation(builder -> AwsClientFactories.configureEndpoint(builder, dynamoDbEndpoint))
+ .applyMutation(this::applyAssumeRoleConfigurations)
+ .applyMutation(awsProperties::applyHttpClientConfigurations)
+ .applyMutation(awsProperties::applyDynamoDbEndpointConfigurations)
.build();
}
@Override
public void initialize(Map<String, String> properties) {
- this.roleArn = properties.get(AwsProperties.CLIENT_ASSUME_ROLE_ARN);
+ this.awsProperties = new AwsProperties(properties);
Preconditions.checkNotNull(
- roleArn, "Cannot initialize AssumeRoleClientConfigFactory with null role ARN");
- this.timeout =
- PropertyUtil.propertyAsInt(
- properties,
- AwsProperties.CLIENT_ASSUME_ROLE_TIMEOUT_SEC,
- AwsProperties.CLIENT_ASSUME_ROLE_TIMEOUT_SEC_DEFAULT);
- this.externalId = properties.get(AwsProperties.CLIENT_ASSUME_ROLE_EXTERNAL_ID);
-
- this.region = properties.get(AwsProperties.CLIENT_ASSUME_ROLE_REGION);
+ awsProperties.clientAssumeRoleArn(),
+ "Cannot initialize AssumeRoleClientConfigFactory with null role ARN");
Preconditions.checkNotNull(
- region, "Cannot initialize AssumeRoleClientConfigFactory with null region");
+ awsProperties.clientAssumeRoleRegion(),
+ "Cannot initialize AssumeRoleClientConfigFactory with null region");
- this.s3Endpoint = properties.get(AwsProperties.S3FILEIO_ENDPOINT);
- this.tags = toTags(properties);
- this.s3UseArnRegionEnabled =
- PropertyUtil.propertyAsBoolean(
- properties,
- AwsProperties.S3_USE_ARN_REGION_ENABLED,
- AwsProperties.S3_USE_ARN_REGION_ENABLED_DEFAULT);
- this.dynamoDbEndpoint = properties.get(AwsProperties.DYNAMODB_ENDPOINT);
- this.httpClientType =
- PropertyUtil.propertyAsString(
- properties, AwsProperties.HTTP_CLIENT_TYPE, AwsProperties.HTTP_CLIENT_TYPE_DEFAULT);
- }
-
- protected <T extends AwsClientBuilder & AwsSyncClientBuilder> T configure(T clientBuilder) {
- AssumeRoleRequest request =
+ this.assumeRoleRequest =
AssumeRoleRequest.builder()
- .roleArn(roleArn)
+ .roleArn(awsProperties.clientAssumeRoleArn())
.roleSessionName(genSessionName())
- .durationSeconds(timeout)
- .externalId(externalId)
- .tags(tags)
+ .durationSeconds(awsProperties.clientAssumeRoleTimeoutSec())
+ .externalId(awsProperties.clientAssumeRoleExternalId())
+ .tags(awsProperties.stsClientAssumeRoleTags())
.build();
-
- clientBuilder.credentialsProvider(
- StsAssumeRoleCredentialsProvider.builder()
- .stsClient(sts())
- .refreshRequest(request)
- .build());
-
- clientBuilder.region(Region.of(region));
- clientBuilder.httpClientBuilder(AwsClientFactories.configureHttpClientBuilder(httpClientType));
-
- return clientBuilder;
}
- protected Set<Tag> tags() {
- return tags;
+ protected <T extends AwsClientBuilder & AwsSyncClientBuilder> T applyAssumeRoleConfigurations(
+ T clientBuilder) {
+ clientBuilder
+ .credentialsProvider(
+ StsAssumeRoleCredentialsProvider.builder()
+ .stsClient(sts())
+ .refreshRequest(assumeRoleRequest)
+ .build())
+ .region(Region.of(awsProperties.clientAssumeRoleRegion()));
+ return clientBuilder;
}
protected String region() {
- return region;
+ return awsProperties.clientAssumeRoleRegion();
}
- protected String s3Endpoint() {
- return s3Endpoint;
- }
-
- protected String httpClientType() {
- return httpClientType;
- }
-
- protected boolean s3UseArnRegionEnabled() {
- return s3UseArnRegionEnabled;
+ protected AwsProperties awsProperties() {
+ return awsProperties;
}
private StsClient sts() {
- return StsClient.builder()
- .httpClientBuilder(AwsClientFactories.configureHttpClientBuilder(httpClientType))
- .build();
+ return StsClient.builder().applyMutation(awsProperties::applyHttpClientConfigurations).build();
}
private String genSessionName() {
return String.format("iceberg-aws-%s", UUID.randomUUID());
}
-
- private static Set<Tag> toTags(Map<String, String> properties) {
- return PropertyUtil.propertiesWithPrefix(
- properties, AwsProperties.CLIENT_ASSUME_ROLE_TAGS_PREFIX)
- .entrySet().stream()
- .map(e -> Tag.builder().key(e.getKey()).value(e.getValue()).build())
- .collect(Collectors.toSet());
- }
}
diff --git a/aws/src/main/java/org/apache/iceberg/aws/AwsClientFactories.java b/aws/src/main/java/org/apache/iceberg/aws/AwsClientFactories.java
index a2f7cc4459..39b8ea4e46 100644
--- a/aws/src/main/java/org/apache/iceberg/aws/AwsClientFactories.java
+++ b/aws/src/main/java/org/apache/iceberg/aws/AwsClientFactories.java
@@ -21,7 +21,6 @@ package org.apache.iceberg.aws;
import java.net.URI;
import java.util.Map;
import org.apache.iceberg.common.DynConstructors;
-import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.relocated.com.google.common.base.Strings;
import org.apache.iceberg.util.PropertyUtil;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
@@ -29,14 +28,18 @@ import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.auth.credentials.AwsSessionCredentials;
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
+import software.amazon.awssdk.awscore.client.builder.AwsSyncClientBuilder;
import software.amazon.awssdk.core.client.builder.SdkClientBuilder;
import software.amazon.awssdk.http.SdkHttpClient;
import software.amazon.awssdk.http.apache.ApacheHttpClient;
import software.amazon.awssdk.http.urlconnection.UrlConnectionHttpClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
+import software.amazon.awssdk.services.dynamodb.DynamoDbClientBuilder;
import software.amazon.awssdk.services.glue.GlueClient;
+import software.amazon.awssdk.services.glue.GlueClientBuilder;
import software.amazon.awssdk.services.kms.KmsClient;
import software.amazon.awssdk.services.s3.S3Client;
+import software.amazon.awssdk.services.s3.S3ClientBuilder;
import software.amazon.awssdk.services.s3.S3Configuration;
public class AwsClientFactories {
@@ -86,99 +89,59 @@ public class AwsClientFactories {
}
static class DefaultAwsClientFactory implements AwsClientFactory {
+ private AwsProperties awsProperties;
- private String glueEndpoint;
- private String s3Endpoint;
- private String s3AccessKeyId;
- private String s3SecretAccessKey;
- private String s3SessionToken;
- private Boolean s3PathStyleAccess;
- private Boolean s3UseArnRegionEnabled;
- private Boolean s3AccelerationEnabled;
- private String dynamoDbEndpoint;
- private String httpClientType;
- private Boolean s3DualStackEnabled;
-
- DefaultAwsClientFactory() {}
+ DefaultAwsClientFactory() {
+ awsProperties = new AwsProperties();
+ }
@Override
public S3Client s3() {
return S3Client.builder()
- .httpClientBuilder(configureHttpClientBuilder(httpClientType))
- .applyMutation(builder -> configureEndpoint(builder, s3Endpoint))
- .dualstackEnabled(s3DualStackEnabled)
- .serviceConfiguration(
- S3Configuration.builder()
- .pathStyleAccessEnabled(s3PathStyleAccess)
- .useArnRegionEnabled(s3UseArnRegionEnabled)
- .accelerateModeEnabled(s3AccelerationEnabled)
- .build())
- .credentialsProvider(
- credentialsProvider(s3AccessKeyId, s3SecretAccessKey, s3SessionToken))
+ .applyMutation(awsProperties::applyHttpClientConfigurations)
+ .applyMutation(awsProperties::applyS3EndpointConfigurations)
+ .applyMutation(awsProperties::applyS3ServiceConfigurations)
+ .applyMutation(awsProperties::applyS3CredentialConfigurations)
.build();
}
@Override
public GlueClient glue() {
return GlueClient.builder()
- .httpClientBuilder(configureHttpClientBuilder(httpClientType))
- .applyMutation(builder -> configureEndpoint(builder, glueEndpoint))
+ .applyMutation(awsProperties::applyHttpClientConfigurations)
+ .applyMutation(awsProperties::applyGlueEndpointConfigurations)
.build();
}
@Override
public KmsClient kms() {
return KmsClient.builder()
- .httpClientBuilder(configureHttpClientBuilder(httpClientType))
+ .applyMutation(awsProperties::applyHttpClientConfigurations)
.build();
}
@Override
public DynamoDbClient dynamo() {
return DynamoDbClient.builder()
- .httpClientBuilder(configureHttpClientBuilder(httpClientType))
- .applyMutation(builder -> configureEndpoint(builder, dynamoDbEndpoint))
+ .applyMutation(awsProperties::applyHttpClientConfigurations)
+ .applyMutation(awsProperties::applyDynamoDbEndpointConfigurations)
.build();
}
@Override
public void initialize(Map<String, String> properties) {
- this.glueEndpoint = properties.get(AwsProperties.GLUE_CATALOG_ENDPOINT);
- this.s3Endpoint = properties.get(AwsProperties.S3FILEIO_ENDPOINT);
- this.s3AccessKeyId = properties.get(AwsProperties.S3FILEIO_ACCESS_KEY_ID);
- this.s3SecretAccessKey = properties.get(AwsProperties.S3FILEIO_SECRET_ACCESS_KEY);
- this.s3SessionToken = properties.get(AwsProperties.S3FILEIO_SESSION_TOKEN);
- this.s3PathStyleAccess =
- PropertyUtil.propertyAsBoolean(
- properties,
- AwsProperties.S3FILEIO_PATH_STYLE_ACCESS,
- AwsProperties.S3FILEIO_PATH_STYLE_ACCESS_DEFAULT);
- this.s3UseArnRegionEnabled =
- PropertyUtil.propertyAsBoolean(
- properties,
- AwsProperties.S3_USE_ARN_REGION_ENABLED,
- AwsProperties.S3_USE_ARN_REGION_ENABLED_DEFAULT);
- this.s3AccelerationEnabled =
- PropertyUtil.propertyAsBoolean(
- properties,
- AwsProperties.S3_ACCELERATION_ENABLED,
- AwsProperties.S3_ACCELERATION_ENABLED_DEFAULT);
- this.s3DualStackEnabled =
- PropertyUtil.propertyAsBoolean(
- properties,
- AwsProperties.S3_DUALSTACK_ENABLED,
- AwsProperties.S3_DUALSTACK_ENABLED_DEFAULT);
-
- ValidationException.check(
- (s3AccessKeyId == null) == (s3SecretAccessKey == null),
- "S3 client access key ID and secret access key must be set at the same time");
- this.dynamoDbEndpoint = properties.get(AwsProperties.DYNAMODB_ENDPOINT);
- this.httpClientType =
- PropertyUtil.propertyAsString(
- properties, AwsProperties.HTTP_CLIENT_TYPE, AwsProperties.HTTP_CLIENT_TYPE_DEFAULT);
+ this.awsProperties = new AwsProperties(properties);
}
}
+ /**
+ * Build a httpClientBuilder object
+ *
+ * @deprecated Not for public use. To configure the httpClient for a client, please use {@link
+ * AwsProperties#applyHttpClientConfigurations(AwsSyncClientBuilder)}. It will be removed in
+ * 2.0.0
+ */
+ @Deprecated
public static SdkHttpClient.Builder configureHttpClientBuilder(String httpClientType) {
String clientType = httpClientType;
if (Strings.isNullOrEmpty(clientType)) {
@@ -194,6 +157,16 @@ public class AwsClientFactories {
}
}
+ /**
+ * Configure the endpoint setting for a client
+ *
+ * @deprecated Not for public use. To configure the endpoint for a client, please use {@link
+ * AwsProperties#applyS3EndpointConfigurations(S3ClientBuilder)}, {@link
+ * AwsProperties#applyGlueEndpointConfigurations(GlueClientBuilder)}, or {@link
+ * AwsProperties#applyDynamoDbEndpointConfigurations(DynamoDbClientBuilder)} accordingly. It
+ * will be removed in 2.0.0
+ */
+ @Deprecated
public static <T extends SdkClientBuilder> void configureEndpoint(T builder, String endpoint) {
if (endpoint != null) {
builder.endpointOverride(URI.create(endpoint));
@@ -215,6 +188,14 @@ public class AwsClientFactories {
.build();
}
+ /**
+ * Build an AwsBasicCredential object
+ *
+ * @deprecated Not for public use. To configure the credentials for a s3 client, please use {@link
+ * AwsProperties#applyS3CredentialConfigurations(S3ClientBuilder)} in AwsProperties. It will
+ * be removed in 2.0.0.
+ */
+ @Deprecated
static AwsCredentialsProvider credentialsProvider(
String accessKeyId, String secretAccessKey, String sessionToken) {
if (accessKeyId != null) {
diff --git a/aws/src/main/java/org/apache/iceberg/aws/AwsProperties.java b/aws/src/main/java/org/apache/iceberg/aws/AwsProperties.java
index 3b492cf49c..0566988163 100644
--- a/aws/src/main/java/org/apache/iceberg/aws/AwsProperties.java
+++ b/aws/src/main/java/org/apache/iceberg/aws/AwsProperties.java
@@ -19,16 +19,32 @@
package org.apache.iceberg.aws;
import java.io.Serializable;
+import java.net.URI;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.iceberg.aws.dynamodb.DynamoDbCatalog;
import org.apache.iceberg.aws.lakeformation.LakeFormationAwsClientFactory;
import org.apache.iceberg.aws.s3.S3FileIO;
+import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.base.Strings;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.util.PropertyUtil;
+import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
+import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.AwsSessionCredentials;
+import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
+import software.amazon.awssdk.awscore.client.builder.AwsSyncClientBuilder;
+import software.amazon.awssdk.core.client.builder.SdkClientBuilder;
+import software.amazon.awssdk.http.apache.ApacheHttpClient;
+import software.amazon.awssdk.http.urlconnection.UrlConnectionHttpClient;
+import software.amazon.awssdk.services.dynamodb.DynamoDbClientBuilder;
+import software.amazon.awssdk.services.glue.GlueClientBuilder;
+import software.amazon.awssdk.services.s3.S3ClientBuilder;
+import software.amazon.awssdk.services.s3.S3Configuration;
import software.amazon.awssdk.services.s3.model.ObjectCannedACL;
import software.amazon.awssdk.services.s3.model.Tag;
@@ -425,9 +441,20 @@ public class AwsProperties implements Serializable {
*/
public static final String LAKE_FORMATION_DB_NAME = "lakeformation.db-name";
+ private String httpClientType;
+ private final Set<software.amazon.awssdk.services.sts.model.Tag> stsClientAssumeRoleTags;
+
+ private String clientAssumeRoleArn;
+ private String clientAssumeRoleExternalId;
+ private int clientAssumeRoleTimeoutSec;
+ private String clientAssumeRoleRegion;
+
private String s3FileIoSseType;
private String s3FileIoSseKey;
private String s3FileIoSseMd5;
+ private String s3AccessKeyId;
+ private String s3SecretAccessKey;
+ private String s3SessionToken;
private int s3FileIoMultipartUploadThreads;
private int s3FileIoMultiPartSize;
private int s3FileIoDeleteBatchSize;
@@ -441,19 +468,38 @@ public class AwsProperties implements Serializable {
private boolean isS3DeleteEnabled;
private final Map<String, String> s3BucketToAccessPointMapping;
private boolean s3PreloadClientEnabled;
+ private boolean s3DualStackEnabled;
+ private boolean s3PathStyleAccess;
+ private boolean s3UseArnRegionEnabled;
+ private boolean s3AccelerationEnabled;
+ private String s3Endpoint;
+ private String glueEndpoint;
private String glueCatalogId;
private boolean glueCatalogSkipArchive;
private boolean glueCatalogSkipNameValidation;
private boolean glueLakeFormationEnabled;
private String dynamoDbTableName;
+ private String dynamoDbEndpoint;
public AwsProperties() {
+ this.httpClientType = HTTP_CLIENT_TYPE_DEFAULT;
+ this.stsClientAssumeRoleTags = Sets.newHashSet();
+
+ this.clientAssumeRoleArn = null;
+ this.clientAssumeRoleTimeoutSec = CLIENT_ASSUME_ROLE_TIMEOUT_SEC_DEFAULT;
+ this.clientAssumeRoleExternalId = null;
+ this.clientAssumeRoleRegion = null;
+
this.s3FileIoSseType = S3FILEIO_SSE_TYPE_NONE;
this.s3FileIoSseKey = null;
this.s3FileIoSseMd5 = null;
+ this.s3AccessKeyId = null;
+ this.s3SecretAccessKey = null;
+ this.s3SessionToken = null;
this.s3FileIoAcl = null;
+ this.s3Endpoint = null;
this.s3FileIoMultipartUploadThreads = Runtime.getRuntime().availableProcessors();
this.s3FileIoMultiPartSize = S3FILEIO_MULTIPART_SIZE_DEFAULT;
@@ -467,39 +513,61 @@ public class AwsProperties implements Serializable {
this.isS3DeleteEnabled = S3_DELETE_ENABLED_DEFAULT;
this.s3BucketToAccessPointMapping = ImmutableMap.of();
this.s3PreloadClientEnabled = S3_PRELOAD_CLIENT_ENABLED_DEFAULT;
+ this.s3DualStackEnabled = S3_DUALSTACK_ENABLED_DEFAULT;
+ this.s3PathStyleAccess = S3FILEIO_PATH_STYLE_ACCESS_DEFAULT;
+ this.s3UseArnRegionEnabled = S3_USE_ARN_REGION_ENABLED_DEFAULT;
+ this.s3AccelerationEnabled = S3_ACCELERATION_ENABLED_DEFAULT;
this.glueCatalogId = null;
+ this.glueEndpoint = null;
this.glueCatalogSkipArchive = GLUE_CATALOG_SKIP_ARCHIVE_DEFAULT;
this.glueCatalogSkipNameValidation = GLUE_CATALOG_SKIP_NAME_VALIDATION_DEFAULT;
this.glueLakeFormationEnabled = GLUE_LAKEFORMATION_ENABLED_DEFAULT;
+ this.dynamoDbEndpoint = null;
this.dynamoDbTableName = DYNAMODB_TABLE_NAME_DEFAULT;
+
+ ValidationException.check(
+ s3KeyIdAccessKeyBothConfigured(),
+ "S3 client access key ID and secret access key must be set at the same time");
}
public AwsProperties(Map<String, String> properties) {
- this.s3FileIoSseType =
- properties.getOrDefault(
- AwsProperties.S3FILEIO_SSE_TYPE, AwsProperties.S3FILEIO_SSE_TYPE_NONE);
- this.s3FileIoSseKey = properties.get(AwsProperties.S3FILEIO_SSE_KEY);
- this.s3FileIoSseMd5 = properties.get(AwsProperties.S3FILEIO_SSE_MD5);
- if (AwsProperties.S3FILEIO_SSE_TYPE_CUSTOM.equals(s3FileIoSseType)) {
+ this.httpClientType =
+ PropertyUtil.propertyAsString(properties, HTTP_CLIENT_TYPE, HTTP_CLIENT_TYPE_DEFAULT);
+ this.stsClientAssumeRoleTags = toStsTags(properties, CLIENT_ASSUME_ROLE_TAGS_PREFIX);
+
+ this.clientAssumeRoleArn = properties.get(CLIENT_ASSUME_ROLE_ARN);
+ this.clientAssumeRoleTimeoutSec =
+ PropertyUtil.propertyAsInt(
+ properties, CLIENT_ASSUME_ROLE_TIMEOUT_SEC, CLIENT_ASSUME_ROLE_TIMEOUT_SEC_DEFAULT);
+ this.clientAssumeRoleExternalId = properties.get(CLIENT_ASSUME_ROLE_EXTERNAL_ID);
+ this.clientAssumeRoleRegion = properties.get(CLIENT_ASSUME_ROLE_REGION);
+
+ this.s3FileIoSseType = properties.getOrDefault(S3FILEIO_SSE_TYPE, S3FILEIO_SSE_TYPE_NONE);
+ this.s3FileIoSseKey = properties.get(S3FILEIO_SSE_KEY);
+ this.s3FileIoSseMd5 = properties.get(S3FILEIO_SSE_MD5);
+ this.s3AccessKeyId = properties.get(S3FILEIO_ACCESS_KEY_ID);
+ this.s3SecretAccessKey = properties.get(S3FILEIO_SECRET_ACCESS_KEY);
+ this.s3SessionToken = properties.get(S3FILEIO_SESSION_TOKEN);
+ if (S3FILEIO_SSE_TYPE_CUSTOM.equals(s3FileIoSseType)) {
Preconditions.checkNotNull(
s3FileIoSseKey, "Cannot initialize SSE-C S3FileIO with null encryption key");
Preconditions.checkNotNull(
s3FileIoSseMd5, "Cannot initialize SSE-C S3FileIO with null encryption key MD5");
}
+ this.s3Endpoint = properties.get(S3FILEIO_ENDPOINT);
+ this.glueEndpoint = properties.get(GLUE_CATALOG_ENDPOINT);
this.glueCatalogId = properties.get(GLUE_CATALOG_ID);
this.glueCatalogSkipArchive =
PropertyUtil.propertyAsBoolean(
- properties,
- AwsProperties.GLUE_CATALOG_SKIP_ARCHIVE,
- AwsProperties.GLUE_CATALOG_SKIP_ARCHIVE_DEFAULT);
+ properties, GLUE_CATALOG_SKIP_ARCHIVE, GLUE_CATALOG_SKIP_ARCHIVE_DEFAULT);
this.glueCatalogSkipNameValidation =
PropertyUtil.propertyAsBoolean(
properties,
- AwsProperties.GLUE_CATALOG_SKIP_NAME_VALIDATION,
- AwsProperties.GLUE_CATALOG_SKIP_NAME_VALIDATION_DEFAULT);
+ GLUE_CATALOG_SKIP_NAME_VALIDATION,
+ GLUE_CATALOG_SKIP_NAME_VALIDATION_DEFAULT);
this.glueLakeFormationEnabled =
PropertyUtil.propertyAsBoolean(
properties, GLUE_LAKEFORMATION_ENABLED, GLUE_LAKEFORMATION_ENABLED_DEFAULT);
@@ -509,6 +577,18 @@ public class AwsProperties implements Serializable {
properties,
S3FILEIO_MULTIPART_UPLOAD_THREADS,
Runtime.getRuntime().availableProcessors());
+ this.s3PathStyleAccess =
+ PropertyUtil.propertyAsBoolean(
+ properties, S3FILEIO_PATH_STYLE_ACCESS, S3FILEIO_PATH_STYLE_ACCESS_DEFAULT);
+ this.s3UseArnRegionEnabled =
+ PropertyUtil.propertyAsBoolean(
+ properties, S3_USE_ARN_REGION_ENABLED, S3_USE_ARN_REGION_ENABLED_DEFAULT);
+ this.s3AccelerationEnabled =
+ PropertyUtil.propertyAsBoolean(
+ properties, S3_ACCELERATION_ENABLED, S3_ACCELERATION_ENABLED_DEFAULT);
+ this.s3DualStackEnabled =
+ PropertyUtil.propertyAsBoolean(
+ properties, S3_DUALSTACK_ENABLED, S3_DUALSTACK_ENABLED_DEFAULT);
try {
this.s3FileIoMultiPartSize =
@@ -555,8 +635,8 @@ public class AwsProperties implements Serializable {
String.format(
"Deletion batch size must be between 1 and %s", S3FILEIO_DELETE_BATCH_SIZE_MAX));
- this.s3WriteTags = toTags(properties, S3_WRITE_TAGS_PREFIX);
- this.s3DeleteTags = toTags(properties, S3_DELETE_TAGS_PREFIX);
+ this.s3WriteTags = toS3Tags(properties, S3_WRITE_TAGS_PREFIX);
+ this.s3DeleteTags = toS3Tags(properties, S3_DELETE_TAGS_PREFIX);
this.s3FileIoDeleteThreads =
PropertyUtil.propertyAsInt(
properties, S3FILEIO_DELETE_THREADS, Runtime.getRuntime().availableProcessors());
@@ -566,12 +646,35 @@ public class AwsProperties implements Serializable {
PropertyUtil.propertiesWithPrefix(properties, S3_ACCESS_POINTS_PREFIX);
this.s3PreloadClientEnabled =
PropertyUtil.propertyAsBoolean(
- properties,
- AwsProperties.S3_PRELOAD_CLIENT_ENABLED,
- AwsProperties.S3_PRELOAD_CLIENT_ENABLED_DEFAULT);
+ properties, S3_PRELOAD_CLIENT_ENABLED, S3_PRELOAD_CLIENT_ENABLED_DEFAULT);
+ this.dynamoDbEndpoint = properties.get(DYNAMODB_ENDPOINT);
this.dynamoDbTableName =
PropertyUtil.propertyAsString(properties, DYNAMODB_TABLE_NAME, DYNAMODB_TABLE_NAME_DEFAULT);
+
+ ValidationException.check(
+ s3KeyIdAccessKeyBothConfigured(),
+ "S3 client access key ID and secret access key must be set at the same time");
+ }
+
+ public Set<software.amazon.awssdk.services.sts.model.Tag> stsClientAssumeRoleTags() {
+ return stsClientAssumeRoleTags;
+ }
+
+ public String clientAssumeRoleArn() {
+ return clientAssumeRoleArn;
+ }
+
+ public int clientAssumeRoleTimeoutSec() {
+ return clientAssumeRoleTimeoutSec;
+ }
+
+ public String clientAssumeRoleExternalId() {
+ return clientAssumeRoleExternalId;
+ }
+
+ public String clientAssumeRoleRegion() {
+ return clientAssumeRoleRegion;
}
public String s3FileIoSseType() {
@@ -726,13 +829,150 @@ public class AwsProperties implements Serializable {
this.isS3DeleteEnabled = s3DeleteEnabled;
}
- private Set<Tag> toTags(Map<String, String> properties, String prefix) {
+ public Map<String, String> s3BucketToAccessPointMapping() {
+ return s3BucketToAccessPointMapping;
+ }
+
+ /**
+ * Configure the credentials for an S3 client.
+ *
+ * <p>Sample usage:
+ *
+ * <pre>
+ * S3Client.builder().applyMutation(awsProperties::applyS3CredentialConfigurations)
+ * </pre>
+ */
+ public <T extends S3ClientBuilder> void applyS3CredentialConfigurations(T builder) {
+ builder.credentialsProvider(
+ credentialsProvider(s3AccessKeyId, s3SecretAccessKey, s3SessionToken));
+ }
+
+ /**
+ * Configure services settings for an S3 client. The settings include: s3DualStack,
+ * s3UseArnRegion, s3PathStyleAccess, and s3Acceleration
+ *
+ * <p>Sample usage:
+ *
+ * <pre>
+ * S3Client.builder().applyMutation(awsProperties::applyS3ServiceConfigurations)
+ * </pre>
+ */
+ public <T extends S3ClientBuilder> void applyS3ServiceConfigurations(T builder) {
+ builder
+ .dualstackEnabled(s3DualStackEnabled)
+ .serviceConfiguration(
+ S3Configuration.builder()
+ .pathStyleAccessEnabled(s3PathStyleAccess)
+ .useArnRegionEnabled(s3UseArnRegionEnabled)
+ .accelerateModeEnabled(s3AccelerationEnabled)
+ .build());
+ }
+
+ /**
+ * Configure the httpClient for a client according to the HttpClientType. The two supported
+ * HttpClientTypes are urlconnection and apache
+ *
+ * <p>Sample usage:
+ *
+ * <pre>
+ * S3Client.builder().applyMutation(awsProperties::applyHttpClientConfigurations)
+ * </pre>
+ */
+ public <T extends AwsSyncClientBuilder> void applyHttpClientConfigurations(T builder) {
+ if (Strings.isNullOrEmpty(httpClientType)) {
+ httpClientType = HTTP_CLIENT_TYPE_DEFAULT;
+ }
+ switch (httpClientType) {
+ case HTTP_CLIENT_TYPE_URLCONNECTION:
+ builder.httpClientBuilder(UrlConnectionHttpClient.builder());
+ break;
+ case HTTP_CLIENT_TYPE_APACHE:
+ builder.httpClientBuilder(ApacheHttpClient.builder());
+ break;
+ default:
+ throw new IllegalArgumentException("Unrecognized HTTP client type " + httpClientType);
+ }
+ }
+
+ /**
+ * Override the endpoint for an S3 client.
+ *
+ * <p>Sample usage:
+ *
+ * <pre>
+ * S3Client.builder().applyMutation(awsProperties::applyS3EndpointConfigurations)
+ * </pre>
+ */
+ public <T extends S3ClientBuilder> void applyS3EndpointConfigurations(T builder) {
+ configureEndpoint(builder, s3Endpoint);
+ }
+
+ /**
+ * Override the endpoint for a glue client.
+ *
+ * <p>Sample usage:
+ *
+ * <pre>
+ * GlueClient.builder().applyMutation(awsProperties::applyS3EndpointConfigurations)
+ * </pre>
+ */
+ public <T extends GlueClientBuilder> void applyGlueEndpointConfigurations(T builder) {
+ configureEndpoint(builder, glueEndpoint);
+ }
+
+ /**
+ * Override the endpoint for a dynamoDb client.
+ *
+ * <p>Sample usage:
+ *
+ * <pre>
+ * DynamoDbClient.builder().applyMutation(awsProperties::applyS3EndpointConfigurations)
+ * </pre>
+ */
+ public <T extends DynamoDbClientBuilder> void applyDynamoDbEndpointConfigurations(T builder) {
+ configureEndpoint(builder, dynamoDbEndpoint);
+ }
+
+ private Set<Tag> toS3Tags(Map<String, String> properties, String prefix) {
return PropertyUtil.propertiesWithPrefix(properties, prefix).entrySet().stream()
.map(e -> Tag.builder().key(e.getKey()).value(e.getValue()).build())
.collect(Collectors.toSet());
}
- public Map<String, String> s3BucketToAccessPointMapping() {
- return s3BucketToAccessPointMapping;
+ private Set<software.amazon.awssdk.services.sts.model.Tag> toStsTags(
+ Map<String, String> properties, String prefix) {
+ return PropertyUtil.propertiesWithPrefix(properties, prefix).entrySet().stream()
+ .map(
+ e ->
+ software.amazon.awssdk.services.sts.model.Tag.builder()
+ .key(e.getKey())
+ .value(e.getValue())
+ .build())
+ .collect(Collectors.toSet());
+ }
+
+ private boolean s3KeyIdAccessKeyBothConfigured() {
+ return (s3AccessKeyId == null) == (s3SecretAccessKey == null);
+ }
+
+ private AwsCredentialsProvider credentialsProvider(
+ String accessKeyId, String secretAccessKey, String sessionToken) {
+ if (accessKeyId != null) {
+ if (sessionToken == null) {
+ return StaticCredentialsProvider.create(
+ AwsBasicCredentials.create(accessKeyId, secretAccessKey));
+ } else {
+ return StaticCredentialsProvider.create(
+ AwsSessionCredentials.create(accessKeyId, secretAccessKey, sessionToken));
+ }
+ } else {
+ return DefaultCredentialsProvider.create();
+ }
+ }
+
+ private <T extends SdkClientBuilder> void configureEndpoint(T builder, String endpoint) {
+ if (endpoint != null) {
+ builder.endpointOverride(URI.create(endpoint));
+ }
}
}
diff --git a/aws/src/main/java/org/apache/iceberg/aws/lakeformation/LakeFormationAwsClientFactory.java b/aws/src/main/java/org/apache/iceberg/aws/lakeformation/LakeFormationAwsClientFactory.java
index bb3c4459fe..7bd19f5a85 100644
--- a/aws/src/main/java/org/apache/iceberg/aws/lakeformation/LakeFormationAwsClientFactory.java
+++ b/aws/src/main/java/org/apache/iceberg/aws/lakeformation/LakeFormationAwsClientFactory.java
@@ -20,7 +20,6 @@ package org.apache.iceberg.aws.lakeformation;
import java.util.Map;
import org.apache.iceberg.aws.AssumeRoleAwsClientFactory;
-import org.apache.iceberg.aws.AwsClientFactories;
import org.apache.iceberg.aws.AwsProperties;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import software.amazon.awssdk.auth.credentials.AwsCredentials;
@@ -64,7 +63,8 @@ public class LakeFormationAwsClientFactory extends AssumeRoleAwsClientFactory {
public void initialize(Map<String, String> catalogProperties) {
super.initialize(catalogProperties);
Preconditions.checkArgument(
- tags().stream().anyMatch(t -> t.key().equals(LF_AUTHORIZED_CALLER)),
+ awsProperties().stsClientAssumeRoleTags().stream()
+ .anyMatch(t -> LF_AUTHORIZED_CALLER.equals(t.key())),
"STS assume role session tag %s must be set using %s to use LakeFormation client factory",
LF_AUTHORIZED_CALLER,
AwsProperties.CLIENT_ASSUME_ROLE_TAGS_PREFIX);
@@ -78,11 +78,11 @@ public class LakeFormationAwsClientFactory extends AssumeRoleAwsClientFactory {
public S3Client s3() {
if (isTableRegisteredWithLakeFormation()) {
return S3Client.builder()
- .httpClientBuilder(AwsClientFactories.configureHttpClientBuilder(httpClientType()))
- .applyMutation(builder -> AwsClientFactories.configureEndpoint(builder, s3Endpoint()))
+ .applyMutation(awsProperties()::applyHttpClientConfigurations)
+ .applyMutation(awsProperties()::applyS3EndpointConfigurations)
+ .applyMutation(awsProperties()::applyS3ServiceConfigurations)
.credentialsProvider(
new LakeFormationCredentialsProvider(lakeFormation(), buildTableArn()))
- .serviceConfiguration(s -> s.useArnRegionEnabled(s3UseArnRegionEnabled()).build())
.region(Region.of(region()))
.build();
} else {
@@ -94,7 +94,7 @@ public class LakeFormationAwsClientFactory extends AssumeRoleAwsClientFactory {
public KmsClient kms() {
if (isTableRegisteredWithLakeFormation()) {
return KmsClient.builder()
- .httpClientBuilder(AwsClientFactories.configureHttpClientBuilder(httpClientType()))
+ .applyMutation(awsProperties()::applyHttpClientConfigurations)
.credentialsProvider(
new LakeFormationCredentialsProvider(lakeFormation(), buildTableArn()))
.region(Region.of(region()))
@@ -132,7 +132,10 @@ public class LakeFormationAwsClientFactory extends AssumeRoleAwsClientFactory {
}
private LakeFormationClient lakeFormation() {
- return LakeFormationClient.builder().applyMutation(this::configure).build();
+ return LakeFormationClient.builder()
+ .applyMutation(this::applyAssumeRoleConfigurations)
+ .applyMutation(awsProperties()::applyHttpClientConfigurations)
+ .build();
}
static class LakeFormationCredentialsProvider implements AwsCredentialsProvider {
diff --git a/aws/src/test/java/org/apache/iceberg/aws/TestAwsClientFactories.java b/aws/src/test/java/org/apache/iceberg/aws/TestAwsClientFactories.java
index 19098b056b..ab85da667b 100644
--- a/aws/src/test/java/org/apache/iceberg/aws/TestAwsClientFactories.java
+++ b/aws/src/test/java/org/apache/iceberg/aws/TestAwsClientFactories.java
@@ -24,10 +24,6 @@ import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.junit.Assert;
import org.junit.Test;
-import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
-import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
-import software.amazon.awssdk.auth.credentials.AwsSessionCredentials;
-import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
import software.amazon.awssdk.services.glue.GlueClient;
import software.amazon.awssdk.services.kms.KmsClient;
@@ -56,24 +52,6 @@ public class TestAwsClientFactories {
"should load custom class", AwsClientFactories.from(properties) instanceof CustomFactory);
}
- @Test
- public void testS3FileIoCredentialsProviders() {
- AwsCredentialsProvider basicCredentials =
- AwsClientFactories.credentialsProvider("key", "secret", null);
- Assert.assertTrue(
- "Should use basic credentials if access key ID and secret access key are set",
- basicCredentials.resolveCredentials() instanceof AwsBasicCredentials);
- AwsCredentialsProvider sessionCredentials =
- AwsClientFactories.credentialsProvider("key", "secret", "token");
- Assert.assertTrue(
- "Should use session credentials if session token is set",
- sessionCredentials.resolveCredentials() instanceof AwsSessionCredentials);
- Assert.assertTrue(
- "Should use default credentials if nothing is set",
- AwsClientFactories.credentialsProvider(null, null, null)
- instanceof DefaultCredentialsProvider);
- }
-
@Test
public void testS3FileIoCredentialsVerification() {
Map<String, String> properties = Maps.newHashMap();
diff --git a/aws/src/test/java/org/apache/iceberg/aws/TestAwsProperties.java b/aws/src/test/java/org/apache/iceberg/aws/TestAwsProperties.java
index ab9a3b165c..1cb705c37f 100644
--- a/aws/src/test/java/org/apache/iceberg/aws/TestAwsProperties.java
+++ b/aws/src/test/java/org/apache/iceberg/aws/TestAwsProperties.java
@@ -23,10 +23,16 @@ import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.junit.Assert;
import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mockito;
+import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
+import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.AwsSessionCredentials;
+import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
+import software.amazon.awssdk.services.s3.S3ClientBuilder;
import software.amazon.awssdk.services.s3.model.ObjectCannedACL;
public class TestAwsProperties {
-
@Test
public void testS3FileIoSseCustom_mustHaveCustomKey() {
Map<String, String> map = Maps.newHashMap();
@@ -123,4 +129,79 @@ public class TestAwsProperties {
"Deletion batch size must be between 1 and 1000",
() -> new AwsProperties(map));
}
+
+ @Test
+ public void testS3FileIoDefaultCredentialsConfiguration() {
+ // set nothing
+ Map<String, String> properties = Maps.newHashMap();
+ AwsProperties awsProperties = new AwsProperties(properties);
+ S3ClientBuilder mockS3ClientBuilder = Mockito.mock(S3ClientBuilder.class);
+ ArgumentCaptor<AwsCredentialsProvider> awsCredentialsProviderCaptor =
+ ArgumentCaptor.forClass(AwsCredentialsProvider.class);
+
+ awsProperties.applyS3CredentialConfigurations(mockS3ClientBuilder);
+ Mockito.verify(mockS3ClientBuilder).credentialsProvider(awsCredentialsProviderCaptor.capture());
+ AwsCredentialsProvider capturedAwsCredentialsProvider = awsCredentialsProviderCaptor.getValue();
+
+ Assert.assertTrue(
+ "Should use default credentials if nothing is set",
+ capturedAwsCredentialsProvider instanceof DefaultCredentialsProvider);
+ }
+
+ @Test
+ public void testS3FileIoBasicCredentialsConfiguration() {
+ // set access key id and secret access key
+ Map<String, String> properties = Maps.newHashMap();
+ properties.put(AwsProperties.S3FILEIO_ACCESS_KEY_ID, "key");
+ properties.put(AwsProperties.S3FILEIO_SECRET_ACCESS_KEY, "secret");
+ AwsProperties awsPropertiesTwoSet = new AwsProperties(properties);
+ S3ClientBuilder mockS3ClientBuilder = Mockito.mock(S3ClientBuilder.class);
+ ArgumentCaptor<AwsCredentialsProvider> awsCredentialsProviderCaptor =
+ ArgumentCaptor.forClass(AwsCredentialsProvider.class);
+
+ awsPropertiesTwoSet.applyS3CredentialConfigurations(mockS3ClientBuilder);
+ Mockito.verify(mockS3ClientBuilder).credentialsProvider(awsCredentialsProviderCaptor.capture());
+ AwsCredentialsProvider capturedAwsCredentialsProvider = awsCredentialsProviderCaptor.getValue();
+
+ Assert.assertTrue(
+ "Should use basic credentials if access key ID and secret access key are set",
+ capturedAwsCredentialsProvider.resolveCredentials() instanceof AwsBasicCredentials);
+ Assert.assertEquals(
+ "The access key id should be the same as the one set by tag S3FILEIO_ACCESS_KEY_ID",
+ "key",
+ capturedAwsCredentialsProvider.resolveCredentials().accessKeyId());
+ Assert.assertEquals(
+ "The secret access key should be the same as the one set by tag S3FILEIO_SECRET_ACCESS_KEY",
+ "secret",
+ capturedAwsCredentialsProvider.resolveCredentials().secretAccessKey());
+ }
+
+ @Test
+ public void testS3FileIoSessionCredentialsConfiguration() {
+ // set access key id, secret access key, and session token
+ Map<String, String> properties = Maps.newHashMap();
+ properties.put(AwsProperties.S3FILEIO_ACCESS_KEY_ID, "key");
+ properties.put(AwsProperties.S3FILEIO_SECRET_ACCESS_KEY, "secret");
+ properties.put(AwsProperties.S3FILEIO_SESSION_TOKEN, "token");
+ AwsProperties awsProperties = new AwsProperties(properties);
+ S3ClientBuilder mockS3ClientBuilder = Mockito.mock(S3ClientBuilder.class);
+ ArgumentCaptor<AwsCredentialsProvider> awsCredentialsProviderCaptor =
+ ArgumentCaptor.forClass(AwsCredentialsProvider.class);
+
+ awsProperties.applyS3CredentialConfigurations(mockS3ClientBuilder);
+ Mockito.verify(mockS3ClientBuilder).credentialsProvider(awsCredentialsProviderCaptor.capture());
+ AwsCredentialsProvider capturedAwsCredentialsProvider = awsCredentialsProviderCaptor.getValue();
+
+ Assert.assertTrue(
+ "Should use session credentials if session token is set",
+ capturedAwsCredentialsProvider.resolveCredentials() instanceof AwsSessionCredentials);
+ Assert.assertEquals(
+ "The access key id should be the same as the one set by tag S3FILEIO_ACCESS_KEY_ID",
+ "key",
+ capturedAwsCredentialsProvider.resolveCredentials().accessKeyId());
+ Assert.assertEquals(
+ "The secret access key should be the same as the one set by tag S3FILEIO_SECRET_ACCESS_KEY",
+ "secret",
+ capturedAwsCredentialsProvider.resolveCredentials().secretAccessKey());
+ }
}