You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by ru...@apache.org on 2022/06/01 14:38:00 UTC

[iceberg] branch 0.13.x updated: Spark: Extend commit unknown exception handling to SparkPositionDeltaWrite (#4893)

This is an automated email from the ASF dual-hosted git repository.

russellspitzer pushed a commit to branch 0.13.x
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/0.13.x by this push:
     new 82be15951 Spark: Extend commit unknown exception handling to SparkPositionDeltaWrite (#4893)
82be15951 is described below

commit 82be159518e4ade8a49f066dfff26fabf1d17f78
Author: Prashant Singh <35...@users.noreply.github.com>
AuthorDate: Wed Jun 1 20:07:52 2022 +0530

    Spark: Extend commit unknown exception handling to SparkPositionDeltaWrite (#4893)
    
    Co-authored-by: Prashant Singh <ps...@amazon.com>
---
 .../spark/extensions/TestMergeOnReadDelete.java    | 66 ++++++++++++++++++++++
 .../spark/source/SparkPositionDeltaWrite.java      | 20 +++++--
 .../iceberg/spark/source/TestSparkCatalog.java     | 17 ++++++
 3 files changed, 99 insertions(+), 4 deletions(-)

diff --git a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadDelete.java b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadDelete.java
index 2316adf66..ae9fac864 100644
--- a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadDelete.java
+++ b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadDelete.java
@@ -20,8 +20,23 @@
 package org.apache.iceberg.spark.extensions;
 
 import java.util.Map;
+import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.RowDelta;
+import org.apache.iceberg.Table;
 import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.exceptions.CommitStateUnknownException;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.spark.source.SparkTable;
+import org.apache.iceberg.spark.source.TestSparkCatalog;
+import org.apache.spark.SparkException;
+import org.apache.spark.sql.connector.catalog.Identifier;
+import org.junit.Test;
+import org.junit.runners.Parameterized;
+
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
 
 public class TestMergeOnReadDelete extends TestDelete {
 
@@ -37,4 +52,55 @@ public class TestMergeOnReadDelete extends TestDelete {
         TableProperties.DELETE_MODE, "merge-on-read"
     );
   }
+
+
+  @Parameterized.AfterParam
+  public static void clearTestSparkCatalogCache() {
+    TestSparkCatalog.clearTables();
+  }
+
+  @Test
+  public void testCommitUnknownException() {
+    createAndInitTable("id INT, dep STRING, category STRING");
+
+    // write unpartitioned files
+    append(tableName, "{ \"id\": 1, \"dep\": \"hr\", \"category\": \"c1\"}");
+    append(tableName, "{ \"id\": 2, \"dep\": \"hr\", \"category\": \"c1\" }\n" +
+        "{ \"id\": 3, \"dep\": \"hr\", \"category\": \"c1\" }");
+
+    Table table = validationCatalog.loadTable(tableIdent);
+
+    RowDelta newRowDelta = table.newRowDelta();
+    RowDelta spyNewRowDelta = spy(newRowDelta);
+    doAnswer(invocation -> {
+      newRowDelta.commit();
+      throw new CommitStateUnknownException(new RuntimeException("Datacenter on Fire"));
+    }).when(spyNewRowDelta).commit();
+
+    Table spyTable = spy(table);
+    when(spyTable.newRowDelta()).thenReturn(spyNewRowDelta);
+    SparkTable sparkTable = new SparkTable(spyTable, false);
+
+    ImmutableMap<String, String> config = ImmutableMap.of(
+        "type", "hive",
+        "default-namespace", "default"
+    );
+    spark.conf().set("spark.sql.catalog.dummy_catalog", "org.apache.iceberg.spark.source.TestSparkCatalog");
+    config.forEach((key, value) -> spark.conf().set("spark.sql.catalog.dummy_catalog." + key, value));
+    Identifier ident = Identifier.of(new String[]{"default"}, "table");
+    TestSparkCatalog.setTable(ident, sparkTable);
+
+    // Although an exception is thrown here, write and commit have succeeded
+    AssertHelpers.assertThrowsWithCause("Should throw a Commit State Unknown Exception",
+        SparkException.class,
+        "Writing job aborted",
+        CommitStateUnknownException.class,
+        "Datacenter on Fire",
+        () -> sql("DELETE FROM %s WHERE id = 2", "dummy_catalog.default.table"));
+
+    // Since write and commit succeeded, the rows should be readable
+    assertEquals("Should have expected rows",
+        ImmutableList.of(row(1, "hr", "c1"), row(3, "hr", "c1")),
+        sql("SELECT * FROM %s ORDER BY id", "dummy_catalog.default.table"));
+  }
 }
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
index 8e26ad80b..558b678bf 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
@@ -38,6 +38,7 @@ import org.apache.iceberg.SnapshotSummary;
 import org.apache.iceberg.SnapshotUpdate;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.deletes.PositionDelete;
+import org.apache.iceberg.exceptions.CommitStateUnknownException;
 import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.expressions.Expressions;
 import org.apache.iceberg.io.ClusteredPositionDeleteWriter;
@@ -92,6 +93,8 @@ class SparkPositionDeltaWrite implements DeltaWrite, RequiresDistributionAndOrde
   private final Distribution requiredDistribution;
   private final SortOrder[] requiredOrdering;
 
+  private boolean cleanupOnAbort = true;
+
   SparkPositionDeltaWrite(SparkSession spark, Table table, Command command, SparkBatchQueryScan scan,
                           IsolationLevel isolationLevel, SparkWriteConf writeConf,
                           ExtendedLogicalWriteInfo info, Schema dataSchema,
@@ -215,6 +218,10 @@ class SparkPositionDeltaWrite implements DeltaWrite, RequiresDistributionAndOrde
 
     @Override
     public void abort(WriterCommitMessage[] messages) {
+      if (!cleanupOnAbort) {
+        return;
+      }
+
       for (WriterCommitMessage message : messages) {
         if (message != null) {
           DeltaTaskCommit taskCommit = (DeltaTaskCommit) message;
@@ -239,10 +246,15 @@ class SparkPositionDeltaWrite implements DeltaWrite, RequiresDistributionAndOrde
         operation.stageOnly();
       }
 
-      long start = System.currentTimeMillis();
-      operation.commit(); // abort is automatically called if this fails
-      long duration = System.currentTimeMillis() - start;
-      LOG.info("Committed in {} ms", duration);
+      try {
+        long start = System.currentTimeMillis();
+        operation.commit(); // abort is automatically called if this fails
+        long duration = System.currentTimeMillis() - start;
+        LOG.info("Committed in {} ms", duration);
+      } catch (CommitStateUnknownException commitStateUnknownException) {
+        cleanupOnAbort = false;
+        throw commitStateUnknownException;
+      }
     }
   }
 
diff --git a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkCatalog.java b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkCatalog.java
index 0e5280875..6b4362b98 100644
--- a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkCatalog.java
+++ b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkCatalog.java
@@ -19,8 +19,11 @@
 
 package org.apache.iceberg.spark.source;
 
+import java.util.Map;
 import org.apache.iceberg.catalog.Namespace;
 import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.spark.Spark3Util;
 import org.apache.iceberg.spark.SparkSessionCatalog;
 import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
@@ -31,8 +34,18 @@ import org.apache.spark.sql.connector.catalog.TableCatalog;
 
 public class TestSparkCatalog<T extends TableCatalog & SupportsNamespaces> extends SparkSessionCatalog<T> {
 
+  private static final Map<Identifier, Table> tableMap = Maps.newHashMap();
+
+  public static void setTable(Identifier ident, Table table) {
+    Preconditions.checkArgument(!tableMap.containsKey(ident), "Cannot set " + ident + ". It is already set");
+    tableMap.put(ident, table);
+  }
+
   @Override
   public Table loadTable(Identifier ident) throws NoSuchTableException {
+    if (tableMap.containsKey(ident)) {
+      return tableMap.get(ident);
+    }
     TableIdentifier tableIdentifier = Spark3Util.identifierToTableIdentifier(ident);
     Namespace namespace = tableIdentifier.namespace();
 
@@ -43,4 +56,8 @@ public class TestSparkCatalog<T extends TableCatalog & SupportsNamespaces> exten
 
     return new SparkTable(table, false);
   }
+
+  public static void clearTables() {
+    tableMap.clear();
+  }
 }