You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2022/05/03 22:49:14 UTC

[GitHub] [iceberg] RussellSpitzer opened a new pull request, #4687: Spark commit unknown handling

RussellSpitzer opened a new pull request, #4687:
URL: https://github.com/apache/iceberg/pull/4687

   The issue here was pointed out by @stevenzwu when we were discussing the issue with incorrect aborts when non-runtime exceptions were thrown. He correctly noticed that the same sort of issue would appear if we threw a commit unknown exception and the Spark Writer aborted and deleted underlying data files.
   
   Closes #4686
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on pull request #4687: Spark commit unknown handling

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on PR #4687:
URL: https://github.com/apache/iceberg/pull/4687#issuecomment-1120105616

   Thanks @flyrain , @stevenzwu and @aokolnychyi for reviews!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a diff in pull request #4687: Spark commit unknown handling

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on code in PR #4687:
URL: https://github.com/apache/iceberg/pull/4687#discussion_r867179750


##########
spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java:
##########
@@ -588,6 +600,56 @@ public void partitionedCreateWithTargetFileSizeViaOption(IcebergOptionsType opti
     Assert.assertTrue("All DataFiles contain 1000 rows", files.stream().allMatch(d -> d.recordCount() == 1000));
   }
 
+  @Test
+  public void testCommitUnknownException() throws IOException {
+    File parent = temp.newFolder(format.toString());
+    File location = new File(parent, "commitunknown");
+
+    HadoopTables tables = new HadoopTables(CONF);
+    PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).identity("data").build();
+    Table table = tables.create(SCHEMA, spec, location.toString());
+
+    List<SimpleRecord> records = Lists.newArrayList(
+        new SimpleRecord(1, "a"),
+        new SimpleRecord(2, "b"),
+        new SimpleRecord(3, "c")
+    );
+
+    Dataset<Row> df = spark.createDataFrame(records, SimpleRecord.class);
+
+    AppendFiles append = table.newFastAppend();
+    AppendFiles spyAppend = spy(append);
+    doAnswer(invocation -> {
+      append.commit();
+      throw new CommitStateUnknownException(new RuntimeException("Datacenter on Fire"));
+    }).when(spyAppend).commit();
+
+    Table spyTable = spy(table);
+    when(spyTable.newAppend()).thenReturn(spyAppend);
+    SparkTable sparkTable = new SparkTable(spyTable, false);
+
+    String manualTableName = "unknown_exception";
+    ManualSource.setTable(manualTableName, sparkTable);
+
+    // Although an exception is thrown here, write and commit have succeeded
+    AssertHelpers.assertThrowsWithCause("Should throw a Commit State Unknown Exception",
+        SparkException.class,

Review Comment:
   This tests the everything in our normal path except for Spark Catalog Operations. All of the other pathways behave the same except for the "loadTable" method.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] flyrain commented on pull request #4687: Spark commit unknown handling

Posted by GitBox <gi...@apache.org>.
flyrain commented on PR #4687:
URL: https://github.com/apache/iceberg/pull/4687#issuecomment-1116766515

   LGTM. Can we manager to add a test case?
   A little bit comment on the unknown exception.  If the commit actually failed, which is more likely, the job failed, and we've got a bunch of orphan file. It is pretty tricky when the commit actually succeeded, it throws unknown exception, the job failed. What if users retry the job? Users may append the same data twice in case of retry.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a diff in pull request #4687: Spark commit unknown handling

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on code in PR #4687:
URL: https://github.com/apache/iceberg/pull/4687#discussion_r867048887


##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java:
##########
@@ -196,24 +199,33 @@ private void commitOperation(SnapshotUpdate<?> operation, String description) {
     }
 
     long start = System.currentTimeMillis();
