You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by dw...@apache.org on 2023/04/04 17:37:50 UTC

[iceberg] branch 1.2.x updated (e340ad5be0 -> 3b47c49c4c)

This is an automated email from the ASF dual-hosted git repository.

dweeks pushed a change to branch 1.2.x
in repository https://gitbox.apache.org/repos/asf/iceberg.git


    from e340ad5be0 Core: Allow passing identity object through RESTSessionCatalog (#7088)
     new 527e8da6a0 Core: Parse snapshot-id as long in remove-statistics update (#7235)
     new ffe8756870 AWS: Prevent token refresh scheduling on every sign request (#7270)
     new fb60c3d4ff AWS: Disable local credentials if remote signing is enabled (#7230)
     new f09f1e16a3 Revert "Spark: Add "Iceberg" prefix to SparkTable name string for SparkUI (#5629)" (#7273)
     new 29e60c43e1 Spark: broadcast table instead of file IO in rewrite manifests (#7263)
     new 3b47c49c4c AWS: abort S3 input stream on close if not EOS (#7262)

The 6 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../java/org/apache/iceberg/aws/AwsProperties.java |  5 +-
 .../org/apache/iceberg/aws/s3/S3InputStream.java   | 25 ++++++++-
 .../aws/s3/signer/S3V4RestSignerClient.java        | 60 +++++++++++++++++-----
 .../iceberg/aws/s3/signer/S3SignerServlet.java     |  2 +
 .../iceberg/aws/s3/signer/TestS3RestSigner.java    | 52 ++++++++++++++-----
 .../org/apache/iceberg/MetadataUpdateParser.java   |  2 +-
 .../apache/iceberg/TestMetadataUpdateParser.java   | 22 ++++----
 .../spark/actions/RewriteManifestsSparkAction.java | 32 ++++++------
 .../apache/iceberg/spark/source/SparkTable.java    |  2 +-
 .../iceberg/spark/source/TestSparkTable.java       | 10 ----
 10 files changed, 145 insertions(+), 67 deletions(-)


[iceberg] 01/06: Core: Parse snapshot-id as long in remove-statistics update (#7235)

Posted by dw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dweeks pushed a commit to branch 1.2.x
in repository https://gitbox.apache.org/repos/asf/iceberg.git

commit 527e8da6a09799a72dd510f226b1f91093e7fcae
Author: Eduard Tudenhöfner <et...@gmail.com>
AuthorDate: Thu Mar 30 17:10:54 2023 +0200

    Core: Parse snapshot-id as long in remove-statistics update (#7235)
---
 .../org/apache/iceberg/MetadataUpdateParser.java   |  2 +-
 .../apache/iceberg/TestMetadataUpdateParser.java   | 22 ++++++++++++----------
 2 files changed, 13 insertions(+), 11 deletions(-)

diff --git a/core/src/main/java/org/apache/iceberg/MetadataUpdateParser.java b/core/src/main/java/org/apache/iceberg/MetadataUpdateParser.java
index 16bc5e685d..1ecf40cc40 100644
--- a/core/src/main/java/org/apache/iceberg/MetadataUpdateParser.java
+++ b/core/src/main/java/org/apache/iceberg/MetadataUpdateParser.java
@@ -436,7 +436,7 @@ public class MetadataUpdateParser {
   }
 
   private static MetadataUpdate readRemoveStatistics(JsonNode node) {
-    int snapshotId = JsonUtil.getInt(SNAPSHOT_ID, node);
+    long snapshotId = JsonUtil.getLong(SNAPSHOT_ID, node);
     return new MetadataUpdate.RemoveStatistics(snapshotId);
   }
 
diff --git a/core/src/test/java/org/apache/iceberg/TestMetadataUpdateParser.java b/core/src/test/java/org/apache/iceberg/TestMetadataUpdateParser.java
index 80faccc5f2..c6baead8a7 100644
--- a/core/src/test/java/org/apache/iceberg/TestMetadataUpdateParser.java
+++ b/core/src/test/java/org/apache/iceberg/TestMetadataUpdateParser.java
@@ -773,24 +773,26 @@ public class TestMetadataUpdateParser {
   @Test
   public void testSetStatistics() {
     String json =
-        "{\"action\":\"set-statistics\",\"snapshot-id\":42,\"statistics\":{\"snapshot-id\":42,"
+        "{\"action\":\"set-statistics\",\"snapshot-id\":1940541653261589030,\"statistics\":{\"snapshot-id\":1940541653261589030,"
             + "\"statistics-path\":\"s3://bucket/warehouse/stats.puffin\",\"file-size-in-bytes\":124,"
             + "\"file-footer-size-in-bytes\":27,\"blob-metadata\":[{\"type\":\"boring-type\","
-            + "\"snapshot-id\":42,\"sequence-number\":2,\"fields\":[1],"
+            + "\"snapshot-id\":1940541653261589030,\"sequence-number\":2,\"fields\":[1],"
             + "\"properties\":{\"prop-key\":\"prop-value\"}}]}}";
+
+    long snapshotId = 1940541653261589030L;
     MetadataUpdate expected =
         new MetadataUpdate.SetStatistics(
-            42,
+            snapshotId,
             new GenericStatisticsFile(
-                42,
+                snapshotId,
                 "s3://bucket/warehouse/stats.puffin",
-                124,
-                27,
+                124L,
+                27L,
                 ImmutableList.of(
                     new GenericBlobMetadata(
                         "boring-type",
-                        42,
-                        2,
+                        snapshotId,
+                        2L,
                         ImmutableList.of(1),
                         ImmutableMap.of("prop-key", "prop-value")))));
     assertEquals(
@@ -803,8 +805,8 @@ public class TestMetadataUpdateParser {
 
   @Test
   public void testRemoveStatistics() {
-    String json = "{\"action\":\"remove-statistics\",\"snapshot-id\":42}";
-    MetadataUpdate expected = new MetadataUpdate.RemoveStatistics(42);
+    String json = "{\"action\":\"remove-statistics\",\"snapshot-id\":1940541653261589030}";
+    MetadataUpdate expected = new MetadataUpdate.RemoveStatistics(1940541653261589030L);
     assertEquals(
         MetadataUpdateParser.REMOVE_STATISTICS, expected, MetadataUpdateParser.fromJson(json));
     Assert.assertEquals(


[iceberg] 04/06: Revert "Spark: Add "Iceberg" prefix to SparkTable name string for SparkUI (#5629)" (#7273)

Posted by dw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dweeks pushed a commit to branch 1.2.x
in repository https://gitbox.apache.org/repos/asf/iceberg.git

commit f09f1e16a3b97869daed44e174ab6a6591d7c90c
Author: Amogh Jahagirdar <ja...@amazon.com>
AuthorDate: Mon Apr 3 15:02:47 2023 -0700

    Revert "Spark: Add "Iceberg" prefix to SparkTable name string for SparkUI (#5629)" (#7273)
---
 .../main/java/org/apache/iceberg/spark/source/SparkTable.java  |  2 +-
 .../java/org/apache/iceberg/spark/source/TestSparkTable.java   | 10 ----------
 2 files changed, 1 insertion(+), 11 deletions(-)

diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
index cd501ccb52..fef2f4540a 100644
--- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
+++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
@@ -163,7 +163,7 @@ public class SparkTable
 
   @Override
   public String name() {
-    return String.format("Iceberg %s", icebergTable.name());
+    return icebergTable.toString();
   }
 
   public Long snapshotId() {
diff --git a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkTable.java b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkTable.java
index 4e4d84e760..616a196872 100644
--- a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkTable.java
+++ b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkTable.java
@@ -57,14 +57,4 @@ public class TestSparkTable extends SparkCatalogTestBase {
     Assert.assertNotSame("References must be different", table1, table2);
     Assert.assertEquals("Tables must be equivalent", table1, table2);
   }
-
-  @Test
-  public void testTableName() throws NoSuchTableException {
-    CatalogManager catalogManager = spark.sessionState().catalogManager();
-    TableCatalog catalog = (TableCatalog) catalogManager.catalog(catalogName);
-    Identifier identifier = Identifier.of(tableIdent.namespace().levels(), tableIdent.name());
-    String actualTableName = catalog.loadTable(identifier).name();
-    String expectedTableName = String.format("Iceberg %s.%s", catalogName, tableIdent);
-    Assert.assertEquals("Table name mismatched", expectedTableName, actualTableName);
-  }
 }


[iceberg] 02/06: AWS: Prevent token refresh scheduling on every sign request (#7270)

Posted by dw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dweeks pushed a commit to branch 1.2.x
in repository https://gitbox.apache.org/repos/asf/iceberg.git

commit ffe875687024572e0c5e37caf98568c78c1f89da
Author: Eduard Tudenhöfner <et...@gmail.com>
AuthorDate: Mon Apr 3 19:42:10 2023 +0200

    AWS: Prevent token refresh scheduling on every sign request (#7270)
---
 .../aws/s3/signer/S3V4RestSignerClient.java        | 60 +++++++++++++++++-----
 .../iceberg/aws/s3/signer/S3SignerServlet.java     |  2 +
 .../iceberg/aws/s3/signer/TestS3RestSigner.java    | 52 ++++++++++++++-----
 3 files changed, 87 insertions(+), 27 deletions(-)

diff --git a/aws/src/main/java/org/apache/iceberg/aws/s3/signer/S3V4RestSignerClient.java b/aws/src/main/java/org/apache/iceberg/aws/s3/signer/S3V4RestSignerClient.java
index 032cb03c4f..a0920774af 100644
--- a/aws/src/main/java/org/apache/iceberg/aws/s3/signer/S3V4RestSignerClient.java
+++ b/aws/src/main/java/org/apache/iceberg/aws/s3/signer/S3V4RestSignerClient.java
@@ -20,7 +20,9 @@ package org.apache.iceberg.aws.s3.signer;
 
 import com.github.benmanes.caffeine.cache.Cache;
 import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.RemovalListener;
 import java.net.URI;
+import java.time.Duration;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -30,6 +32,7 @@ import java.util.function.Consumer;
 import java.util.function.Supplier;
 import javax.annotation.Nullable;
 import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
@@ -115,7 +118,8 @@ public abstract class S3V4RestSignerClient
         OAuth2Properties.TOKEN_REFRESH_ENABLED_DEFAULT);
   }
 
-  private ScheduledExecutorService tokenRefreshExecutor() {
+  @VisibleForTesting
+  ScheduledExecutorService tokenRefreshExecutor() {
     if (!keepTokenRefreshed()) {
       return null;
     }
@@ -131,6 +135,26 @@ public abstract class S3V4RestSignerClient
     return tokenRefreshExecutor;
   }
 
+  @Value.Lazy
+  Cache<String, AuthSession> authSessionCache() {
+    long expirationIntervalMs =
+        PropertyUtil.propertyAsLong(
+            properties(),
+            CatalogProperties.AUTH_SESSION_TIMEOUT_MS,
+            CatalogProperties.AUTH_SESSION_TIMEOUT_MS_DEFAULT);
+
+    return Caffeine.newBuilder()
+        .expireAfterAccess(Duration.ofMillis(expirationIntervalMs))
+        .removalListener(
+            (RemovalListener<String, AuthSession>)
+                (id, auth, cause) -> {
+                  if (null != auth) {
+                    auth.stopRefreshing();
+                  }
+                })
+        .build();
+  }
+
   private RESTClient httpClient() {
     if (null == httpClient) {
       synchronized (S3V4RestSignerClient.class) {
@@ -150,21 +174,31 @@ public abstract class S3V4RestSignerClient
   private AuthSession authSession() {
     String token = token().get();
     if (null != token) {
-      return AuthSession.fromAccessToken(
-          httpClient(),
-          tokenRefreshExecutor(),
-          token,
-          expiresAtMillis(properties()),
-          new AuthSession(ImmutableMap.of(), token, null, credential(), SCOPE));
+      return authSessionCache()
+          .get(
+              token,
+              id ->
+                  AuthSession.fromAccessToken(
+                      httpClient(),
+                      tokenRefreshExecutor(),
+                      token,
+                      expiresAtMillis(properties()),
+                      new AuthSession(ImmutableMap.of(), token, null, credential(), SCOPE)));
     }
 
     if (credentialProvided()) {
-      AuthSession session = new AuthSession(ImmutableMap.of(), null, null, credential(), SCOPE);
-      long startTimeMillis = System.currentTimeMillis();
-      OAuthTokenResponse authResponse =
-          OAuth2Util.fetchToken(httpClient(), session.headers(), credential(), SCOPE);
-      return AuthSession.fromTokenResponse(
-          httpClient(), tokenRefreshExecutor(), authResponse, startTimeMillis, session);
+      return authSessionCache()
+          .get(
+              credential(),
+              id -> {
+                AuthSession session =
+                    new AuthSession(ImmutableMap.of(), null, null, credential(), SCOPE);
+                long startTimeMillis = System.currentTimeMillis();
+                OAuthTokenResponse authResponse =
+                    OAuth2Util.fetchToken(httpClient(), session.headers(), credential(), SCOPE);
+                return AuthSession.fromTokenResponse(
+                    httpClient(), tokenRefreshExecutor(), authResponse, startTimeMillis, session);
+              });
     }
 
     return AuthSession.empty();
diff --git a/aws/src/test/java/org/apache/iceberg/aws/s3/signer/S3SignerServlet.java b/aws/src/test/java/org/apache/iceberg/aws/s3/signer/S3SignerServlet.java
index ad2177a01a..6240efa2ad 100644
--- a/aws/src/test/java/org/apache/iceberg/aws/s3/signer/S3SignerServlet.java
+++ b/aws/src/test/java/org/apache/iceberg/aws/s3/signer/S3SignerServlet.java
@@ -112,6 +112,7 @@ public class S3SignerServlet extends HttpServlet {
                 .withToken("client-credentials-token:sub=" + requestMap.get("client_id"))
                 .withIssuedTokenType("urn:ietf:params:oauth:token-type:access_token")
                 .withTokenType("Bearer")
+                .setExpirationInSeconds(100)
                 .build());
 
       case "urn:ietf:params:oauth:grant-type:token-exchange":
@@ -126,6 +127,7 @@ public class S3SignerServlet extends HttpServlet {
                 .withToken(token)
                 .withIssuedTokenType("urn:ietf:params:oauth:token-type:access_token")
                 .withTokenType("Bearer")
+                .setExpirationInSeconds(100)
                 .build());
 
       default:
diff --git a/aws/src/test/java/org/apache/iceberg/aws/s3/signer/TestS3RestSigner.java b/aws/src/test/java/org/apache/iceberg/aws/s3/signer/TestS3RestSigner.java
index 02914542b7..c19e3bc6d4 100644
--- a/aws/src/test/java/org/apache/iceberg/aws/s3/signer/TestS3RestSigner.java
+++ b/aws/src/test/java/org/apache/iceberg/aws/s3/signer/TestS3RestSigner.java
@@ -18,14 +18,16 @@
  */
 package org.apache.iceberg.aws.s3.signer;
 
+import static org.assertj.core.api.Assertions.assertThat;
+
 import java.nio.file.Paths;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.stream.Collectors;
 import org.apache.iceberg.aws.s3.MinioContainer;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.rest.auth.OAuth2Properties;
-import org.assertj.core.api.Assertions;
 import org.eclipse.jetty.server.Server;
 import org.eclipse.jetty.server.handler.gzip.GzipHandler;
 import org.eclipse.jetty.servlet.ServletContextHandler;
@@ -33,6 +35,7 @@ import org.eclipse.jetty.servlet.ServletHolder;
 import org.jetbrains.annotations.NotNull;
 import org.junit.AfterClass;
 import org.junit.Before;
+import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
@@ -69,6 +72,7 @@ public class TestS3RestSigner {
           AwsBasicCredentials.create("accessKeyId", "secretAccessKey"));
 
   private static Server httpServer;
+  private static ValidatingSigner validatingSigner;
   private S3Client s3;
 
   @Rule public TemporaryFolder temp = new TemporaryFolder();
@@ -77,20 +81,13 @@ public class TestS3RestSigner {
   public MinioContainer minioContainer =
       new MinioContainer(CREDENTIALS_PROVIDER.resolveCredentials());
 
-  @AfterClass
-  public static void afterClass() throws Exception {
-    if (null != httpServer) {
-      httpServer.stop();
-    }
-  }
-
-  @Before
-  public void before() throws Exception {
+  @BeforeClass
+  public static void beforeClass() throws Exception {
     if (null == httpServer) {
       httpServer = initHttpServer();
     }
 
-    ValidatingSigner validatingSigner =
+    validatingSigner =
         new ValidatingSigner(
             ImmutableS3V4RestSignerClient.builder()
                 .properties(
@@ -101,7 +98,34 @@ public class TestS3RestSigner {
                         "catalog:12345"))
                 .build(),
             new CustomAwsS3V4Signer());
+  }
 
+  @AfterClass
+  public static void afterClass() throws Exception {
+    assertThat(validatingSigner.icebergSigner.tokenRefreshExecutor())
+        .isInstanceOf(ScheduledThreadPoolExecutor.class);
+
+    ScheduledThreadPoolExecutor executor =
+        ((ScheduledThreadPoolExecutor) validatingSigner.icebergSigner.tokenRefreshExecutor());
+    // token expiration is set to 100s so there should be exactly one token scheduled for refresh
+    assertThat(executor.getPoolSize()).isEqualTo(1);
+    assertThat(executor.getQueue())
+        .as("should only have a single token scheduled for refresh")
+        .hasSize(1);
+    assertThat(executor.getActiveCount())
+        .as("should not have any token being refreshed")
+        .isEqualTo(0);
+    assertThat(executor.getCompletedTaskCount())
+        .as("should not have any expired token that required a refresh")
+        .isEqualTo(0);
+
+    if (null != httpServer) {
+      httpServer.stop();
+    }
+  }
+
+  @Before
+  public void before() throws Exception {
     s3 =
         S3Client.builder()
             .region(REGION)
@@ -128,7 +152,7 @@ public class TestS3RestSigner {
         CreateMultipartUploadRequest.builder().bucket(BUCKET).key("random/multipart-key").build());
   }
 
-  private Server initHttpServer() throws Exception {
+  private static Server initHttpServer() throws Exception {
     S3SignerServlet servlet = new S3SignerServlet(S3ObjectMapper.mapper());
     ServletContextHandler servletContext =
         new ServletContextHandler(ServletContextHandler.NO_SESSIONS);
@@ -260,10 +284,10 @@ public class TestS3RestSigner {
 
       SdkHttpFullRequest awsResult = signWithAwsSigner(request, signerParams);
 
-      Assertions.assertThat(awsResult.headers().get("Authorization"))
+      assertThat(awsResult.headers().get("Authorization"))
           .isEqualTo(icebergResult.headers().get("Authorization"));
 
-      Assertions.assertThat(awsResult.headers()).isEqualTo(icebergResult.headers());
+      assertThat(awsResult.headers()).isEqualTo(icebergResult.headers());
       return awsResult;
     }
 


[iceberg] 06/06: AWS: abort S3 input stream on close if not EOS (#7262)

Posted by dw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dweeks pushed a commit to branch 1.2.x
in repository https://gitbox.apache.org/repos/asf/iceberg.git

commit 3b47c49c4c9270e746938e07cf9edda3ae02a86d
Author: Bryan Keller <br...@gmail.com>
AuthorDate: Tue Apr 4 08:31:51 2023 -0700

    AWS: abort S3 input stream on close if not EOS (#7262)
    
    * AWS: abort S3 input stream on close if not EOS
    
    * Close the stream for backwards compatibility
    
    * undo unrelated change
    
    * add trace log
    
    * comment update
    
    * logger updates
    
    * handle connection closed exception
---
 .../org/apache/iceberg/aws/s3/S3InputStream.java   | 25 +++++++++++++++++++++-
 1 file changed, 24 insertions(+), 1 deletion(-)

diff --git a/aws/src/main/java/org/apache/iceberg/aws/s3/S3InputStream.java b/aws/src/main/java/org/apache/iceberg/aws/s3/S3InputStream.java
index 1a45ad0d0c..7d83cea3f1 100644
--- a/aws/src/main/java/org/apache/iceberg/aws/s3/S3InputStream.java
+++ b/aws/src/main/java/org/apache/iceberg/aws/s3/S3InputStream.java
@@ -36,6 +36,7 @@ import org.apache.iceberg.relocated.com.google.common.io.ByteStreams;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import software.amazon.awssdk.core.sync.ResponseTransformer;
+import software.amazon.awssdk.http.Abortable;
 import software.amazon.awssdk.services.s3.S3Client;
 import software.amazon.awssdk.services.s3.model.GetObjectRequest;
 import software.amazon.awssdk.services.s3.model.NoSuchKeyException;
@@ -196,7 +197,29 @@ class S3InputStream extends SeekableInputStream implements RangeReadable {
 
   private void closeStream() throws IOException {
     if (stream != null) {
-      stream.close();
+      // if we aren't at the end of the stream, and the stream is abortable, then
+      // call abort() so we don't read the remaining data with the Apache HTTP client
+      abortStream();
+      try {
+        stream.close();
+      } catch (IOException e) {
+        // the Apache HTTP client will throw a ConnectionClosedException
+        // when closing an aborted stream, which is expected
+        if (!e.getClass().getSimpleName().equals("ConnectionClosedException")) {
+          throw e;
+        }
+      }
+      stream = null;
+    }
+  }
+
+  private void abortStream() {
+    try {
+      if (stream instanceof Abortable && stream.read() != -1) {
+        ((Abortable) stream).abort();
+      }
+    } catch (Exception e) {
+      LOG.warn("An error occurred while aborting the stream", e);
     }
   }
 


[iceberg] 05/06: Spark: broadcast table instead of file IO in rewrite manifests (#7263)

Posted by dw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dweeks pushed a commit to branch 1.2.x
in repository https://gitbox.apache.org/repos/asf/iceberg.git

commit 29e60c43e1935c6cddd628b805094fcbc4b5fe6a
Author: Bryan Keller <br...@gmail.com>
AuthorDate: Mon Apr 3 20:38:42 2023 -0700

    Spark: broadcast table instead of file IO in rewrite manifests (#7263)
---
 .../spark/actions/RewriteManifestsSparkAction.java | 32 +++++++++++-----------
 1 file changed, 16 insertions(+), 16 deletions(-)

diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
index 42977b0be5..860168ae0a 100644
--- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
+++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
@@ -36,6 +36,7 @@ import org.apache.iceberg.ManifestFiles;
 import org.apache.iceberg.ManifestWriter;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Partitioning;
+import org.apache.iceberg.SerializableTable;
 import org.apache.iceberg.Snapshot;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableOperations;
@@ -44,7 +45,6 @@ import org.apache.iceberg.actions.BaseRewriteManifestsActionResult;
 import org.apache.iceberg.actions.RewriteManifests;
 import org.apache.iceberg.exceptions.CommitStateUnknownException;
 import org.apache.iceberg.exceptions.ValidationException;
-import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.io.OutputFile;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
@@ -52,7 +52,6 @@ import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.spark.JobGroupInfo;
 import org.apache.iceberg.spark.SparkDataFile;
-import org.apache.iceberg.spark.SparkUtil;
 import org.apache.iceberg.types.Types;
 import org.apache.iceberg.util.PropertyUtil;
 import org.apache.iceberg.util.Tasks;
@@ -90,7 +89,6 @@ public class RewriteManifestsSparkAction
   private final Encoder<ManifestFile> manifestEncoder;
   private final Table table;
   private final int formatVersion;
-  private final FileIO fileIO;
   private final long targetManifestSizeBytes;
 
   private PartitionSpec spec = null;
@@ -107,7 +105,6 @@ public class RewriteManifestsSparkAction
             table.properties(),
             TableProperties.MANIFEST_TARGET_SIZE_BYTES,
             TableProperties.MANIFEST_TARGET_SIZE_BYTES_DEFAULT);
-    this.fileIO = SparkUtil.serializableFileIO(table);
 
     // default the staging location to the metadata location
     TableOperations ops = ((HasTableOperations) table).operations();
@@ -216,7 +213,7 @@ public class RewriteManifestsSparkAction
 
   private List<ManifestFile> writeManifestsForUnpartitionedTable(
       Dataset<Row> manifestEntryDF, int numManifests) {
-    Broadcast<FileIO> io = sparkContext().broadcast(fileIO);
+    Broadcast<Table> tableBroadcast = sparkContext().broadcast(SerializableTable.copyOf(table));
     StructType sparkType = (StructType) manifestEntryDF.schema().apply("data_file").dataType();
     Types.StructType combinedPartitionType = Partitioning.partitionType(table);
 
@@ -228,7 +225,7 @@ public class RewriteManifestsSparkAction
         .repartition(numManifests)
         .mapPartitions(
             toManifests(
-                io,
+                tableBroadcast,
                 maxNumManifestEntries,
                 stagingLocation,
                 formatVersion,
@@ -242,7 +239,7 @@ public class RewriteManifestsSparkAction
   private List<ManifestFile> writeManifestsForPartitionedTable(
       Dataset<Row> manifestEntryDF, int numManifests, int targetNumManifestEntries) {
 
-    Broadcast<FileIO> io = sparkContext().broadcast(fileIO);
+    Broadcast<Table> tableBroadcast = sparkContext().broadcast(SerializableTable.copyOf(table));
     StructType sparkType = (StructType) manifestEntryDF.schema().apply("data_file").dataType();
     Types.StructType combinedPartitionType = Partitioning.partitionType(table);
 
@@ -258,7 +255,7 @@ public class RewriteManifestsSparkAction
               .sortWithinPartitions(partitionColumn)
               .mapPartitions(
                   toManifests(
-                      io,
+                      tableBroadcast,
                       maxNumManifestEntries,
                       stagingLocation,
                       formatVersion,
@@ -298,7 +295,7 @@ public class RewriteManifestsSparkAction
       return ImmutableList.of();
     }
 
-    return currentSnapshot.dataManifests(fileIO).stream()
+    return currentSnapshot.dataManifests(table.io()).stream()
         .filter(manifest -> manifest.partitionSpecId() == spec.specId() && predicate.test(manifest))
         .collect(Collectors.toList());
   }
@@ -351,14 +348,14 @@ public class RewriteManifestsSparkAction
         .noRetry()
         .suppressFailureWhenFinished()
         .onFailure((location, exc) -> LOG.warn("Failed to delete: {}", location, exc))
-        .run(fileIO::deleteFile);
+        .run(location -> table.io().deleteFile(location));
   }
 
   private static ManifestFile writeManifest(
       List<Row> rows,
       int startIndex,
       int endIndex,
-      Broadcast<FileIO> io,
+      Broadcast<Table> tableBroadcast,
       String location,
       int format,
       Types.StructType combinedPartitionType,
@@ -369,7 +366,10 @@ public class RewriteManifestsSparkAction
     String manifestName = "optimized-m-" + UUID.randomUUID();
     Path manifestPath = new Path(location, manifestName);
     OutputFile outputFile =
-        io.value().newOutputFile(FileFormat.AVRO.addExtension(manifestPath.toString()));
+        tableBroadcast
+            .value()
+            .io()
+            .newOutputFile(FileFormat.AVRO.addExtension(manifestPath.toString()));
 
     Types.StructType combinedFileType = DataFile.getType(combinedPartitionType);
     Types.StructType manifestFileType = DataFile.getType(spec.partitionType());
@@ -394,7 +394,7 @@ public class RewriteManifestsSparkAction
   }
 
   private static MapPartitionsFunction<Row, ManifestFile> toManifests(
-      Broadcast<FileIO> io,
+      Broadcast<Table> tableBroadcast,
       long maxNumManifestEntries,
       String location,
       int format,
@@ -416,7 +416,7 @@ public class RewriteManifestsSparkAction
                 rowsAsList,
                 0,
                 rowsAsList.size(),
-                io,
+                tableBroadcast,
                 location,
                 format,
                 combinedPartitionType,
@@ -429,7 +429,7 @@ public class RewriteManifestsSparkAction
                 rowsAsList,
                 0,
                 midIndex,
-                io,
+                tableBroadcast,
                 location,
                 format,
                 combinedPartitionType,
@@ -440,7 +440,7 @@ public class RewriteManifestsSparkAction
                 rowsAsList,
                 midIndex,
                 rowsAsList.size(),
-                io,
+                tableBroadcast,
                 location,
                 format,
                 combinedPartitionType,


[iceberg] 03/06: AWS: Disable local credentials if remote signing is enabled (#7230)

Posted by dw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dweeks pushed a commit to branch 1.2.x
in repository https://gitbox.apache.org/repos/asf/iceberg.git

commit fb60c3d4ffdfaa9c152e760a063db717b6c5a4ce
Author: Daniel Weeks <dw...@apache.org>
AuthorDate: Mon Apr 3 12:06:36 2023 -0700

    AWS: Disable local credentials if remote signing is enabled (#7230)
---
 aws/src/main/java/org/apache/iceberg/aws/AwsProperties.java | 5 ++++-
 1 file changed, 4 insertions(+), 1 deletion(-)

diff --git a/aws/src/main/java/org/apache/iceberg/aws/AwsProperties.java b/aws/src/main/java/org/apache/iceberg/aws/AwsProperties.java
index 94bc7e6ceb..b1ab6c0d5d 100644
--- a/aws/src/main/java/org/apache/iceberg/aws/AwsProperties.java
+++ b/aws/src/main/java/org/apache/iceberg/aws/AwsProperties.java
@@ -39,6 +39,7 @@ import org.apache.iceberg.util.PropertyUtil;
 import org.apache.iceberg.util.SerializableMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider;
 import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
 import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
 import software.amazon.awssdk.auth.credentials.AwsSessionCredentials;
@@ -1126,7 +1127,9 @@ public class AwsProperties implements Serializable {
    */
   public <T extends S3ClientBuilder> void applyS3CredentialConfigurations(T builder) {
     builder.credentialsProvider(
-        credentialsProvider(s3AccessKeyId, s3SecretAccessKey, s3SessionToken));
+        s3RemoteSigningEnabled
+            ? AnonymousCredentialsProvider.create()
+            : credentialsProvider(s3AccessKeyId, s3SecretAccessKey, s3SessionToken));
   }
 
   /**