You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by ru...@apache.org on 2022/06/01 14:38:00 UTC
[iceberg] branch 0.13.x updated: Spark: Extend commit unknown exception handling to SparkPositionDeltaWrite (#4893)
This is an automated email from the ASF dual-hosted git repository.
russellspitzer pushed a commit to branch 0.13.x
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/0.13.x by this push:
new 82be15951 Spark: Extend commit unknown exception handling to SparkPositionDeltaWrite (#4893)
82be15951 is described below
commit 82be159518e4ade8a49f066dfff26fabf1d17f78
Author: Prashant Singh <35...@users.noreply.github.com>
AuthorDate: Wed Jun 1 20:07:52 2022 +0530
Spark: Extend commit unknown exception handling to SparkPositionDeltaWrite (#4893)
Co-authored-by: Prashant Singh <ps...@amazon.com>
---
.../spark/extensions/TestMergeOnReadDelete.java | 66 ++++++++++++++++++++++
.../spark/source/SparkPositionDeltaWrite.java | 20 +++++--
.../iceberg/spark/source/TestSparkCatalog.java | 17 ++++++
3 files changed, 99 insertions(+), 4 deletions(-)
diff --git a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadDelete.java b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadDelete.java
index 2316adf66..ae9fac864 100644
--- a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadDelete.java
+++ b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadDelete.java
@@ -20,8 +20,23 @@
package org.apache.iceberg.spark.extensions;
import java.util.Map;
+import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.RowDelta;
+import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.exceptions.CommitStateUnknownException;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.spark.source.SparkTable;
+import org.apache.iceberg.spark.source.TestSparkCatalog;
+import org.apache.spark.SparkException;
+import org.apache.spark.sql.connector.catalog.Identifier;
+import org.junit.Test;
+import org.junit.runners.Parameterized;
+
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
public class TestMergeOnReadDelete extends TestDelete {
@@ -37,4 +52,55 @@ public class TestMergeOnReadDelete extends TestDelete {
TableProperties.DELETE_MODE, "merge-on-read"
);
}
+
+
+ @Parameterized.AfterParam
+ public static void clearTestSparkCatalogCache() {
+ TestSparkCatalog.clearTables();
+ }
+
+ @Test
+ public void testCommitUnknownException() {
+ createAndInitTable("id INT, dep STRING, category STRING");
+
+ // write unpartitioned files
+ append(tableName, "{ \"id\": 1, \"dep\": \"hr\", \"category\": \"c1\"}");
+ append(tableName, "{ \"id\": 2, \"dep\": \"hr\", \"category\": \"c1\" }\n" +
+ "{ \"id\": 3, \"dep\": \"hr\", \"category\": \"c1\" }");
+
+ Table table = validationCatalog.loadTable(tableIdent);
+
+ RowDelta newRowDelta = table.newRowDelta();
+ RowDelta spyNewRowDelta = spy(newRowDelta);
+ doAnswer(invocation -> {
+ newRowDelta.commit();
+ throw new CommitStateUnknownException(new RuntimeException("Datacenter on Fire"));
+ }).when(spyNewRowDelta).commit();
+
+ Table spyTable = spy(table);
+ when(spyTable.newRowDelta()).thenReturn(spyNewRowDelta);
+ SparkTable sparkTable = new SparkTable(spyTable, false);
+
+ ImmutableMap<String, String> config = ImmutableMap.of(
+ "type", "hive",
+ "default-namespace", "default"
+ );
+ spark.conf().set("spark.sql.catalog.dummy_catalog", "org.apache.iceberg.spark.source.TestSparkCatalog");
+ config.forEach((key, value) -> spark.conf().set("spark.sql.catalog.dummy_catalog." + key, value));
+ Identifier ident = Identifier.of(new String[]{"default"}, "table");
+ TestSparkCatalog.setTable(ident, sparkTable);
+
+ // Although an exception is thrown here, write and commit have succeeded
+ AssertHelpers.assertThrowsWithCause("Should throw a Commit State Unknown Exception",
+ SparkException.class,
+ "Writing job aborted",
+ CommitStateUnknownException.class,
+ "Datacenter on Fire",
+ () -> sql("DELETE FROM %s WHERE id = 2", "dummy_catalog.default.table"));
+
+ // Since write and commit succeeded, the rows should be readable
+ assertEquals("Should have expected rows",
+ ImmutableList.of(row(1, "hr", "c1"), row(3, "hr", "c1")),
+ sql("SELECT * FROM %s ORDER BY id", "dummy_catalog.default.table"));
+ }
}
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
index 8e26ad80b..558b678bf 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
@@ -38,6 +38,7 @@ import org.apache.iceberg.SnapshotSummary;
import org.apache.iceberg.SnapshotUpdate;
import org.apache.iceberg.Table;
import org.apache.iceberg.deletes.PositionDelete;
+import org.apache.iceberg.exceptions.CommitStateUnknownException;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.io.ClusteredPositionDeleteWriter;
@@ -92,6 +93,8 @@ class SparkPositionDeltaWrite implements DeltaWrite, RequiresDistributionAndOrde
private final Distribution requiredDistribution;
private final SortOrder[] requiredOrdering;
+ private boolean cleanupOnAbort = true;
+
SparkPositionDeltaWrite(SparkSession spark, Table table, Command command, SparkBatchQueryScan scan,
IsolationLevel isolationLevel, SparkWriteConf writeConf,
ExtendedLogicalWriteInfo info, Schema dataSchema,
@@ -215,6 +218,10 @@ class SparkPositionDeltaWrite implements DeltaWrite, RequiresDistributionAndOrde
@Override
public void abort(WriterCommitMessage[] messages) {
+ if (!cleanupOnAbort) {
+ return;
+ }
+
for (WriterCommitMessage message : messages) {
if (message != null) {
DeltaTaskCommit taskCommit = (DeltaTaskCommit) message;
@@ -239,10 +246,15 @@ class SparkPositionDeltaWrite implements DeltaWrite, RequiresDistributionAndOrde
operation.stageOnly();
}
- long start = System.currentTimeMillis();
- operation.commit(); // abort is automatically called if this fails
- long duration = System.currentTimeMillis() - start;
- LOG.info("Committed in {} ms", duration);
+ try {
+ long start = System.currentTimeMillis();
+ operation.commit(); // abort is automatically called if this fails
+ long duration = System.currentTimeMillis() - start;
+ LOG.info("Committed in {} ms", duration);
+ } catch (CommitStateUnknownException commitStateUnknownException) {
+ cleanupOnAbort = false;
+ throw commitStateUnknownException;
+ }
}
}
diff --git a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkCatalog.java b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkCatalog.java
index 0e5280875..6b4362b98 100644
--- a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkCatalog.java
+++ b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkCatalog.java
@@ -19,8 +19,11 @@
package org.apache.iceberg.spark.source;
+import java.util.Map;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.spark.Spark3Util;
import org.apache.iceberg.spark.SparkSessionCatalog;
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
@@ -31,8 +34,18 @@ import org.apache.spark.sql.connector.catalog.TableCatalog;
public class TestSparkCatalog<T extends TableCatalog & SupportsNamespaces> extends SparkSessionCatalog<T> {
+ private static final Map<Identifier, Table> tableMap = Maps.newHashMap();
+
+ public static void setTable(Identifier ident, Table table) {
+ Preconditions.checkArgument(!tableMap.containsKey(ident), "Cannot set " + ident + ". It is already set");
+ tableMap.put(ident, table);
+ }
+
@Override
public Table loadTable(Identifier ident) throws NoSuchTableException {
+ if (tableMap.containsKey(ident)) {
+ return tableMap.get(ident);
+ }
TableIdentifier tableIdentifier = Spark3Util.identifierToTableIdentifier(ident);
Namespace namespace = tableIdentifier.namespace();
@@ -43,4 +56,8 @@ public class TestSparkCatalog<T extends TableCatalog & SupportsNamespaces> exten
return new SparkTable(table, false);
}
+
+ public static void clearTables() {
+ tableMap.clear();
+ }
}