You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by ja...@apache.org on 2022/10/11 15:52:07 UTC

[iceberg] branch master updated: AWS: Fix NotSerializableException when using AssumeRoleAwsClientFactory in Spark (#5939)

This is an automated email from the ASF dual-hosted git repository.

jackye pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new b71c37de34 AWS: Fix NotSerializableException when using AssumeRoleAwsClientFactory in Spark (#5939)
b71c37de34 is described below

commit b71c37de343ea46e191987f75ab19d88edca49c3
Author: Rushan Jiang <ru...@andrew.cmu.edu>
AuthorDate: Tue Oct 11 11:51:59 2022 -0400

    AWS: Fix NotSerializableException when using AssumeRoleAwsClientFactory in Spark (#5939)
---
 .../iceberg/aws/AssumeRoleAwsClientFactory.java    | 14 ++--
 .../apache/iceberg/aws/TestAwsClientFactories.java | 78 ++++++++++++++++++++++
 2 files changed, 85 insertions(+), 7 deletions(-)

diff --git a/aws/src/main/java/org/apache/iceberg/aws/AssumeRoleAwsClientFactory.java b/aws/src/main/java/org/apache/iceberg/aws/AssumeRoleAwsClientFactory.java
index e26f39a833..8c1b7f23ad 100644
--- a/aws/src/main/java/org/apache/iceberg/aws/AssumeRoleAwsClientFactory.java
+++ b/aws/src/main/java/org/apache/iceberg/aws/AssumeRoleAwsClientFactory.java
@@ -34,7 +34,7 @@ import software.amazon.awssdk.services.sts.model.AssumeRoleRequest;
 
 public class AssumeRoleAwsClientFactory implements AwsClientFactory {
   private AwsProperties awsProperties;
-  private AssumeRoleRequest assumeRoleRequest;
+  private String roleSessionName;
 
   @Override
   public S3Client s3() {
@@ -74,25 +74,25 @@ public class AssumeRoleAwsClientFactory implements AwsClientFactory {
   @Override
   public void initialize(Map<String, String> properties) {
     this.awsProperties = new AwsProperties(properties);
+    this.roleSessionName = genSessionName();
     Preconditions.checkNotNull(
         awsProperties.clientAssumeRoleArn(),
         "Cannot initialize AssumeRoleClientConfigFactory with null role ARN");
     Preconditions.checkNotNull(
         awsProperties.clientAssumeRoleRegion(),
         "Cannot initialize AssumeRoleClientConfigFactory with null region");
+  }
 
-    this.assumeRoleRequest =
+  protected <T extends AwsClientBuilder & AwsSyncClientBuilder> T applyAssumeRoleConfigurations(
+      T clientBuilder) {
+    AssumeRoleRequest assumeRoleRequest =
         AssumeRoleRequest.builder()
             .roleArn(awsProperties.clientAssumeRoleArn())
-            .roleSessionName(genSessionName())
+            .roleSessionName(roleSessionName)
             .durationSeconds(awsProperties.clientAssumeRoleTimeoutSec())
             .externalId(awsProperties.clientAssumeRoleExternalId())
             .tags(awsProperties.stsClientAssumeRoleTags())
             .build();
-  }
-
-  protected <T extends AwsClientBuilder & AwsSyncClientBuilder> T applyAssumeRoleConfigurations(
-      T clientBuilder) {
     clientBuilder
         .credentialsProvider(
             StsAssumeRoleCredentialsProvider.builder()
diff --git a/aws/src/test/java/org/apache/iceberg/aws/TestAwsClientFactories.java b/aws/src/test/java/org/apache/iceberg/aws/TestAwsClientFactories.java
index ab85da667b..bc2d85c6b6 100644
--- a/aws/src/test/java/org/apache/iceberg/aws/TestAwsClientFactories.java
+++ b/aws/src/test/java/org/apache/iceberg/aws/TestAwsClientFactories.java
@@ -18,10 +18,14 @@
  */
 package org.apache.iceberg.aws;
 
+import java.io.IOException;
 import java.util.Map;
 import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.TestHelpers;
+import org.apache.iceberg.aws.lakeformation.LakeFormationAwsClientFactory;
 import org.apache.iceberg.exceptions.ValidationException;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.SerializationUtil;
 import org.junit.Assert;
 import org.junit.Test;
 import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
@@ -71,6 +75,80 @@ public class TestAwsClientFactories {
         () -> AwsClientFactories.from(properties));
   }
 
+  @Test
+  public void testDefaultAwsClientFactorySerializable() {
+    Map<String, String> properties = Maps.newHashMap();
+    AwsClientFactory defaultAwsClientFactory = AwsClientFactories.from(properties);
+    try {
+      AwsClientFactory roundTripResult =
+          TestHelpers.KryoHelpers.roundTripSerialize(defaultAwsClientFactory);
+      Assert.assertTrue(
+          "DefaultAwsClientFactory should be serializable",
+          roundTripResult instanceof AwsClientFactories.DefaultAwsClientFactory);
+    } catch (IOException e) {
+      Assert.fail("kryoSerializer should serialize and deserialize DefaultAwsClientFactory");
+    }
+    byte[] serializedFactoryBytes = SerializationUtil.serializeToBytes(defaultAwsClientFactory);
+    AwsClientFactory deserializedClientFactory =
+        SerializationUtil.deserializeFromBytes(serializedFactoryBytes);
+    Assert.assertTrue(
+        "DefaultAwsClientFactory should be serializable",
+        deserializedClientFactory instanceof AwsClientFactories.DefaultAwsClientFactory);
+  }
+
+  @Test
+  public void testAssumeRoleAwsClientFactorySerializable() {
+    Map<String, String> properties = Maps.newHashMap();
+    properties.put(AwsProperties.CLIENT_FACTORY, AssumeRoleAwsClientFactory.class.getName());
+    properties.put(AwsProperties.CLIENT_ASSUME_ROLE_ARN, "arn::test");
+    properties.put(AwsProperties.CLIENT_ASSUME_ROLE_REGION, "us-east-1");
+    AwsClientFactory assumeRoleAwsClientFactory = AwsClientFactories.from(properties);
+    try {
+      AwsClientFactory roundTripResult =
+          TestHelpers.KryoHelpers.roundTripSerialize(assumeRoleAwsClientFactory);
+      Assert.assertTrue(
+          "AssumeRoleAwsClientFactory should be serializable",
+          roundTripResult instanceof AssumeRoleAwsClientFactory);
+    } catch (IOException e) {
+      Assert.fail("kryoSerializer should serialize and deserialize AssumeRoleAwsClientFactory");
+    }
+    byte[] serializedFactoryBytes = SerializationUtil.serializeToBytes(assumeRoleAwsClientFactory);
+    AwsClientFactory deserializedClientFactory =
+        SerializationUtil.deserializeFromBytes(serializedFactoryBytes);
+    Assert.assertTrue(
+        "AssumeRoleAwsClientFactory should be serializable",
+        deserializedClientFactory instanceof AssumeRoleAwsClientFactory);
+  }
+
+  @Test
+  public void testLakeFormationAwsClientFactorySerializable() {
+    Map<String, String> properties = Maps.newHashMap();
+    properties.put(AwsProperties.CLIENT_FACTORY, LakeFormationAwsClientFactory.class.getName());
+    properties.put(AwsProperties.CLIENT_ASSUME_ROLE_ARN, "arn::test");
+    properties.put(AwsProperties.CLIENT_ASSUME_ROLE_REGION, "us-east-1");
+    properties.put(
+        AwsProperties.CLIENT_ASSUME_ROLE_TAGS_PREFIX
+            + LakeFormationAwsClientFactory.LF_AUTHORIZED_CALLER,
+        "emr");
+    AwsClientFactory lakeFormationAwsClientFactory = AwsClientFactories.from(properties);
+    try {
+      AwsClientFactory roundTripResult =
+          TestHelpers.KryoHelpers.roundTripSerialize(lakeFormationAwsClientFactory);
+      Assert.assertTrue(
+          "LakeFormationAwsClientFactory should be serializable",
+          roundTripResult instanceof LakeFormationAwsClientFactory);
+    } catch (IOException e) {
+      Assert.fail("kryoSerializer should serialize and deserialize LakeFormationAwsClientFactory");
+    }
+    byte[] serializedFactoryBytes =
+        SerializationUtil.serializeToBytes(lakeFormationAwsClientFactory);
+    AwsClientFactory deserializedClientFactory =
+        SerializationUtil.deserializeFromBytes(serializedFactoryBytes);
+    Assert.assertTrue(
+        "LakeFormationAwsClientFactory should be serializable",
+        deserializedClientFactory instanceof LakeFormationAwsClientFactory);
+  }
+
   public static class CustomFactory implements AwsClientFactory {
 
     public CustomFactory() {}