You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by ja...@apache.org on 2022/10/11 15:52:07 UTC
[iceberg] branch master updated: AWS: Fix NotSerializableException when using AssumeRoleAwsClientFactory in Spark (#5939)
This is an automated email from the ASF dual-hosted git repository.
jackye pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new b71c37de34 AWS: Fix NotSerializableException when using AssumeRoleAwsClientFactory in Spark (#5939)
b71c37de34 is described below
commit b71c37de343ea46e191987f75ab19d88edca49c3
Author: Rushan Jiang <ru...@andrew.cmu.edu>
AuthorDate: Tue Oct 11 11:51:59 2022 -0400
AWS: Fix NotSerializableException when using AssumeRoleAwsClientFactory in Spark (#5939)
---
.../iceberg/aws/AssumeRoleAwsClientFactory.java | 14 ++--
.../apache/iceberg/aws/TestAwsClientFactories.java | 78 ++++++++++++++++++++++
2 files changed, 85 insertions(+), 7 deletions(-)
diff --git a/aws/src/main/java/org/apache/iceberg/aws/AssumeRoleAwsClientFactory.java b/aws/src/main/java/org/apache/iceberg/aws/AssumeRoleAwsClientFactory.java
index e26f39a833..8c1b7f23ad 100644
--- a/aws/src/main/java/org/apache/iceberg/aws/AssumeRoleAwsClientFactory.java
+++ b/aws/src/main/java/org/apache/iceberg/aws/AssumeRoleAwsClientFactory.java
@@ -34,7 +34,7 @@ import software.amazon.awssdk.services.sts.model.AssumeRoleRequest;
public class AssumeRoleAwsClientFactory implements AwsClientFactory {
private AwsProperties awsProperties;
- private AssumeRoleRequest assumeRoleRequest;
+ private String roleSessionName;
@Override
public S3Client s3() {
@@ -74,25 +74,25 @@ public class AssumeRoleAwsClientFactory implements AwsClientFactory {
@Override
public void initialize(Map<String, String> properties) {
this.awsProperties = new AwsProperties(properties);
+ this.roleSessionName = genSessionName();
Preconditions.checkNotNull(
awsProperties.clientAssumeRoleArn(),
"Cannot initialize AssumeRoleClientConfigFactory with null role ARN");
Preconditions.checkNotNull(
awsProperties.clientAssumeRoleRegion(),
"Cannot initialize AssumeRoleClientConfigFactory with null region");
+ }
- this.assumeRoleRequest =
+ protected <T extends AwsClientBuilder & AwsSyncClientBuilder> T applyAssumeRoleConfigurations(
+ T clientBuilder) {
+ AssumeRoleRequest assumeRoleRequest =
AssumeRoleRequest.builder()
.roleArn(awsProperties.clientAssumeRoleArn())
- .roleSessionName(genSessionName())
+ .roleSessionName(roleSessionName)
.durationSeconds(awsProperties.clientAssumeRoleTimeoutSec())
.externalId(awsProperties.clientAssumeRoleExternalId())
.tags(awsProperties.stsClientAssumeRoleTags())
.build();
- }
-
- protected <T extends AwsClientBuilder & AwsSyncClientBuilder> T applyAssumeRoleConfigurations(
- T clientBuilder) {
clientBuilder
.credentialsProvider(
StsAssumeRoleCredentialsProvider.builder()
diff --git a/aws/src/test/java/org/apache/iceberg/aws/TestAwsClientFactories.java b/aws/src/test/java/org/apache/iceberg/aws/TestAwsClientFactories.java
index ab85da667b..bc2d85c6b6 100644
--- a/aws/src/test/java/org/apache/iceberg/aws/TestAwsClientFactories.java
+++ b/aws/src/test/java/org/apache/iceberg/aws/TestAwsClientFactories.java
@@ -18,10 +18,14 @@
*/
package org.apache.iceberg.aws;
+import java.io.IOException;
import java.util.Map;
import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.TestHelpers;
+import org.apache.iceberg.aws.lakeformation.LakeFormationAwsClientFactory;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.SerializationUtil;
import org.junit.Assert;
import org.junit.Test;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
@@ -71,6 +75,80 @@ public class TestAwsClientFactories {
() -> AwsClientFactories.from(properties));
}
+ @Test
+ public void testDefaultAwsClientFactorySerializable() {
+ Map<String, String> properties = Maps.newHashMap();
+ AwsClientFactory defaultAwsClientFactory = AwsClientFactories.from(properties);
+ try {
+ AwsClientFactory roundTripResult =
+ TestHelpers.KryoHelpers.roundTripSerialize(defaultAwsClientFactory);
+ Assert.assertTrue(
+ "DefaultAwsClientFactory should be serializable",
+ roundTripResult instanceof AwsClientFactories.DefaultAwsClientFactory);
+ } catch (IOException e) {
+ Assert.fail("kryoSerializer should serialize and deserialize DefaultAwsClientFactory");
+ }
+ byte[] serializedFactoryBytes = SerializationUtil.serializeToBytes(defaultAwsClientFactory);
+ AwsClientFactory deserializedClientFactory =
+ SerializationUtil.deserializeFromBytes(serializedFactoryBytes);
+ Assert.assertTrue(
+ "DefaultAwsClientFactory should be serializable",
+ deserializedClientFactory instanceof AwsClientFactories.DefaultAwsClientFactory);
+ }
+
+ @Test
+ public void testAssumeRoleAwsClientFactorySerializable() {
+ Map<String, String> properties = Maps.newHashMap();
+ properties.put(AwsProperties.CLIENT_FACTORY, AssumeRoleAwsClientFactory.class.getName());
+ properties.put(AwsProperties.CLIENT_ASSUME_ROLE_ARN, "arn::test");
+ properties.put(AwsProperties.CLIENT_ASSUME_ROLE_REGION, "us-east-1");
+ AwsClientFactory assumeRoleAwsClientFactory = AwsClientFactories.from(properties);
+ try {
+ AwsClientFactory roundTripResult =
+ TestHelpers.KryoHelpers.roundTripSerialize(assumeRoleAwsClientFactory);
+ Assert.assertTrue(
+ "AssumeRoleAwsClientFactory should be serializable",
+ roundTripResult instanceof AssumeRoleAwsClientFactory);
+ } catch (IOException e) {
+ Assert.fail("kryoSerializer should serialize and deserialize AssumeRoleAwsClientFactory");
+ }
+ byte[] serializedFactoryBytes = SerializationUtil.serializeToBytes(assumeRoleAwsClientFactory);
+ AwsClientFactory deserializedClientFactory =
+ SerializationUtil.deserializeFromBytes(serializedFactoryBytes);
+ Assert.assertTrue(
+ "AssumeRoleAwsClientFactory should be serializable",
+ deserializedClientFactory instanceof AssumeRoleAwsClientFactory);
+ }
+
+ @Test
+ public void testLakeFormationAwsClientFactorySerializable() {
+ Map<String, String> properties = Maps.newHashMap();
+ properties.put(AwsProperties.CLIENT_FACTORY, LakeFormationAwsClientFactory.class.getName());
+ properties.put(AwsProperties.CLIENT_ASSUME_ROLE_ARN, "arn::test");
+ properties.put(AwsProperties.CLIENT_ASSUME_ROLE_REGION, "us-east-1");
+ properties.put(
+ AwsProperties.CLIENT_ASSUME_ROLE_TAGS_PREFIX
+ + LakeFormationAwsClientFactory.LF_AUTHORIZED_CALLER,
+ "emr");
+ AwsClientFactory lakeFormationAwsClientFactory = AwsClientFactories.from(properties);
+ try {
+ AwsClientFactory roundTripResult =
+ TestHelpers.KryoHelpers.roundTripSerialize(lakeFormationAwsClientFactory);
+ Assert.assertTrue(
+ "LakeFormationAwsClientFactory should be serializable",
+ roundTripResult instanceof LakeFormationAwsClientFactory);
+ } catch (IOException e) {
+ Assert.fail("kryoSerializer should serialize and deserialize LakeFormationAwsClientFactory");
+ }
+ byte[] serializedFactoryBytes =
+ SerializationUtil.serializeToBytes(lakeFormationAwsClientFactory);
+ AwsClientFactory deserializedClientFactory =
+ SerializationUtil.deserializeFromBytes(serializedFactoryBytes);
+ Assert.assertTrue(
+ "LakeFormationAwsClientFactory should be serializable",
+ deserializedClientFactory instanceof LakeFormationAwsClientFactory);
+ }
+
public static class CustomFactory implements AwsClientFactory {
public CustomFactory() {}