You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2022/09/03 19:09:41 UTC

[iceberg] branch 0.14.x updated (6d2edd6284 -> 3c9600f899)

This is an automated email from the ASF dual-hosted git repository.

blue pushed a change to branch 0.14.x
in repository https://gitbox.apache.org/repos/asf/iceberg.git


    from 6d2edd6284 Spark 3.2: Expose action classes in SparkActions (#5261)
     new adb4c2c9df API: Fix ID assignment in schema merging (#5395)
     new 60d5ff7b3e AWS: S3OutputStream - failure to close should persist on subsequent close calls (#5311)
     new 7bb15a2d88 Core: Fix snapshot log with intermediate transaction snapshots (#5568)
     new 0c45e96583 Core: Fix exception handling in BaseTaskWriter (#5683)
     new 950e6c01f9 Core: Support deleting tables without metadata files (#5510)
     new c171830a61 Core, AWS: Fix Kryo serialization failure for FileIO (#5437)
     new 8a296de24a Parquet: Close zstd input stream early to avoid memory pressure (#5681)
     new 7feb9457bd Spark: Fix stats in rewrite metadata action (#5691)
     new 3c9600f899 Core: Add CommitStateUnknownException handling to REST (#5694)

The 9 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../exceptions/ServiceFailureException.java        |  10 +-
 ...ption.java => ServiceUnavailableException.java} |  11 +--
 .../java/org/apache/iceberg/types/TypeUtil.java    |   5 +-
 .../test/java/org/apache/iceberg/TestHelpers.java  |  35 +++++++
 .../org/apache/iceberg/types/TestTypeUtil.java     |  24 +++++
 .../iceberg/aws/dynamodb/DynamoDbCatalog.java      |  26 +++--
 .../org/apache/iceberg/aws/glue/GlueCatalog.java   |  27 ++++--
 .../java/org/apache/iceberg/aws/s3/S3FileIO.java   |  11 ++-
 .../org/apache/iceberg/aws/s3/S3OutputStream.java  |   1 -
 .../org/apache/iceberg/aws/s3/TestS3FileIO.java    |  23 +++++
 .../apache/iceberg/aws/s3/TestS3OutputStream.java  |  18 ++++
 build.gradle                                       |   4 +
 .../java/org/apache/iceberg/TableMetadata.java     |   8 +-
 .../java/org/apache/iceberg/io/BaseTaskWriter.java |  96 ++++++++++++-------
 .../org/apache/iceberg/io/ResolvingFileIO.java     |   7 +-
 .../apache/iceberg/io/SortedPosDeleteWriter.java   |  18 +++-
 .../java/org/apache/iceberg/jdbc/JdbcCatalog.java  |  16 +++-
 .../org/apache/iceberg/rest/ErrorHandlers.java     |  12 ++-
 .../java/org/apache/iceberg/TestTransaction.java   |  50 ++++++----
 .../apache/iceberg/hadoop/HadoopFileIOTest.java    |  26 +++++
 .../org/apache/iceberg/io/TestResolvingIO.java     |  52 ++++++++++
 .../org/apache/iceberg/jdbc/TestJdbcCatalog.java   |  15 +++
 .../java/org/apache/iceberg/gcp/gcs/GCSFileIO.java |  20 ++--
 .../org/apache/iceberg/gcp/gcs/GCSFileIOTest.java  |  26 +++++
 .../java/org/apache/iceberg/hive/HiveCatalog.java  |  16 +++-
 .../org/apache/iceberg/hive/TestHiveCatalog.java   |  18 ++++
 .../java/org/apache/iceberg/parquet/Parquet.java   |   4 +
 .../iceberg/parquet/ParquetCodecFactory.java       | 102 ++++++++++++++++++++
 .../extensions/TestRewriteManifestsProcedure.java  |  29 ++++++
 .../org/apache/iceberg/spark/SparkDataFile.java    |  21 ++++-
 .../spark/actions/RewriteManifestsSparkAction.java | 105 ++++++++++++++++-----
 .../extensions/TestRewriteManifestsProcedure.java  |  29 ++++++
 .../org/apache/iceberg/spark/SparkDataFile.java    |  21 ++++-
 .../spark/actions/RewriteManifestsSparkAction.java | 105 ++++++++++++++++-----
 versions.props                                     |   1 +
 35 files changed, 830 insertions(+), 162 deletions(-)
 copy api/src/main/java/org/apache/iceberg/exceptions/{NotAuthorizedException.java => ServiceUnavailableException.java} (75%)
 create mode 100644 core/src/test/java/org/apache/iceberg/io/TestResolvingIO.java
 create mode 100644 parquet/src/main/java/org/apache/iceberg/parquet/ParquetCodecFactory.java


[iceberg] 05/09: Core: Support deleting tables without metadata files (#5510)

Posted by bl...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch 0.14.x
in repository https://gitbox.apache.org/repos/asf/iceberg.git

commit 950e6c01f94f9156c6321b462ebc7036b55ac06b
Author: ChenLiang <31...@users.noreply.github.com>
AuthorDate: Fri Sep 2 03:48:32 2022 +0800

    Core: Support deleting tables without metadata files (#5510)
---
 .../iceberg/aws/dynamodb/DynamoDbCatalog.java      | 26 +++++++++++++++------
 .../org/apache/iceberg/aws/glue/GlueCatalog.java   | 27 ++++++++++++++++------
 .../java/org/apache/iceberg/jdbc/JdbcCatalog.java  | 16 +++++++++----
 .../org/apache/iceberg/jdbc/TestJdbcCatalog.java   | 15 ++++++++++++
 .../java/org/apache/iceberg/hive/HiveCatalog.java  | 16 +++++++++----
 .../org/apache/iceberg/hive/TestHiveCatalog.java   | 18 +++++++++++++++
 6 files changed, 94 insertions(+), 24 deletions(-)

diff --git a/aws/src/main/java/org/apache/iceberg/aws/dynamodb/DynamoDbCatalog.java b/aws/src/main/java/org/apache/iceberg/aws/dynamodb/DynamoDbCatalog.java
index cf9a21ac57..1a69442707 100644
--- a/aws/src/main/java/org/apache/iceberg/aws/dynamodb/DynamoDbCatalog.java
+++ b/aws/src/main/java/org/apache/iceberg/aws/dynamodb/DynamoDbCatalog.java
@@ -43,6 +43,7 @@ import org.apache.iceberg.exceptions.AlreadyExistsException;
 import org.apache.iceberg.exceptions.NamespaceNotEmptyException;
 import org.apache.iceberg.exceptions.NoSuchNamespaceException;
 import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.exceptions.NotFoundException;
 import org.apache.iceberg.exceptions.ValidationException;
 import org.apache.iceberg.io.CloseableGroup;
 import org.apache.iceberg.io.FileIO;
@@ -351,13 +352,24 @@ public class DynamoDbCatalog extends BaseMetastoreCatalog implements Closeable,
       }
 
       TableOperations ops = newTableOps(identifier);
-      TableMetadata lastMetadata = ops.current();
-      dynamo.deleteItem(DeleteItemRequest.builder()
-          .tableName(awsProperties.dynamoDbTableName())
-          .key(tablePrimaryKey(identifier))
-          .conditionExpression(COL_VERSION + " = :v")
-          .expressionAttributeValues(ImmutableMap.of(":v", response.item().get(COL_VERSION)))
-          .build());
+      TableMetadata lastMetadata = null;
+      if (purge) {
+        try {
+          lastMetadata = ops.current();
+        } catch (NotFoundException e) {
+          LOG.warn(
+              "Failed to load table metadata for table: {}, continuing drop without purge",
+              identifier,
+              e);
+        }
+      }
+      dynamo.deleteItem(
+          DeleteItemRequest.builder()
+              .tableName(awsProperties.dynamoDbTableName())
+              .key(tablePrimaryKey(identifier))
+              .conditionExpression(COL_VERSION + " = :v")
+              .expressionAttributeValues(ImmutableMap.of(":v", response.item().get(COL_VERSION)))
+              .build());
       LOG.info("Successfully dropped table {} from DynamoDb catalog", identifier);
 
       if (purge && lastMetadata != null) {
diff --git a/aws/src/main/java/org/apache/iceberg/aws/glue/GlueCatalog.java b/aws/src/main/java/org/apache/iceberg/aws/glue/GlueCatalog.java
index 755005abe6..5e03cdd978 100644
--- a/aws/src/main/java/org/apache/iceberg/aws/glue/GlueCatalog.java
+++ b/aws/src/main/java/org/apache/iceberg/aws/glue/GlueCatalog.java
@@ -46,6 +46,7 @@ import org.apache.iceberg.exceptions.AlreadyExistsException;
 import org.apache.iceberg.exceptions.NamespaceNotEmptyException;
 import org.apache.iceberg.exceptions.NoSuchNamespaceException;
 import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.exceptions.NotFoundException;
 import org.apache.iceberg.hadoop.Configurable;
 import org.apache.iceberg.io.CloseableGroup;
 import org.apache.iceberg.io.FileIO;
@@ -273,13 +274,25 @@ public class GlueCatalog extends BaseMetastoreCatalog
   public boolean dropTable(TableIdentifier identifier, boolean purge) {
     try {
       TableOperations ops = newTableOps(identifier);
-      TableMetadata lastMetadata = ops.current();
-      glue.deleteTable(DeleteTableRequest.builder()
-          .catalogId(awsProperties.glueCatalogId())
-          .databaseName(IcebergToGlueConverter.getDatabaseName(
-              identifier, awsProperties.glueCatalogSkipNameValidation()))
-          .name(identifier.name())
-          .build());
+      TableMetadata lastMetadata = null;
+      if (purge) {
+        try {
+          lastMetadata = ops.current();
+        } catch (NotFoundException e) {
+          LOG.warn(
+              "Failed to load table metadata for table: {}, continuing drop without purge",
+              identifier,
+              e);
+        }
+      }
+      glue.deleteTable(
+          DeleteTableRequest.builder()
+              .catalogId(awsProperties.glueCatalogId())
+              .databaseName(
+                  IcebergToGlueConverter.getDatabaseName(
+                      identifier, awsProperties.glueCatalogSkipNameValidation()))
+              .name(identifier.name())
+              .build());
       LOG.info("Successfully dropped table {} from Glue", identifier);
       if (purge && lastMetadata != null) {
         CatalogUtil.dropTableData(ops.io(), lastMetadata);
diff --git a/core/src/main/java/org/apache/iceberg/jdbc/JdbcCatalog.java b/core/src/main/java/org/apache/iceberg/jdbc/JdbcCatalog.java
index a5ffa2f7c0..f415f8c757 100644
--- a/core/src/main/java/org/apache/iceberg/jdbc/JdbcCatalog.java
+++ b/core/src/main/java/org/apache/iceberg/jdbc/JdbcCatalog.java
@@ -49,6 +49,7 @@ import org.apache.iceberg.exceptions.AlreadyExistsException;
 import org.apache.iceberg.exceptions.NamespaceNotEmptyException;
 import org.apache.iceberg.exceptions.NoSuchNamespaceException;
 import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.exceptions.NotFoundException;
 import org.apache.iceberg.hadoop.Configurable;
 import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.relocated.com.google.common.base.Joiner;
@@ -154,11 +155,16 @@ public class JdbcCatalog extends BaseMetastoreCatalog
   @Override
   public boolean dropTable(TableIdentifier identifier, boolean purge) {
     TableOperations ops = newTableOps(identifier);
-    TableMetadata lastMetadata;
-    if (purge && ops.current() != null) {
-      lastMetadata = ops.current();
-    } else {
-      lastMetadata = null;
+    TableMetadata lastMetadata = null;
+    if (purge) {
+      try {
+        lastMetadata = ops.current();
+      } catch (NotFoundException e) {
+        LOG.warn(
+            "Failed to load table metadata for table: {}, continuing drop without purge",
+            identifier,
+            e);
+      }
     }
 
     int deletedRecords = execute(
diff --git a/core/src/test/java/org/apache/iceberg/jdbc/TestJdbcCatalog.java b/core/src/test/java/org/apache/iceberg/jdbc/TestJdbcCatalog.java
index ec8c3671aa..7e4e1fbf9f 100644
--- a/core/src/test/java/org/apache/iceberg/jdbc/TestJdbcCatalog.java
+++ b/core/src/test/java/org/apache/iceberg/jdbc/TestJdbcCatalog.java
@@ -41,6 +41,7 @@ import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.SortOrder;
 import org.apache.iceberg.Table;
+import org.apache.iceberg.TableOperations;
 import org.apache.iceberg.Transaction;
 import org.apache.iceberg.catalog.CatalogTests;
 import org.apache.iceberg.catalog.Namespace;
@@ -66,6 +67,7 @@ import org.junit.jupiter.api.io.TempDir;
 import static org.apache.iceberg.NullOrder.NULLS_FIRST;
 import static org.apache.iceberg.SortDirection.ASC;
 import static org.apache.iceberg.types.Types.NestedField.required;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 public class TestJdbcCatalog extends CatalogTests<JdbcCatalog> {
 
@@ -372,6 +374,19 @@ public class TestJdbcCatalog extends CatalogTests<JdbcCatalog> {
     Assert.assertFalse(catalog.dropTable(TableIdentifier.of("db", "tbl-not-exists")));
   }
 
+  @Test
+  public void testDropTableWithoutMetadataFile() {
+    TableIdentifier testTable = TableIdentifier.of("db", "ns1", "ns2", "tbl");
+    catalog.createTable(testTable, SCHEMA, PartitionSpec.unpartitioned());
+    String metadataFileLocation = catalog.newTableOps(testTable).current().metadataFileLocation();
+    TableOperations ops = catalog.newTableOps(testTable);
+    ops.io().deleteFile(metadataFileLocation);
+    Assert.assertTrue(catalog.dropTable(testTable));
+    assertThatThrownBy(() -> catalog.loadTable(testTable))
+        .isInstanceOf(NoSuchTableException.class)
+        .hasMessageContaining("Table does not exist:");
+  }
+
   @Test
   public void testRenameTable() {
     TableIdentifier from = TableIdentifier.of("db", "tbl1");
diff --git a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java
index 62c70438e6..20c7746f98 100644
--- a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java
+++ b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java
@@ -49,6 +49,7 @@ import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.exceptions.NamespaceNotEmptyException;
 import org.apache.iceberg.exceptions.NoSuchNamespaceException;
 import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.exceptions.NotFoundException;
 import org.apache.iceberg.hadoop.HadoopFileIO;
 import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.io.InputFile;
@@ -157,11 +158,16 @@ public class HiveCatalog extends BaseMetastoreCatalog implements SupportsNamespa
     String database = identifier.namespace().level(0);
 
     TableOperations ops = newTableOps(identifier);
-    TableMetadata lastMetadata;
-    if (purge && ops.current() != null) {
-      lastMetadata = ops.current();
-    } else {
-      lastMetadata = null;
+    TableMetadata lastMetadata = null;
+    if (purge) {
+      try {
+        lastMetadata = ops.current();
+      } catch (NotFoundException e) {
+        LOG.warn(
+            "Failed to load table metadata for table: {}, continuing drop without purge",
+            identifier,
+            e);
+      }
     }
 
     try {
diff --git a/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCatalog.java b/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCatalog.java
index 46ccb571a5..24487612bf 100644
--- a/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCatalog.java
+++ b/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCatalog.java
@@ -41,6 +41,7 @@ import org.apache.iceberg.SortOrder;
 import org.apache.iceberg.SortOrderParser;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableOperations;
 import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.Transaction;
 import org.apache.iceberg.UpdateSchema;
@@ -50,6 +51,7 @@ import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.exceptions.AlreadyExistsException;
 import org.apache.iceberg.exceptions.NamespaceNotEmptyException;
 import org.apache.iceberg.exceptions.NoSuchNamespaceException;
+import org.apache.iceberg.exceptions.NoSuchTableException;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
@@ -74,6 +76,7 @@ import static org.apache.iceberg.TableProperties.DEFAULT_PARTITION_SPEC;
 import static org.apache.iceberg.TableProperties.DEFAULT_SORT_ORDER;
 import static org.apache.iceberg.expressions.Expressions.bucket;
 import static org.apache.iceberg.types.Types.NestedField.required;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -440,6 +443,21 @@ public class TestHiveCatalog extends HiveMetastoreTest {
         });
   }
 
+  @Test
+  public void testDropTableWithoutMetadataFile() {
+    TableIdentifier identifier = TableIdentifier.of(DB_NAME, "tbl");
+    Schema tableSchema =
+        new Schema(Types.StructType.of(required(1, "id", Types.LongType.get())).fields());
+    catalog.createTable(identifier, tableSchema);
+    String metadataFileLocation = catalog.newTableOps(identifier).current().metadataFileLocation();
+    TableOperations ops = catalog.newTableOps(identifier);
+    ops.io().deleteFile(metadataFileLocation);
+    Assert.assertTrue(catalog.dropTable(identifier));
+    assertThatThrownBy(() -> catalog.loadTable(identifier))
+        .isInstanceOf(NoSuchTableException.class)
+        .hasMessageContaining("Table does not exist:");
+  }
+
   @Test
   public void testTableName() {
     Schema schema = new Schema(


[iceberg] 06/09: Core, AWS: Fix Kryo serialization failure for FileIO (#5437)

Posted by bl...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch 0.14.x
in repository https://gitbox.apache.org/repos/asf/iceberg.git

commit c171830a61af71fbcadc3d4ba2628643b179e097
Author: Prashant Singh <35...@users.noreply.github.com>
AuthorDate: Fri Sep 2 04:07:16 2022 +0530

    Core, AWS: Fix Kryo serialization failure for FileIO (#5437)
---
 .../test/java/org/apache/iceberg/TestHelpers.java  | 35 +++++++++++++++
 .../java/org/apache/iceberg/aws/s3/S3FileIO.java   | 11 ++---
 .../org/apache/iceberg/aws/s3/TestS3FileIO.java    | 23 ++++++++++
 build.gradle                                       |  4 ++
 .../org/apache/iceberg/io/ResolvingFileIO.java     |  7 +--
 .../apache/iceberg/hadoop/HadoopFileIOTest.java    | 26 +++++++++++
 .../org/apache/iceberg/io/TestResolvingIO.java     | 52 ++++++++++++++++++++++
 .../java/org/apache/iceberg/gcp/gcs/GCSFileIO.java | 20 ++++++---
 .../org/apache/iceberg/gcp/gcs/GCSFileIOTest.java  | 26 +++++++++++
 versions.props                                     |  1 +
 10 files changed, 190 insertions(+), 15 deletions(-)

diff --git a/api/src/test/java/org/apache/iceberg/TestHelpers.java b/api/src/test/java/org/apache/iceberg/TestHelpers.java
index f82269060e..77e7acf2f6 100644
--- a/api/src/test/java/org/apache/iceberg/TestHelpers.java
+++ b/api/src/test/java/org/apache/iceberg/TestHelpers.java
@@ -19,11 +19,16 @@
 
 package org.apache.iceberg;
 
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import com.esotericsoftware.kryo.serializers.ClosureSerializer;
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
+import java.lang.invoke.SerializedLambda;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.List;
@@ -36,6 +41,7 @@ import org.apache.iceberg.expressions.ExpressionVisitors;
 import org.apache.iceberg.expressions.UnboundPredicate;
 import org.apache.iceberg.util.ByteBuffers;
 import org.junit.Assert;
+import org.objenesis.strategy.StdInstantiatorStrategy;
 
 public class TestHelpers {
 
@@ -147,6 +153,35 @@ public class TestHelpers {
     });
   }
 
+  public static class KryoHelpers {
+    private KryoHelpers() {
+    }
+
+    @SuppressWarnings("unchecked")
+    public static <T> T roundTripSerialize(T obj) throws IOException {
+      Kryo kryo = new Kryo();
+
+      // required for avoiding requirement of zero arg constructor
+      kryo.setInstantiatorStrategy(
+          new Kryo.DefaultInstantiatorStrategy(new StdInstantiatorStrategy()));
+
+      // required for serializing and deserializing $$Lambda$ Anonymous Classes
+      kryo.register(SerializedLambda.class);
+      kryo.register(ClosureSerializer.Closure.class, new ClosureSerializer());
+
+      ByteArrayOutputStream bytes = new ByteArrayOutputStream();
+
+      try (Output out = new Output(new ObjectOutputStream(bytes))) {
+        kryo.writeClassAndObject(out, obj);
+      }
+
+      try (Input in =
+          new Input(new ObjectInputStream(new ByteArrayInputStream(bytes.toByteArray())))) {
+        return (T) kryo.readClassAndObject(in);
+      }
+    }
+  }
+
   private static class CheckReferencesBound extends ExpressionVisitors.ExpressionVisitor<Void> {
     private final String message;
 
diff --git a/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIO.java b/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIO.java
index 5d5d88eafb..917b25af48 100644
--- a/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIO.java
+++ b/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIO.java
@@ -45,6 +45,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.Multimaps;
 import org.apache.iceberg.relocated.com.google.common.collect.SetMultimap;
 import org.apache.iceberg.relocated.com.google.common.collect.Sets;
 import org.apache.iceberg.relocated.com.google.common.collect.Streams;
+import org.apache.iceberg.util.SerializableMap;
 import org.apache.iceberg.util.SerializableSupplier;
 import org.apache.iceberg.util.Tasks;
 import org.apache.iceberg.util.ThreadPools;
@@ -79,7 +80,7 @@ public class S3FileIO implements FileIO, SupportsBulkOperations, SupportsPrefixO
   private String credential = null;
   private SerializableSupplier<S3Client> s3;
   private AwsProperties awsProperties;
-  private Map<String, String> properties = null;
+  private SerializableMap<String, String> properties = null;
   private transient volatile S3Client client;
   private MetricsContext metrics = MetricsContext.nullMetrics();
   private final AtomicBoolean isResourceClosed = new AtomicBoolean(false);
@@ -152,7 +153,7 @@ public class S3FileIO implements FileIO, SupportsBulkOperations, SupportsPrefixO
 
   @Override
   public Map<String, String> properties() {
-    return properties;
+    return properties.immutableMap();
   }
 
   /**
@@ -314,8 +315,8 @@ public class S3FileIO implements FileIO, SupportsBulkOperations, SupportsPrefixO
 
   @Override
   public void initialize(Map<String, String> props) {
-    this.awsProperties = new AwsProperties(props);
-    this.properties = props;
+    this.properties = SerializableMap.copyOf(props);
+    this.awsProperties = new AwsProperties(properties);
 
     // Do not override s3 client if it was provided
     if (s3 == null) {
@@ -334,7 +335,7 @@ public class S3FileIO implements FileIO, SupportsBulkOperations, SupportsPrefixO
               .hiddenImpl(DEFAULT_METRICS_IMPL, String.class)
               .buildChecked();
       MetricsContext context = ctor.newInstance("s3");
-      context.initialize(props);
+      context.initialize(properties);
       this.metrics = context;
     } catch (NoClassDefFoundError | NoSuchMethodException | ClassCastException e) {
       LOG.warn("Unable to load metrics class: '{}', falling back to null metrics", DEFAULT_METRICS_IMPL, e);
diff --git a/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3FileIO.java b/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3FileIO.java
index 972df9da89..4d4d06c84c 100644
--- a/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3FileIO.java
+++ b/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3FileIO.java
@@ -30,6 +30,7 @@ import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang3.SerializationUtils;
 import org.apache.hadoop.conf.Configurable;
 import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.TestHelpers;
 import org.apache.iceberg.io.BulkDeletionFailureException;
 import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.io.FileIOParser;
@@ -244,6 +245,28 @@ public class TestS3FileIO {
     }
   }
 
+  @Test
+  public void testS3FileIOKryoSerialization() throws IOException {
+    FileIO testS3FileIO = new S3FileIO();
+
+    // s3 fileIO should be serializable when properties are passed as immutable map
+    testS3FileIO.initialize(ImmutableMap.of("k1", "v1"));
+    FileIO roundTripSerializedFileIO = TestHelpers.KryoHelpers.roundTripSerialize(testS3FileIO);
+
+    Assert.assertEquals(testS3FileIO.properties(), roundTripSerializedFileIO.properties());
+  }
+
+  @Test
+  public void testS3FileIOJavaSerialization() throws IOException, ClassNotFoundException {
+    FileIO testS3FileIO = new S3FileIO();
+
+    // s3 fileIO should be serializable when properties are passed as immutable map
+    testS3FileIO.initialize(ImmutableMap.of("k1", "v1"));
+    FileIO roundTripSerializedFileIO = TestHelpers.roundTripSerialize(testS3FileIO);
+
+    Assert.assertEquals(testS3FileIO.properties(), roundTripSerializedFileIO.properties());
+  }
+
   private void createRandomObjects(String prefix, int count) {
     S3URI s3URI = new S3URI(prefix);
 
diff --git a/build.gradle b/build.gradle
index d9a6effddc..3a6468ba3c 100644
--- a/build.gradle
+++ b/build.gradle
@@ -222,6 +222,7 @@ project(':iceberg-api') {
     implementation project(path: ':iceberg-bundled-guava', configuration: 'shadow')
     compileOnly "com.google.errorprone:error_prone_annotations"
     testImplementation "org.apache.avro:avro"
+    testImplementation "com.esotericsoftware:kryo"
   }
 
   tasks.processTestResources.dependsOn rootProject.tasks.buildInfo
@@ -269,6 +270,7 @@ project(':iceberg-core') {
     testImplementation 'org.mock-server:mockserver-client-java'
     testImplementation "org.xerial:sqlite-jdbc"
     testImplementation project(path: ':iceberg-api', configuration: 'testArtifacts')
+    testImplementation "com.esotericsoftware:kryo"
   }
 }
 
@@ -381,6 +383,7 @@ project(':iceberg-aws') {
       exclude module: "logback-classic"
       exclude group: 'junit'
     }
+    testImplementation "com.esotericsoftware:kryo"
   }
 
   sourceSets {
@@ -423,6 +426,7 @@ project(':iceberg-gcp') {
       exclude group: 'javax.servlet', module: 'servlet-api'
       exclude group: 'com.google.code.gson', module: 'gson'
     }
+    testImplementation "com.esotericsoftware:kryo"
   }
 }
 
diff --git a/core/src/main/java/org/apache/iceberg/io/ResolvingFileIO.java b/core/src/main/java/org/apache/iceberg/io/ResolvingFileIO.java
index 3815d5da54..8a29cadf27 100644
--- a/core/src/main/java/org/apache/iceberg/io/ResolvingFileIO.java
+++ b/core/src/main/java/org/apache/iceberg/io/ResolvingFileIO.java
@@ -29,6 +29,7 @@ import org.apache.iceberg.hadoop.SerializableConfiguration;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.SerializableMap;
 import org.apache.iceberg.util.SerializableSupplier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -47,7 +48,7 @@ public class ResolvingFileIO implements FileIO, HadoopConfigurable {
   );
 
   private final Map<String, FileIO> ioInstances = Maps.newHashMap();
-  private Map<String, String> properties;
+  private SerializableMap<String, String> properties;
   private SerializableSupplier<Configuration> hadoopConf;
 
   /**
@@ -80,13 +81,13 @@ public class ResolvingFileIO implements FileIO, HadoopConfigurable {
 
   @Override
   public Map<String, String> properties() {
-    return properties;
+    return properties.immutableMap();
   }
 
   @Override
   public void initialize(Map<String, String> newProperties) {
     close(); // close and discard any existing FileIO instances
-    this.properties = newProperties;
+    this.properties = SerializableMap.copyOf(newProperties);
   }
 
   @Override
diff --git a/core/src/test/java/org/apache/iceberg/hadoop/HadoopFileIOTest.java b/core/src/test/java/org/apache/iceberg/hadoop/HadoopFileIOTest.java
index 0721d69997..12de675dd5 100644
--- a/core/src/test/java/org/apache/iceberg/hadoop/HadoopFileIOTest.java
+++ b/core/src/test/java/org/apache/iceberg/hadoop/HadoopFileIOTest.java
@@ -27,8 +27,12 @@ import java.util.Random;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.iceberg.TestHelpers;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.relocated.com.google.common.collect.Streams;
+import org.junit.Assert;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
@@ -91,6 +95,28 @@ public class HadoopFileIOTest {
     assertThrows(UncheckedIOException.class, () -> hadoopFileIO.listPrefix(parent.toUri().toString()).iterator());
   }
 
+  @Test
+  public void testHadoopFileIOKryoSerialization() throws IOException {
+    FileIO testHadoopFileIO = new HadoopFileIO();
+
+    // hadoop fileIO should be serializable when properties are passed as immutable map
+    testHadoopFileIO.initialize(ImmutableMap.of("k1", "v1"));
+    FileIO roundTripSerializedFileIO = TestHelpers.KryoHelpers.roundTripSerialize(testHadoopFileIO);
+
+    Assert.assertEquals(testHadoopFileIO.properties(), roundTripSerializedFileIO.properties());
+  }
+
+  @Test
+  public void testHadoopFileIOJavaSerialization() throws IOException, ClassNotFoundException {
+    FileIO testHadoopFileIO = new HadoopFileIO();
+
+    // hadoop fileIO should be serializable when properties are passed as immutable map
+    testHadoopFileIO.initialize(ImmutableMap.of("k1", "v1"));
+    FileIO roundTripSerializedFileIO = TestHelpers.roundTripSerialize(testHadoopFileIO);
+
+    Assert.assertEquals(testHadoopFileIO.properties(), roundTripSerializedFileIO.properties());
+  }
+
   private void createRandomFiles(Path parent, int count) {
     random.ints(count).parallel().forEach(i -> {
           try {
diff --git a/core/src/test/java/org/apache/iceberg/io/TestResolvingIO.java b/core/src/test/java/org/apache/iceberg/io/TestResolvingIO.java
new file mode 100644
index 0000000000..e79c31db1b
--- /dev/null
+++ b/core/src/test/java/org/apache/iceberg/io/TestResolvingIO.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.io;
+
+import java.io.IOException;
+import org.apache.iceberg.TestHelpers;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestResolvingIO {
+
+  @Test
+  public void testResolvingFileIOKryoSerialization() throws IOException {
+    FileIO testResolvingFileIO = new ResolvingFileIO();
+
+    // resolving fileIO should be serializable when properties are passed as immutable map
+    testResolvingFileIO.initialize(ImmutableMap.of("k1", "v1"));
+    FileIO roundTripSerializedFileIO =
+        TestHelpers.KryoHelpers.roundTripSerialize(testResolvingFileIO);
+
+    Assert.assertEquals(testResolvingFileIO.properties(), roundTripSerializedFileIO.properties());
+  }
+
+  @Test
+  public void testResolvingFileIOJavaSerialization() throws IOException, ClassNotFoundException {
+    FileIO testResolvingFileIO = new ResolvingFileIO();
+
+    // resolving fileIO should be serializable when properties are passed as immutable map
+    testResolvingFileIO.initialize(ImmutableMap.of("k1", "v1"));
+    FileIO roundTripSerializedFileIO = TestHelpers.roundTripSerialize(testResolvingFileIO);
+
+    Assert.assertEquals(testResolvingFileIO.properties(), roundTripSerializedFileIO.properties());
+  }
+}
diff --git a/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSFileIO.java b/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSFileIO.java
index ecb520f1d2..7bc37edc92 100644
--- a/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSFileIO.java
+++ b/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSFileIO.java
@@ -30,6 +30,7 @@ import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.io.InputFile;
 import org.apache.iceberg.io.OutputFile;
 import org.apache.iceberg.metrics.MetricsContext;
+import org.apache.iceberg.util.SerializableMap;
 import org.apache.iceberg.util.SerializableSupplier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -52,7 +53,7 @@ public class GCSFileIO implements FileIO {
   private transient volatile Storage storage;
   private MetricsContext metrics = MetricsContext.nullMetrics();
   private final AtomicBoolean isResourceClosed = new AtomicBoolean(false);
-  private Map<String, String> properties = null;
+  private SerializableMap<String, String> properties = null;
 
   /**
    * No-arg constructor to load the FileIO dynamically.
@@ -102,7 +103,7 @@ public class GCSFileIO implements FileIO {
 
   @Override
   public Map<String, String> properties() {
-    return properties;
+    return properties.immutableMap();
   }
 
   private Storage client() {
@@ -118,8 +119,8 @@ public class GCSFileIO implements FileIO {
 
   @Override
   public void initialize(Map<String, String> props) {
-    this.properties = props;
-    this.gcpProperties = new GCPProperties(props);
+    this.properties = SerializableMap.copyOf(props);
+    this.gcpProperties = new GCPProperties(properties);
 
     this.storageSupplier = () -> {
       StorageOptions.Builder builder = StorageOptions.newBuilder();
@@ -131,12 +132,17 @@ public class GCSFileIO implements FileIO {
       // Report Hadoop metrics if Hadoop is available
       try {
         DynConstructors.Ctor<MetricsContext> ctor =
-            DynConstructors.builder(MetricsContext.class).hiddenImpl(DEFAULT_METRICS_IMPL, String.class).buildChecked();
+            DynConstructors.builder(MetricsContext.class)
+                .hiddenImpl(DEFAULT_METRICS_IMPL, String.class)
+                .buildChecked();
         MetricsContext context = ctor.newInstance("gcs");
-        context.initialize(props);
+        context.initialize(properties);
         this.metrics = context;
       } catch (NoClassDefFoundError | NoSuchMethodException | ClassCastException e) {
-        LOG.warn("Unable to load metrics class: '{}', falling back to null metrics", DEFAULT_METRICS_IMPL, e);
+        LOG.warn(
+            "Unable to load metrics class: '{}', falling back to null metrics",
+            DEFAULT_METRICS_IMPL,
+            e);
       }
 
       return builder.build().getService();
diff --git a/gcp/src/test/java/org/apache/iceberg/gcp/gcs/GCSFileIOTest.java b/gcp/src/test/java/org/apache/iceberg/gcp/gcs/GCSFileIOTest.java
index 99bb08d4bf..2d31d74c6d 100644
--- a/gcp/src/test/java/org/apache/iceberg/gcp/gcs/GCSFileIOTest.java
+++ b/gcp/src/test/java/org/apache/iceberg/gcp/gcs/GCSFileIOTest.java
@@ -28,9 +28,13 @@ import java.io.OutputStream;
 import java.util.Random;
 import java.util.stream.StreamSupport;
 import org.apache.commons.io.IOUtils;
+import org.apache.iceberg.TestHelpers;
 import org.apache.iceberg.gcp.GCPProperties;
+import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.io.InputFile;
 import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -93,4 +97,26 @@ public class GCSFileIOTest {
     assertThat(StreamSupport.stream(storage.list(TEST_BUCKET).iterateAll().spliterator(), false).count())
         .isZero();
   }
+
+  @Test
+  public void testGCSFileIOKryoSerialization() throws IOException {
+    FileIO testGCSFileIO = new GCSFileIO();
+
+    // gcs fileIO should be serializable when properties are passed as immutable map
+    testGCSFileIO.initialize(ImmutableMap.of("k1", "v1"));
+    FileIO roundTripSerializedFileIO = TestHelpers.KryoHelpers.roundTripSerialize(testGCSFileIO);
+
+    Assert.assertEquals(testGCSFileIO.properties(), roundTripSerializedFileIO.properties());
+  }
+
+  @Test
+  public void testGCSFileIOJavaSerialization() throws IOException, ClassNotFoundException {
+    FileIO testGCSFileIO = new GCSFileIO();
+
+    // gcs fileIO should be serializable when properties are passed as immutable map
+    testGCSFileIO.initialize(ImmutableMap.of("k1", "v1"));
+    FileIO roundTripSerializedFileIO = TestHelpers.roundTripSerialize(testGCSFileIO);
+
+    Assert.assertEquals(testGCSFileIO.properties(), roundTripSerializedFileIO.properties());
+  }
 }
diff --git a/versions.props b/versions.props
index 69a08ca22b..c6e157193b 100644
--- a/versions.props
+++ b/versions.props
@@ -44,3 +44,4 @@ org.springframework:* = 5.3.9
 org.springframework.boot:* = 2.5.4
 org.mock-server:mockserver-netty = 5.11.1
 org.mock-server:mockserver-client-java = 5.11.1
+com.esotericsoftware:kryo = 4.0.2


[iceberg] 07/09: Parquet: Close zstd input stream early to avoid memory pressure (#5681)

Posted by bl...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch 0.14.x
in repository https://gitbox.apache.org/repos/asf/iceberg.git

commit 8a296de24a0e093b05e4332130dc45c8e665af3b
Author: Bryan Keller <br...@gmail.com>
AuthorDate: Thu Sep 1 15:54:53 2022 -0700

    Parquet: Close zstd input stream early to avoid memory pressure (#5681)
---
 .../java/org/apache/iceberg/parquet/Parquet.java   |   4 +
 .../iceberg/parquet/ParquetCodecFactory.java       | 102 +++++++++++++++++++++
 2 files changed, 106 insertions(+)

diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
index 1561283feb..5503ffd458 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
@@ -898,8 +898,12 @@ public class Parquet {
             conf.unset(property);
           }
           optionsBuilder = HadoopReadOptions.builder(conf);
+          // page size not used by decompressors
+          optionsBuilder.withCodecFactory(new ParquetCodecFactory(conf, 0));
         } else {
           optionsBuilder = ParquetReadOptions.builder();
+          // page size not used by decompressors
+          optionsBuilder.withCodecFactory(new ParquetCodecFactory(new Configuration(), 0));
         }
 
         for (Map.Entry<String, String> entry : properties.entrySet()) {
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetCodecFactory.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetCodecFactory.java
new file mode 100644
index 0000000000..d112d5f189
--- /dev/null
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetCodecFactory.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.parquet;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.compress.CodecPool;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.Decompressor;
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.hadoop.CodecFactory;
+import org.apache.parquet.hadoop.codec.ZstandardCodec;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+
+/**
+ * This class implements a codec factory that is used when reading from Parquet. It adds a
+ * workaround for memory issues encountered when reading from zstd-compressed files.
+ */
+public class ParquetCodecFactory extends CodecFactory {
+
+  public ParquetCodecFactory(Configuration configuration, int pageSize) {
+    super(configuration, pageSize);
+  }
+
+  /** Copied and modified from CodecFactory.HeapBytesDecompressor */
+  class HeapBytesDecompressor extends BytesDecompressor {
+
+    private final CompressionCodec codec;
+    private final Decompressor decompressor;
+
+    HeapBytesDecompressor(CompressionCodecName codecName) {
+      this.codec = getCodec(codecName);
+      if (codec != null) {
+        decompressor = CodecPool.getDecompressor(codec);
+      } else {
+        decompressor = null;
+      }
+    }
+
+    @Override
+    public BytesInput decompress(BytesInput bytes, int uncompressedSize) throws IOException {
+      final BytesInput decompressed;
+      if (codec != null) {
+        if (decompressor != null) {
+          decompressor.reset();
+        }
+        if (codec instanceof ZstandardCodec) {
+          // we need to close the zstd input stream ASAP to free up native resources, so
+          // read everything into a buffer and then close it
+          try (InputStream is = codec.createInputStream(bytes.toInputStream(), decompressor)) {
+            decompressed = BytesInput.copy(BytesInput.from(is, uncompressedSize));
+          }
+        } else {
+          InputStream is = codec.createInputStream(bytes.toInputStream(), decompressor);
+          decompressed = BytesInput.from(is, uncompressedSize);
+        }
+      } else {
+        decompressed = bytes;
+      }
+      return decompressed;
+    }
+
+    @Override
+    public void decompress(
+        ByteBuffer input, int compressedSize, ByteBuffer output, int uncompressedSize)
+        throws IOException {
+      ByteBuffer decompressed = decompress(BytesInput.from(input), uncompressedSize).toByteBuffer();
+      output.put(decompressed);
+    }
+
+    @Override
+    public void release() {
+      if (decompressor != null) {
+        CodecPool.returnDecompressor(decompressor);
+      }
+    }
+  }
+
+  @Override
+  protected BytesDecompressor createDecompressor(CompressionCodecName codecName) {
+    return new HeapBytesDecompressor(codecName);
+  }
+}


[iceberg] 09/09: Core: Add CommitStateUnknownException handling to REST (#5694)

Posted by bl...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch 0.14.x
in repository https://gitbox.apache.org/repos/asf/iceberg.git

commit 3c9600f899cdd57ffcc78f46bac59c51b11af7a4
Author: Ryan Blue <bl...@apache.org>
AuthorDate: Fri Sep 2 12:50:39 2022 -0700

    Core: Add CommitStateUnknownException handling to REST (#5694)
---
 .../apache/iceberg/exceptions/ServiceFailureException.java | 10 ++++------
 ...lureException.java => ServiceUnavailableException.java} | 14 ++++++--------
 .../main/java/org/apache/iceberg/rest/ErrorHandlers.java   | 12 ++++++++++--
 3 files changed, 20 insertions(+), 16 deletions(-)

diff --git a/api/src/main/java/org/apache/iceberg/exceptions/ServiceFailureException.java b/api/src/main/java/org/apache/iceberg/exceptions/ServiceFailureException.java
index 9049d725ef..b8ef3b411a 100644
--- a/api/src/main/java/org/apache/iceberg/exceptions/ServiceFailureException.java
+++ b/api/src/main/java/org/apache/iceberg/exceptions/ServiceFailureException.java
@@ -21,17 +21,15 @@ package org.apache.iceberg.exceptions;
 
 import com.google.errorprone.annotations.FormatMethod;
 
-/**
- * Exception thrown on HTTP 5XX Server Error.
- */
-public class ServiceFailureException extends RuntimeException {
+/** Exception thrown on HTTP 5XX Server Error. */
+public class ServiceFailureException extends RESTException {
   @FormatMethod
   public ServiceFailureException(String message, Object... args) {
-    super(String.format(message, args));
+    super(message, args);
   }
 
   @FormatMethod
   public ServiceFailureException(Throwable cause, String message, Object... args) {
-    super(String.format(message, args), cause);
+    super(cause, message, args);
   }
 }
diff --git a/api/src/main/java/org/apache/iceberg/exceptions/ServiceFailureException.java b/api/src/main/java/org/apache/iceberg/exceptions/ServiceUnavailableException.java
similarity index 72%
copy from api/src/main/java/org/apache/iceberg/exceptions/ServiceFailureException.java
copy to api/src/main/java/org/apache/iceberg/exceptions/ServiceUnavailableException.java
index 9049d725ef..74877214ac 100644
--- a/api/src/main/java/org/apache/iceberg/exceptions/ServiceFailureException.java
+++ b/api/src/main/java/org/apache/iceberg/exceptions/ServiceUnavailableException.java
@@ -21,17 +21,15 @@ package org.apache.iceberg.exceptions;
 
 import com.google.errorprone.annotations.FormatMethod;
 
-/**
- * Exception thrown on HTTP 5XX Server Error.
- */
-public class ServiceFailureException extends RuntimeException {
+/** Exception thrown on HTTP 503: service is unavailable */
+public class ServiceUnavailableException extends RESTException {
   @FormatMethod
-  public ServiceFailureException(String message, Object... args) {
-    super(String.format(message, args));
+  public ServiceUnavailableException(String message, Object... args) {
+    super(message, args);
   }
 
   @FormatMethod
-  public ServiceFailureException(Throwable cause, String message, Object... args) {
-    super(String.format(message, args), cause);
+  public ServiceUnavailableException(Throwable cause, String message, Object... args) {
+    super(cause, message, args);
   }
 }
diff --git a/core/src/main/java/org/apache/iceberg/rest/ErrorHandlers.java b/core/src/main/java/org/apache/iceberg/rest/ErrorHandlers.java
index d72ee460aa..33cce5dd96 100644
--- a/core/src/main/java/org/apache/iceberg/rest/ErrorHandlers.java
+++ b/core/src/main/java/org/apache/iceberg/rest/ErrorHandlers.java
@@ -23,12 +23,14 @@ import java.util.function.Consumer;
 import org.apache.iceberg.exceptions.AlreadyExistsException;
 import org.apache.iceberg.exceptions.BadRequestException;
 import org.apache.iceberg.exceptions.CommitFailedException;
+import org.apache.iceberg.exceptions.CommitStateUnknownException;
 import org.apache.iceberg.exceptions.ForbiddenException;
 import org.apache.iceberg.exceptions.NoSuchNamespaceException;
 import org.apache.iceberg.exceptions.NoSuchTableException;
 import org.apache.iceberg.exceptions.NotAuthorizedException;
 import org.apache.iceberg.exceptions.RESTException;
 import org.apache.iceberg.exceptions.ServiceFailureException;
+import org.apache.iceberg.exceptions.ServiceUnavailableException;
 import org.apache.iceberg.rest.responses.ErrorResponse;
 
 /**
@@ -59,6 +61,10 @@ public class ErrorHandlers {
           throw new NoSuchTableException("%s", error.message());
         case 409:
           throw new CommitFailedException("Commit failed: %s", error.message());
+        case 500:
+        case 504:
+          throw new CommitStateUnknownException(
+              new ServiceFailureException("Service failed: %s: %s", error.code(), error.message()));
       }
     };
   }
@@ -115,10 +121,12 @@ public class ErrorHandlers {
         case 405:
         case 406:
           break;
-        case 501:
-          throw new UnsupportedOperationException(error.message());
         case 500:
           throw new ServiceFailureException("Server error: %s: %s", error.type(), error.message());
+        case 501:
+          throw new UnsupportedOperationException(error.message());
+        case 503:
+          throw new ServiceUnavailableException("Service unavailable: %s", error.message());
       }
 
       throw new RESTException("Unable to process: %s", error.message());


[iceberg] 01/09: API: Fix ID assignment in schema merging (#5395)

Posted by bl...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch 0.14.x
in repository https://gitbox.apache.org/repos/asf/iceberg.git

commit adb4c2c9dfe7535a500fdd66edaf084cac8238de
Author: Karuppayya <ka...@gmail.com>
AuthorDate: Mon Aug 1 12:36:13 2022 -0700

    API: Fix ID assignment in schema merging (#5395)
---
 .../java/org/apache/iceberg/types/TypeUtil.java    |  5 +++--
 .../org/apache/iceberg/types/TestTypeUtil.java     | 24 ++++++++++++++++++++++
 2 files changed, 27 insertions(+), 2 deletions(-)

diff --git a/api/src/main/java/org/apache/iceberg/types/TypeUtil.java b/api/src/main/java/org/apache/iceberg/types/TypeUtil.java
index e4791ee02c..b63908ec16 100644
--- a/api/src/main/java/org/apache/iceberg/types/TypeUtil.java
+++ b/api/src/main/java/org/apache/iceberg/types/TypeUtil.java
@@ -278,8 +278,9 @@ public class TypeUtil {
   }
 
   public static Schema reassignOrRefreshIds(Schema schema, Schema idSourceSchema) {
-    AtomicInteger highest = new AtomicInteger(schema.highestFieldId());
-    Types.StructType struct = visit(schema, new ReassignIds(idSourceSchema, highest::incrementAndGet)).asStructType();
+    AtomicInteger highest = new AtomicInteger(idSourceSchema.highestFieldId());
+    Types.StructType struct =
+        visit(schema, new ReassignIds(idSourceSchema, highest::incrementAndGet)).asStructType();
     return new Schema(struct.fields(), refreshIdentifierFields(struct, schema));
   }
 
diff --git a/api/src/test/java/org/apache/iceberg/types/TestTypeUtil.java b/api/src/test/java/org/apache/iceberg/types/TestTypeUtil.java
index 210efd352f..cd96fe5eaf 100644
--- a/api/src/test/java/org/apache/iceberg/types/TestTypeUtil.java
+++ b/api/src/test/java/org/apache/iceberg/types/TestTypeUtil.java
@@ -482,4 +482,28 @@ public class TestTypeUtil {
     Schema actualNoStruct = TypeUtil.selectNot(schema, Sets.newHashSet(2));
     Assert.assertEquals(schema.asStruct(), actualNoStruct.asStruct());
   }
+
+  @Test
+  public void testReassignOrRefreshIds() {
+    Schema schema =
+        new Schema(
+            Lists.newArrayList(
+                required(10, "a", Types.IntegerType.get()),
+                required(11, "c", Types.IntegerType.get()),
+                required(12, "B", Types.IntegerType.get())),
+            Sets.newHashSet(10));
+    Schema sourceSchema =
+        new Schema(
+            Lists.newArrayList(
+                required(1, "a", Types.IntegerType.get()),
+                required(15, "B", Types.IntegerType.get())));
+    final Schema actualSchema = TypeUtil.reassignOrRefreshIds(schema, sourceSchema);
+    final Schema expectedSchema =
+        new Schema(
+            Lists.newArrayList(
+                required(1, "a", Types.IntegerType.get()),
+                required(16, "c", Types.IntegerType.get()),
+                required(15, "B", Types.IntegerType.get())));
+    Assert.assertEquals(expectedSchema.asStruct(), actualSchema.asStruct());
+  }
 }


[iceberg] 03/09: Core: Fix snapshot log with intermediate transaction snapshots (#5568)

Posted by bl...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch 0.14.x
in repository https://gitbox.apache.org/repos/asf/iceberg.git

commit 7bb15a2d88654ed0346557db3f1d6935f83320f8
Author: Eduard Tudenhöfner <et...@gmail.com>
AuthorDate: Thu Aug 18 21:06:36 2022 +0200

    Core: Fix snapshot log with intermediate transaction snapshots (#5568)
---
 .../java/org/apache/iceberg/TableMetadata.java     |  8 ++--
 .../java/org/apache/iceberg/TestTransaction.java   | 50 ++++++++++++++--------
 2 files changed, 36 insertions(+), 22 deletions(-)

diff --git a/core/src/main/java/org/apache/iceberg/TableMetadata.java b/core/src/main/java/org/apache/iceberg/TableMetadata.java
index 8c1d8f5dbf..f49b64dcb5 100644
--- a/core/src/main/java/org/apache/iceberg/TableMetadata.java
+++ b/core/src/main/java/org/apache/iceberg/TableMetadata.java
@@ -1481,9 +1481,11 @@ public class TableMetadata implements Serializable {
       List<HistoryEntry> newSnapshotLog = Lists.newArrayList();
       for (HistoryEntry logEntry : snapshotLog) {
         long snapshotId = logEntry.snapshotId();
-        if (snapshotsById.containsKey(snapshotId) && !intermediateSnapshotIds.contains(snapshotId)) {
-          // copy the log entries that are still valid
-          newSnapshotLog.add(logEntry);
+        if (snapshotsById.containsKey(snapshotId)) {
+          if (!intermediateSnapshotIds.contains(snapshotId)) {
+            // copy the log entries that are still valid
+            newSnapshotLog.add(logEntry);
+          }
         } else {
           // any invalid entry causes the history before it to be removed. otherwise, there could be
           // history gaps that cause time-travel queries to produce incorrect results. for example,
diff --git a/core/src/test/java/org/apache/iceberg/TestTransaction.java b/core/src/test/java/org/apache/iceberg/TestTransaction.java
index 6c0fd69312..a11df6b628 100644
--- a/core/src/test/java/org/apache/iceberg/TestTransaction.java
+++ b/core/src/test/java/org/apache/iceberg/TestTransaction.java
@@ -92,22 +92,25 @@ public class TestTransaction extends TableTestBase {
   public void testMultipleOperationTransaction() {
     Assert.assertEquals("Table should be on version 0", 0, (int) version());
 
+    table.newAppend().appendFile(FILE_C).commit();
+    List<HistoryEntry> initialHistory = table.history();
+
     TableMetadata base = readMetadata();
 
     Transaction txn = table.newTransaction();
 
-    Assert.assertSame("Base metadata should not change when commit is created",
-        base, readMetadata());
-    Assert.assertEquals("Table should be on version 0 after txn create", 0, (int) version());
+    Assert.assertSame(
+        "Base metadata should not change when commit is created", base, readMetadata());
+    Assert.assertEquals("Table should be on version 1 after txn create", 1, (int) version());
 
     txn.newAppend()
         .appendFile(FILE_A)
         .appendFile(FILE_B)
         .commit();
 
-    Assert.assertSame("Base metadata should not change when commit is created",
-        base, readMetadata());
-    Assert.assertEquals("Table should be on version 0 after txn create", 0, (int) version());
+    Assert.assertSame(
+        "Base metadata should not change when commit is created", base, readMetadata());
+    Assert.assertEquals("Table should be on version 1 after txn create", 1, (int) version());
 
     Snapshot appendSnapshot = txn.table().currentSnapshot();
 
@@ -117,26 +120,35 @@ public class TestTransaction extends TableTestBase {
 
     Snapshot deleteSnapshot = txn.table().currentSnapshot();
 
-    Assert.assertSame("Base metadata should not change when an append is committed",
-        base, readMetadata());
-    Assert.assertEquals("Table should be on version 0 after append", 0, (int) version());
+    Assert.assertSame(
+        "Base metadata should not change when an append is committed", base, readMetadata());
+    Assert.assertEquals("Table should be on version 1 after append", 1, (int) version());
 
     txn.commitTransaction();
 
-    Assert.assertEquals("Table should be on version 1 after commit", 1, (int) version());
-    Assert.assertEquals("Table should have one manifest after commit",
-        1, readMetadata().currentSnapshot().allManifests(table.io()).size());
-    Assert.assertEquals("Table snapshot should be the delete snapshot",
-        deleteSnapshot, readMetadata().currentSnapshot());
-    validateManifestEntries(readMetadata().currentSnapshot().allManifests(table.io()).get(0),
+    Assert.assertEquals("Table should be on version 2 after commit", 2, (int) version());
+    Assert.assertEquals(
+        "Table should have two manifest after commit",
+        2,
+        readMetadata().currentSnapshot().allManifests(table.io()).size());
+    Assert.assertEquals(
+        "Table snapshot should be the delete snapshot",
+        deleteSnapshot,
+        readMetadata().currentSnapshot());
+    validateManifestEntries(
+        readMetadata().currentSnapshot().allManifests(table.io()).get(0),
         ids(deleteSnapshot.snapshotId(), appendSnapshot.snapshotId()),
         files(FILE_A, FILE_B), statuses(Status.DELETED, Status.EXISTING));
 
-    Assert.assertEquals("Table should have a snapshot for each operation",
-        2, readMetadata().snapshots().size());
-    validateManifestEntries(readMetadata().snapshots().get(0).allManifests(table.io()).get(0),
+    Assert.assertEquals(
+        "Table should have a snapshot for each operation", 3, readMetadata().snapshots().size());
+    validateManifestEntries(
+        readMetadata().snapshots().get(1).allManifests(table.io()).get(0),
         ids(appendSnapshot.snapshotId(), appendSnapshot.snapshotId()),
-        files(FILE_A, FILE_B), statuses(Status.ADDED, Status.ADDED));
+        files(FILE_A, FILE_B),
+        statuses(Status.ADDED, Status.ADDED));
+
+    org.assertj.core.api.Assertions.assertThat(table.history()).containsAll(initialHistory);
   }
 
   @Test


[iceberg] 02/09: AWS: S3OutputStream - failure to close should persist on subsequent close calls (#5311)

Posted by bl...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch 0.14.x
in repository https://gitbox.apache.org/repos/asf/iceberg.git

commit 60d5ff7b3ee60a92cd80047aace513b18974500a
Author: Abid Mohammed <69...@users.noreply.github.com>
AuthorDate: Tue Aug 2 10:33:52 2022 -0700

    AWS: S3OutputStream - failure to close should persist on subsequent close calls (#5311)
---
 .../org/apache/iceberg/aws/s3/S3OutputStream.java     | 14 +++++++++++++-
 .../org/apache/iceberg/aws/s3/TestS3OutputStream.java | 19 +++++++++++++++++++
 2 files changed, 32 insertions(+), 1 deletion(-)

diff --git a/aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java b/aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java
index 555a9e1634..80ed982367 100644
--- a/aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java
+++ b/aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java
@@ -103,6 +103,7 @@ class S3OutputStream extends PositionOutputStream {
 
   private long pos = 0;
   private boolean closed = false;
+  private Throwable closeFailureException;
 
   @SuppressWarnings("StaticAssignmentInConstructor")
   S3OutputStream(S3Client s3, S3URI location, AwsProperties awsProperties, MetricsContext metrics)
@@ -240,6 +241,15 @@ class S3OutputStream extends PositionOutputStream {
 
   @Override
   public void close() throws IOException {
+
+    // A failed s3 close removes state that is required for a successful close.
+    // Any future close on this stream should fail.
+    if (closeFailureException != null) {
+      throw new IOException(
+          "Attempted to close an S3 output stream that failed to close earlier",
+          closeFailureException);
+    }
+
     if (closed) {
       return;
     }
@@ -249,8 +259,10 @@ class S3OutputStream extends PositionOutputStream {
 
     try {
       stream.close();
-
       completeUploads();
+    } catch (Exception e) {
+      closeFailureException = e;
+      throw e;
     } finally {
       cleanUpStagingFiles();
     }
diff --git a/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3OutputStream.java b/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3OutputStream.java
index c9b6043ccd..28d9c5da6c 100644
--- a/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3OutputStream.java
+++ b/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3OutputStream.java
@@ -42,6 +42,7 @@ import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mockito.ArgumentCaptor;
+import org.mockito.Mockito;
 import org.mockito.junit.MockitoJUnitRunner;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -160,6 +161,24 @@ public class TestS3OutputStream {
     writeTest();
   }
 
+  @Test
+  public void testCloseFailureShouldPersistOnFutureClose() throws IOException {
+    IllegalStateException mockException =
+        new IllegalStateException("mock failure to completeUploads on close");
+    Mockito.doThrow(mockException)
+        .when(s3mock)
+        .putObject(any(PutObjectRequest.class), any(RequestBody.class));
+    S3OutputStream stream = new S3OutputStream(s3mock, randomURI(), properties, nullMetrics());
+
+    Assertions.assertThatThrownBy(stream::close)
+        .isInstanceOf(mockException.getClass())
+        .hasMessageContaining(mockException.getMessage());
+
+    Assertions.assertThatThrownBy(stream::close)
+        .isInstanceOf(IOException.class)
+        .hasCause(mockException);
+  }
+
   private void writeTest() {
     // Run tests for both byte and array write paths
     Stream.of(true, false).forEach(arrayWrite -> {


[iceberg] 08/09: Spark: Fix stats in rewrite metadata action (#5691)

Posted by bl...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch 0.14.x
in repository https://gitbox.apache.org/repos/asf/iceberg.git

commit 7feb9457bd095b1d5e3d6224db4780cc5b833a36
Author: Ryan Blue <bl...@apache.org>
AuthorDate: Thu Sep 1 23:20:57 2022 -0700

    Spark: Fix stats in rewrite metadata action (#5691)
    
    * Core: Don't show dropped fields from the partition spec
    
    * Use projection instead
    
    * Use StructProjection in SparkDataFile.
    
    Co-authored-by: Fokko Driesprong <fo...@apache.org>
---
 .../extensions/TestRewriteManifestsProcedure.java  |  29 ++++++
 .../org/apache/iceberg/spark/SparkDataFile.java    |  21 ++++-
 .../spark/actions/RewriteManifestsSparkAction.java | 105 ++++++++++++++++-----
 .../extensions/TestRewriteManifestsProcedure.java  |  29 ++++++
 .../org/apache/iceberg/spark/SparkDataFile.java    |  21 ++++-
 .../spark/actions/RewriteManifestsSparkAction.java | 105 ++++++++++++++++-----
 6 files changed, 262 insertions(+), 48 deletions(-)

diff --git a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteManifestsProcedure.java b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteManifestsProcedure.java
index dcf0a2d91e..0d10cb0d7d 100644
--- a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteManifestsProcedure.java
+++ b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteManifestsProcedure.java
@@ -19,6 +19,8 @@
 
 package org.apache.iceberg.spark.extensions;
 
+import java.sql.Date;
+import java.sql.Timestamp;
 import java.util.List;
 import java.util.Map;
 import org.apache.iceberg.AssertHelpers;
@@ -171,4 +173,31 @@ public class TestRewriteManifestsProcedure extends SparkExtensionsTestBase {
         IllegalArgumentException.class, "Cannot handle an empty identifier",
         () -> sql("CALL %s.system.rewrite_manifests('')", catalogName));
   }
+
+  @Test
+  public void testReplacePartitionField() {
+    sql(
+        "CREATE TABLE %s (id int, ts timestamp, day_of_ts date) USING iceberg PARTITIONED BY (day_of_ts)",
+        tableName);
+
+    sql("ALTER TABLE %s SET TBLPROPERTIES ('format-version' = '2')", tableName);
+    sql("ALTER TABLE %s REPLACE PARTITION FIELD day_of_ts WITH days(ts)\n", tableName);
+    sql(
+        "INSERT INTO %s VALUES (1, CAST('2022-01-01 10:00:00' AS TIMESTAMP), CAST('2022-01-01' AS DATE))",
+        tableName);
+
+    assertEquals(
+        "Should have expected rows",
+        ImmutableList.of(
+            row(1, Timestamp.valueOf("2022-01-01 10:00:00"), Date.valueOf("2022-01-01"))),
+        sql("SELECT * FROM %s WHERE ts < current_timestamp()", tableName));
+
+    sql("CALL %s.system.rewrite_manifests(table => '%s')", catalogName, tableName);
+
+    assertEquals(
+        "Should have expected rows",
+        ImmutableList.of(
+            row(1, Timestamp.valueOf("2022-01-01 10:00:00"), Date.valueOf("2022-01-01"))),
+        sql("SELECT * FROM %s WHERE ts < current_timestamp()", tableName));
+  }
 }
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkDataFile.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkDataFile.java
index a6390d39c5..5fe0cd86a4 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkDataFile.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkDataFile.java
@@ -29,6 +29,7 @@ import org.apache.iceberg.StructLike;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.types.Type;
 import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.StructProjection;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.types.StructType;
 
@@ -53,13 +54,29 @@ public class SparkDataFile implements DataFile {
   private final Type keyMetadataType;
 
   private final SparkStructLike wrappedPartition;
+  private final StructLike partitionProjection;
   private Row wrapped;
 
   public SparkDataFile(Types.StructType type, StructType sparkType) {
+    this(type, null, sparkType);
+  }
+
+  public SparkDataFile(
+      Types.StructType type, Types.StructType projectedType, StructType sparkType) {
     this.lowerBoundsType = type.fieldType("lower_bounds");
     this.upperBoundsType = type.fieldType("upper_bounds");
     this.keyMetadataType = type.fieldType("key_metadata");
-    this.wrappedPartition = new SparkStructLike(type.fieldType("partition").asStructType());
+
+    Types.StructType partitionType = type.fieldType("partition").asStructType();
+    this.wrappedPartition = new SparkStructLike(partitionType);
+
+    if (projectedType != null) {
+      Types.StructType projectedPartitionType = projectedType.fieldType("partition").asStructType();
+      this.partitionProjection =
+          StructProjection.create(partitionType, projectedPartitionType).wrap(wrappedPartition);
+    } else {
+      this.partitionProjection = wrappedPartition;
+    }
 
     Map<String, Integer> positions = Maps.newHashMap();
     type.fields().forEach(field -> {
@@ -114,7 +131,7 @@ public class SparkDataFile implements DataFile {
 
   @Override
   public StructLike partition() {
-    return wrappedPartition;
+    return partitionProjection;
   }
 
   @Override
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
index 99e51a37aa..030532fa94 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
@@ -34,6 +34,7 @@ import org.apache.iceberg.ManifestFile;
 import org.apache.iceberg.ManifestFiles;
 import org.apache.iceberg.ManifestWriter;
 import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Partitioning;
 import org.apache.iceberg.Snapshot;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableOperations;
@@ -200,6 +201,7 @@ public class RewriteManifestsSparkAction
   private List<ManifestFile> writeManifestsForUnpartitionedTable(Dataset<Row> manifestEntryDF, int numManifests) {
     Broadcast<FileIO> io = sparkContext().broadcast(fileIO);
     StructType sparkType = (StructType) manifestEntryDF.schema().apply("data_file").dataType();
+    Types.StructType combinedPartitionType = Partitioning.partitionType(table);
 
     // we rely only on the target number of manifests for unpartitioned tables
     // as we should not worry about having too much metadata per partition
@@ -208,9 +210,15 @@ public class RewriteManifestsSparkAction
     return manifestEntryDF
         .repartition(numManifests)
         .mapPartitions(
-            toManifests(io, maxNumManifestEntries, stagingLocation, formatVersion, spec, sparkType),
-            manifestEncoder
-        )
+            toManifests(
+                io,
+                maxNumManifestEntries,
+                stagingLocation,
+                formatVersion,
+                combinedPartitionType,
+                spec,
+                sparkType),
+            manifestEncoder)
         .collectAsList();
   }
 
@@ -220,20 +228,29 @@ public class RewriteManifestsSparkAction
 
     Broadcast<FileIO> io = sparkContext().broadcast(fileIO);
     StructType sparkType = (StructType) manifestEntryDF.schema().apply("data_file").dataType();
+    Types.StructType combinedPartitionType = Partitioning.partitionType(table);
 
     // we allow the actual size of manifests to be 10% higher if the estimation is not precise enough
     long maxNumManifestEntries = (long) (1.1 * targetNumManifestEntries);
 
-    return withReusableDS(manifestEntryDF, df -> {
-      Column partitionColumn = df.col("data_file.partition");
-      return df.repartitionByRange(numManifests, partitionColumn)
-          .sortWithinPartitions(partitionColumn)
-          .mapPartitions(
-              toManifests(io, maxNumManifestEntries, stagingLocation, formatVersion, spec, sparkType),
-              manifestEncoder
-          )
-          .collectAsList();
-    });
+    return withReusableDS(
+        manifestEntryDF,
+        df -> {
+          Column partitionColumn = df.col("data_file.partition");
+          return df.repartitionByRange(numManifests, partitionColumn)
+              .sortWithinPartitions(partitionColumn)
+              .mapPartitions(
+                  toManifests(
+                      io,
+                      maxNumManifestEntries,
+                      stagingLocation,
+                      formatVersion,
+                      combinedPartitionType,
+                      spec,
+                      sparkType),
+                  manifestEncoder)
+              .collectAsList();
+        });
   }
 
   private <T, U> U withReusableDS(Dataset<T> ds, Function<Dataset<T>, U> func) {
@@ -317,15 +334,24 @@ public class RewriteManifestsSparkAction
   }
 
   private static ManifestFile writeManifest(
-      List<Row> rows, int startIndex, int endIndex, Broadcast<FileIO> io,
-      String location, int format, PartitionSpec spec, StructType sparkType) throws IOException {
+      List<Row> rows,
+      int startIndex,
+      int endIndex,
+      Broadcast<FileIO> io,
+      String location,
+      int format,
+      Types.StructType combinedPartitionType,
+      PartitionSpec spec,
+      StructType sparkType)
+      throws IOException {
 
     String manifestName = "optimized-m-" + UUID.randomUUID();
     Path manifestPath = new Path(location, manifestName);
     OutputFile outputFile = io.value().newOutputFile(FileFormat.AVRO.addExtension(manifestPath.toString()));
 
-    Types.StructType dataFileType = DataFile.getType(spec.partitionType());
-    SparkDataFile wrapper = new SparkDataFile(dataFileType, sparkType);
+    Types.StructType combinedFileType = DataFile.getType(combinedPartitionType);
+    Types.StructType manifestFileType = DataFile.getType(spec.partitionType());
+    SparkDataFile wrapper = new SparkDataFile(combinedFileType, manifestFileType, sparkType);
 
     ManifestWriter<DataFile> writer = ManifestFiles.write(format, spec, outputFile, null);
 
@@ -345,8 +371,13 @@ public class RewriteManifestsSparkAction
   }
 
   private static MapPartitionsFunction<Row, ManifestFile> toManifests(
-      Broadcast<FileIO> io, long maxNumManifestEntries, String location,
-      int format, PartitionSpec spec, StructType sparkType) {
+      Broadcast<FileIO> io,
+      long maxNumManifestEntries,
+      String location,
+      int format,
+      Types.StructType combinedPartitionType,
+      PartitionSpec spec,
+      StructType sparkType) {
 
     return rows -> {
       List<Row> rowsAsList = Lists.newArrayList(rows);
@@ -357,11 +388,41 @@ public class RewriteManifestsSparkAction
 
       List<ManifestFile> manifests = Lists.newArrayList();
       if (rowsAsList.size() <= maxNumManifestEntries) {
-        manifests.add(writeManifest(rowsAsList, 0, rowsAsList.size(), io, location, format, spec, sparkType));
+        manifests.add(
+            writeManifest(
+                rowsAsList,
+                0,
+                rowsAsList.size(),
+                io,
+                location,
+                format,
+                combinedPartitionType,
+                spec,
+                sparkType));
       } else {
         int midIndex = rowsAsList.size() / 2;
-        manifests.add(writeManifest(rowsAsList, 0, midIndex, io, location, format, spec, sparkType));
-        manifests.add(writeManifest(rowsAsList,  midIndex, rowsAsList.size(), io, location, format, spec, sparkType));
+        manifests.add(
+            writeManifest(
+                rowsAsList,
+                0,
+                midIndex,
+                io,
+                location,
+                format,
+                combinedPartitionType,
+                spec,
+                sparkType));
+        manifests.add(
+            writeManifest(
+                rowsAsList,
+                midIndex,
+                rowsAsList.size(),
+                io,
+                location,
+                format,
+                combinedPartitionType,
+                spec,
+                sparkType));
       }
 
       return manifests.iterator();
diff --git a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteManifestsProcedure.java b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteManifestsProcedure.java
index dcf0a2d91e..0d10cb0d7d 100644
--- a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteManifestsProcedure.java
+++ b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteManifestsProcedure.java
@@ -19,6 +19,8 @@
 
 package org.apache.iceberg.spark.extensions;
 
+import java.sql.Date;
+import java.sql.Timestamp;
 import java.util.List;
 import java.util.Map;
 import org.apache.iceberg.AssertHelpers;
@@ -171,4 +173,31 @@ public class TestRewriteManifestsProcedure extends SparkExtensionsTestBase {
         IllegalArgumentException.class, "Cannot handle an empty identifier",
         () -> sql("CALL %s.system.rewrite_manifests('')", catalogName));
   }
+
+  @Test
+  public void testReplacePartitionField() {
+    sql(
+        "CREATE TABLE %s (id int, ts timestamp, day_of_ts date) USING iceberg PARTITIONED BY (day_of_ts)",
+        tableName);
+
+    sql("ALTER TABLE %s SET TBLPROPERTIES ('format-version' = '2')", tableName);
+    sql("ALTER TABLE %s REPLACE PARTITION FIELD day_of_ts WITH days(ts)\n", tableName);
+    sql(
+        "INSERT INTO %s VALUES (1, CAST('2022-01-01 10:00:00' AS TIMESTAMP), CAST('2022-01-01' AS DATE))",
+        tableName);
+
+    assertEquals(
+        "Should have expected rows",
+        ImmutableList.of(
+            row(1, Timestamp.valueOf("2022-01-01 10:00:00"), Date.valueOf("2022-01-01"))),
+        sql("SELECT * FROM %s WHERE ts < current_timestamp()", tableName));
+
+    sql("CALL %s.system.rewrite_manifests(table => '%s')", catalogName, tableName);
+
+    assertEquals(
+        "Should have expected rows",
+        ImmutableList.of(
+            row(1, Timestamp.valueOf("2022-01-01 10:00:00"), Date.valueOf("2022-01-01"))),
+        sql("SELECT * FROM %s WHERE ts < current_timestamp()", tableName));
+  }
 }
diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkDataFile.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkDataFile.java
index a6390d39c5..5fe0cd86a4 100644
--- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkDataFile.java
+++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkDataFile.java
@@ -29,6 +29,7 @@ import org.apache.iceberg.StructLike;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.types.Type;
 import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.StructProjection;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.types.StructType;
 
@@ -53,13 +54,29 @@ public class SparkDataFile implements DataFile {
   private final Type keyMetadataType;
 
   private final SparkStructLike wrappedPartition;
+  private final StructLike partitionProjection;
   private Row wrapped;
 
   public SparkDataFile(Types.StructType type, StructType sparkType) {
+    this(type, null, sparkType);
+  }
+
+  public SparkDataFile(
+      Types.StructType type, Types.StructType projectedType, StructType sparkType) {
     this.lowerBoundsType = type.fieldType("lower_bounds");
     this.upperBoundsType = type.fieldType("upper_bounds");
     this.keyMetadataType = type.fieldType("key_metadata");
-    this.wrappedPartition = new SparkStructLike(type.fieldType("partition").asStructType());
+
+    Types.StructType partitionType = type.fieldType("partition").asStructType();
+    this.wrappedPartition = new SparkStructLike(partitionType);
+
+    if (projectedType != null) {
+      Types.StructType projectedPartitionType = projectedType.fieldType("partition").asStructType();
+      this.partitionProjection =
+          StructProjection.create(partitionType, projectedPartitionType).wrap(wrappedPartition);
+    } else {
+      this.partitionProjection = wrappedPartition;
+    }
 
     Map<String, Integer> positions = Maps.newHashMap();
     type.fields().forEach(field -> {
@@ -114,7 +131,7 @@ public class SparkDataFile implements DataFile {
 
   @Override
   public StructLike partition() {
-    return wrappedPartition;
+    return partitionProjection;
   }
 
   @Override
diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
index 99e51a37aa..030532fa94 100644
--- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
+++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
@@ -34,6 +34,7 @@ import org.apache.iceberg.ManifestFile;
 import org.apache.iceberg.ManifestFiles;
 import org.apache.iceberg.ManifestWriter;
 import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Partitioning;
 import org.apache.iceberg.Snapshot;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableOperations;
@@ -200,6 +201,7 @@ public class RewriteManifestsSparkAction
   private List<ManifestFile> writeManifestsForUnpartitionedTable(Dataset<Row> manifestEntryDF, int numManifests) {
     Broadcast<FileIO> io = sparkContext().broadcast(fileIO);
     StructType sparkType = (StructType) manifestEntryDF.schema().apply("data_file").dataType();
+    Types.StructType combinedPartitionType = Partitioning.partitionType(table);
 
     // we rely only on the target number of manifests for unpartitioned tables
     // as we should not worry about having too much metadata per partition
@@ -208,9 +210,15 @@ public class RewriteManifestsSparkAction
     return manifestEntryDF
         .repartition(numManifests)
         .mapPartitions(
-            toManifests(io, maxNumManifestEntries, stagingLocation, formatVersion, spec, sparkType),
-            manifestEncoder
-        )
+            toManifests(
+                io,
+                maxNumManifestEntries,
+                stagingLocation,
+                formatVersion,
+                combinedPartitionType,
+                spec,
+                sparkType),
+            manifestEncoder)
         .collectAsList();
   }
 
@@ -220,20 +228,29 @@ public class RewriteManifestsSparkAction
 
     Broadcast<FileIO> io = sparkContext().broadcast(fileIO);
     StructType sparkType = (StructType) manifestEntryDF.schema().apply("data_file").dataType();
+    Types.StructType combinedPartitionType = Partitioning.partitionType(table);
 
     // we allow the actual size of manifests to be 10% higher if the estimation is not precise enough
     long maxNumManifestEntries = (long) (1.1 * targetNumManifestEntries);
 
-    return withReusableDS(manifestEntryDF, df -> {
-      Column partitionColumn = df.col("data_file.partition");
-      return df.repartitionByRange(numManifests, partitionColumn)
-          .sortWithinPartitions(partitionColumn)
-          .mapPartitions(
-              toManifests(io, maxNumManifestEntries, stagingLocation, formatVersion, spec, sparkType),
-              manifestEncoder
-          )
-          .collectAsList();
-    });
+    return withReusableDS(
+        manifestEntryDF,
+        df -> {
+          Column partitionColumn = df.col("data_file.partition");
+          return df.repartitionByRange(numManifests, partitionColumn)
+              .sortWithinPartitions(partitionColumn)
+              .mapPartitions(
+                  toManifests(
+                      io,
+                      maxNumManifestEntries,
+                      stagingLocation,
+                      formatVersion,
+                      combinedPartitionType,
+                      spec,
+                      sparkType),
+                  manifestEncoder)
+              .collectAsList();
+        });
   }
 
   private <T, U> U withReusableDS(Dataset<T> ds, Function<Dataset<T>, U> func) {
@@ -317,15 +334,24 @@ public class RewriteManifestsSparkAction
   }
 
   private static ManifestFile writeManifest(
-      List<Row> rows, int startIndex, int endIndex, Broadcast<FileIO> io,
-      String location, int format, PartitionSpec spec, StructType sparkType) throws IOException {
+      List<Row> rows,
+      int startIndex,
+      int endIndex,
+      Broadcast<FileIO> io,
+      String location,
+      int format,
+      Types.StructType combinedPartitionType,
+      PartitionSpec spec,
+      StructType sparkType)
+      throws IOException {
 
     String manifestName = "optimized-m-" + UUID.randomUUID();
     Path manifestPath = new Path(location, manifestName);
     OutputFile outputFile = io.value().newOutputFile(FileFormat.AVRO.addExtension(manifestPath.toString()));
 
-    Types.StructType dataFileType = DataFile.getType(spec.partitionType());
-    SparkDataFile wrapper = new SparkDataFile(dataFileType, sparkType);
+    Types.StructType combinedFileType = DataFile.getType(combinedPartitionType);
+    Types.StructType manifestFileType = DataFile.getType(spec.partitionType());
+    SparkDataFile wrapper = new SparkDataFile(combinedFileType, manifestFileType, sparkType);
 
     ManifestWriter<DataFile> writer = ManifestFiles.write(format, spec, outputFile, null);
 
@@ -345,8 +371,13 @@ public class RewriteManifestsSparkAction
   }
 
   private static MapPartitionsFunction<Row, ManifestFile> toManifests(
-      Broadcast<FileIO> io, long maxNumManifestEntries, String location,
-      int format, PartitionSpec spec, StructType sparkType) {
+      Broadcast<FileIO> io,
+      long maxNumManifestEntries,
+      String location,
+      int format,
+      Types.StructType combinedPartitionType,
+      PartitionSpec spec,
+      StructType sparkType) {
 
     return rows -> {
       List<Row> rowsAsList = Lists.newArrayList(rows);
@@ -357,11 +388,41 @@ public class RewriteManifestsSparkAction
 
       List<ManifestFile> manifests = Lists.newArrayList();
       if (rowsAsList.size() <= maxNumManifestEntries) {
-        manifests.add(writeManifest(rowsAsList, 0, rowsAsList.size(), io, location, format, spec, sparkType));
+        manifests.add(
+            writeManifest(
+                rowsAsList,
+                0,
+                rowsAsList.size(),
+                io,
+                location,
+                format,
+                combinedPartitionType,
+                spec,
+                sparkType));
       } else {
         int midIndex = rowsAsList.size() / 2;
-        manifests.add(writeManifest(rowsAsList, 0, midIndex, io, location, format, spec, sparkType));
-        manifests.add(writeManifest(rowsAsList,  midIndex, rowsAsList.size(), io, location, format, spec, sparkType));
+        manifests.add(
+            writeManifest(
+                rowsAsList,
+                0,
+                midIndex,
+                io,
+                location,
+                format,
+                combinedPartitionType,
+                spec,
+                sparkType));
+        manifests.add(
+            writeManifest(
+                rowsAsList,
+                midIndex,
+                rowsAsList.size(),
+                io,
+                location,
+                format,
+                combinedPartitionType,
+                spec,
+                sparkType));
       }
 
       return manifests.iterator();


[iceberg] 04/09: Core: Fix exception handling in BaseTaskWriter (#5683)

Posted by bl...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch 0.14.x
in repository https://gitbox.apache.org/repos/asf/iceberg.git

commit 0c45e9658392872c6f60fa6f4ba29ad06e26c2ff
Author: Ryan Blue <bl...@apache.org>
AuthorDate: Thu Sep 1 12:44:19 2022 -0700

    Core: Fix exception handling in BaseTaskWriter (#5683)
    
    * Core: Fix exception handling in BaseTaskWriter.
    
    * Fix state check.
---
 .../org/apache/iceberg/aws/s3/S3OutputStream.java  | 13 ---
 .../apache/iceberg/aws/s3/TestS3OutputStream.java  |  7 +-
 .../java/org/apache/iceberg/io/BaseTaskWriter.java | 96 ++++++++++++++--------
 .../apache/iceberg/io/SortedPosDeleteWriter.java   | 18 +++-
 4 files changed, 82 insertions(+), 52 deletions(-)

diff --git a/aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java b/aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java
index 80ed982367..ebb1ad82f6 100644
--- a/aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java
+++ b/aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java
@@ -103,7 +103,6 @@ class S3OutputStream extends PositionOutputStream {
 
   private long pos = 0;
   private boolean closed = false;
-  private Throwable closeFailureException;
 
   @SuppressWarnings("StaticAssignmentInConstructor")
   S3OutputStream(S3Client s3, S3URI location, AwsProperties awsProperties, MetricsContext metrics)
@@ -241,15 +240,6 @@ class S3OutputStream extends PositionOutputStream {
 
   @Override
   public void close() throws IOException {
-
-    // A failed s3 close removes state that is required for a successful close.
-    // Any future close on this stream should fail.
-    if (closeFailureException != null) {
-      throw new IOException(
-          "Attempted to close an S3 output stream that failed to close earlier",
-          closeFailureException);
-    }
-
     if (closed) {
       return;
     }
@@ -260,9 +250,6 @@ class S3OutputStream extends PositionOutputStream {
     try {
       stream.close();
       completeUploads();
-    } catch (Exception e) {
-      closeFailureException = e;
-      throw e;
     } finally {
       cleanUpStagingFiles();
     }
diff --git a/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3OutputStream.java b/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3OutputStream.java
index 28d9c5da6c..465f8c50f7 100644
--- a/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3OutputStream.java
+++ b/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3OutputStream.java
@@ -36,6 +36,7 @@ import java.util.stream.Collectors;
 import java.util.stream.Stream;
 import org.apache.iceberg.aws.AwsProperties;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.assertj.core.api.Assertions;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.ClassRule;
@@ -162,7 +163,7 @@ public class TestS3OutputStream {
   }
 
   @Test
-  public void testCloseFailureShouldPersistOnFutureClose() throws IOException {
+  public void testDoubleClose() throws IOException {
     IllegalStateException mockException =
         new IllegalStateException("mock failure to completeUploads on close");
     Mockito.doThrow(mockException)
@@ -174,9 +175,7 @@ public class TestS3OutputStream {
         .isInstanceOf(mockException.getClass())
         .hasMessageContaining(mockException.getMessage());
 
-    Assertions.assertThatThrownBy(stream::close)
-        .isInstanceOf(IOException.class)
-        .hasCause(mockException);
+    Assertions.assertThatNoException().isThrownBy(stream::close);
   }
 
   private void writeTest() {
diff --git a/core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java b/core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java
index c80084f3d3..fe96b2e994 100644
--- a/core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java
+++ b/core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java
@@ -53,6 +53,7 @@ public abstract class BaseTaskWriter<T> implements TaskWriter<T> {
   private final OutputFileFactory fileFactory;
   private final FileIO io;
   private final long targetFileSize;
+  private Throwable failure;
 
   protected BaseTaskWriter(PartitionSpec spec, FileFormat format, FileAppenderFactory<T> appenderFactory,
                            OutputFileFactory fileFactory, FileIO io, long targetFileSize) {
@@ -68,6 +69,12 @@ public abstract class BaseTaskWriter<T> implements TaskWriter<T> {
     return spec;
   }
 
+  protected void setFailure(Throwable throwable) {
+    if (failure == null) {
+      this.failure = throwable;
+    }
+  }
+
   @Override
   public void abort() throws IOException {
     close();
@@ -84,6 +91,8 @@ public abstract class BaseTaskWriter<T> implements TaskWriter<T> {
   public WriteResult complete() throws IOException {
     close();
 
+    Preconditions.checkState(failure == null, "Cannot return results from failed writer", failure);
+
     return WriteResult.builder()
         .addDataFiles(completedDataFiles)
         .addDeleteFiles(completedDeleteFiles)
@@ -181,28 +190,43 @@ public abstract class BaseTaskWriter<T> implements TaskWriter<T> {
 
     @Override
     public void close() throws IOException {
-      // Close data writer and add completed data files.
-      if (dataWriter != null) {
-        dataWriter.close();
-        dataWriter = null;
-      }
+      try {
+        // Close data writer and add completed data files.
+        if (dataWriter != null) {
+          try {
+            dataWriter.close();
+          } finally {
+            dataWriter = null;
+          }
+        }
 
-      // Close eq-delete writer and add completed equality-delete files.
-      if (eqDeleteWriter != null) {
-        eqDeleteWriter.close();
-        eqDeleteWriter = null;
-      }
+        // Close eq-delete writer and add completed equality-delete files.
+        if (eqDeleteWriter != null) {
+          try {
+            eqDeleteWriter.close();
+          } finally {
+            eqDeleteWriter = null;
+          }
+        }
 
-      if (insertedRowMap != null) {
-        insertedRowMap.clear();
-        insertedRowMap = null;
-      }
+        if (insertedRowMap != null) {
+          insertedRowMap.clear();
+          insertedRowMap = null;
+        }
 
-      // Add the completed pos-delete files.
-      if (posDeleteWriter != null) {
-        completedDeleteFiles.addAll(posDeleteWriter.complete());
-        referencedDataFiles.addAll(posDeleteWriter.referencedDataFiles());
-        posDeleteWriter = null;
+        // Add the completed pos-delete files.
+        if (posDeleteWriter != null) {
+          try {
+            // complete will call close
+            completedDeleteFiles.addAll(posDeleteWriter.complete());
+            referencedDataFiles.addAll(posDeleteWriter.referencedDataFiles());
+          } finally {
+            posDeleteWriter = null;
+          }
+        }
+      } catch (IOException | RuntimeException e) {
+        setFailure(e);
+        throw e;
       }
     }
   }
@@ -287,21 +311,29 @@ public abstract class BaseTaskWriter<T> implements TaskWriter<T> {
 
     private void closeCurrent() throws IOException {
       if (currentWriter != null) {
-        currentWriter.close();
-
-        if (currentRows == 0L) {
-          try {
-            io.deleteFile(currentFile.encryptingOutputFile());
-          } catch (UncheckedIOException e) {
-            // the file may not have been created, and it isn't worth failing the job to clean up, skip deleting
+        try {
+          currentWriter.close();
+
+          if (currentRows == 0L) {
+            try {
+              io.deleteFile(currentFile.encryptingOutputFile());
+            } catch (UncheckedIOException e) {
+              // the file may not have been created, and it isn't worth failing the job to clean up,
+              // skip deleting
+            }
+          } else {
+            complete(currentWriter);
           }
-        } else {
-          complete(currentWriter);
-        }
 
-        this.currentFile = null;
-        this.currentWriter = null;
-        this.currentRows = 0;
+        } catch (IOException | RuntimeException e) {
+          setFailure(e);
+          throw e;
+
+        } finally {
+          this.currentFile = null;
+          this.currentWriter = null;
+          this.currentRows = 0;
+        }
       }
     }
 
diff --git a/core/src/main/java/org/apache/iceberg/io/SortedPosDeleteWriter.java b/core/src/main/java/org/apache/iceberg/io/SortedPosDeleteWriter.java
index 36a0313a4e..1fd5c00792 100644
--- a/core/src/main/java/org/apache/iceberg/io/SortedPosDeleteWriter.java
+++ b/core/src/main/java/org/apache/iceberg/io/SortedPosDeleteWriter.java
@@ -53,6 +53,7 @@ class SortedPosDeleteWriter<T> implements FileWriter<PositionDelete<T>, DeleteWr
 
   private int records = 0;
   private boolean closed = false;
+  private Throwable failure;
 
   SortedPosDeleteWriter(FileAppenderFactory<T> appenderFactory,
                         OutputFileFactory fileFactory,
@@ -73,6 +74,12 @@ class SortedPosDeleteWriter<T> implements FileWriter<PositionDelete<T>, DeleteWr
     this(appenderFactory, fileFactory, format, partition, DEFAULT_RECORDS_NUM_THRESHOLD);
   }
 
+  protected void setFailure(Throwable throwable) {
+    if (failure == null) {
+      this.failure = throwable;
+    }
+  }
+
   @Override
   public long length() {
     throw new UnsupportedOperationException(this.getClass().getName() + " does not implement length");
@@ -106,6 +113,8 @@ class SortedPosDeleteWriter<T> implements FileWriter<PositionDelete<T>, DeleteWr
   public List<DeleteFile> complete() throws IOException {
     close();
 
+    Preconditions.checkState(failure == null, "Cannot return results from failed writer", failure);
+
     return completedFiles;
   }
 
@@ -116,8 +125,8 @@ class SortedPosDeleteWriter<T> implements FileWriter<PositionDelete<T>, DeleteWr
   @Override
   public void close() throws IOException {
     if (!closed) {
-      flushDeletes();
       this.closed = true;
+      flushDeletes();
     }
   }
 
@@ -157,8 +166,11 @@ class SortedPosDeleteWriter<T> implements FileWriter<PositionDelete<T>, DeleteWr
         positions.forEach(posRow -> closeableWriter.delete(path, posRow.pos(), posRow.row()));
       }
     } catch (IOException e) {
-      throw new UncheckedIOException("Failed to write the sorted path/pos pairs to pos-delete file: " +
-          outputFile.encryptingOutputFile().location(), e);
+      setFailure(e);
+      throw new UncheckedIOException(
+          "Failed to write the sorted path/pos pairs to pos-delete file: " +
+              outputFile.encryptingOutputFile().location(),
+          e);
     }
 
     // Clear the buffered pos-deletions.