You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2022/05/17 16:49:30 UTC

[iceberg] branch 0.13.x updated: Spark: Update commit state unknown handling (backport to 0.13) (#4787)

This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch 0.13.x
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/0.13.x by this push:
     new 304fc89d8 Spark: Update commit state unknown handling (backport to 0.13) (#4787)
304fc89d8 is described below

commit 304fc89d833e084dd59edb7f0aaa431199c65cb7
Author: Eduard Tudenhöfner <et...@gmail.com>
AuthorDate: Tue May 17 18:49:25 2022 +0200

    Spark: Update commit state unknown handling (backport to 0.13) (#4787)
    
    Co-authored-by: Russell Spitzer <rs...@apple.com>
---
 .../java/org/apache/iceberg/PendingUpdate.java     |  3 +
 .../org/apache/iceberg/spark/source/Writer.java    | 44 ++++++----
 .../apache/iceberg/spark/source/ManualSource.java  | 99 ++++++++++++++++++++++
 .../iceberg/spark/source/TestSparkDataWrite.java   | 61 +++++++++++++
 .../apache/iceberg/spark/source/SparkWrite.java    | 44 ++++++----
 .../apache/iceberg/spark/source/ManualSource.java  | 74 ++++++++++++++++
 .../iceberg/spark/source/TestSparkDataWrite.java   | 62 ++++++++++++++
 .../apache/iceberg/spark/source/SparkWrite.java    | 44 ++++++----
 .../apache/iceberg/spark/source/ManualSource.java  | 74 ++++++++++++++++
 .../iceberg/spark/source/TestSparkDataWrite.java   | 62 ++++++++++++++
 .../apache/iceberg/spark/source/SparkWrite.java    | 44 ++++++----
 .../apache/iceberg/spark/source/ManualSource.java  | 74 ++++++++++++++++
 .../iceberg/spark/source/TestSparkDataWrite.java   | 62 ++++++++++++++
 13 files changed, 683 insertions(+), 64 deletions(-)

diff --git a/api/src/main/java/org/apache/iceberg/PendingUpdate.java b/api/src/main/java/org/apache/iceberg/PendingUpdate.java
index 8e47dd006..9c1b18434 100644
--- a/api/src/main/java/org/apache/iceberg/PendingUpdate.java
+++ b/api/src/main/java/org/apache/iceberg/PendingUpdate.java
@@ -20,6 +20,7 @@
 package org.apache.iceberg;
 
 import org.apache.iceberg.exceptions.CommitFailedException;
+import org.apache.iceberg.exceptions.CommitStateUnknownException;
 import org.apache.iceberg.exceptions.ValidationException;
 
 /**
@@ -49,6 +50,8 @@ public interface PendingUpdate<T> {
    *
    * @throws ValidationException If the update cannot be applied to the current table metadata.
    * @throws CommitFailedException If the update cannot be committed due to conflicts.
+   * @throws CommitStateUnknownException If the update success or failure is unknown, no cleanup should be done in
+   * this case.
    */
   void commit();
 
diff --git a/spark/v2.4/spark/src/main/java/org/apache/iceberg/spark/source/Writer.java b/spark/v2.4/spark/src/main/java/org/apache/iceberg/spark/source/Writer.java
index 451487ecf..c4259a9bf 100644
--- a/spark/v2.4/spark/src/main/java/org/apache/iceberg/spark/source/Writer.java
+++ b/spark/v2.4/spark/src/main/java/org/apache/iceberg/spark/source/Writer.java
@@ -33,6 +33,7 @@ import org.apache.iceberg.SnapshotSummary;
 import org.apache.iceberg.SnapshotUpdate;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.exceptions.CommitStateUnknownException;
 import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.io.OutputFileFactory;
 import org.apache.iceberg.io.UnpartitionedWriter;
@@ -78,6 +79,8 @@ class Writer implements DataSourceWriter {
   private final Map<String, String> extraSnapshotMetadata;
   private final boolean partitionedFanoutEnabled;
 
+  private boolean cleanupOnAbort = true;
+
   Writer(SparkSession spark, Table table, SparkWriteConf writeConf, boolean replacePartitions,
          String applicationId, Schema writeSchema, StructType dsSchema) {
     this(spark, table, writeConf, replacePartitions, applicationId, null, writeSchema, dsSchema);
@@ -136,10 +139,15 @@ class Writer implements DataSourceWriter {
       operation.stageOnly();
     }
 
-    long start = System.currentTimeMillis();
-    operation.commit(); // abort is automatically called if this fails
-    long duration = System.currentTimeMillis() - start;
-    LOG.info("Committed in {} ms", duration);
+    try {
+      long start = System.currentTimeMillis();
+      operation.commit(); // abort is automatically called if this fails
+      long duration = System.currentTimeMillis() - start;
+      LOG.info("Committed in {} ms", duration);
+    } catch (CommitStateUnknownException commitStateUnknownException) {
+      cleanupOnAbort = false;
+      throw commitStateUnknownException;
+    }
   }
 
   private void append(WriterCommitMessage[] messages) {
@@ -175,18 +183,22 @@ class Writer implements DataSourceWriter {
 
   @Override
   public void abort(WriterCommitMessage[] messages) {
-    Map<String, String> props = table.properties();
-    Tasks.foreach(files(messages))
-        .retry(PropertyUtil.propertyAsInt(props, COMMIT_NUM_RETRIES, COMMIT_NUM_RETRIES_DEFAULT))
-        .exponentialBackoff(
-            PropertyUtil.propertyAsInt(props, COMMIT_MIN_RETRY_WAIT_MS, COMMIT_MIN_RETRY_WAIT_MS_DEFAULT),
-            PropertyUtil.propertyAsInt(props, COMMIT_MAX_RETRY_WAIT_MS, COMMIT_MAX_RETRY_WAIT_MS_DEFAULT),
-            PropertyUtil.propertyAsInt(props, COMMIT_TOTAL_RETRY_TIME_MS, COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT),
-            2.0 /* exponential */)
-        .throwFailureWhenFinished()
-        .run(file -> {
-          table.io().deleteFile(file.path().toString());
-        });
+    if (cleanupOnAbort) {
+      Map<String, String> props = table.properties();
+      Tasks.foreach(files(messages))
+          .retry(PropertyUtil.propertyAsInt(props, COMMIT_NUM_RETRIES, COMMIT_NUM_RETRIES_DEFAULT))
+          .exponentialBackoff(
+              PropertyUtil.propertyAsInt(props, COMMIT_MIN_RETRY_WAIT_MS, COMMIT_MIN_RETRY_WAIT_MS_DEFAULT),
+              PropertyUtil.propertyAsInt(props, COMMIT_MAX_RETRY_WAIT_MS, COMMIT_MAX_RETRY_WAIT_MS_DEFAULT),
+              PropertyUtil.propertyAsInt(props, COMMIT_TOTAL_RETRY_TIME_MS, COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT),
+              2.0 /* exponential */)
+          .throwFailureWhenFinished()
+          .run(file -> {
+            table.io().deleteFile(file.path().toString());
+          });
+    } else {
+      LOG.warn("Skipping cleaning up of data files because Iceberg was unable to determine the final commit state");
+    }
   }
 
   protected Table table() {
diff --git a/spark/v2.4/spark/src/test/java/org/apache/iceberg/spark/source/ManualSource.java b/spark/v2.4/spark/src/test/java/org/apache/iceberg/spark/source/ManualSource.java
new file mode 100644
index 000000000..158d57473
--- /dev/null
+++ b/spark/v2.4/spark/src/test/java/org/apache/iceberg/spark/source/ManualSource.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.util.Map;
+import java.util.Optional;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.spark.SparkSchemaUtil;
+import org.apache.iceberg.spark.SparkUtil;
+import org.apache.iceberg.spark.SparkWriteConf;
+import org.apache.iceberg.types.TypeUtil;
+import org.apache.spark.sql.SaveMode;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.sources.DataSourceRegister;
+import org.apache.spark.sql.sources.v2.DataSourceOptions;
+import org.apache.spark.sql.sources.v2.DataSourceV2;
+import org.apache.spark.sql.sources.v2.WriteSupport;
+import org.apache.spark.sql.sources.v2.writer.DataSourceWriter;
+import org.apache.spark.sql.types.StructType;
+
+public class ManualSource implements WriteSupport, DataSourceRegister, DataSourceV2 {
+  public static final String SHORT_NAME = "manual_source";
+  public static final String TABLE_NAME = "table_name";
+  private static final Map<String, Table> tableMap = Maps.newHashMap();
+
+  private SparkSession lazySpark = null;
+  private Configuration lazyConf = null;
+
+  public static void setTable(String name, Table table) {
+    Preconditions.checkArgument(!tableMap.containsKey(name), "Cannot set " + name + ". It is already set");
+    tableMap.put(name, table);
+  }
+
+  public static void clearTables() {
+    tableMap.clear();
+  }
+
+  @Override
+  public String shortName() {
+    return SHORT_NAME;
+  }
+
+  @Override
+  public Optional<DataSourceWriter> createWriter(String writeUUID, StructType dsStruct, SaveMode mode,
+                                                 DataSourceOptions options) {
+
+    Map<String, String> properties = options.asMap();
+    Preconditions.checkArgument(properties.containsKey(TABLE_NAME), "Missing property " + TABLE_NAME);
+    String tableName = properties.get(TABLE_NAME);
+    Preconditions.checkArgument(tableMap.containsKey(tableName), "Table missing " + tableName);
+    Table table = tableMap.get(tableName);
+
+    SparkWriteConf writeConf = new SparkWriteConf(lazySparkSession(), table, options.asMap());
+    Schema writeSchema = SparkSchemaUtil.convert(table.schema(), dsStruct);
+    TypeUtil.validateWriteSchema(table.schema(), writeSchema, writeConf.checkNullability(), writeConf.checkOrdering());
+    SparkUtil.validatePartitionTransforms(table.spec());
+    String appId = lazySparkSession().sparkContext().applicationId();
+    String wapId = writeConf.wapId();
+    boolean replacePartitions = mode == SaveMode.Overwrite;
+
+    return Optional.of(new Writer(
+        lazySparkSession(), table, writeConf, replacePartitions, appId, wapId, writeSchema, dsStruct));
+  }
+
+  private SparkSession lazySparkSession() {
+    if (lazySpark == null) {
+      this.lazySpark = SparkSession.builder().getOrCreate();
+    }
+    return lazySpark;
+  }
+
+  private Configuration lazyBaseConf() {
+    if (lazyConf == null) {
+      this.lazyConf = lazySparkSession().sessionState().newHadoopConf();
+    }
+    return lazyConf;
+  }
+}
diff --git a/spark/v2.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java b/spark/v2.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java
index 5e99dac5c..6d1d9df63 100644
--- a/spark/v2.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java
+++ b/spark/v2.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java
@@ -24,6 +24,8 @@ import java.io.IOException;
 import java.util.List;
 import java.util.Locale;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.AppendFiles;
+import org.apache.iceberg.AssertHelpers;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.ManifestFile;
@@ -32,11 +34,13 @@ import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.exceptions.CommitStateUnknownException;
 import org.apache.iceberg.hadoop.HadoopTables;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.spark.SparkWriteOptions;
 import org.apache.iceberg.types.Types;
+import org.apache.spark.SparkException;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Encoders;
 import org.apache.spark.sql.Row;
@@ -54,6 +58,9 @@ import org.junit.runners.Parameterized;
 
 import static org.apache.iceberg.TableProperties.SPARK_WRITE_PARTITIONED_FANOUT_ENABLED;
 import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
 
 @RunWith(Parameterized.class)
 public class TestSparkDataWrite {
@@ -78,6 +85,11 @@ public class TestSparkDataWrite {
     TestSparkDataWrite.spark = SparkSession.builder().master("local[2]").getOrCreate();
   }
 
+  @Parameterized.AfterParam
+  public static void clearSourceCache() {
+    ManualSource.clearTables();
+  }
+
   @AfterClass
   public static void stopSpark() {
     SparkSession currentSpark = TestSparkDataWrite.spark;
@@ -592,6 +604,55 @@ public class TestSparkDataWrite {
     }
   }
 
+  @Test
+  public void testCommitUnknownException() throws IOException {
+    File parent = temp.newFolder(format.toString());
+    File location = new File(parent, "commitunknown");
+
+    HadoopTables tables = new HadoopTables(CONF);
+    PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).identity("data").build();
+    Table table = tables.create(SCHEMA, spec, location.toString());
+
+    List<SimpleRecord> records = Lists.newArrayList(
+        new SimpleRecord(1, "a"),
+        new SimpleRecord(2, "b"),
+        new SimpleRecord(3, "c")
+    );
+
+    Dataset<Row> df = spark.createDataFrame(records, SimpleRecord.class);
+
+    AppendFiles append = table.newFastAppend();
+    AppendFiles spyAppend = spy(append);
+    doAnswer(invocation -> {
+      append.commit();
+      throw new CommitStateUnknownException(new RuntimeException("Datacenter on Fire"));
+    }).when(spyAppend).commit();
+
+    Table spyTable = spy(table);
+    when(spyTable.newAppend()).thenReturn(spyAppend);
+
+    String manualTableName = "unknown_exception";
+    ManualSource.setTable(manualTableName, spyTable);
+
+    // Although an exception is thrown here, write and commit have succeeded
+    AssertHelpers.assertThrowsWithCause("Should throw a Commit State Unknown Exception",
+        SparkException.class,
+        "Writing job aborted",
+        CommitStateUnknownException.class,
+        "Datacenter on Fire",
+        () -> df.select("id", "data").sort("data").write()
+            .format("org.apache.iceberg.spark.source.ManualSource")
+            .option(ManualSource.TABLE_NAME, manualTableName)
+            .mode(SaveMode.Append)
+            .save(location.toString()));
+
+    // Since write and commit succeeded, the rows should be readable
+    Dataset<Row> result = spark.read().format("iceberg").load(location.toString());
+    List<SimpleRecord> actual = result.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList();
+    Assert.assertEquals("Number of rows should match", records.size(), actual.size());
+    Assert.assertEquals("Result rows should match", records, actual);
+  }
+
   public enum IcebergOptionsType {
     NONE,
     TABLE,
diff --git a/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java b/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
index 24475899e..87fcd9cad 100644
--- a/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
+++ b/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
@@ -44,6 +44,7 @@ import org.apache.iceberg.SnapshotUpdate;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.encryption.EncryptedOutputFile;
+import org.apache.iceberg.exceptions.CommitStateUnknownException;
 import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.expressions.Expressions;
 import org.apache.iceberg.io.ClusteredDataWriter;
@@ -103,6 +104,8 @@ class SparkWrite {
   private final Map<String, String> extraSnapshotMetadata;
   private final boolean partitionedFanoutEnabled;
 
+  private boolean cleanupOnAbort = true;
+
   SparkWrite(SparkSession spark, Table table, SparkWriteConf writeConf,
              LogicalWriteInfo writeInfo, String applicationId,
              Schema writeSchema, StructType dsSchema) {
@@ -176,25 +179,34 @@ class SparkWrite {
       operation.stageOnly();
     }
 
-    long start = System.currentTimeMillis();
-    operation.commit(); // abort is automatically called if this fails
-    long duration = System.currentTimeMillis() - start;
-    LOG.info("Committed in {} ms", duration);
+    try {
+      long start = System.currentTimeMillis();
+      operation.commit(); // abort is automatically called if this fails
+      long duration = System.currentTimeMillis() - start;
+      LOG.info("Committed in {} ms", duration);
+    } catch (CommitStateUnknownException commitStateUnknownException) {
+      cleanupOnAbort = false;
+      throw commitStateUnknownException;
+    }
   }
 
   private void abort(WriterCommitMessage[] messages) {
-    Map<String, String> props = table.properties();
-    Tasks.foreach(files(messages))
-        .retry(PropertyUtil.propertyAsInt(props, COMMIT_NUM_RETRIES, COMMIT_NUM_RETRIES_DEFAULT))
-        .exponentialBackoff(
-            PropertyUtil.propertyAsInt(props, COMMIT_MIN_RETRY_WAIT_MS, COMMIT_MIN_RETRY_WAIT_MS_DEFAULT),
-            PropertyUtil.propertyAsInt(props, COMMIT_MAX_RETRY_WAIT_MS, COMMIT_MAX_RETRY_WAIT_MS_DEFAULT),
-            PropertyUtil.propertyAsInt(props, COMMIT_TOTAL_RETRY_TIME_MS, COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT),
-            2.0 /* exponential */)
-        .throwFailureWhenFinished()
-        .run(file -> {
-          table.io().deleteFile(file.path().toString());
-        });
+    if (cleanupOnAbort) {
+      Map<String, String> props = table.properties();
+      Tasks.foreach(files(messages))
+          .retry(PropertyUtil.propertyAsInt(props, COMMIT_NUM_RETRIES, COMMIT_NUM_RETRIES_DEFAULT))
+          .exponentialBackoff(
+              PropertyUtil.propertyAsInt(props, COMMIT_MIN_RETRY_WAIT_MS, COMMIT_MIN_RETRY_WAIT_MS_DEFAULT),
+              PropertyUtil.propertyAsInt(props, COMMIT_MAX_RETRY_WAIT_MS, COMMIT_MAX_RETRY_WAIT_MS_DEFAULT),
+              PropertyUtil.propertyAsInt(props, COMMIT_TOTAL_RETRY_TIME_MS, COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT),
+              2.0 /* exponential */)
+          .throwFailureWhenFinished()
+          .run(file -> {
+            table.io().deleteFile(file.path().toString());
+          });
+    } else {
+      LOG.warn("Skipping cleaning up of data files because Iceberg was unable to determine the final commit state");
+    }
   }
 
   private Iterable<DataFile> files(WriterCommitMessage[] messages) {
diff --git a/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/ManualSource.java b/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/ManualSource.java
new file mode 100644
index 000000000..0f8c8b3b6
--- /dev/null
+++ b/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/ManualSource.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.util.Map;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.spark.sql.connector.catalog.Table;
+import org.apache.spark.sql.connector.catalog.TableProvider;
+import org.apache.spark.sql.connector.expressions.Transform;
+import org.apache.spark.sql.sources.DataSourceRegister;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+
+public class ManualSource implements TableProvider, DataSourceRegister {
+  public static final String SHORT_NAME = "manual_source";
+  public static final String TABLE_NAME = "TABLE_NAME";
+  private static final Map<String, Table> tableMap = Maps.newHashMap();
+
+  public static void setTable(String name, Table table) {
+    Preconditions.checkArgument(!tableMap.containsKey(name), "Cannot set " + name + ". It is already set");
+    tableMap.put(name, table);
+  }
+
+  public static void clearTables() {
+    tableMap.clear();
+  }
+
+  @Override
+  public String shortName() {
+    return SHORT_NAME;
+  }
+
+  @Override
+  public StructType inferSchema(CaseInsensitiveStringMap options) {
+    return getTable(null, null, options).schema();
+  }
+
+  @Override
+  public Transform[] inferPartitioning(CaseInsensitiveStringMap options) {
+    return getTable(null, null, options).partitioning();
+  }
+
+  @Override
+  public org.apache.spark.sql.connector.catalog.Table getTable(
+      StructType schema, Transform[] partitioning, Map<String, String> properties) {
+    Preconditions.checkArgument(properties.containsKey(TABLE_NAME), "Missing property " + TABLE_NAME);
+    String tableName = properties.get(TABLE_NAME);
+    Preconditions.checkArgument(tableMap.containsKey(tableName), "Table missing " + tableName);
+    return tableMap.get(tableName);
+  }
+
+  @Override
+  public boolean supportsExternalMetadata() {
+    return false;
+  }
+}
diff --git a/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java b/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java
index 5e99dac5c..0082394fb 100644
--- a/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java
+++ b/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java
@@ -24,6 +24,8 @@ import java.io.IOException;
 import java.util.List;
 import java.util.Locale;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.AppendFiles;
+import org.apache.iceberg.AssertHelpers;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.ManifestFile;
@@ -32,11 +34,13 @@ import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.exceptions.CommitStateUnknownException;
 import org.apache.iceberg.hadoop.HadoopTables;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.spark.SparkWriteOptions;
 import org.apache.iceberg.types.Types;
+import org.apache.spark.SparkException;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Encoders;
 import org.apache.spark.sql.Row;
@@ -54,6 +58,9 @@ import org.junit.runners.Parameterized;
 
 import static org.apache.iceberg.TableProperties.SPARK_WRITE_PARTITIONED_FANOUT_ENABLED;
 import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
 
 @RunWith(Parameterized.class)
 public class TestSparkDataWrite {
@@ -78,6 +85,11 @@ public class TestSparkDataWrite {
     TestSparkDataWrite.spark = SparkSession.builder().master("local[2]").getOrCreate();
   }
 
+  @Parameterized.AfterParam
+  public static void clearSourceCache() {
+    ManualSource.clearTables();
+  }
+
   @AfterClass
   public static void stopSpark() {
     SparkSession currentSpark = TestSparkDataWrite.spark;
@@ -592,6 +604,56 @@ public class TestSparkDataWrite {
     }
   }
 
+  @Test
+  public void testCommitUnknownException() throws IOException {
+    File parent = temp.newFolder(format.toString());
+    File location = new File(parent, "commitunknown");
+
+    HadoopTables tables = new HadoopTables(CONF);
+    PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).identity("data").build();
+    Table table = tables.create(SCHEMA, spec, location.toString());
+
+    List<SimpleRecord> records = Lists.newArrayList(
+        new SimpleRecord(1, "a"),
+        new SimpleRecord(2, "b"),
+        new SimpleRecord(3, "c")
+    );
+
+    Dataset<Row> df = spark.createDataFrame(records, SimpleRecord.class);
+
+    AppendFiles append = table.newFastAppend();
+    AppendFiles spyAppend = spy(append);
+    doAnswer(invocation -> {
+      append.commit();
+      throw new CommitStateUnknownException(new RuntimeException("Datacenter on Fire"));
+    }).when(spyAppend).commit();
+
+    Table spyTable = spy(table);
+    when(spyTable.newAppend()).thenReturn(spyAppend);
+    SparkTable sparkTable = new SparkTable(spyTable, false);
+
+    String manualTableName = "unknown_exception";
+    ManualSource.setTable(manualTableName, sparkTable);
+
+    // Although an exception is thrown here, write and commit have succeeded
+    AssertHelpers.assertThrowsWithCause("Should throw a Commit State Unknown Exception",
+        SparkException.class,
+        "Writing job aborted",
+        CommitStateUnknownException.class,
+        "Datacenter on Fire",
+        () -> df.select("id", "data").sort("data").write()
+            .format("org.apache.iceberg.spark.source.ManualSource")
+            .option(ManualSource.TABLE_NAME, manualTableName)
+            .mode(SaveMode.Append)
+            .save(location.toString()));
+
+    // Since write and commit succeeded, the rows should be readable
+    Dataset<Row> result = spark.read().format("iceberg").load(location.toString());
+    List<SimpleRecord> actual = result.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList();
+    Assert.assertEquals("Number of rows should match", records.size(), actual.size());
+    Assert.assertEquals("Result rows should match", records, actual);
+  }
+
   public enum IcebergOptionsType {
     NONE,
     TABLE,
diff --git a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
index 24475899e..87fcd9cad 100644
--- a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
+++ b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
@@ -44,6 +44,7 @@ import org.apache.iceberg.SnapshotUpdate;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.encryption.EncryptedOutputFile;
+import org.apache.iceberg.exceptions.CommitStateUnknownException;
 import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.expressions.Expressions;
 import org.apache.iceberg.io.ClusteredDataWriter;
@@ -103,6 +104,8 @@ class SparkWrite {
   private final Map<String, String> extraSnapshotMetadata;
   private final boolean partitionedFanoutEnabled;
 
+  private boolean cleanupOnAbort = true;
+
   SparkWrite(SparkSession spark, Table table, SparkWriteConf writeConf,
              LogicalWriteInfo writeInfo, String applicationId,
              Schema writeSchema, StructType dsSchema) {
@@ -176,25 +179,34 @@ class SparkWrite {
       operation.stageOnly();
     }
 
-    long start = System.currentTimeMillis();
-    operation.commit(); // abort is automatically called if this fails
-    long duration = System.currentTimeMillis() - start;
-    LOG.info("Committed in {} ms", duration);
+    try {
+      long start = System.currentTimeMillis();
+      operation.commit(); // abort is automatically called if this fails
+      long duration = System.currentTimeMillis() - start;
+      LOG.info("Committed in {} ms", duration);
+    } catch (CommitStateUnknownException commitStateUnknownException) {
+      cleanupOnAbort = false;
+      throw commitStateUnknownException;
+    }
   }
 
   private void abort(WriterCommitMessage[] messages) {
-    Map<String, String> props = table.properties();
-    Tasks.foreach(files(messages))
-        .retry(PropertyUtil.propertyAsInt(props, COMMIT_NUM_RETRIES, COMMIT_NUM_RETRIES_DEFAULT))
-        .exponentialBackoff(
-            PropertyUtil.propertyAsInt(props, COMMIT_MIN_RETRY_WAIT_MS, COMMIT_MIN_RETRY_WAIT_MS_DEFAULT),
-            PropertyUtil.propertyAsInt(props, COMMIT_MAX_RETRY_WAIT_MS, COMMIT_MAX_RETRY_WAIT_MS_DEFAULT),
-            PropertyUtil.propertyAsInt(props, COMMIT_TOTAL_RETRY_TIME_MS, COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT),
-            2.0 /* exponential */)
-        .throwFailureWhenFinished()
-        .run(file -> {
-          table.io().deleteFile(file.path().toString());
-        });
+    if (cleanupOnAbort) {
+      Map<String, String> props = table.properties();
+      Tasks.foreach(files(messages))
+          .retry(PropertyUtil.propertyAsInt(props, COMMIT_NUM_RETRIES, COMMIT_NUM_RETRIES_DEFAULT))
+          .exponentialBackoff(
+              PropertyUtil.propertyAsInt(props, COMMIT_MIN_RETRY_WAIT_MS, COMMIT_MIN_RETRY_WAIT_MS_DEFAULT),
+              PropertyUtil.propertyAsInt(props, COMMIT_MAX_RETRY_WAIT_MS, COMMIT_MAX_RETRY_WAIT_MS_DEFAULT),
+              PropertyUtil.propertyAsInt(props, COMMIT_TOTAL_RETRY_TIME_MS, COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT),
+              2.0 /* exponential */)
+          .throwFailureWhenFinished()
+          .run(file -> {
+            table.io().deleteFile(file.path().toString());
+          });
+    } else {
+      LOG.warn("Skipping cleaning up of data files because Iceberg was unable to determine the final commit state");
+    }
   }
 
   private Iterable<DataFile> files(WriterCommitMessage[] messages) {
diff --git a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/ManualSource.java b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/ManualSource.java
new file mode 100644
index 000000000..0f8c8b3b6
--- /dev/null
+++ b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/ManualSource.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.util.Map;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.spark.sql.connector.catalog.Table;
+import org.apache.spark.sql.connector.catalog.TableProvider;
+import org.apache.spark.sql.connector.expressions.Transform;
+import org.apache.spark.sql.sources.DataSourceRegister;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+
+public class ManualSource implements TableProvider, DataSourceRegister {
+  public static final String SHORT_NAME = "manual_source";
+  public static final String TABLE_NAME = "TABLE_NAME";
+  private static final Map<String, Table> tableMap = Maps.newHashMap();
+
+  public static void setTable(String name, Table table) {
+    Preconditions.checkArgument(!tableMap.containsKey(name), "Cannot set " + name + ". It is already set");
+    tableMap.put(name, table);
+  }
+
+  public static void clearTables() {
+    tableMap.clear();
+  }
+
+  @Override
+  public String shortName() {
+    return SHORT_NAME;
+  }
+
+  @Override
+  public StructType inferSchema(CaseInsensitiveStringMap options) {
+    return getTable(null, null, options).schema();
+  }
+
+  @Override
+  public Transform[] inferPartitioning(CaseInsensitiveStringMap options) {
+    return getTable(null, null, options).partitioning();
+  }
+
+  @Override
+  public org.apache.spark.sql.connector.catalog.Table getTable(
+      StructType schema, Transform[] partitioning, Map<String, String> properties) {
+    Preconditions.checkArgument(properties.containsKey(TABLE_NAME), "Missing property " + TABLE_NAME);
+    String tableName = properties.get(TABLE_NAME);
+    Preconditions.checkArgument(tableMap.containsKey(tableName), "Table missing " + tableName);
+    return tableMap.get(tableName);
+  }
+
+  @Override
+  public boolean supportsExternalMetadata() {
+    return false;
+  }
+}
diff --git a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java
index 5e99dac5c..0082394fb 100644
--- a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java
+++ b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java
@@ -24,6 +24,8 @@ import java.io.IOException;
 import java.util.List;
 import java.util.Locale;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.AppendFiles;
+import org.apache.iceberg.AssertHelpers;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.ManifestFile;
@@ -32,11 +34,13 @@ import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.exceptions.CommitStateUnknownException;
 import org.apache.iceberg.hadoop.HadoopTables;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.spark.SparkWriteOptions;
 import org.apache.iceberg.types.Types;
+import org.apache.spark.SparkException;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Encoders;
 import org.apache.spark.sql.Row;
@@ -54,6 +58,9 @@ import org.junit.runners.Parameterized;
 
 import static org.apache.iceberg.TableProperties.SPARK_WRITE_PARTITIONED_FANOUT_ENABLED;
 import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
 
 @RunWith(Parameterized.class)
 public class TestSparkDataWrite {
@@ -78,6 +85,11 @@ public class TestSparkDataWrite {
     TestSparkDataWrite.spark = SparkSession.builder().master("local[2]").getOrCreate();
   }
 
+  @Parameterized.AfterParam
+  public static void clearSourceCache() {
+    ManualSource.clearTables();
+  }
+
   @AfterClass
   public static void stopSpark() {
     SparkSession currentSpark = TestSparkDataWrite.spark;
@@ -592,6 +604,56 @@ public class TestSparkDataWrite {
     }
   }
 
+  @Test
+  public void testCommitUnknownException() throws IOException {
+    File parent = temp.newFolder(format.toString());
+    File location = new File(parent, "commitunknown");
+
+    HadoopTables tables = new HadoopTables(CONF);
+    PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).identity("data").build();
+    Table table = tables.create(SCHEMA, spec, location.toString());
+
+    List<SimpleRecord> records = Lists.newArrayList(
+        new SimpleRecord(1, "a"),
+        new SimpleRecord(2, "b"),
+        new SimpleRecord(3, "c")
+    );
+
+    Dataset<Row> df = spark.createDataFrame(records, SimpleRecord.class);
+
+    AppendFiles append = table.newFastAppend();
+    AppendFiles spyAppend = spy(append);
+    doAnswer(invocation -> {
+      append.commit();
+      throw new CommitStateUnknownException(new RuntimeException("Datacenter on Fire"));
+    }).when(spyAppend).commit();
+
+    Table spyTable = spy(table);
+    when(spyTable.newAppend()).thenReturn(spyAppend);
+    SparkTable sparkTable = new SparkTable(spyTable, false);
+
+    String manualTableName = "unknown_exception";
+    ManualSource.setTable(manualTableName, sparkTable);
+
+    // Although an exception is thrown here, write and commit have succeeded
+    AssertHelpers.assertThrowsWithCause("Should throw a Commit State Unknown Exception",
+        SparkException.class,
+        "Writing job aborted",
+        CommitStateUnknownException.class,
+        "Datacenter on Fire",
+        () -> df.select("id", "data").sort("data").write()
+            .format("org.apache.iceberg.spark.source.ManualSource")
+            .option(ManualSource.TABLE_NAME, manualTableName)
+            .mode(SaveMode.Append)
+            .save(location.toString()));
+
+    // Since write and commit succeeded, the rows should be readable
+    Dataset<Row> result = spark.read().format("iceberg").load(location.toString());
+    List<SimpleRecord> actual = result.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList();
+    Assert.assertEquals("Number of rows should match", records.size(), actual.size());
+    Assert.assertEquals("Result rows should match", records, actual);
+  }
+
   public enum IcebergOptionsType {
     NONE,
     TABLE,
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
index 1504ba9a8..e1f0ae9fd 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
@@ -43,6 +43,7 @@ import org.apache.iceberg.SnapshotSummary;
 import org.apache.iceberg.SnapshotUpdate;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.encryption.EncryptedOutputFile;
+import org.apache.iceberg.exceptions.CommitStateUnknownException;
 import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.expressions.Expressions;
 import org.apache.iceberg.io.ClusteredDataWriter;
@@ -109,6 +110,8 @@ abstract class SparkWrite implements Write, RequiresDistributionAndOrdering {
   private final Distribution requiredDistribution;
   private final SortOrder[] requiredOrdering;
 
+  private boolean cleanupOnAbort = true;
+
   SparkWrite(SparkSession spark, Table table, SparkWriteConf writeConf,
              LogicalWriteInfo writeInfo, String applicationId,
              Schema writeSchema, StructType dsSchema,
@@ -191,25 +194,34 @@ abstract class SparkWrite implements Write, RequiresDistributionAndOrdering {
       operation.stageOnly();
     }
 
-    long start = System.currentTimeMillis();
-    operation.commit(); // abort is automatically called if this fails
-    long duration = System.currentTimeMillis() - start;
-    LOG.info("Committed in {} ms", duration);
+    try {
+      long start = System.currentTimeMillis();
+      operation.commit(); // abort is automatically called if this fails
+      long duration = System.currentTimeMillis() - start;
+      LOG.info("Committed in {} ms", duration);
+    } catch (CommitStateUnknownException commitStateUnknownException) {
+      cleanupOnAbort = false;
+      throw commitStateUnknownException;
+    }
   }
 
   private void abort(WriterCommitMessage[] messages) {
-    Map<String, String> props = table.properties();
-    Tasks.foreach(files(messages))
-        .retry(PropertyUtil.propertyAsInt(props, COMMIT_NUM_RETRIES, COMMIT_NUM_RETRIES_DEFAULT))
-        .exponentialBackoff(
-            PropertyUtil.propertyAsInt(props, COMMIT_MIN_RETRY_WAIT_MS, COMMIT_MIN_RETRY_WAIT_MS_DEFAULT),
-            PropertyUtil.propertyAsInt(props, COMMIT_MAX_RETRY_WAIT_MS, COMMIT_MAX_RETRY_WAIT_MS_DEFAULT),
-            PropertyUtil.propertyAsInt(props, COMMIT_TOTAL_RETRY_TIME_MS, COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT),
-            2.0 /* exponential */)
-        .throwFailureWhenFinished()
-        .run(file -> {
-          table.io().deleteFile(file.path().toString());
-        });
+    if (cleanupOnAbort) {
+      Map<String, String> props = table.properties();
+      Tasks.foreach(files(messages))
+          .retry(PropertyUtil.propertyAsInt(props, COMMIT_NUM_RETRIES, COMMIT_NUM_RETRIES_DEFAULT))
+          .exponentialBackoff(
+              PropertyUtil.propertyAsInt(props, COMMIT_MIN_RETRY_WAIT_MS, COMMIT_MIN_RETRY_WAIT_MS_DEFAULT),
+              PropertyUtil.propertyAsInt(props, COMMIT_MAX_RETRY_WAIT_MS, COMMIT_MAX_RETRY_WAIT_MS_DEFAULT),
+              PropertyUtil.propertyAsInt(props, COMMIT_TOTAL_RETRY_TIME_MS, COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT),
+              2.0 /* exponential */)
+          .throwFailureWhenFinished()
+          .run(file -> {
+            table.io().deleteFile(file.path().toString());
+          });
+    } else {
+      LOG.warn("Skipping cleaning up of data files because Iceberg was unable to determine the final commit state");
+    }
   }
 
   private Iterable<DataFile> files(WriterCommitMessage[] messages) {
diff --git a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/ManualSource.java b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/ManualSource.java
new file mode 100644
index 000000000..0f8c8b3b6
--- /dev/null
+++ b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/ManualSource.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.util.Map;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.spark.sql.connector.catalog.Table;
+import org.apache.spark.sql.connector.catalog.TableProvider;
+import org.apache.spark.sql.connector.expressions.Transform;
+import org.apache.spark.sql.sources.DataSourceRegister;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+
+public class ManualSource implements TableProvider, DataSourceRegister {
+  public static final String SHORT_NAME = "manual_source";
+  public static final String TABLE_NAME = "TABLE_NAME";
+  private static final Map<String, Table> tableMap = Maps.newHashMap();
+
+  public static void setTable(String name, Table table) {
+    Preconditions.checkArgument(!tableMap.containsKey(name), "Cannot set " + name + ". It is already set");
+    tableMap.put(name, table);
+  }
+
+  public static void clearTables() {
+    tableMap.clear();
+  }
+
+  @Override
+  public String shortName() {
+    return SHORT_NAME;
+  }
+
+  @Override
+  public StructType inferSchema(CaseInsensitiveStringMap options) {
+    return getTable(null, null, options).schema();
+  }
+
+  @Override
+  public Transform[] inferPartitioning(CaseInsensitiveStringMap options) {
+    return getTable(null, null, options).partitioning();
+  }
+
+  @Override
+  public org.apache.spark.sql.connector.catalog.Table getTable(
+      StructType schema, Transform[] partitioning, Map<String, String> properties) {
+    Preconditions.checkArgument(properties.containsKey(TABLE_NAME), "Missing property " + TABLE_NAME);
+    String tableName = properties.get(TABLE_NAME);
+    Preconditions.checkArgument(tableMap.containsKey(tableName), "Table missing " + tableName);
+    return tableMap.get(tableName);
+  }
+
+  @Override
+  public boolean supportsExternalMetadata() {
+    return false;
+  }
+}
diff --git a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java
index 5e99dac5c..0082394fb 100644
--- a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java
+++ b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java
@@ -24,6 +24,8 @@ import java.io.IOException;
 import java.util.List;
 import java.util.Locale;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.AppendFiles;
+import org.apache.iceberg.AssertHelpers;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.ManifestFile;
@@ -32,11 +34,13 @@ import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.exceptions.CommitStateUnknownException;
 import org.apache.iceberg.hadoop.HadoopTables;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.spark.SparkWriteOptions;
 import org.apache.iceberg.types.Types;
+import org.apache.spark.SparkException;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Encoders;
 import org.apache.spark.sql.Row;
@@ -54,6 +58,9 @@ import org.junit.runners.Parameterized;
 
 import static org.apache.iceberg.TableProperties.SPARK_WRITE_PARTITIONED_FANOUT_ENABLED;
 import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
 
 @RunWith(Parameterized.class)
 public class TestSparkDataWrite {
@@ -78,6 +85,11 @@ public class TestSparkDataWrite {
     TestSparkDataWrite.spark = SparkSession.builder().master("local[2]").getOrCreate();
   }
 
+  @Parameterized.AfterParam
+  public static void clearSourceCache() {
+    ManualSource.clearTables();
+  }
+
   @AfterClass
   public static void stopSpark() {
     SparkSession currentSpark = TestSparkDataWrite.spark;
@@ -592,6 +604,56 @@ public class TestSparkDataWrite {
     }
   }
 
+  @Test
+  public void testCommitUnknownException() throws IOException {
+    File parent = temp.newFolder(format.toString());
+    File location = new File(parent, "commitunknown");
+
+    HadoopTables tables = new HadoopTables(CONF);
+    PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).identity("data").build();
+    Table table = tables.create(SCHEMA, spec, location.toString());
+
+    List<SimpleRecord> records = Lists.newArrayList(
+        new SimpleRecord(1, "a"),
+        new SimpleRecord(2, "b"),
+        new SimpleRecord(3, "c")
+    );
+
+    Dataset<Row> df = spark.createDataFrame(records, SimpleRecord.class);
+
+    AppendFiles append = table.newFastAppend();
+    AppendFiles spyAppend = spy(append);
+    doAnswer(invocation -> {
+      append.commit();
+      throw new CommitStateUnknownException(new RuntimeException("Datacenter on Fire"));
+    }).when(spyAppend).commit();
+
+    Table spyTable = spy(table);
+    when(spyTable.newAppend()).thenReturn(spyAppend);
+    SparkTable sparkTable = new SparkTable(spyTable, false);
+
+    String manualTableName = "unknown_exception";
+    ManualSource.setTable(manualTableName, sparkTable);
+
+    // Although an exception is thrown here, write and commit have succeeded
+    AssertHelpers.assertThrowsWithCause("Should throw a Commit State Unknown Exception",
+        SparkException.class,
+        "Writing job aborted",
+        CommitStateUnknownException.class,
+        "Datacenter on Fire",
+        () -> df.select("id", "data").sort("data").write()
+            .format("org.apache.iceberg.spark.source.ManualSource")
+            .option(ManualSource.TABLE_NAME, manualTableName)
+            .mode(SaveMode.Append)
+            .save(location.toString()));
+
+    // Since write and commit succeeded, the rows should be readable
+    Dataset<Row> result = spark.read().format("iceberg").load(location.toString());
+    List<SimpleRecord> actual = result.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList();
+    Assert.assertEquals("Number of rows should match", records.size(), actual.size());
+    Assert.assertEquals("Result rows should match", records, actual);
+  }
+
   public enum IcebergOptionsType {
     NONE,
     TABLE,