You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by ru...@apache.org on 2022/05/09 22:09:40 UTC

[iceberg] branch master updated: Spark 2.4 - 3.X: Backport Fix CommitStateUnknown Handling (#4687) (#4719)

This is an automated email from the ASF dual-hosted git repository.

russellspitzer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 56933c179 Spark 2.4 - 3.X: Backport Fix CommitStateUnknown Handling (#4687) (#4719)
56933c179 is described below

commit 56933c179aedcfd3433f599111099785d2abb575
Author: Russell Spitzer <rs...@apple.com>
AuthorDate: Mon May 9 17:09:34 2022 -0500

    Spark 2.4 - 3.X: Backport Fix CommitStateUnknown Handling (#4687) (#4719)
---
 .../org/apache/iceberg/spark/source/Writer.java    | 44 ++++++----
 .../apache/iceberg/spark/source/ManualSource.java  | 99 ++++++++++++++++++++++
 .../iceberg/spark/source/TestSparkDataWrite.java   | 61 +++++++++++++
 .../apache/iceberg/spark/source/SparkWrite.java    | 44 ++++++----
 .../apache/iceberg/spark/source/ManualSource.java  | 74 ++++++++++++++++
 .../iceberg/spark/source/TestSparkDataWrite.java   | 62 ++++++++++++++
 .../apache/iceberg/spark/source/SparkWrite.java    | 44 ++++++----
 .../apache/iceberg/spark/source/ManualSource.java  | 74 ++++++++++++++++
 .../iceberg/spark/source/TestSparkDataWrite.java   | 62 ++++++++++++++
 9 files changed, 516 insertions(+), 48 deletions(-)

diff --git a/spark/v2.4/spark/src/main/java/org/apache/iceberg/spark/source/Writer.java b/spark/v2.4/spark/src/main/java/org/apache/iceberg/spark/source/Writer.java
index 451487ecf..c4259a9bf 100644
--- a/spark/v2.4/spark/src/main/java/org/apache/iceberg/spark/source/Writer.java
+++ b/spark/v2.4/spark/src/main/java/org/apache/iceberg/spark/source/Writer.java
@@ -33,6 +33,7 @@ import org.apache.iceberg.SnapshotSummary;
 import org.apache.iceberg.SnapshotUpdate;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.exceptions.CommitStateUnknownException;
 import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.io.OutputFileFactory;
 import org.apache.iceberg.io.UnpartitionedWriter;
@@ -78,6 +79,8 @@ class Writer implements DataSourceWriter {
   private final Map<String, String> extraSnapshotMetadata;
   private final boolean partitionedFanoutEnabled;
 
+  private boolean cleanupOnAbort = true;
+
   Writer(SparkSession spark, Table table, SparkWriteConf writeConf, boolean replacePartitions,
          String applicationId, Schema writeSchema, StructType dsSchema) {
     this(spark, table, writeConf, replacePartitions, applicationId, null, writeSchema, dsSchema);
@@ -136,10 +139,15 @@ class Writer implements DataSourceWriter {
       operation.stageOnly();
     }
 
-    long start = System.currentTimeMillis();
-    operation.commit(); // abort is automatically called if this fails
-    long duration = System.currentTimeMillis() - start;
-    LOG.info("Committed in {} ms", duration);
+    try {
+      long start = System.currentTimeMillis();
+      operation.commit(); // abort is automatically called if this fails
+      long duration = System.currentTimeMillis() - start;
+      LOG.info("Committed in {} ms", duration);
+    } catch (CommitStateUnknownException commitStateUnknownException) {
+      cleanupOnAbort = false;
+      throw commitStateUnknownException;
+    }
   }
 
   private void append(WriterCommitMessage[] messages) {
@@ -175,18 +183,22 @@ class Writer implements DataSourceWriter {
 
   @Override
   public void abort(WriterCommitMessage[] messages) {
-    Map<String, String> props = table.properties();
-    Tasks.foreach(files(messages))
-        .retry(PropertyUtil.propertyAsInt(props, COMMIT_NUM_RETRIES, COMMIT_NUM_RETRIES_DEFAULT))
-        .exponentialBackoff(
-            PropertyUtil.propertyAsInt(props, COMMIT_MIN_RETRY_WAIT_MS, COMMIT_MIN_RETRY_WAIT_MS_DEFAULT),
-            PropertyUtil.propertyAsInt(props, COMMIT_MAX_RETRY_WAIT_MS, COMMIT_MAX_RETRY_WAIT_MS_DEFAULT),
-            PropertyUtil.propertyAsInt(props, COMMIT_TOTAL_RETRY_TIME_MS, COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT),
-            2.0 /* exponential */)
-        .throwFailureWhenFinished()
-        .run(file -> {
-          table.io().deleteFile(file.path().toString());
-        });
+    if (cleanupOnAbort) {
+      Map<String, String> props = table.properties();
+      Tasks.foreach(files(messages))
+          .retry(PropertyUtil.propertyAsInt(props, COMMIT_NUM_RETRIES, COMMIT_NUM_RETRIES_DEFAULT))
+          .exponentialBackoff(
+              PropertyUtil.propertyAsInt(props, COMMIT_MIN_RETRY_WAIT_MS, COMMIT_MIN_RETRY_WAIT_MS_DEFAULT),
+              PropertyUtil.propertyAsInt(props, COMMIT_MAX_RETRY_WAIT_MS, COMMIT_MAX_RETRY_WAIT_MS_DEFAULT),
+              PropertyUtil.propertyAsInt(props, COMMIT_TOTAL_RETRY_TIME_MS, COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT),
+              2.0 /* exponential */)
+          .throwFailureWhenFinished()
+          .run(file -> {
+            table.io().deleteFile(file.path().toString());
+          });
+    } else {
+      LOG.warn("Skipping cleaning up of data files because Iceberg was unable to determine the final commit state");
+    }
   }
 
   protected Table table() {
diff --git a/spark/v2.4/spark/src/test/java/org/apache/iceberg/spark/source/ManualSource.java b/spark/v2.4/spark/src/test/java/org/apache/iceberg/spark/source/ManualSource.java
new file mode 100644
index 000000000..158d57473
--- /dev/null
+++ b/spark/v2.4/spark/src/test/java/org/apache/iceberg/spark/source/ManualSource.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.util.Map;
+import java.util.Optional;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.spark.SparkSchemaUtil;
+import org.apache.iceberg.spark.SparkUtil;
+import org.apache.iceberg.spark.SparkWriteConf;
+import org.apache.iceberg.types.TypeUtil;
+import org.apache.spark.sql.SaveMode;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.sources.DataSourceRegister;
+import org.apache.spark.sql.sources.v2.DataSourceOptions;
+import org.apache.spark.sql.sources.v2.DataSourceV2;
+import org.apache.spark.sql.sources.v2.WriteSupport;
+import org.apache.spark.sql.sources.v2.writer.DataSourceWriter;
+import org.apache.spark.sql.types.StructType;
+
+public class ManualSource implements WriteSupport, DataSourceRegister, DataSourceV2 {
+  public static final String SHORT_NAME = "manual_source";
+  public static final String TABLE_NAME = "table_name";
+  private static final Map<String, Table> tableMap = Maps.newHashMap();
+
+  private SparkSession lazySpark = null;
+  private Configuration lazyConf = null;
+
+  public static void setTable(String name, Table table) {
+    Preconditions.checkArgument(!tableMap.containsKey(name), "Cannot set " + name + ". It is already set");
+    tableMap.put(name, table);
+  }
+
+  public static void clearTables() {
+    tableMap.clear();
+  }
+
+  @Override
+  public String shortName() {
+    return SHORT_NAME;
+  }
+
+  @Override
+  public Optional<DataSourceWriter> createWriter(String writeUUID, StructType dsStruct, SaveMode mode,
+                                                 DataSourceOptions options) {
+
+    Map<String, String> properties = options.asMap();
+    Preconditions.checkArgument(properties.containsKey(TABLE_NAME), "Missing property " + TABLE_NAME);
+    String tableName = properties.get(TABLE_NAME);
+    Preconditions.checkArgument(tableMap.containsKey(tableName), "Table missing " + tableName);
+    Table table = tableMap.get(tableName);
+
+    SparkWriteConf writeConf = new SparkWriteConf(lazySparkSession(), table, options.asMap());
+    Schema writeSchema = SparkSchemaUtil.convert(table.schema(), dsStruct);
+    TypeUtil.validateWriteSchema(table.schema(), writeSchema, writeConf.checkNullability(), writeConf.checkOrdering());
+    SparkUtil.validatePartitionTransforms(table.spec());
+    String appId = lazySparkSession().sparkContext().applicationId();
+    String wapId = writeConf.wapId();
+    boolean replacePartitions = mode == SaveMode.Overwrite;
+
+    return Optional.of(new Writer(
+        lazySparkSession(), table, writeConf, replacePartitions, appId, wapId, writeSchema, dsStruct));
+  }
+
+  private SparkSession lazySparkSession() {
+    if (lazySpark == null) {
+      this.lazySpark = SparkSession.builder().getOrCreate();
+    }
+    return lazySpark;
+  }
+
+  private Configuration lazyBaseConf() {
+    if (lazyConf == null) {
+      this.lazyConf = lazySparkSession().sessionState().newHadoopConf();
+    }
+    return lazyConf;
+  }
+}
diff --git a/spark/v2.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java b/spark/v2.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java
index f9241758c..e7a6d1623 100644
--- a/spark/v2.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java
+++ b/spark/v2.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java
@@ -24,6 +24,8 @@ import java.io.IOException;
 import java.util.List;
 import java.util.Locale;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.AppendFiles;
+import org.apache.iceberg.AssertHelpers;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.ManifestFile;
@@ -32,11 +34,13 @@ import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.exceptions.CommitStateUnknownException;
 import org.apache.iceberg.hadoop.HadoopTables;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.spark.SparkWriteOptions;
 import org.apache.iceberg.types.Types;
+import org.apache.spark.SparkException;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Encoders;
 import org.apache.spark.sql.Row;
@@ -54,6 +58,9 @@ import org.junit.runners.Parameterized;
 
 import static org.apache.iceberg.TableProperties.SPARK_WRITE_PARTITIONED_FANOUT_ENABLED;
 import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
 
 @RunWith(Parameterized.class)
 public class TestSparkDataWrite {
@@ -78,6 +85,11 @@ public class TestSparkDataWrite {
     TestSparkDataWrite.spark = SparkSession.builder().master("local[2]").getOrCreate();
   }
 
+  @Parameterized.AfterParam
+  public static void clearSourceCache() {
+    ManualSource.clearTables();
+  }
+
   @AfterClass
   public static void stopSpark() {
     SparkSession currentSpark = TestSparkDataWrite.spark;
@@ -588,6 +600,55 @@ public class TestSparkDataWrite {
     Assert.assertTrue("All DataFiles contain 1000 rows", files.stream().allMatch(d -> d.recordCount() == 1000));
   }
 
+  @Test
+  public void testCommitUnknownException() throws IOException {
+    File parent = temp.newFolder(format.toString());
+    File location = new File(parent, "commitunknown");
+
+    HadoopTables tables = new HadoopTables(CONF);
+    PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).identity("data").build();
+    Table table = tables.create(SCHEMA, spec, location.toString());
+
+    List<SimpleRecord> records = Lists.newArrayList(
+        new SimpleRecord(1, "a"),
+        new SimpleRecord(2, "b"),
+        new SimpleRecord(3, "c")
+    );
+
+    Dataset<Row> df = spark.createDataFrame(records, SimpleRecord.class);
+
+    AppendFiles append = table.newFastAppend();
+    AppendFiles spyAppend = spy(append);
+    doAnswer(invocation -> {
+      append.commit();
+      throw new CommitStateUnknownException(new RuntimeException("Datacenter on Fire"));
+    }).when(spyAppend).commit();
+
+    Table spyTable = spy(table);
+    when(spyTable.newAppend()).thenReturn(spyAppend);
+
+    String manualTableName = "unknown_exception";
+    ManualSource.setTable(manualTableName, spyTable);
+
+    // Although an exception is thrown here, write and commit have succeeded
+    AssertHelpers.assertThrowsWithCause("Should throw a Commit State Unknown Exception",
+        SparkException.class,
+        "Writing job aborted",
+        CommitStateUnknownException.class,
+        "Datacenter on Fire",
+        () -> df.select("id", "data").sort("data").write()
+            .format("org.apache.iceberg.spark.source.ManualSource")
+            .option(ManualSource.TABLE_NAME, manualTableName)
+            .mode(SaveMode.Append)
+            .save(location.toString()));
+
+    // Since write and commit succeeded, the rows should be readable
+    Dataset<Row> result = spark.read().format("iceberg").load(location.toString());
+    List<SimpleRecord> actual = result.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList();
+    Assert.assertEquals("Number of rows should match", records.size(), actual.size());
+    Assert.assertEquals("Result rows should match", records, actual);
+  }
+
   public enum IcebergOptionsType {
     NONE,
     TABLE,
diff --git a/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java b/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
index 934a03490..80e6de96a 100644
--- a/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
+++ b/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
@@ -43,6 +43,7 @@ import org.apache.iceberg.SnapshotSummary;
 import org.apache.iceberg.SnapshotUpdate;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.exceptions.CommitStateUnknownException;
 import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.expressions.Expressions;
 import org.apache.iceberg.io.ClusteredDataWriter;
@@ -105,6 +106,8 @@ class SparkWrite {
   private final Map<String, String> extraSnapshotMetadata;
   private final boolean partitionedFanoutEnabled;
 
+  private boolean cleanupOnAbort = true;
+
   SparkWrite(SparkSession spark, Table table, SparkWriteConf writeConf,
              LogicalWriteInfo writeInfo, String applicationId,
              Schema writeSchema, StructType dsSchema) {
@@ -178,25 +181,34 @@ class SparkWrite {
       operation.stageOnly();
     }
 
-    long start = System.currentTimeMillis();
-    operation.commit(); // abort is automatically called if this fails
-    long duration = System.currentTimeMillis() - start;
-    LOG.info("Committed in {} ms", duration);
+    try {
+      long start = System.currentTimeMillis();
+      operation.commit(); // abort is automatically called if this fails
+      long duration = System.currentTimeMillis() - start;
+      LOG.info("Committed in {} ms", duration);
+    } catch (CommitStateUnknownException commitStateUnknownException) {
+      cleanupOnAbort = false;
+      throw commitStateUnknownException;
+    }
   }
 
   private void abort(WriterCommitMessage[] messages) {
-    Map<String, String> props = table.properties();
-    Tasks.foreach(files(messages))
-        .retry(PropertyUtil.propertyAsInt(props, COMMIT_NUM_RETRIES, COMMIT_NUM_RETRIES_DEFAULT))
-        .exponentialBackoff(
-            PropertyUtil.propertyAsInt(props, COMMIT_MIN_RETRY_WAIT_MS, COMMIT_MIN_RETRY_WAIT_MS_DEFAULT),
-            PropertyUtil.propertyAsInt(props, COMMIT_MAX_RETRY_WAIT_MS, COMMIT_MAX_RETRY_WAIT_MS_DEFAULT),
-            PropertyUtil.propertyAsInt(props, COMMIT_TOTAL_RETRY_TIME_MS, COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT),
-            2.0 /* exponential */)
-        .throwFailureWhenFinished()
-        .run(file -> {
-          table.io().deleteFile(file.path().toString());
-        });
+    if (cleanupOnAbort) {
+      Map<String, String> props = table.properties();
+      Tasks.foreach(files(messages))
+          .retry(PropertyUtil.propertyAsInt(props, COMMIT_NUM_RETRIES, COMMIT_NUM_RETRIES_DEFAULT))
+          .exponentialBackoff(
+              PropertyUtil.propertyAsInt(props, COMMIT_MIN_RETRY_WAIT_MS, COMMIT_MIN_RETRY_WAIT_MS_DEFAULT),
+              PropertyUtil.propertyAsInt(props, COMMIT_MAX_RETRY_WAIT_MS, COMMIT_MAX_RETRY_WAIT_MS_DEFAULT),
+              PropertyUtil.propertyAsInt(props, COMMIT_TOTAL_RETRY_TIME_MS, COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT),
+              2.0 /* exponential */)
+          .throwFailureWhenFinished()
+          .run(file -> {
+            table.io().deleteFile(file.path().toString());
+          });
+    } else {
+      LOG.warn("Skipping cleaning up of data files because Iceberg was unable to determine the final commit state");
+    }
   }
 
   private Iterable<DataFile> files(WriterCommitMessage[] messages) {
diff --git a/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/ManualSource.java b/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/ManualSource.java
new file mode 100644
index 000000000..0f8c8b3b6
--- /dev/null
+++ b/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/ManualSource.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.util.Map;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.spark.sql.connector.catalog.Table;
+import org.apache.spark.sql.connector.catalog.TableProvider;
+import org.apache.spark.sql.connector.expressions.Transform;
+import org.apache.spark.sql.sources.DataSourceRegister;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+
+public class ManualSource implements TableProvider, DataSourceRegister {
+  public static final String SHORT_NAME = "manual_source";
+  public static final String TABLE_NAME = "TABLE_NAME";
+  private static final Map<String, Table> tableMap = Maps.newHashMap();
+
+  public static void setTable(String name, Table table) {
+    Preconditions.checkArgument(!tableMap.containsKey(name), "Cannot set " + name + ". It is already set");
+    tableMap.put(name, table);
+  }
+
+  public static void clearTables() {
+    tableMap.clear();
+  }
+
+  @Override
+  public String shortName() {
+    return SHORT_NAME;
+  }
+
+  @Override
+  public StructType inferSchema(CaseInsensitiveStringMap options) {
+    return getTable(null, null, options).schema();
+  }
+
+  @Override
+  public Transform[] inferPartitioning(CaseInsensitiveStringMap options) {
+    return getTable(null, null, options).partitioning();
+  }
+
+  @Override
+  public org.apache.spark.sql.connector.catalog.Table getTable(
+      StructType schema, Transform[] partitioning, Map<String, String> properties) {
+    Preconditions.checkArgument(properties.containsKey(TABLE_NAME), "Missing property " + TABLE_NAME);
+    String tableName = properties.get(TABLE_NAME);
+    Preconditions.checkArgument(tableMap.containsKey(tableName), "Table missing " + tableName);
+    return tableMap.get(tableName);
+  }
+
+  @Override
+  public boolean supportsExternalMetadata() {
+    return false;
+  }
+}
diff --git a/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java b/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java
index f9241758c..8bf57ba65 100644
--- a/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java
+++ b/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java
@@ -24,6 +24,8 @@ import java.io.IOException;
 import java.util.List;
 import java.util.Locale;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.AppendFiles;
+import org.apache.iceberg.AssertHelpers;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.ManifestFile;
@@ -32,11 +34,13 @@ import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.exceptions.CommitStateUnknownException;
 import org.apache.iceberg.hadoop.HadoopTables;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.spark.SparkWriteOptions;
 import org.apache.iceberg.types.Types;
+import org.apache.spark.SparkException;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Encoders;
 import org.apache.spark.sql.Row;
@@ -54,6 +58,9 @@ import org.junit.runners.Parameterized;
 
 import static org.apache.iceberg.TableProperties.SPARK_WRITE_PARTITIONED_FANOUT_ENABLED;
 import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
 
 @RunWith(Parameterized.class)
 public class TestSparkDataWrite {
@@ -78,6 +85,11 @@ public class TestSparkDataWrite {
     TestSparkDataWrite.spark = SparkSession.builder().master("local[2]").getOrCreate();
   }
 
+  @Parameterized.AfterParam
+  public static void clearSourceCache() {
+    ManualSource.clearTables();
+  }
+
   @AfterClass
   public static void stopSpark() {
     SparkSession currentSpark = TestSparkDataWrite.spark;
@@ -588,6 +600,56 @@ public class TestSparkDataWrite {
     Assert.assertTrue("All DataFiles contain 1000 rows", files.stream().allMatch(d -> d.recordCount() == 1000));
   }
 
+  @Test
+  public void testCommitUnknownException() throws IOException {
+    File parent = temp.newFolder(format.toString());
+    File location = new File(parent, "commitunknown");
+
+    HadoopTables tables = new HadoopTables(CONF);
+    PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).identity("data").build();
+    Table table = tables.create(SCHEMA, spec, location.toString());
+
+    List<SimpleRecord> records = Lists.newArrayList(
+        new SimpleRecord(1, "a"),
+        new SimpleRecord(2, "b"),
+        new SimpleRecord(3, "c")
+    );
+
+    Dataset<Row> df = spark.createDataFrame(records, SimpleRecord.class);
+
+    AppendFiles append = table.newFastAppend();
+    AppendFiles spyAppend = spy(append);
+    doAnswer(invocation -> {
+      append.commit();
+      throw new CommitStateUnknownException(new RuntimeException("Datacenter on Fire"));
+    }).when(spyAppend).commit();
+
+    Table spyTable = spy(table);
+    when(spyTable.newAppend()).thenReturn(spyAppend);
+    SparkTable sparkTable = new SparkTable(spyTable, false);
+
+    String manualTableName = "unknown_exception";
+    ManualSource.setTable(manualTableName, sparkTable);
+
+    // Although an exception is thrown here, write and commit have succeeded
+    AssertHelpers.assertThrowsWithCause("Should throw a Commit State Unknown Exception",
+        SparkException.class,
+        "Writing job aborted",
+        CommitStateUnknownException.class,
+        "Datacenter on Fire",
+        () -> df.select("id", "data").sort("data").write()
+            .format("org.apache.iceberg.spark.source.ManualSource")
+            .option(ManualSource.TABLE_NAME, manualTableName)
+            .mode(SaveMode.Append)
+            .save(location.toString()));
+
+    // Since write and commit succeeded, the rows should be readable
+    Dataset<Row> result = spark.read().format("iceberg").load(location.toString());
+    List<SimpleRecord> actual = result.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList();
+    Assert.assertEquals("Number of rows should match", records.size(), actual.size());
+    Assert.assertEquals("Result rows should match", records, actual);
+  }
+
   public enum IcebergOptionsType {
     NONE,
     TABLE,
diff --git a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
index 934a03490..80e6de96a 100644
--- a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
+++ b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
@@ -43,6 +43,7 @@ import org.apache.iceberg.SnapshotSummary;
 import org.apache.iceberg.SnapshotUpdate;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.exceptions.CommitStateUnknownException;
 import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.expressions.Expressions;
 import org.apache.iceberg.io.ClusteredDataWriter;
@@ -105,6 +106,8 @@ class SparkWrite {
   private final Map<String, String> extraSnapshotMetadata;
   private final boolean partitionedFanoutEnabled;
 
+  private boolean cleanupOnAbort = true;
+
   SparkWrite(SparkSession spark, Table table, SparkWriteConf writeConf,
              LogicalWriteInfo writeInfo, String applicationId,
              Schema writeSchema, StructType dsSchema) {
@@ -178,25 +181,34 @@ class SparkWrite {
       operation.stageOnly();
     }
 
-    long start = System.currentTimeMillis();
-    operation.commit(); // abort is automatically called if this fails
-    long duration = System.currentTimeMillis() - start;
-    LOG.info("Committed in {} ms", duration);
+    try {
+      long start = System.currentTimeMillis();
+      operation.commit(); // abort is automatically called if this fails
+      long duration = System.currentTimeMillis() - start;
+      LOG.info("Committed in {} ms", duration);
+    } catch (CommitStateUnknownException commitStateUnknownException) {
+      cleanupOnAbort = false;
+      throw commitStateUnknownException;
+    }
   }
 
   private void abort(WriterCommitMessage[] messages) {
-    Map<String, String> props = table.properties();
-    Tasks.foreach(files(messages))
-        .retry(PropertyUtil.propertyAsInt(props, COMMIT_NUM_RETRIES, COMMIT_NUM_RETRIES_DEFAULT))
-        .exponentialBackoff(
-            PropertyUtil.propertyAsInt(props, COMMIT_MIN_RETRY_WAIT_MS, COMMIT_MIN_RETRY_WAIT_MS_DEFAULT),
-            PropertyUtil.propertyAsInt(props, COMMIT_MAX_RETRY_WAIT_MS, COMMIT_MAX_RETRY_WAIT_MS_DEFAULT),
-            PropertyUtil.propertyAsInt(props, COMMIT_TOTAL_RETRY_TIME_MS, COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT),
-            2.0 /* exponential */)
-        .throwFailureWhenFinished()
-        .run(file -> {
-          table.io().deleteFile(file.path().toString());
-        });
+    if (cleanupOnAbort) {
+      Map<String, String> props = table.properties();
+      Tasks.foreach(files(messages))
+          .retry(PropertyUtil.propertyAsInt(props, COMMIT_NUM_RETRIES, COMMIT_NUM_RETRIES_DEFAULT))
+          .exponentialBackoff(
+              PropertyUtil.propertyAsInt(props, COMMIT_MIN_RETRY_WAIT_MS, COMMIT_MIN_RETRY_WAIT_MS_DEFAULT),
+              PropertyUtil.propertyAsInt(props, COMMIT_MAX_RETRY_WAIT_MS, COMMIT_MAX_RETRY_WAIT_MS_DEFAULT),
+              PropertyUtil.propertyAsInt(props, COMMIT_TOTAL_RETRY_TIME_MS, COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT),
+              2.0 /* exponential */)
+          .throwFailureWhenFinished()
+          .run(file -> {
+            table.io().deleteFile(file.path().toString());
+          });
+    } else {
+      LOG.warn("Skipping cleaning up of data files because Iceberg was unable to determine the final commit state");
+    }
   }
 
   private Iterable<DataFile> files(WriterCommitMessage[] messages) {
diff --git a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/ManualSource.java b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/ManualSource.java
new file mode 100644
index 000000000..0f8c8b3b6
--- /dev/null
+++ b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/ManualSource.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.util.Map;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.spark.sql.connector.catalog.Table;
+import org.apache.spark.sql.connector.catalog.TableProvider;
+import org.apache.spark.sql.connector.expressions.Transform;
+import org.apache.spark.sql.sources.DataSourceRegister;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+
+public class ManualSource implements TableProvider, DataSourceRegister {
+  public static final String SHORT_NAME = "manual_source";
+  public static final String TABLE_NAME = "TABLE_NAME";
+  private static final Map<String, Table> tableMap = Maps.newHashMap();
+
+  public static void setTable(String name, Table table) {
+    Preconditions.checkArgument(!tableMap.containsKey(name), "Cannot set " + name + ". It is already set");
+    tableMap.put(name, table);
+  }
+
+  public static void clearTables() {
+    tableMap.clear();
+  }
+
+  @Override
+  public String shortName() {
+    return SHORT_NAME;
+  }
+
+  @Override
+  public StructType inferSchema(CaseInsensitiveStringMap options) {
+    return getTable(null, null, options).schema();
+  }
+
+  @Override
+  public Transform[] inferPartitioning(CaseInsensitiveStringMap options) {
+    return getTable(null, null, options).partitioning();
+  }
+
+  @Override
+  public org.apache.spark.sql.connector.catalog.Table getTable(
+      StructType schema, Transform[] partitioning, Map<String, String> properties) {
+    Preconditions.checkArgument(properties.containsKey(TABLE_NAME), "Missing property " + TABLE_NAME);
+    String tableName = properties.get(TABLE_NAME);
+    Preconditions.checkArgument(tableMap.containsKey(tableName), "Table missing " + tableName);
+    return tableMap.get(tableName);
+  }
+
+  @Override
+  public boolean supportsExternalMetadata() {
+    return false;
+  }
+}
diff --git a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java
index f9241758c..8bf57ba65 100644
--- a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java
+++ b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java
@@ -24,6 +24,8 @@ import java.io.IOException;
 import java.util.List;
 import java.util.Locale;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.AppendFiles;
+import org.apache.iceberg.AssertHelpers;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.ManifestFile;
@@ -32,11 +34,13 @@ import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.exceptions.CommitStateUnknownException;
 import org.apache.iceberg.hadoop.HadoopTables;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.spark.SparkWriteOptions;
 import org.apache.iceberg.types.Types;
+import org.apache.spark.SparkException;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Encoders;
 import org.apache.spark.sql.Row;
@@ -54,6 +58,9 @@ import org.junit.runners.Parameterized;
 
 import static org.apache.iceberg.TableProperties.SPARK_WRITE_PARTITIONED_FANOUT_ENABLED;
 import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
 
 @RunWith(Parameterized.class)
 public class TestSparkDataWrite {
@@ -78,6 +85,11 @@ public class TestSparkDataWrite {
     TestSparkDataWrite.spark = SparkSession.builder().master("local[2]").getOrCreate();
   }
 
+  @Parameterized.AfterParam
+  public static void clearSourceCache() {
+    ManualSource.clearTables();
+  }
+
   @AfterClass
   public static void stopSpark() {
     SparkSession currentSpark = TestSparkDataWrite.spark;
@@ -588,6 +600,56 @@ public class TestSparkDataWrite {
     Assert.assertTrue("All DataFiles contain 1000 rows", files.stream().allMatch(d -> d.recordCount() == 1000));
   }
 
+  @Test
+  public void testCommitUnknownException() throws IOException {
+    File parent = temp.newFolder(format.toString());
+    File location = new File(parent, "commitunknown");
+
+    HadoopTables tables = new HadoopTables(CONF);
+    PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).identity("data").build();
+    Table table = tables.create(SCHEMA, spec, location.toString());
+
+    List<SimpleRecord> records = Lists.newArrayList(
+        new SimpleRecord(1, "a"),
+        new SimpleRecord(2, "b"),
+        new SimpleRecord(3, "c")
+    );
+
+    Dataset<Row> df = spark.createDataFrame(records, SimpleRecord.class);
+
+    AppendFiles append = table.newFastAppend();
+    AppendFiles spyAppend = spy(append);
+    doAnswer(invocation -> {
+      append.commit();
+      throw new CommitStateUnknownException(new RuntimeException("Datacenter on Fire"));
+    }).when(spyAppend).commit();
+
+    Table spyTable = spy(table);
+    when(spyTable.newAppend()).thenReturn(spyAppend);
+    SparkTable sparkTable = new SparkTable(spyTable, false);
+
+    String manualTableName = "unknown_exception";
+    ManualSource.setTable(manualTableName, sparkTable);
+
+    // Although an exception is thrown here, write and commit have succeeded
+    AssertHelpers.assertThrowsWithCause("Should throw a Commit State Unknown Exception",
+        SparkException.class,
+        "Writing job aborted",
+        CommitStateUnknownException.class,
+        "Datacenter on Fire",
+        () -> df.select("id", "data").sort("data").write()
+            .format("org.apache.iceberg.spark.source.ManualSource")
+            .option(ManualSource.TABLE_NAME, manualTableName)
+            .mode(SaveMode.Append)
+            .save(location.toString()));
+
+    // Since write and commit succeeded, the rows should be readable
+    Dataset<Row> result = spark.read().format("iceberg").load(location.toString());
+    List<SimpleRecord> actual = result.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList();
+    Assert.assertEquals("Number of rows should match", records.size(), actual.size());
+    Assert.assertEquals("Result rows should match", records, actual);
+  }
+
   public enum IcebergOptionsType {
     NONE,
     TABLE,