-    operation.commit(); // abort is automatically called if this fails
-    long duration = System.currentTimeMillis() - start;
-    LOG.info("Committed in {} ms", duration);
+    try {
+      operation.commit(); // abort is automatically called if this fails
+      long duration = System.currentTimeMillis() - start;
+      LOG.info("Committed in {} ms", duration);
+    } catch (CommitStateUnknownException commitStateUnknownException) {

Review Comment:
   This is our current doc for the underlying commit function
   ```
      * Implementations must throw a {@link org.apache.iceberg.exceptions.CommitStateUnknownException}
      * in cases where it cannot be determined if the commit succeeded or failed.
      * For example if a network partition causes the confirmation of the commit to be lost,
      * the implementation should throw a CommitStateUnknownException. This is important because downstream users of
      * this API need to know whether they can clean up the commit or not, if the state is unknown then it is not safe
      * to remove any files. All other exceptions will be treated as if the commit has failed.
      ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a diff in pull request #4687: Spark commit unknown handling

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on code in PR #4687:
URL: https://github.com/apache/iceberg/pull/4687#discussion_r867179215


##########
spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java:
##########
@@ -588,6 +600,56 @@ public void partitionedCreateWithTargetFileSizeViaOption(IcebergOptionsType opti
     Assert.assertTrue("All DataFiles contain 1000 rows", files.stream().allMatch(d -> d.recordCount() == 1000));
   }
 
+  @Test
+  public void testCommitUnknownException() throws IOException {
+    File parent = temp.newFolder(format.toString());
+    File location = new File(parent, "commitunknown");
+
+    HadoopTables tables = new HadoopTables(CONF);
+    PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).identity("data").build();
+    Table table = tables.create(SCHEMA, spec, location.toString());
+
+    List<SimpleRecord> records = Lists.newArrayList(
+        new SimpleRecord(1, "a"),
+        new SimpleRecord(2, "b"),
+        new SimpleRecord(3, "c")
+    );
+
+    Dataset<Row> df = spark.createDataFrame(records, SimpleRecord.class);
+
+    AppendFiles append = table.newFastAppend();
+    AppendFiles spyAppend = spy(append);
+    doAnswer(invocation -> {
+      append.commit();
+      throw new CommitStateUnknownException(new RuntimeException("Datacenter on Fire"));
+    }).when(spyAppend).commit();
+
+    Table spyTable = spy(table);

Review Comment:
   We need a spy Table here so we can have it return our Spy Append



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #4687: Spark commit unknown handling

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on code in PR #4687:
URL: https://github.com/apache/iceberg/pull/4687#discussion_r867196390


##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java:
##########
@@ -196,24 +199,33 @@ private void commitOperation(SnapshotUpdate<?> operation, String description) {
     }
 
     long start = System.currentTimeMillis();
-    operation.commit(); // abort is automatically called if this fails
-    long duration = System.currentTimeMillis() - start;
-    LOG.info("Committed in {} ms", duration);
+    try {
+      operation.commit(); // abort is automatically called if this fails
+      long duration = System.currentTimeMillis() - start;
+      LOG.info("Committed in {} ms", duration);
+    } catch (CommitStateUnknownException commitStateUnknownException) {
+      cleanupOnAbort = false;
+      throw commitStateUnknownException;
+    }
   }
 
   private void abort(WriterCommitMessage[] messages) {
-    Map<String, String> props = table.properties();
-    Tasks.foreach(files(messages))
-        .retry(PropertyUtil.propertyAsInt(props, COMMIT_NUM_RETRIES, COMMIT_NUM_RETRIES_DEFAULT))
-        .exponentialBackoff(
-            PropertyUtil.propertyAsInt(props, COMMIT_MIN_RETRY_WAIT_MS, COMMIT_MIN_RETRY_WAIT_MS_DEFAULT),
-            PropertyUtil.propertyAsInt(props, COMMIT_MAX_RETRY_WAIT_MS, COMMIT_MAX_RETRY_WAIT_MS_DEFAULT),
-            PropertyUtil.propertyAsInt(props, COMMIT_TOTAL_RETRY_TIME_MS, COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT),
-            2.0 /* exponential */)
-        .throwFailureWhenFinished()
-        .run(file -> {
-          table.io().deleteFile(file.path().toString());
-        });
+    if (cleanupOnAbort) {
+      Map<String, String> props = table.properties();
+      Tasks.foreach(files(messages))
+          .retry(PropertyUtil.propertyAsInt(props, COMMIT_NUM_RETRIES, COMMIT_NUM_RETRIES_DEFAULT))
+          .exponentialBackoff(
+              PropertyUtil.propertyAsInt(props, COMMIT_MIN_RETRY_WAIT_MS, COMMIT_MIN_RETRY_WAIT_MS_DEFAULT),
+              PropertyUtil.propertyAsInt(props, COMMIT_MAX_RETRY_WAIT_MS, COMMIT_MAX_RETRY_WAIT_MS_DEFAULT),
+              PropertyUtil.propertyAsInt(props, COMMIT_TOTAL_RETRY_TIME_MS, COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT),
+              2.0 /* exponential */)
+          .throwFailureWhenFinished()
+          .run(file -> {
+            table.io().deleteFile(file.path().toString());
+          });
+    } else {
+      LOG.error("Skipping cleaning up of data files because Iceberg was unable to determine the final commit state");

Review Comment:
   nit: would warn be more appropriate?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a diff in pull request #4687: Spark commit unknown handling

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on code in PR #4687:
URL: https://github.com/apache/iceberg/pull/4687#discussion_r867054937


##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java:
##########
@@ -196,24 +199,33 @@ private void commitOperation(SnapshotUpdate<?> operation, String description) {
     }
 
     long start = System.currentTimeMillis();
-    operation.commit(); // abort is automatically called if this fails
-    long duration = System.currentTimeMillis() - start;
-    LOG.info("Committed in {} ms", duration);
+    try {
+      operation.commit(); // abort is automatically called if this fails
+      long duration = System.currentTimeMillis() - start;
+      LOG.info("Committed in {} ms", duration);
+    } catch (CommitStateUnknownException commitStateUnknownException) {

Review Comment:
   a separate follow-up sounds good to me



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] flyrain commented on pull request #4687: Spark commit unknown handling

Posted by GitBox <gi...@apache.org>.
flyrain commented on PR #4687:
URL: https://github.com/apache/iceberg/pull/4687#issuecomment-1119993307

   The test looks good!!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #4687: Spark commit unknown handling

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on code in PR #4687:
URL: https://github.com/apache/iceberg/pull/4687#discussion_r867195583


##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java:
##########
@@ -196,24 +199,33 @@ private void commitOperation(SnapshotUpdate<?> operation, String description) {
     }
 
     long start = System.currentTimeMillis();
-    operation.commit(); // abort is automatically called if this fails
-    long duration = System.currentTimeMillis() - start;
-    LOG.info("Committed in {} ms", duration);
+    try {
+      operation.commit(); // abort is automatically called if this fails
+      long duration = System.currentTimeMillis() - start;
+      LOG.info("Committed in {} ms", duration);
+    } catch (CommitStateUnknownException commitStateUnknownException) {

Review Comment:
   It is an interesting idea to interpret all exceptions apart from `CommitFailedException` as commit unknown exceptions but I also worry it is not how the core behaves right now. I'd prefer this approach for now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] kbendick commented on a diff in pull request #4687: Spark commit unknown handling

Posted by GitBox <gi...@apache.org>.
kbendick commented on code in PR #4687:
URL: https://github.com/apache/iceberg/pull/4687#discussion_r865174975


##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java:
##########
@@ -196,24 +199,33 @@ private void commitOperation(SnapshotUpdate<?> operation, String description) {
     }
 
     long start = System.currentTimeMillis();
-    operation.commit(); // abort is automatically called if this fails
-    long duration = System.currentTimeMillis() - start;
-    LOG.info("Committed in {} ms", duration);
+    try {
+      operation.commit(); // abort is automatically called if this fails
+      long duration = System.currentTimeMillis() - start;
+      LOG.info("Committed in {} ms", duration);
+    } catch (CommitStateUnknownException commitStateUnknownException) {
+      cleanupOnAbort = false;
+      throw commitStateUnknownException;
+    }
   }
 
   private void abort(WriterCommitMessage[] messages) {
-    Map<String, String> props = table.properties();
-    Tasks.foreach(files(messages))
-        .retry(PropertyUtil.propertyAsInt(props, COMMIT_NUM_RETRIES, COMMIT_NUM_RETRIES_DEFAULT))
-        .exponentialBackoff(
-            PropertyUtil.propertyAsInt(props, COMMIT_MIN_RETRY_WAIT_MS, COMMIT_MIN_RETRY_WAIT_MS_DEFAULT),
-            PropertyUtil.propertyAsInt(props, COMMIT_MAX_RETRY_WAIT_MS, COMMIT_MAX_RETRY_WAIT_MS_DEFAULT),
-            PropertyUtil.propertyAsInt(props, COMMIT_TOTAL_RETRY_TIME_MS, COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT),
-            2.0 /* exponential */)
-        .throwFailureWhenFinished()
-        .run(file -> {
-          table.io().deleteFile(file.path().toString());
-        });
+    if (cleanupOnAbort) {
+      Map<String, String> props = table.properties();
+      Tasks.foreach(files(messages))
+          .retry(PropertyUtil.propertyAsInt(props, COMMIT_NUM_RETRIES, COMMIT_NUM_RETRIES_DEFAULT))
+          .exponentialBackoff(
+              PropertyUtil.propertyAsInt(props, COMMIT_MIN_RETRY_WAIT_MS, COMMIT_MIN_RETRY_WAIT_MS_DEFAULT),
+              PropertyUtil.propertyAsInt(props, COMMIT_MAX_RETRY_WAIT_MS, COMMIT_MAX_RETRY_WAIT_MS_DEFAULT),
+              PropertyUtil.propertyAsInt(props, COMMIT_TOTAL_RETRY_TIME_MS, COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT),
+              2.0 /* exponential */)
+          .throwFailureWhenFinished()
+          .run(file -> {
+            table.io().deleteFile(file.path().toString());
+          });
+    } else {
+      LOG.error("Skipping cleaning up of data files, CommitStateUnknown");

Review Comment:
   +1 to the usage of plain english language for logs.
   
   People see technical terms and start to get more concerned for whatever reason or thinking they've encountered some sort of novel issue.
   
   `CommitStateUnknown` is something I would even change, but it's fine relatively speaking.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a diff in pull request #4687: Spark commit unknown handling

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on code in PR #4687:
URL: https://github.com/apache/iceberg/pull/4687#discussion_r866989317


##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java:
##########
@@ -196,24 +199,33 @@ private void commitOperation(SnapshotUpdate<?> operation, String description) {
     }
 
     long start = System.currentTimeMillis();
-    operation.commit(); // abort is automatically called if this fails
-    long duration = System.currentTimeMillis() - start;
-    LOG.info("Committed in {} ms", duration);
+    try {
+      operation.commit(); // abort is automatically called if this fails
+      long duration = System.currentTimeMillis() - start;
+      LOG.info("Committed in {} ms", duration);
+    } catch (CommitStateUnknownException commitStateUnknownException) {

Review Comment:
   Let's leave this for a follow up then? I'm a little worried about messing more with SnapshotProducer right now.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #4687: Spark commit unknown handling

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on code in PR #4687:
URL: https://github.com/apache/iceberg/pull/4687#discussion_r867196629


##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java:
##########
@@ -196,24 +199,33 @@ private void commitOperation(SnapshotUpdate<?> operation, String description) {
     }
 
     long start = System.currentTimeMillis();

Review Comment:
   nit: you could have grabbed this line too as they belong together.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer merged pull request #4687: Spark commit unknown handling

Posted by GitBox <gi...@apache.org>.
RussellSpitzer merged PR #4687:
URL: https://github.com/apache/iceberg/pull/4687


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a diff in pull request #4687: Spark commit unknown handling

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on code in PR #4687:
URL: https://github.com/apache/iceberg/pull/4687#discussion_r867178537


##########
spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/ManualSource.java:
##########
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.util.Map;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.spark.sql.connector.catalog.Table;
+import org.apache.spark.sql.connector.catalog.TableProvider;
+import org.apache.spark.sql.connector.expressions.Transform;
+import org.apache.spark.sql.sources.DataSourceRegister;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+
+public class ManualSource implements TableProvider, DataSourceRegister {

Review Comment:
   This is a fake source which can have Tables loaded into it at Runtime. This is important because we need to load in Mocks that do specific things we want them to do in tests.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on pull request #4687: Spark commit unknown handling

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on PR #4687:
URL: https://github.com/apache/iceberg/pull/4687#issuecomment-1119964591

   @stevenzwu , @flyrain + @kbendick updated PR based on comments.
   
   @aokolnychyi could you also take a look?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a diff in pull request #4687: Spark commit unknown handling

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on code in PR #4687:
URL: https://github.com/apache/iceberg/pull/4687#discussion_r867251585


##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java:
##########
@@ -196,24 +199,33 @@ private void commitOperation(SnapshotUpdate<?> operation, String description) {
     }
 
     long start = System.currentTimeMillis();
-    operation.commit(); // abort is automatically called if this fails
-    long duration = System.currentTimeMillis() - start;
-    LOG.info("Committed in {} ms", duration);
+    try {
+      operation.commit(); // abort is automatically called if this fails
+      long duration = System.currentTimeMillis() - start;
+      LOG.info("Committed in {} ms", duration);
+    } catch (CommitStateUnknownException commitStateUnknownException) {
+      cleanupOnAbort = false;
+      throw commitStateUnknownException;
+    }
   }
 
   private void abort(WriterCommitMessage[] messages) {
-    Map<String, String> props = table.properties();
-    Tasks.foreach(files(messages))
-        .retry(PropertyUtil.propertyAsInt(props, COMMIT_NUM_RETRIES, COMMIT_NUM_RETRIES_DEFAULT))
-        .exponentialBackoff(
-            PropertyUtil.propertyAsInt(props, COMMIT_MIN_RETRY_WAIT_MS, COMMIT_MIN_RETRY_WAIT_MS_DEFAULT),
-            PropertyUtil.propertyAsInt(props, COMMIT_MAX_RETRY_WAIT_MS, COMMIT_MAX_RETRY_WAIT_MS_DEFAULT),
-            PropertyUtil.propertyAsInt(props, COMMIT_TOTAL_RETRY_TIME_MS, COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT),
-            2.0 /* exponential */)
-        .throwFailureWhenFinished()
-        .run(file -> {
-          table.io().deleteFile(file.path().toString());
-        });
+    if (cleanupOnAbort) {
+      Map<String, String> props = table.properties();
+      Tasks.foreach(files(messages))
+          .retry(PropertyUtil.propertyAsInt(props, COMMIT_NUM_RETRIES, COMMIT_NUM_RETRIES_DEFAULT))
+          .exponentialBackoff(
+              PropertyUtil.propertyAsInt(props, COMMIT_MIN_RETRY_WAIT_MS, COMMIT_MIN_RETRY_WAIT_MS_DEFAULT),
+              PropertyUtil.propertyAsInt(props, COMMIT_MAX_RETRY_WAIT_MS, COMMIT_MAX_RETRY_WAIT_MS_DEFAULT),
+              PropertyUtil.propertyAsInt(props, COMMIT_TOTAL_RETRY_TIME_MS, COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT),
+              2.0 /* exponential */)
+          .throwFailureWhenFinished()
+          .run(file -> {
+            table.io().deleteFile(file.path().toString());
+          });
+    } else {
+      LOG.error("Skipping cleaning up of data files because Iceberg was unable to determine the final commit state");

Review Comment:
   We are about to abort, so I was mixed ... something is going wrong and we will be failing



##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java:
##########
@@ -196,24 +199,33 @@ private void commitOperation(SnapshotUpdate<?> operation, String description) {
     }
 
     long start = System.currentTimeMillis();
-    operation.commit(); // abort is automatically called if this fails
-    long duration = System.currentTimeMillis() - start;
-    LOG.info("Committed in {} ms", duration);
+    try {
+      operation.commit(); // abort is automatically called if this fails
+      long duration = System.currentTimeMillis() - start;
+      LOG.info("Committed in {} ms", duration);
+    } catch (CommitStateUnknownException commitStateUnknownException) {
+      cleanupOnAbort = false;
+      throw commitStateUnknownException;
+    }
   }
 
   private void abort(WriterCommitMessage[] messages) {
-    Map<String, String> props = table.properties();
-    Tasks.foreach(files(messages))
-        .retry(PropertyUtil.propertyAsInt(props, COMMIT_NUM_RETRIES, COMMIT_NUM_RETRIES_DEFAULT))
-        .exponentialBackoff(
-            PropertyUtil.propertyAsInt(props, COMMIT_MIN_RETRY_WAIT_MS, COMMIT_MIN_RETRY_WAIT_MS_DEFAULT),
-            PropertyUtil.propertyAsInt(props, COMMIT_MAX_RETRY_WAIT_MS, COMMIT_MAX_RETRY_WAIT_MS_DEFAULT),
-            PropertyUtil.propertyAsInt(props, COMMIT_TOTAL_RETRY_TIME_MS, COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT),
-            2.0 /* exponential */)
-        .throwFailureWhenFinished()
-        .run(file -> {
-          table.io().deleteFile(file.path().toString());
-        });
+    if (cleanupOnAbort) {
+      Map<String, String> props = table.properties();
+      Tasks.foreach(files(messages))
+          .retry(PropertyUtil.propertyAsInt(props, COMMIT_NUM_RETRIES, COMMIT_NUM_RETRIES_DEFAULT))
+          .exponentialBackoff(
+              PropertyUtil.propertyAsInt(props, COMMIT_MIN_RETRY_WAIT_MS, COMMIT_MIN_RETRY_WAIT_MS_DEFAULT),
+              PropertyUtil.propertyAsInt(props, COMMIT_MAX_RETRY_WAIT_MS, COMMIT_MAX_RETRY_WAIT_MS_DEFAULT),
+              PropertyUtil.propertyAsInt(props, COMMIT_TOTAL_RETRY_TIME_MS, COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT),
+              2.0 /* exponential */)
+          .throwFailureWhenFinished()
+          .run(file -> {
+            table.io().deleteFile(file.path().toString());
+          });
+    } else {
+      LOG.error("Skipping cleaning up of data files because Iceberg was unable to determine the final commit state");

Review Comment:
   We are about to abort, so I was mixed ... something is going wrong and we will be failing. I can do warn



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a diff in pull request #4687: Spark commit unknown handling

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on code in PR #4687:
URL: https://github.com/apache/iceberg/pull/4687#discussion_r864411205


##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java:
##########
@@ -196,24 +199,33 @@ private void commitOperation(SnapshotUpdate<?> operation, String description) {
     }
 
     long start = System.currentTimeMillis();
-    operation.commit(); // abort is automatically called if this fails
-    long duration = System.currentTimeMillis() - start;
-    LOG.info("Committed in {} ms", duration);
+    try {
+      operation.commit(); // abort is automatically called if this fails
+      long duration = System.currentTimeMillis() - start;
+      LOG.info("Committed in {} ms", duration);
+    } catch (CommitStateUnknownException commitStateUnknownException) {
+      cleanupOnAbort = false;
+      throw commitStateUnknownException;
+    }
   }
 
   private void abort(WriterCommitMessage[] messages) {
-    Map<String, String> props = table.properties();
-    Tasks.foreach(files(messages))
-        .retry(PropertyUtil.propertyAsInt(props, COMMIT_NUM_RETRIES, COMMIT_NUM_RETRIES_DEFAULT))
-        .exponentialBackoff(
-            PropertyUtil.propertyAsInt(props, COMMIT_MIN_RETRY_WAIT_MS, COMMIT_MIN_RETRY_WAIT_MS_DEFAULT),
-            PropertyUtil.propertyAsInt(props, COMMIT_MAX_RETRY_WAIT_MS, COMMIT_MAX_RETRY_WAIT_MS_DEFAULT),
-            PropertyUtil.propertyAsInt(props, COMMIT_TOTAL_RETRY_TIME_MS, COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT),
-            2.0 /* exponential */)
-        .throwFailureWhenFinished()
-        .run(file -> {
-          table.io().deleteFile(file.path().toString());
-        });
+    if (cleanupOnAbort) {
+      Map<String, String> props = table.properties();
+      Tasks.foreach(files(messages))
+          .retry(PropertyUtil.propertyAsInt(props, COMMIT_NUM_RETRIES, COMMIT_NUM_RETRIES_DEFAULT))
+          .exponentialBackoff(
+              PropertyUtil.propertyAsInt(props, COMMIT_MIN_RETRY_WAIT_MS, COMMIT_MIN_RETRY_WAIT_MS_DEFAULT),
+              PropertyUtil.propertyAsInt(props, COMMIT_MAX_RETRY_WAIT_MS, COMMIT_MAX_RETRY_WAIT_MS_DEFAULT),
+              PropertyUtil.propertyAsInt(props, COMMIT_TOTAL_RETRY_TIME_MS, COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT),
+              2.0 /* exponential */)
+          .throwFailureWhenFinished()
+          .run(file -> {
+            table.io().deleteFile(file.path().toString());
+          });
+    } else {
+      LOG.error("Skipping cleaning up of data files, CommitStateUnknown");

Review Comment:
   nit: I have been getting review feedbacks that error msg shouldn't contain the variable or class names. maybe `Skipping cleaning up of data files as commit state is unknown`.
   
   If in the future, we have other reasons for skipping abort. We can rename `cleanupOnAbort` to `skipAbortCleanupReason` and make it a string to carry the reason. If it is `null`,  we continue the regular abort process.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a diff in pull request #4687: Spark commit unknown handling

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on code in PR #4687:
URL: https://github.com/apache/iceberg/pull/4687#discussion_r865079497


##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java:
##########
@@ -196,24 +199,33 @@ private void commitOperation(SnapshotUpdate<?> operation, String description) {
     }
 
     long start = System.currentTimeMillis();
-    operation.commit(); // abort is automatically called if this fails
-    long duration = System.currentTimeMillis() - start;
-    LOG.info("Committed in {} ms", duration);
+    try {
+      operation.commit(); // abort is automatically called if this fails
+      long duration = System.currentTimeMillis() - start;
+      LOG.info("Committed in {} ms", duration);
+    } catch (CommitStateUnknownException commitStateUnknownException) {

Review Comment:
   @RussellSpitzer here is what I meant in the meeting. Instead of catching exceptions that we should skip abort, we can only catch `CommitFailedException` where SparkWrite should execute the abort action. For all other exceptions, we skip abort as we don't know whether it is safe or not. That may require change in iceberg-core to wrap all exceptions from commit action with `CommitFailedException` if it is not the case already.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on pull request #4687: Spark commit unknown handling

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on PR #4687:
URL: https://github.com/apache/iceberg/pull/4687#issuecomment-1116787267

   I do want to, I think I need to mock the table, writerBuilder and writer In order to get the exception in the right place. I'll try tomorrow 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a diff in pull request #4687: Spark commit unknown handling

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on code in PR #4687:
URL: https://github.com/apache/iceberg/pull/4687#discussion_r867179071


##########
spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java:
##########
@@ -588,6 +600,56 @@ public void partitionedCreateWithTargetFileSizeViaOption(IcebergOptionsType opti
     Assert.assertTrue("All DataFiles contain 1000 rows", files.stream().allMatch(d -> d.recordCount() == 1000));
   }
 
+  @Test
+  public void testCommitUnknownException() throws IOException {
+    File parent = temp.newFolder(format.toString());
+    File location = new File(parent, "commitunknown");
+
+    HadoopTables tables = new HadoopTables(CONF);
+    PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).identity("data").build();
+    Table table = tables.create(SCHEMA, spec, location.toString());
+
+    List<SimpleRecord> records = Lists.newArrayList(
+        new SimpleRecord(1, "a"),
+        new SimpleRecord(2, "b"),
+        new SimpleRecord(3, "c")
+    );
+
+    Dataset<Row> df = spark.createDataFrame(records, SimpleRecord.class);
+
+    AppendFiles append = table.newFastAppend();

Review Comment:
   The Spy Append here does the real work of throwing the exception



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a diff in pull request #4687: Spark commit unknown handling

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on code in PR #4687:
URL: https://github.com/apache/iceberg/pull/4687#discussion_r864410262


##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java:
##########
@@ -196,24 +199,33 @@ private void commitOperation(SnapshotUpdate<?> operation, String description) {
     }
 
     long start = System.currentTimeMillis();
-    operation.commit(); // abort is automatically called if this fails
-    long duration = System.currentTimeMillis() - start;
-    LOG.info("Committed in {} ms", duration);
+    try {
+      operation.commit(); // abort is automatically called if this fails
+      long duration = System.currentTimeMillis() - start;
+      LOG.info("Committed in {} ms", duration);
+    } catch (CommitStateUnknownException commitStateUnknownException) {
+      cleanupOnAbort = false;
+      throw commitStateUnknownException;
+    }
   }
 
   private void abort(WriterCommitMessage[] messages) {
-    Map<String, String> props = table.properties();
-    Tasks.foreach(files(messages))
-        .retry(PropertyUtil.propertyAsInt(props, COMMIT_NUM_RETRIES, COMMIT_NUM_RETRIES_DEFAULT))
-        .exponentialBackoff(
-            PropertyUtil.propertyAsInt(props, COMMIT_MIN_RETRY_WAIT_MS, COMMIT_MIN_RETRY_WAIT_MS_DEFAULT),
-            PropertyUtil.propertyAsInt(props, COMMIT_MAX_RETRY_WAIT_MS, COMMIT_MAX_RETRY_WAIT_MS_DEFAULT),
-            PropertyUtil.propertyAsInt(props, COMMIT_TOTAL_RETRY_TIME_MS, COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT),
-            2.0 /* exponential */)
-        .throwFailureWhenFinished()
-        .run(file -> {
-          table.io().deleteFile(file.path().toString());
-        });
+    if (cleanupOnAbort) {

Review Comment:
   clever way of working around the Spark interface on abort handling. I assume a `SparkWrite` object is only used for a one-time write, right? If it can be used for multiple writes/commits, we would need to reset this variable.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a diff in pull request #4687: Spark commit unknown handling

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on code in PR #4687:
URL: https://github.com/apache/iceberg/pull/4687#discussion_r864851603


##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java:
##########
@@ -196,24 +199,33 @@ private void commitOperation(SnapshotUpdate<?> operation, String description) {
     }
 
     long start = System.currentTimeMillis();
-    operation.commit(); // abort is automatically called if this fails
-    long duration = System.currentTimeMillis() - start;
-    LOG.info("Committed in {} ms", duration);
+    try {
+      operation.commit(); // abort is automatically called if this fails
+      long duration = System.currentTimeMillis() - start;
+      LOG.info("Committed in {} ms", duration);
+    } catch (CommitStateUnknownException commitStateUnknownException) {
+      cleanupOnAbort = false;
+      throw commitStateUnknownException;
+    }
   }
 
   private void abort(WriterCommitMessage[] messages) {
-    Map<String, String> props = table.properties();
-    Tasks.foreach(files(messages))
-        .retry(PropertyUtil.propertyAsInt(props, COMMIT_NUM_RETRIES, COMMIT_NUM_RETRIES_DEFAULT))
-        .exponentialBackoff(
-            PropertyUtil.propertyAsInt(props, COMMIT_MIN_RETRY_WAIT_MS, COMMIT_MIN_RETRY_WAIT_MS_DEFAULT),
-            PropertyUtil.propertyAsInt(props, COMMIT_MAX_RETRY_WAIT_MS, COMMIT_MAX_RETRY_WAIT_MS_DEFAULT),
-            PropertyUtil.propertyAsInt(props, COMMIT_TOTAL_RETRY_TIME_MS, COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT),
-            2.0 /* exponential */)
-        .throwFailureWhenFinished()
-        .run(file -> {
-          table.io().deleteFile(file.path().toString());
-        });
+    if (cleanupOnAbort) {
+      Map<String, String> props = table.properties();
+      Tasks.foreach(files(messages))
+          .retry(PropertyUtil.propertyAsInt(props, COMMIT_NUM_RETRIES, COMMIT_NUM_RETRIES_DEFAULT))
+          .exponentialBackoff(
+              PropertyUtil.propertyAsInt(props, COMMIT_MIN_RETRY_WAIT_MS, COMMIT_MIN_RETRY_WAIT_MS_DEFAULT),
+              PropertyUtil.propertyAsInt(props, COMMIT_MAX_RETRY_WAIT_MS, COMMIT_MAX_RETRY_WAIT_MS_DEFAULT),
+              PropertyUtil.propertyAsInt(props, COMMIT_TOTAL_RETRY_TIME_MS, COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT),
+              2.0 /* exponential */)
+          .throwFailureWhenFinished()
+          .run(file -> {
+            table.io().deleteFile(file.path().toString());
+          });
+    } else {
+      LOG.error("Skipping cleaning up of data files, CommitStateUnknown");

Review Comment:
   That's a good call, i wasn't sure what to do here but I had the same instinct you are mentioning here. I really should change it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a diff in pull request #4687: Spark commit unknown handling

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on code in PR #4687:
URL: https://github.com/apache/iceberg/pull/4687#discussion_r864851002


##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java:
##########
@@ -196,24 +199,33 @@ private void commitOperation(SnapshotUpdate<?> operation, String description) {
     }
 
     long start = System.currentTimeMillis();
-    operation.commit(); // abort is automatically called if this fails
-    long duration = System.currentTimeMillis() - start;
-    LOG.info("Committed in {} ms", duration);
+    try {
+      operation.commit(); // abort is automatically called if this fails
+      long duration = System.currentTimeMillis() - start;
+      LOG.info("Committed in {} ms", duration);
+    } catch (CommitStateUnknownException commitStateUnknownException) {
+      cleanupOnAbort = false;
+      throw commitStateUnknownException;
+    }
   }
 
   private void abort(WriterCommitMessage[] messages) {
-    Map<String, String> props = table.properties();
-    Tasks.foreach(files(messages))
-        .retry(PropertyUtil.propertyAsInt(props, COMMIT_NUM_RETRIES, COMMIT_NUM_RETRIES_DEFAULT))
-        .exponentialBackoff(
-            PropertyUtil.propertyAsInt(props, COMMIT_MIN_RETRY_WAIT_MS, COMMIT_MIN_RETRY_WAIT_MS_DEFAULT),
-            PropertyUtil.propertyAsInt(props, COMMIT_MAX_RETRY_WAIT_MS, COMMIT_MAX_RETRY_WAIT_MS_DEFAULT),
-            PropertyUtil.propertyAsInt(props, COMMIT_TOTAL_RETRY_TIME_MS, COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT),
-            2.0 /* exponential */)
-        .throwFailureWhenFinished()
-        .run(file -> {
-          table.io().deleteFile(file.path().toString());
-        });
+    if (cleanupOnAbort) {

Review Comment:
   Yep, one "SparkWrite" per Spark Job Sink. The object cannot be reused in another plan



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org